Extending Microsoft's Concurrency and Coordination Runtime with MSMQ
Outbound
The MSMQGateway.AddQueueOut function appears below.
public Port<MessageBody> AddQueueOut(string pathToQueue)
{
//This ties everything together,
//relates the MessageQueue to a
//particular port, with a handler
//supplied by a component
//inside of the application
MessageQueue queue = null;
Port<MessageBody> port = new Port<MessageBody>();
MessageQueueMediator mediator;
queue = MSMQHelper.GetQ(pathToQueue);
mediator = new MessageQueueMediator(queue);
_msmqQueuesOut.Add(queue);
_mediatorsOut.Add(mediator);
_portsOut.Add(port);
Arbiter.Activate(_ccrQueue,
Arbiter.Receive(true, port, mediator.SendMessage)
);
return port;
}
The code is very similar to AddQueueIn except that it configures a port for sending a message.
MessageQueueMediator also handles sending a message. The SendMessage function appears below.
public void SendMessage(MessageBody body)
{
System.Messaging.Message msg = new Message();
MessageMapper mapper = new MessageMapper(msg);
mapper.Body = body;
Console.WriteLine("From Mediator Sending Message " +
body.MessageId + " " +
body.CreatedDateTime);
_queue.Send(mapper.Message);
}
Like a receive operation, sending leverages the MessageMapper class. MessageMapper serializes the message so its bytes can be inserted into the BodyStream of the Message class.
Conclusion
Originally built for Robotics, CCR is a library for coordinating tasks and handling concurrency in .NET Framework applications. CCRs capabilities can be extended to coordinate processes across multiple machines, but doing so requires a distributed transport. Fortunately, MSMQ makes a fine transport choice.
Resources
- CCR and DSS Home Page
- Concurrent Affairs Concurrency and Control Runtime
- PDC 2008 presentation: The Concurrency and Coordination Runtime and Decentralized Software Services Toolkit
- CCR Programming, by Jeffrey Richter and George Chrysanthakopoulos
For the source code click here
Page 3 of 3
This article was originally published on August 11, 2009