WCF Duplex Communication with multiple clients
Recent days I am working on a batch processing system to cater printing of more than 100k documents per day from a queue. To make system scalable I divided the system into two
Producer: who produce the document, it’s time consuming, CPU intensive process.
Consumer: who consumes the document and print/Email/fax it. Fairly simple with different handlers for printing, faxing or emailing and less CPU intensive
Producer and Consumer will communicate through a WCF service using TCPBinding.
One of the functionality of consumer is reporting the status (exception while printing) of the document back to the producer for audting purpose. My first design approach was
Run these two system as a pair. The producer will call a WCF service hosted in consumer when a document is ready for printing and consumer will call a WCF service in producer to report the status.
If the document volume is high then user can configure and run multiple Producers and one consumer. That means each instance of producer, we need to provide unique endpoints. But producer can communicate to any one Consumer to status reporting thus by adding load to one of the Consumer.
If you think in terms of scalability/deployment you can visualize how complex the design will be. Yes I also realize that my design is ridiculous. To make it more scalable, I have to redesign the communication between the Consumer and the Producer.
This time I thought of trying Duplex communication for the first time. Initially I avoid duplex binding in first place because I don’t know it, second a bit scared of introducing complexity. In my mind duplex binding is complex and my approach I explained above is simple 🙂 (how ignorant I am).
So I decided to try duplex communication. But I need to evaluate it and don’t want to mess with my current implementation. So end up in writing a small tracer to experiment duplex communication. Rest of the post will go through the tracer I created.
Note: My design is surrounded with the proxy less communication approach.
Let’s start with the Service contract and Callback contract.
[ServiceContract (SessionMode=SessionMode.Required,CallbackContract=typeof(ICallbackInterface))] public interface IServiceContract { [OperationContract (IsOneWay=true)] void MessageToProcess(ServiceMessage message); }
As you can see I mentioned the type of callback contract in ServiceContract attribute. The above contract will be implemented in Consumer and Producer will call the consumer using this contract if any thing available for processing.
public interface ICallbackInterface
{
[OperationContract(IsOneWay=true)]
void ProcessedMessage(ServiceMessage message);
}
The above callback interface will be implemented in Producer. Consumer will callback the Producer and update the status using this contract.
ServiceMessage is just a value object to transfer data between consumer and producer. It has just one string field called Message.
Let’s implement the Service contract in one of the class in Consumer.
[ServiceBehavior (InstanceContextMode=InstanceContextMode.PerSession)] public class MessageProcessingService:IServiceContract { private ICallbackInterface GetCurrentCallback() { return OperationContext.Current.GetCallbackChannel<ICallbackInterface>(); } public void MessageToProcess(ServiceMessage message) { ProcessRequest procRequest = new ProcessRequest(this.GetCurrentCallback(), message); ProcessQueue.Add(procRequest); } }
We need to decorate the MessageProcessingService with ServiceBehavior attribute as shown above, we cannot set that in config file.
GetCurrentCallback function will return the callback instance that is part of the service call.
MessageToProcess function will be called by Producer once it produce some message to process. As you can see I created a new object of ProcessRequest with callback instance and message then add the ProcessRequest instance to a queue. I will explain why we need queuing system.
Why we need a Queue?
Think of a scenario where there are multiple Producers and single consumer. The single consumer may not able to process all the message received from multiple producers in real time. So all the message will be queued along with the callback instance. Consumer will process the item in the queue one by one and notify the producer once the processing is done. With this implementation we can have n number of producers and one consumer.
Process Request class
public class ProcessRequest { private readonly ICallbackInterface _callbackInterface; private readonly ServiceMessage _message; public ProcessRequest(ICallbackInterface callbackInterface, ServiceMessage message) { _callbackInterface = callbackInterface; _message = message; } public ICallbackInterface GetCallback() { return _callbackInterface; } public ServiceMessage GetMessage() { return _message; } }
ProcessQueue implementation as follows
public static class ProcessQueue { private static Queue<ProcessRequest> _processQueue; private static ManualResetEvent _enqueuedEvent; static ProcessQueue() { _processQueue = new Queue<ProcessRequest>(); _enqueuedEvent = new ManualResetEvent(false); } public static void Add(ProcessRequest request) { lock (_processQueue) { _processQueue.Enqueue(request); _enqueuedEvent.Set(); } ProcessQueuedItem(); } private static ProcessRequest _currentRequest; public static bool Dequeue() { lock (_processQueue) { if (_processQueue.Count > 0) { _enqueuedEvent.Reset(); _currentRequest = _processQueue.Dequeue(); return true; } else return false; } } private static void ProcessQueuedItem() { while (Dequeue()) { ServiceMessage message = _currentRequest.GetMessage(); message.Message += " {Processed}"; _currentRequest.GetCallback().ProcessedMessage(message); } } }
ProcessedQueuedItem function process the message, for my tracer I just appended ‘{Processed}’ at the message.
We are done with the Consumer implementation. Let’s go through the Producer.
public partial class frmProducer : Form { public frmProducer() { InitializeComponent(); this.Text = this.Text+ RandomGenerator.GetRandom().ToString(); } private void btnSend_Click(object sender, EventArgs e) { ServiceMessage message = new ServiceMessage(); message.Message = txtMessage.Text; InstanceContext instanceContext = new InstanceContext(new CallbackHandler()); IServiceContract service = ServiceFactory.CreateInstance<IServiceContract>(instanceContext); service.MessageToProcess(message); } }
As you can see in btnSend_click we need to create instance of InstanceContext with an instance of class that implemented ICallbackInterface I created earlier. Then I call a factory class to get the channel to Consumer. In the factory I used DuplexChannelFactory instead of ChannelFactory.
CallbackHandler implementation
public class CallbackHandler:ICallbackInterface { public void ProcessedMessage(ServiceMessage message) { //do the status reporting here. } }
We are done with our Producer and consumer implementation. We can test it by running different instance of Producer and one instance of consumer and see how the communication works.
Once my tracer is successful I redesigned the communication of Producer and Consumer in my projects with few changes of code. With new design user can add any number of Producer just by creating an instance of Producer without any difficulty.
Hope this little post will help some one who is new to duplex communication in WCF.
Download tracer source code (WCF.TCPIP.Duplex.Service) here
This tutorial was a great help to me finding a direction to go with managing multiple clients and one service. However, I am having a problem where every time I call OperationContext.Current.GetCallbackChannel() I get the same object. So, when I scroll through my list of callbacks the last client to connect gets updated x number of times (x = number of clients connected to the service). What am I doing wrong?
TJ Redlich
September 20, 2013 at 12:50 am
Hi Redlich,
Sorry for the late reply, your comment was buried inside spams :). If you can provide the code then I may can help you out.
Regards,
Sony Arouje
Sony Arouje
October 9, 2013 at 3:54 pm
For using “ServiceMessage”, do I need to make a class or add an reference or something?
Regards,
Kiki de Rooij
Kiki
May 25, 2015 at 3:05 pm
As I mentioned in the post, it just a class with one string property. You can create a simple class and name it as ServiceMessage.
Sony Arouje
May 26, 2015 at 1:52 pm
Hello,
I have a requirement where for a duplex service, where each client and service will have to maintain their own queue of requests. That is,
1. Each client will have a queue associated with it and any request it wants to make to the server, it just adds to its queue and there will be another queuerunner that takes care of dequeuing and forwarding the call to the service.
2. Similarly on the server, we need to maintain one queue for each client and if we have to call back a particular client, we have to retreive the callchannel for that particular client and perform the call back by adding the request back to queue of that client created in the server.
Kindly help in suggesting or providing a design or solution for this.
dsagar078
January 28, 2016 at 9:52 pm
The reason to maintain a Q per client in Service is that, client queries the server and say the client goes down even before reverting back from the server and then when the client comes back, the server needs to send the previous data which was their in that client’s Q earlier.
dsagar078
January 28, 2016 at 11:01 pm
You might need to implement an Acknowledgment type client. For e.g. if the client receives the response from server then it should call a server method say Acknowledge(string guid). Once server receives ack it looks for the request with that GUID and removes it from the list and will never get reprocessed again.
Steps to do.
1. Server receives a request from client.
2. Add a new property to ‘ProcessRequest’, that keep a GUID.
3. Server creates an instance of Process request, that intern creates a GUID. This new ProcessRequest instance will be added to a queue.
4. Another class reads this List and process the ProcessRequest and callback the caller using the callback interface.
5. Response from server should also include the GUID stored in that ProcessRequest.
6. Once the client receives the response and processed successfully, it then calls the Acknowledge service method with the GUID received from server.
7. Once Server receive this GUID via Acknowledge function call, it just find the ProcessRequest with that GUID from the list and remove it from the list.
Hope the above steps make sense.
Regards,
Sony Arouje
Sony Arouje
February 1, 2016 at 11:25 pm
Hi Dayanand,
Sorry for the late reply.
1. Each client will have a queue associated with it and any request it wants to make to the server, it just adds to its queue and there will be another queuerunner that takes care of dequeuing and forwarding the call to the service.
[Sony:You can have a queue just like ‘ProcessQueue’ to handle this requirement. So your process should write the request to this queue and some other process should keep on dequeue it and send the request to server.]
2. Similarly on the server, we need to maintain one queue for each client and if we have to call back a particular client, we have to retreive the callchannel for that particular client and perform the call back by adding the request back to queue of that client created in the server.
[Sony: Its already part of the source, if you see ‘ProcessRequest’, it keeps a reference of callback. Instead of keeping the ProcessRequest in a queue you can store that in a list. So you can retrieve based on your requirement.]
Regards,
Sony
Sony Arouje
February 1, 2016 at 11:17 pm