小男孩‘自慰网亚洲一区二区,亚洲一级在线播放毛片,亚洲中文字幕av每天更新,黄aⅴ永久免费无码,91成人午夜在线精品,色网站免费在线观看,亚洲欧洲wwwww在线观看

分享

WCF之旅(13):創(chuàng)建基于MSMQ的Responsive Service

 waywin 2008-02-16
一、One-way MEP V.S. Responsible Service

我們知道MSMQ天生就具有異步的特性,它只能以O(shè)ne-way的MEP(Message Exchange Pattern)進行通信。Client和Service之間采用One-way MEP的話就意味著Client調(diào)用Service之后立即返回,它無法獲得Service的執(zhí)行結(jié)果,也無法捕捉Service運行的Exception。下圖簡單表述了基于MSMQ的WCF Service中Client和Service的交互。


但是在有些場景 中,這是無法容忍的。再拿我在上一篇文章的Order Delivery的例子來說。Client向Service提交了Order,卻無法確認(rèn)該Order是否被Service正確處理,這顯然是不能接受的。我們今天就來討論一下,如何創(chuàng)建一個Responsive Service來解決這個問題:Client不再是對Service的執(zhí)行情況一無所知,它可以獲知Order是否被Service正確處理了。

二、 Solution

雖然我們的目的很簡單:當(dāng)Client向Service遞交了Order之后,能以某種方式獲知Order的執(zhí)行結(jié)果;對于Service端來說,在正確把Order從Message Queue中獲取出來、并正確處理之后,能夠向Order的遞交者發(fā)送一個Acknowledge Message。為了簡單起見,這個Acknowledge Message包含兩組信息:

  • Order No.: 被處理的Order的一個能夠為一標(biāo)志它的ID。
  • Exception: 如果處理失敗的Exception,如果成功處理為null。

要在WCF中實現(xiàn)這樣的目的,對于Request/Reply MEP來說是簡單而直接的:Client向Service遞交Order,并等待Service的Response,Service在處理接收到Order之后直接將處理結(jié)果 返回給Client就可以了。但是我們說過MSMQ天生就是異步的,我們只有采取一種間接的方式實現(xiàn)“曲線救國”。

我們的解決方案是:在每個Client Domain也創(chuàng)建一個基于MSMQ的本地的WCF Service,用于接收來自O(shè)rder處理端發(fā)送的Acknowledge Message。對于處理Order 的Service來說,在正確處理Order之后,想對應(yīng)的Client發(fā)送Acknowledge Message。下圖簡單演示整個過程:


三、Implementation

了解了上面的Solution之后,我們來看看該Solution在真正實現(xiàn)過程中有什么樣的困難。對于處理Order的Service來說,在向Client端發(fā)送Acknowledge Message的時候,它必須要知道該Order對應(yīng)的Client的Response Service的MSMQ的Address以及其他和Operation相關(guān)的Context信息(在這里我們不需要,不過考慮到擴展性,我們把包括了address的Context的信息 封裝到一個了Class中,在這里叫做:OrderResponseContext)。而這些Context卻不能在Configuration中進行配置,因為他可以同時面臨著很多個Client:比如每個Client用于接收Response 的Message Queue的address都不一樣。所以這個OrderResponseContext必須通過對應(yīng)的Client來提供?;诖?,我們具有兩面兩種解決方式:

方式一、修改Service Contract,把OrderResponseContext當(dāng)成是Operation的一個參數(shù)

這是我們最容易想到的,比如我們原來的Operation這樣定義:

namespace Artech.ResponsiveQueuedService.Contract
{
    [ServiceContract]
    [ServiceKnownType(
typeof(Order))]
    
public interface IOrderProcessor
    
{
        [OperationContract(IsOneWay 
= true)]
        
void Submit(Order order);
    }

}

現(xiàn)在變成:

namespace Artech.ResponsiveQueuedService.Contract
{
    [ServiceContract]
    [ServiceKnownType(
typeof(Order))]
    
public interface IOrderProcessor
    
{
        [OperationContract(IsOneWay 
= true)]
        
void Submit(Order order, OrderResponseContext responseContext);
    }

}

雖然這種方式看起來不錯,但是卻不值得推薦。在一般情況下,我們的Contract需要是很穩(wěn)定的,一經(jīng)確定就不能輕易更改,因為Contract是被交互的多方共同支持的,牽一發(fā)動全身;此外,從Service Contract代表的是Service的一個Interface,他是對業(yè)務(wù)邏輯的抽象、和具體實現(xiàn)無關(guān),而對于我們的例子來說,我們僅僅是定義一個遞交Order的Operation,從業(yè)務(wù)邏輯來看,OrderResponseContext和抽象的業(yè)務(wù)邏輯毫無關(guān)系?;诖?,我們需要尋求一種和Service Contract無關(guān)的解決方式:

方式二、將OrderResponseContext放到Soap Message 的Header中

其實我們要解決的問題很簡單,就是要把OrderResponseContext的信息置于Soap Message中發(fā)送到Service。而我們知道,Soap的Header具有極強的可伸縮性,原則上,我們可以把任何控制信息置于Header中?;赪CF的編程模式很容易地幫助我們實現(xiàn)對Soap Header的插入和獲?。?/p>

我們可以通過下面的方式獲得當(dāng)前Operation Context的Incoming Message Headers和Outgoing Message Headers

OperationContext.Current.IncomingMessageHeaders
OperationContext.Current.OutgoingMessageHeaders

如果我們要把一個OrderResponseContext 對象插入到當(dāng)前Operation Context的Outgoing Message Headers中,我們可以通過下面的代碼來實現(xiàn):

OrderResponseContext context = new OrderResponseContext();
MessageHeader
<OrderResponseContext> header = new MessageHeader<OrderResponseContext>( context);
OperationContext.Current.OutgoingMessageHeaders.Add(header.GetUntypedHeader(
"name""namespace"));

相應(yīng)的,我們可以通過下面的代碼從Outgoing Message Headers OrderResponseContext的數(shù)據(jù)獲取的內(nèi)容:

OrderResponseContext context = OperationContext.Current.IncomingMessageHeaders.GetHeader<OrderResponseContext>("name""namespace"));

四、Sample

我們照例給出一個完整的Sample,下面是整個Solution的結(jié)構(gòu):


除了一貫使用的4層結(jié)構(gòu)(Contract-Service-Hosting-Client),還為ResponseService增加了下面兩層:

  • Localservice: 作為Client Domain的ResponseService。
  • LocalHosting:Host Localservice。

1.Contract:  Artech.ResponsiveQueuedService.Contract

Service Contract: Artech.ResponsiveQueuedService.Contract. IOrderProcessor

using System;
using System.Collections.Generic;
using System.Text;
using System.ServiceModel;

namespace Artech.ResponsiveQueuedService.Contract
{
    [ServiceContract]
    [ServiceKnownType(
typeof(Order))]
    
public interface IOrderProcessor
    
{
        [OperationContract(IsOneWay 
= true)]
        
void Submit(Order order);
    }

}

Service Contract: Artech.ResponsiveQueuedService.Contract.IOrderRessponse

using System;
using System.Collections.Generic;
using System.Text;
using System.ServiceModel;

namespace Artech.ResponsiveQueuedService.Contract
{
    [ServiceContract]
    
public interface  IOrderRessponse
    
{
        [OperationContract(IsOneWay 
=true)]
        
void SubmitOrderResponse(Guid orderNo,FaultException exception);
    }

}

接收來自O(shè)rder processing端的Response:Order No.和Exception。

Data Contract: Artech.ResponsiveQueuedService.Contract.Order

using System;
using System.Collections.Generic;
using System.Text;
using System.Runtime.Serialization;

namespace Artech.ResponsiveQueuedService.Contract
{
    [DataContract]
    
public class Order
    
{
        
Private Fields

        
Constructors

        
Public Properties

        
Public Methods
    }

}

對Order的封裝。

Data Contract:Artech.ResponsiveQueuedService.Contract. OrderResponseContext

using System;
using System.Collections.Generic;
using System.Text;
using System.Runtime.Serialization;
using System.ServiceModel;

namespace Artech.ResponsiveQueuedService.Contract
{    
    [DataContract]
    
public class OrderResponseContext
    
{
        
private Uri _responseAddress;

        [DataMember]
        
public Uri ResponseAddress
        
{
            
get return _responseAddress; }
            
set { _responseAddress = value; }
        }


        
public static OrderResponseContext Current
        
{
            
get
            
{
                
if (OperationContext.Current == null)
                
{
                    
return null;
                }


                
return OperationContext.Current.IncomingMessageHeaders.GetHeader<OrderResponseContext>("OrderResponseContext""Artech.ResponsiveQueuedService.Contract");
            }

            
set
            
{
                MessageHeader
<OrderResponseContext> header = new MessageHeader<OrderResponseContext>(value);
                OperationContext.Current.OutgoingMessageHeaders.Add(header.GetUntypedHeader(
"OrderResponseContext""Artech.ResponsiveQueuedService.Contract"));
            }

        }

    }

}

ResponseAddress代表Host在Client Domain的Response Service的Address。同過Current把OrderResponseContext插入到Outgoing Message Headers中、以及從Ingoing Message Headers取出OrderResponseContext對象。

2.Order Processing Service:Artech.ResponsiveQueuedService.Service

using System;
using System.Collections.Generic;
using System.Text;
using Artech.ResponsiveQueuedService.Contract;
using System.ServiceModel;
using System.Net.Security;

namespace Artech.ResponsiveQueuedService.Service
{
    
public class OrderProcessorService:IOrderProcessor
    
{
        
private void ProcessOrder(Order order)
        
{

            
if (order.OrderDate < DateTime.Today)
            
{
                
throw new Exception();
            }

        }


        
IOrderProcessor Members
    }

}

在這里我們模擬了這樣的場景:先通過Order Date判斷Order是否過期,如果過期創(chuàng)建一個FaultException,否則正確處理該Order,然后通過OrderResponseContext.Current從Incoming Message Header中獲取封裝在OrderResponseContext對象中的Response Address,創(chuàng)建Binding并調(diào)用Response Service.

3. Order Processing Service Hosting: Artech.ResponsiveQueuedService.Hosting

Configuration

<?xml version="1.0" encoding="utf-8" ?>
<configuration>
  
<appSettings>
    
<add key="msmqPath" value=".\private$\orderprocessor"/>
  
</appSettings>
  
<system.serviceModel>
    
<bindings>
      
<netMsmqBinding>
        
<binding name="MsmqBinding" exactlyOnce="false" useActiveDirectory="false">
          
<security>
            
<transport msmqAuthenticationMode="None" msmqProtectionLevel="None" />
          
</security>
        
</binding>
      
</netMsmqBinding>
    
</bindings>
    
<services>
      
<service name="Artech.ResponsiveQueuedService.Service.OrderProcessorService">
        
<endpoint address="net.msmq://localhost/private/orderprocessor" binding="netMsmqBinding"
            bindingConfiguration
="MsmqBinding" contract="Artech.ResponsiveQueuedService.Contract.IOrderProcessor" />
      
</service>
    
</services>
  
</system.serviceModel>
</configuration>

Program

using System;
using System.Collections.Generic;
using System.Text;
using Artech.ResponsiveQueuedService.Service;
using System.ServiceModel;
using System.Configuration;
using System.Messaging;

namespace Artech.ResponsiveQueuedService.Hosting
{
    
class Program
    
{
        
static void Main(string[] args)
        
{
            
string path = ConfigurationManager.AppSettings["msmqPath"];
            
if (!MessageQueue.Exists(path))
            
{
                MessageQueue.Create(path);
            }


            
using (ServiceHost host = new ServiceHost(typeof(OrderProcessorService)))
            
{
                host.Opened 
+= delegate
                
{
                    Console.WriteLine(
"The Order Processor service has begun to listen");
                }
;

                host.Open();

                Console.Read();
            }

        }

    }

}

4. Response Service: Artech.ResponsiveQueuedService.LocalService.OrderRessponseService

using System;
using System.Collections.Generic;
using System.Text;
using Artech.ResponsiveQueuedService.Contract;
using System.ServiceModel;

namespace Artech.ResponsiveQueuedService.LocalService
{
    
public class OrderRessponseService : IOrderRessponse
    
{
        
IOrderRessponse Members
    }

}


5. Response Service Hosting: Artech.ResponsiveQueuedService.LocalhHosting

Configuration

<?xml version="1.0" encoding="utf-8" ?>
<configuration>
  
<appSettings>
    
<add key="msmqPath" value=".\private$\orderresponse"/>
  
</appSettings>
  
<system.serviceModel>
    
<bindings>
      
<netMsmqBinding>
        
<binding name="msmqBinding" exactlyOnce="false">
          
<security>
            
<transport msmqAuthenticationMode="None" msmqProtectionLevel="None" />
          
</security>
        
</binding>
      
</netMsmqBinding>
    
</bindings>
    
<services>
      
<service name="Artech.ResponsiveQueuedService.LocalService.OrderRessponseService">
        
<endpoint address="net.msmq://localhost/private/orderresponse" binding="netMsmqBinding"
            bindingConfiguration
="msmqBinding" contract="Artech.ResponsiveQueuedService.Contract.IOrderRessponse" />
      
</service>
    
</services>
  
</system.serviceModel>
</configuration>

Program

using System;
using System.Collections.Generic;
using System.Text;
using Artech.ResponsiveQueuedService.LocalService;
using System.Configuration;
using System.ServiceModel;
using System.Messaging;

namespace Artech.ResponsiveQueuedService.LocalhHosting
{
    
class Program
    
{
        
static void Main(string[] args)
        
{
            
string path = ConfigurationManager.AppSettings["msmqPath"];
            
if (!MessageQueue.Exists(path))
            
{
                MessageQueue.Create(path);
            }


            
using (ServiceHost host = new ServiceHost(typeof(OrderRessponseService)))
            
{
                host.Opened 
+= delegate
                
{
                    Console.WriteLine(
"The Order Response service has begun to listen");
                }
;

                host.Open();

                Console.Read();
            }

        }

    }

}

6. Client: Artech.ResponsiveQueuedService.Client

Configuration:

<?xml version="1.0" encoding="utf-8" ?>
<configuration>
  
<appSettings>
    
<add key="msmqPath" value="net.msmq://localhost/private/orderresponse"/>
  
</appSettings>
  
<system.serviceModel>
    
<bindings>
      
<netMsmqBinding>
        
<binding name="MsmqBinding" exactlyOnce="false" useActiveDirectory="false">
          
<security>
            
<transport msmqAuthenticationMode="None" msmqProtectionLevel="None" />
          
</security>
        
</binding>
      
</netMsmqBinding>
    
</bindings>
    
<client>
      
<endpoint address="net.msmq://localhost/private/orderprocessor" binding="netMsmqBinding"
            bindingConfiguration
="MsmqBinding" contract="Artech.ResponsiveQueuedService.Contract.IOrderProcessor" name="defaultEndpoint" />
    
</client>
  
</system.serviceModel>
</configuration>

Program:

using System;
using System.Collections.Generic;
using System.Text;
using System.Configuration;
using System.ServiceModel;
using Artech.ResponsiveQueuedService.Contract;
using System.Messaging;

namespace Artech.ResponsiveQueuedService.Clinet
{
    
class Program
    
{
        
static void Main(string[] args)
        
{
            Order order1 
= new Order(Guid.NewGuid(), DateTime.Today.AddDays(5), Guid.NewGuid(), "Supplier A");
            Order order2 
= new Order(Guid.NewGuid(), DateTime.Today.AddDays(-5), Guid.NewGuid(), "Supplier A");

            
string path = ConfigurationManager.AppSettings["msmqPath"];
            Uri address 
= new Uri(path);
            OrderResponseContext context 
= new OrderResponseContext();
            context.ResponseAddress 
= address;

            ChannelFactory
<IOrderProcessor> channelFactory = new ChannelFactory<IOrderProcessor>("defaultEndpoint");
            IOrderProcessor orderProcessor 
= channelFactory.CreateChannel();

            
using (OperationContextScope contextScope = new OperationContextScope(orderProcessor as IContextChannel))
            
{
                Console.WriteLine(
"Submit the order of order No.: {0}", order1.OrderNo);
                OrderResponseContext.Current 
= context;
                orderProcessor.Submit(order1);
            }


            
using (OperationContextScope contextScope = new OperationContextScope(orderProcessor as IContextChannel))
            
{
                Console.WriteLine(
"Submit the order of order No.: {0}", order2.OrderNo);
                OrderResponseContext.Current 
= context;
                orderProcessor.Submit(order2);
            }


            Console.Read();
        }

    }

}

我創(chuàng)建了兩個Order對象, 其中一個已經(jīng)過期。從Configuration中取出Response Address并購建一個OrderResponseContext,然后分兩次將這兩個Order向Order Processing Service遞交。在調(diào)用Order Processing Order的Operation Context Scope中,通過OrderResponseContext.Current將OrderResponseContext對象插入Outcoming Message Header中。

我們現(xiàn)在運行一下整個程序,看看最終的輸出結(jié)果:

Client:


Order Processing:


Order Response:

    本站是提供個人知識管理的網(wǎng)絡(luò)存儲空間,所有內(nèi)容均由用戶發(fā)布,不代表本站觀點。請注意甄別內(nèi)容中的聯(lián)系方式、誘導(dǎo)購買等信息,謹(jǐn)防詐騙。如發(fā)現(xiàn)有害或侵權(quán)內(nèi)容,請點擊一鍵舉報。
    轉(zhuǎn)藏 分享 獻花(0

    0條評論

    發(fā)表

    請遵守用戶 評論公約

    類似文章 更多