August 22, 2014
Hot Topics:
RSS RSS feed Download our iPhone app

Extending Microsoft's Concurrency and Coordination Runtime with MSMQ

  • August 11, 2009
  • By Jeffrey Juday
  • Send Email »
  • More Articles »

Incoming Messages

The AddQueueIn function on the MSMQGateway class appears below.

public void AddQueueIn(string pathToQueue, 
   Port<MessageQueueMediatorVisitor> port)
{
    //This ties everything together, relates the MessageQueue to a
    //particular port
    MessageQueue queue = null;
    MessageQueueMediator mediator;
    
    queue = MSMQHelper.GetQ(pathToQueue);
    mediator = new MessageQueueMediator(queue);
    queue.PeekCompleted += AsyncMSMQCallBackFactory.Create(
       port, mediator);
    _msmqQueuesIn.Add(queue);
    _mediatorsIn.Add(mediator);
    _portsIn.Add(port);
}

Most of the code stores all data supporting MSMQ and the CCR. The AsyncMSMQCallBackFactory requires a Create method:

public static PeekCompletedEventHandler Create(
   Port<MessageQueueMediatorVisitor> portResult,
   MessageQueueMediator mediator)
{
   AsyncCallbackMethodAdapter builder =
      new AsyncCallbackMethodAdapter(portResult,mediator);
   return builder.PeekCompletedCallBack;
}
//This class wires the ports to the callback method
private sealed class AsyncCallbackMethodAdapter
{
   private Port<MessageQueueMediatorVisitor> _portResultPeek;
   private MessageQueueMediator _mediator = null;
   internal AsyncCallbackMethodAdapter(
      Port<MessageQueueMediatorVisitor> portResult, 
      MessageQueueMediator mediator)
   {
      _portResultPeek = portResult;
      _mediator = mediator;
   }
   ...
}

Create returns a function matching the PeekCompletedCallBack delegate. There are two ways to read messages from a MessageQueue. Peeking, getting the message without removing it, and Receiving, getting the message and receiving it from the queue. Doing a Peek and posting the Peeked message to the port ensures that exceptions during message handling don’t result in message loss. I’ll explain how the message is removed in a moment. The PeekCompletedEventHandler function returned by Create with exception handling code removed appears below.

internal void PeekCompletedCallBack(
   Object source, PeekCompletedEventArgs asyncResult)
{
   try
   {
      // Connect to the queue.
      MessageMapper mapper = null;
      MessageQueue mq = (MessageQueue)source;
      // EndPeek completes and returns the message.
      Message m = mq.EndPeek(asyncResult.AsyncResult);
      mapper = new MessageMapper(m);
      Console.WriteLine("PeekCompletedCallBack " + 
         mapper.Body.MessageId + " " + mapper.Body.CreatedDateTime);
      MessageQueueMediatorVisitor visitor = new 
         MessageQueueMediatorVisitor(_mediator, mapper.Body);
      _portResultPeek.Post(visitor);
   }
   catch (MessageQueueException ex)
   {
   ...
   }
}

As you’ll see throughout the sample application, a MessageMapper class serializes and deserializes into and out of the MessageBody class. To maximize flexibility I opted to handle my own serialization. So MessageMapper utilized the BodyStream properties on the message. The MessageMapper Body property appears below.

public MessageBody Body
{
   get
   {
      MessageBody body;
      body = MSMQHelper.DeSerializeBody(
         _message.BodyStream);
      return body;
   }
   set
   {
      Stream stream = null;
      stream = MSMQHelper.SerializeBody(value);
      _message.BodyStream = stream;
   }
}

MessageQueueMediator handles MessageQueues on behalf of the MSMQGateway. Typically a class like MessageQueueMediator would be internal and therefore hidden inside an assembly. Somehow, a delegate processing a MessageBody class must indicate that the message has been successfully processed and the MessageQueueMediator can move on the next message. MessageQueueMediatorVisitor packages the MessageBody class and controls access to the MessageQueueMediator class.

As you can see in the code below, the MessageQueueMediatorVisitor's CompleteReceive function indicates to the MessageQueueMediator that the message has been processed. MessageQueueMediator calls the MessageQueue class's Receive function and then starts another Peek.

public void CompleteReceive()
{
   _mediator.ReceiveCompleted();
}
public void ReceiveCompleted()
{
   Console.WriteLine("From Mediator ReceiveCompleted ");
   _queue.Receive();//clear the message out
   ContinueWithMessages();
}
public void ContinueWithMessages()
{
   _queue.BeginPeek();
}

Receiving messages is a bit complicated, sending is much easier.


Tags: Windows, services, Microsoft, robotics, messaging



Page 2 of 3



Comment and Contribute

 


(Maximum characters: 1200). You have characters left.

 

 


Sitemap | Contact Us

Rocket Fuel