Posts Tagged ‘Producer Consumer’
WCF Duplex Communication with multiple clients
Recent days I am working on a batch processing system to cater printing of more than 100k documents per day from a queue. To make system scalable I divided the system into two
Producer: who produce the document, it’s time consuming, CPU intensive process.
Consumer: who consumes the document and print/Email/fax it. Fairly simple with different handlers for printing, faxing or emailing and less CPU intensive
Producer and Consumer will communicate through a WCF service using TCPBinding.
One of the functionality of consumer is reporting the status (exception while printing) of the document back to the producer for audting purpose. My first design approach was
Run these two system as a pair. The producer will call a WCF service hosted in consumer when a document is ready for printing and consumer will call a WCF service in producer to report the status.
If the document volume is high then user can configure and run multiple Producers and one consumer. That means each instance of producer, we need to provide unique endpoints. But producer can communicate to any one Consumer to status reporting thus by adding load to one of the Consumer.
If you think in terms of scalability/deployment you can visualize how complex the design will be. Yes I also realize that my design is ridiculous. To make it more scalable, I have to redesign the communication between the Consumer and the Producer.
This time I thought of trying Duplex communication for the first time. Initially I avoid duplex binding in first place because I don’t know it, second a bit scared of introducing complexity. In my mind duplex binding is complex and my approach I explained above is simple 🙂 (how ignorant I am).
So I decided to try duplex communication. But I need to evaluate it and don’t want to mess with my current implementation. So end up in writing a small tracer to experiment duplex communication. Rest of the post will go through the tracer I created.
Note: My design is surrounded with the proxy less communication approach.
Let’s start with the Service contract and Callback contract.
[ServiceContract (SessionMode=SessionMode.Required,CallbackContract=typeof(ICallbackInterface))] public interface IServiceContract { [OperationContract (IsOneWay=true)] void MessageToProcess(ServiceMessage message); }
As you can see I mentioned the type of callback contract in ServiceContract attribute. The above contract will be implemented in Consumer and Producer will call the consumer using this contract if any thing available for processing.
public interface ICallbackInterface
{
[OperationContract(IsOneWay=true)]
void ProcessedMessage(ServiceMessage message);
}
The above callback interface will be implemented in Producer. Consumer will callback the Producer and update the status using this contract.
ServiceMessage is just a value object to transfer data between consumer and producer. It has just one string field called Message.
Let’s implement the Service contract in one of the class in Consumer.
[ServiceBehavior (InstanceContextMode=InstanceContextMode.PerSession)] public class MessageProcessingService:IServiceContract { private ICallbackInterface GetCurrentCallback() { return OperationContext.Current.GetCallbackChannel<ICallbackInterface>(); } public void MessageToProcess(ServiceMessage message) { ProcessRequest procRequest = new ProcessRequest(this.GetCurrentCallback(), message); ProcessQueue.Add(procRequest); } }
We need to decorate the MessageProcessingService with ServiceBehavior attribute as shown above, we cannot set that in config file.
GetCurrentCallback function will return the callback instance that is part of the service call.
MessageToProcess function will be called by Producer once it produce some message to process. As you can see I created a new object of ProcessRequest with callback instance and message then add the ProcessRequest instance to a queue. I will explain why we need queuing system.
Why we need a Queue?
Think of a scenario where there are multiple Producers and single consumer. The single consumer may not able to process all the message received from multiple producers in real time. So all the message will be queued along with the callback instance. Consumer will process the item in the queue one by one and notify the producer once the processing is done. With this implementation we can have n number of producers and one consumer.
Process Request class
public class ProcessRequest { private readonly ICallbackInterface _callbackInterface; private readonly ServiceMessage _message; public ProcessRequest(ICallbackInterface callbackInterface, ServiceMessage message) { _callbackInterface = callbackInterface; _message = message; } public ICallbackInterface GetCallback() { return _callbackInterface; } public ServiceMessage GetMessage() { return _message; } }
ProcessQueue implementation as follows
public static class ProcessQueue { private static Queue<ProcessRequest> _processQueue; private static ManualResetEvent _enqueuedEvent; static ProcessQueue() { _processQueue = new Queue<ProcessRequest>(); _enqueuedEvent = new ManualResetEvent(false); } public static void Add(ProcessRequest request) { lock (_processQueue) { _processQueue.Enqueue(request); _enqueuedEvent.Set(); } ProcessQueuedItem(); } private static ProcessRequest _currentRequest; public static bool Dequeue() { lock (_processQueue) { if (_processQueue.Count > 0) { _enqueuedEvent.Reset(); _currentRequest = _processQueue.Dequeue(); return true; } else return false; } } private static void ProcessQueuedItem() { while (Dequeue()) { ServiceMessage message = _currentRequest.GetMessage(); message.Message += " {Processed}"; _currentRequest.GetCallback().ProcessedMessage(message); } } }
ProcessedQueuedItem function process the message, for my tracer I just appended ‘{Processed}’ at the message.
We are done with the Consumer implementation. Let’s go through the Producer.
public partial class frmProducer : Form { public frmProducer() { InitializeComponent(); this.Text = this.Text+ RandomGenerator.GetRandom().ToString(); } private void btnSend_Click(object sender, EventArgs e) { ServiceMessage message = new ServiceMessage(); message.Message = txtMessage.Text; InstanceContext instanceContext = new InstanceContext(new CallbackHandler()); IServiceContract service = ServiceFactory.CreateInstance<IServiceContract>(instanceContext); service.MessageToProcess(message); } }
As you can see in btnSend_click we need to create instance of InstanceContext with an instance of class that implemented ICallbackInterface I created earlier. Then I call a factory class to get the channel to Consumer. In the factory I used DuplexChannelFactory instead of ChannelFactory.
CallbackHandler implementation
public class CallbackHandler:ICallbackInterface { public void ProcessedMessage(ServiceMessage message) { //do the status reporting here. } }
We are done with our Producer and consumer implementation. We can test it by running different instance of Producer and one instance of consumer and see how the communication works.
Once my tracer is successful I redesigned the communication of Producer and Consumer in my projects with few changes of code. With new design user can add any number of Producer just by creating an instance of Producer without any difficulty.
Hope this little post will help some one who is new to duplex communication in WCF.
Download tracer source code (WCF.TCPIP.Duplex.Service) here