|
一、One-way MEP V.S. Responsible Service
我們知道MSMQ天生就具有異步的特性,它只能以O(shè)ne-way的MEP(Message Exchange Pattern)進行通信。Client和Service之間采用One-way MEP的話就意味著Client調(diào)用Service之后立即返回,它無法獲得Service的執(zhí)行結(jié)果,也無法捕捉Service運行的Exception。下圖簡單表述了基于MSMQ的WCF Service中Client和Service的交互。
二、 Solution 雖然我們的目的很簡單:當(dāng)Client向Service遞交了Order之后,能以某種方式獲知Order的執(zhí)行結(jié)果;對于Service端來說,在正確把Order從Message Queue中獲取出來、并正確處理之后,能夠向Order的遞交者發(fā)送一個Acknowledge Message。為了簡單起見,這個Acknowledge Message包含兩組信息:
要在WCF中實現(xiàn)這樣的目的,對于Request/Reply MEP來說是簡單而直接的:Client向Service遞交Order,并等待Service的Response,Service在處理接收到Order之后直接將處理結(jié)果 返回給Client就可以了。但是我們說過MSMQ天生就是異步的,我們只有采取一種間接的方式實現(xiàn)“曲線救國”。 我們的解決方案是:在每個Client Domain也創(chuàng)建一個基于MSMQ的本地的WCF Service,用于接收來自O(shè)rder處理端發(fā)送的Acknowledge Message。對于處理Order 的Service來說,在正確處理Order之后,想對應(yīng)的Client發(fā)送Acknowledge Message。下圖簡單演示整個過程:
了解了上面的Solution之后,我們來看看該Solution在真正實現(xiàn)過程中有什么樣的困難。對于處理Order的Service來說,在向Client端發(fā)送Acknowledge Message的時候,它必須要知道該Order對應(yīng)的Client的Response Service的MSMQ的Address以及其他和Operation相關(guān)的Context信息(在這里我們不需要,不過考慮到擴展性,我們把包括了address的Context的信息 封裝到一個了Class中,在這里叫做:OrderResponseContext)。而這些Context卻不能在Configuration中進行配置,因為他可以同時面臨著很多個Client:比如每個Client用于接收Response 的Message Queue的address都不一樣。所以這個OrderResponseContext必須通過對應(yīng)的Client來提供?;诖?,我們具有兩面兩種解決方式: 方式一、修改Service Contract,把OrderResponseContext當(dāng)成是Operation的一個參數(shù) 這是我們最容易想到的,比如我們原來的Operation這樣定義: namespace Artech.ResponsiveQueuedService.Contract![]() ![]() { [ServiceContract] [ServiceKnownType(typeof(Order))] public interface IOrderProcessor![]() { [OperationContract(IsOneWay = true)] void Submit(Order order); } }![]() 現(xiàn)在變成: namespace Artech.ResponsiveQueuedService.Contract![]() ![]() { [ServiceContract] [ServiceKnownType(typeof(Order))] public interface IOrderProcessor![]() { [OperationContract(IsOneWay = true)] void Submit(Order order, OrderResponseContext responseContext); } }![]() 雖然這種方式看起來不錯,但是卻不值得推薦。在一般情況下,我們的Contract需要是很穩(wěn)定的,一經(jīng)確定就不能輕易更改,因為Contract是被交互的多方共同支持的,牽一發(fā)動全身;此外,從Service Contract代表的是Service的一個Interface,他是對業(yè)務(wù)邏輯的抽象、和具體實現(xiàn)無關(guān),而對于我們的例子來說,我們僅僅是定義一個遞交Order的Operation,從業(yè)務(wù)邏輯來看,OrderResponseContext和抽象的業(yè)務(wù)邏輯毫無關(guān)系?;诖?,我們需要尋求一種和Service Contract無關(guān)的解決方式: 方式二、將OrderResponseContext放到Soap Message 的Header中 其實我們要解決的問題很簡單,就是要把OrderResponseContext的信息置于Soap Message中發(fā)送到Service。而我們知道,Soap的Header具有極強的可伸縮性,原則上,我們可以把任何控制信息置于Header中?;赪CF的編程模式很容易地幫助我們實現(xiàn)對Soap Header的插入和獲?。?/p> 我們可以通過下面的方式獲得當(dāng)前Operation Context的Incoming Message Headers和Outgoing Message Headers OperationContext.Current.IncomingMessageHeaders OperationContext.Current.OutgoingMessageHeaders![]() 如果我們要把一個OrderResponseContext 對象插入到當(dāng)前Operation Context的Outgoing Message Headers中,我們可以通過下面的代碼來實現(xiàn): OrderResponseContext context = new OrderResponseContext(); MessageHeader<OrderResponseContext> header = new MessageHeader<OrderResponseContext>( context); OperationContext.Current.OutgoingMessageHeaders.Add(header.GetUntypedHeader("name", "namespace"));![]() 相應(yīng)的,我們可以通過下面的代碼從Outgoing Message Headers OrderResponseContext的數(shù)據(jù)獲取的內(nèi)容: OrderResponseContext context = OperationContext.Current.IncomingMessageHeaders.GetHeader<OrderResponseContext>("name", "namespace"));四、Sample 我們照例給出一個完整的Sample,下面是整個Solution的結(jié)構(gòu):
1.Contract: Artech.ResponsiveQueuedService.Contract Service Contract: Artech.ResponsiveQueuedService.Contract. IOrderProcessor using System; using System.Collections.Generic; using System.Text; using System.ServiceModel;![]() namespace Artech.ResponsiveQueuedService.Contract![]() ![]() { [ServiceContract] [ServiceKnownType(typeof(Order))] public interface IOrderProcessor![]() { [OperationContract(IsOneWay = true)] void Submit(Order order); } }![]() Service Contract: Artech.ResponsiveQueuedService.Contract.IOrderRessponse using System; using System.Collections.Generic; using System.Text; using System.ServiceModel;![]() namespace Artech.ResponsiveQueuedService.Contract![]() ![]() { [ServiceContract] public interface IOrderRessponse![]() { [OperationContract(IsOneWay =true)] void SubmitOrderResponse(Guid orderNo,FaultException exception); } }![]() 接收來自O(shè)rder processing端的Response:Order No.和Exception。 Data Contract: Artech.ResponsiveQueuedService.Contract.Order using System; using System.Collections.Generic; using System.Text; using System.Runtime.Serialization;![]() namespace Artech.ResponsiveQueuedService.Contract![]() ![]() { [DataContract] public class Order![]() {![]() Private Fields#region Private Fields private Guid _orderNo; private DateTime _orderDate; private Guid _supplierID; private string _supplierName; #endregion![]() ![]() Constructors#region Constructors public Order(Guid orderNo, DateTime orderDate, Guid supplierID, string supplierName)![]() { this._orderNo = orderNo; this._orderDate = orderDate; this._supplierID = supplierID; this._supplierName = supplierName; }![]() #endregion![]() ![]() Public Properties#region Public Properties [DataMember] public Guid OrderNo![]() {![]() get { return _orderNo; }![]() set { _orderNo = value; } }![]() [DataMember] public DateTime OrderDate![]() {![]() get { return _orderDate; }![]() set { _orderDate = value; } }![]() [DataMember] public Guid SupplierID![]() {![]() get { return _supplierID; }![]() set { _supplierID = value; } }![]() [DataMember] public string SupplierName![]() {![]() get { return _supplierName; }![]() set { _supplierName = value; } } #endregion![]() ![]() Public Methods#region Public Methods public override string ToString()![]() { string description = string.Format("Order No.\t: {0}\n\tOrder Date\t: {1}\n\tSupplier No.\t: {2}\n\tSupplier Name\t: {3}", this._orderNo, this._orderDate.ToString("yyyy/MM/dd"), this._supplierID, this._supplierName); return description; } #endregion } }![]() 對Order的封裝。 Data Contract:Artech.ResponsiveQueuedService.Contract. OrderResponseContext using System; using System.Collections.Generic; using System.Text; using System.Runtime.Serialization; using System.ServiceModel;![]() namespace Artech.ResponsiveQueuedService.Contract![]() ![]() { [DataContract] public class OrderResponseContext![]() { private Uri _responseAddress;![]() [DataMember] public Uri ResponseAddress![]() {![]() get { return _responseAddress; }![]() set { _responseAddress = value; } }![]() public static OrderResponseContext Current![]() { get![]() { if (OperationContext.Current == null)![]() { return null; }![]() return OperationContext.Current.IncomingMessageHeaders.GetHeader<OrderResponseContext>("OrderResponseContext", "Artech.ResponsiveQueuedService.Contract"); } set![]() { MessageHeader<OrderResponseContext> header = new MessageHeader<OrderResponseContext>(value); OperationContext.Current.OutgoingMessageHeaders.Add(header.GetUntypedHeader("OrderResponseContext", "Artech.ResponsiveQueuedService.Contract")); } } } }![]() ResponseAddress代表Host在Client Domain的Response Service的Address。同過Current把OrderResponseContext插入到Outgoing Message Headers中、以及從Ingoing Message Headers取出OrderResponseContext對象。 2.Order Processing Service:Artech.ResponsiveQueuedService.Service using System; using System.Collections.Generic; using System.Text; using Artech.ResponsiveQueuedService.Contract; using System.ServiceModel; using System.Net.Security;![]() namespace Artech.ResponsiveQueuedService.Service![]() ![]() { public class OrderProcessorService:IOrderProcessor![]() { private void ProcessOrder(Order order)![]() {![]() if (order.OrderDate < DateTime.Today)![]() { throw new Exception(); } }![]() ![]() IOrderProcessor Members#region IOrderProcessor Members![]() public void Submit(Order order)![]() { Console.WriteLine("Begin to process the order of the order No.: {0}", order.OrderNo); FaultException exception= null; if (order.OrderDate < DateTime.Today)![]() { exception = new FaultException(new FaultReason("The order has expried"), new FaultCode("sender")); Console.WriteLine("It‘s fail to process the order.\n\tOrder No.: {0}\n\tReason:{1}", order.OrderNo, "The order has expried"); } else![]() { Console.WriteLine("It‘s successful to process the order.\n\tOrder No.: {0}", order.OrderNo); }![]() NetMsmqBinding binding = new NetMsmqBinding(); binding.ExactlyOnce = false; binding.Security.Transport.MsmqAuthenticationMode = MsmqAuthenticationMode.None; binding.Security.Transport.MsmqProtectionLevel = ProtectionLevel.None; ChannelFactory<IOrderRessponse> channelFacotry = new ChannelFactory<IOrderRessponse>(binding); OrderResponseContext responseContext = OrderResponseContext.Current; IOrderRessponse channel = channelFacotry.CreateChannel(new EndpointAddress(responseContext.ResponseAddress));![]() using (OperationContextScope contextScope = new OperationContextScope(channel as IContextChannel))![]() { channel.SubmitOrderResponse(order.OrderNo, exception); } } #endregion } }![]() 在這里我們模擬了這樣的場景:先通過Order Date判斷Order是否過期,如果過期創(chuàng)建一個FaultException,否則正確處理該Order,然后通過OrderResponseContext.Current從Incoming Message Header中獲取封裝在OrderResponseContext對象中的Response Address,創(chuàng)建Binding并調(diào)用Response Service. 3. Order Processing Service Hosting: Artech.ResponsiveQueuedService.Hosting Configuration <?xml version="1.0" encoding="utf-8" ?> <configuration> <appSettings> <add key="msmqPath" value=".\private$\orderprocessor"/> </appSettings> <system.serviceModel> <bindings> <netMsmqBinding> <binding name="MsmqBinding" exactlyOnce="false" useActiveDirectory="false"> <security> <transport msmqAuthenticationMode="None" msmqProtectionLevel="None" /> </security> </binding> </netMsmqBinding> </bindings> <services> <service name="Artech.ResponsiveQueuedService.Service.OrderProcessorService"> <endpoint address="net.msmq://localhost/private/orderprocessor" binding="netMsmqBinding" bindingConfiguration="MsmqBinding" contract="Artech.ResponsiveQueuedService.Contract.IOrderProcessor" /> </service> </services> </system.serviceModel> </configuration>![]() Program using System; using System.Collections.Generic; using System.Text; using Artech.ResponsiveQueuedService.Service; using System.ServiceModel; using System.Configuration; using System.Messaging;![]() namespace Artech.ResponsiveQueuedService.Hosting![]() ![]() { class Program![]() { static void Main(string[] args)![]() { string path = ConfigurationManager.AppSettings["msmqPath"]; if (!MessageQueue.Exists(path))![]() { MessageQueue.Create(path); }![]() using (ServiceHost host = new ServiceHost(typeof(OrderProcessorService)))![]() { host.Opened += delegate![]() { Console.WriteLine("The Order Processor service has begun to listen "); };![]() host.Open();![]() Console.Read(); } } } }![]() 4. Response Service: Artech.ResponsiveQueuedService.LocalService.OrderRessponseService using System; using System.Collections.Generic; using System.Text; using Artech.ResponsiveQueuedService.Contract; using System.ServiceModel;![]() namespace Artech.ResponsiveQueuedService.LocalService![]() ![]() { public class OrderRessponseService : IOrderRessponse![]() {![]() IOrderRessponse Members#region IOrderRessponse Members![]() public void SubmitOrderResponse(Guid orderNo, FaultException exception)![]() { if (exception == null)![]() { Console.WriteLine("It‘s successful to process the order!\n\tOrder No.: {0}",orderNo); } else![]() { Console.WriteLine("It‘s fail to process the order!\n\tOrder No.: {0}\n\tReason: {1}", orderNo, exception.Message); } }![]() #endregion } }![]()
Configuration <?xml version="1.0" encoding="utf-8" ?> <configuration> <appSettings> <add key="msmqPath" value=".\private$\orderresponse"/> </appSettings> <system.serviceModel> <bindings> <netMsmqBinding> <binding name="msmqBinding" exactlyOnce="false"> <security> <transport msmqAuthenticationMode="None" msmqProtectionLevel="None" /> </security> </binding> </netMsmqBinding> </bindings> <services> <service name="Artech.ResponsiveQueuedService.LocalService.OrderRessponseService"> <endpoint address="net.msmq://localhost/private/orderresponse" binding="netMsmqBinding" bindingConfiguration="msmqBinding" contract="Artech.ResponsiveQueuedService.Contract.IOrderRessponse" /> </service> </services> </system.serviceModel> </configuration>![]() Program using System; using System.Collections.Generic; using System.Text; using Artech.ResponsiveQueuedService.LocalService; using System.Configuration; using System.ServiceModel; using System.Messaging;![]() namespace Artech.ResponsiveQueuedService.LocalhHosting![]() ![]() { class Program![]() { static void Main(string[] args)![]() { string path = ConfigurationManager.AppSettings["msmqPath"]; if (!MessageQueue.Exists(path))![]() { MessageQueue.Create(path); }![]() using (ServiceHost host = new ServiceHost(typeof(OrderRessponseService)))![]() { host.Opened += delegate![]() { Console.WriteLine("The Order Response service has begun to listen "); };![]() host.Open();![]() Console.Read(); } } } }![]() 6. Client: Artech.ResponsiveQueuedService.Client Configuration: <?xml version="1.0" encoding="utf-8" ?> <configuration> <appSettings> <add key="msmqPath" value="net.msmq://localhost/private/orderresponse"/> </appSettings> <system.serviceModel> <bindings> <netMsmqBinding> <binding name="MsmqBinding" exactlyOnce="false" useActiveDirectory="false"> <security> <transport msmqAuthenticationMode="None" msmqProtectionLevel="None" /> </security> </binding> </netMsmqBinding> </bindings> <client> <endpoint address="net.msmq://localhost/private/orderprocessor" binding="netMsmqBinding" bindingConfiguration="MsmqBinding" contract="Artech.ResponsiveQueuedService.Contract.IOrderProcessor" name="defaultEndpoint" /> </client> </system.serviceModel> </configuration>![]() Program: using System; using System.Collections.Generic; using System.Text; using System.Configuration; using System.ServiceModel; using Artech.ResponsiveQueuedService.Contract; using System.Messaging;![]() namespace Artech.ResponsiveQueuedService.Clinet![]() ![]() { class Program![]() { static void Main(string[] args)![]() { Order order1 = new Order(Guid.NewGuid(), DateTime.Today.AddDays(5), Guid.NewGuid(), "Supplier A"); Order order2 = new Order(Guid.NewGuid(), DateTime.Today.AddDays(-5), Guid.NewGuid(), "Supplier A");![]() string path = ConfigurationManager.AppSettings["msmqPath"]; Uri address = new Uri(path); OrderResponseContext context = new OrderResponseContext(); context.ResponseAddress = address;![]() ChannelFactory<IOrderProcessor> channelFactory = new ChannelFactory<IOrderProcessor>("defaultEndpoint"); IOrderProcessor orderProcessor = channelFactory.CreateChannel();![]() using (OperationContextScope contextScope = new OperationContextScope(orderProcessor as IContextChannel))![]() { Console.WriteLine("Submit the order of order No.: {0}", order1.OrderNo); OrderResponseContext.Current = context; orderProcessor.Submit(order1); }![]() using (OperationContextScope contextScope = new OperationContextScope(orderProcessor as IContextChannel))![]() { Console.WriteLine("Submit the order of order No.: {0}", order2.OrderNo); OrderResponseContext.Current = context; orderProcessor.Submit(order2); }![]() Console.Read(); } } }![]() 我創(chuàng)建了兩個Order對象, 其中一個已經(jīng)過期。從Configuration中取出Response Address并購建一個OrderResponseContext,然后分兩次將這兩個Order向Order Processing Service遞交。在調(diào)用Order Processing Order的Operation Context Scope中,通過OrderResponseContext.Current將OrderResponseContext對象插入Outcoming Message Header中。 我們現(xiàn)在運行一下整個程序,看看最終的輸出結(jié)果: Client:
|
|
|