January 25, 2021
Hot Topics:

Extending Microsoft's Concurrency and Coordination Runtime with MSMQ

  • By Jeffrey Juday
  • Send Email »
  • More Articles »

Incoming Messages

The AddQueueIn function on the MSMQGateway class appears below.

public void AddQueueIn(string pathToQueue, 
   Port<MessageQueueMediatorVisitor> port)
    //This ties everything together, relates the MessageQueue to a
    //particular port
    MessageQueue queue = null;
    MessageQueueMediator mediator;
    queue = MSMQHelper.GetQ(pathToQueue);
    mediator = new MessageQueueMediator(queue);
    queue.PeekCompleted += AsyncMSMQCallBackFactory.Create(
       port, mediator);

Most of the code stores all data supporting MSMQ and the CCR. The AsyncMSMQCallBackFactory requires a Create method:

public static PeekCompletedEventHandler Create(
   Port<MessageQueueMediatorVisitor> portResult,
   MessageQueueMediator mediator)
   AsyncCallbackMethodAdapter builder =
      new AsyncCallbackMethodAdapter(portResult,mediator);
   return builder.PeekCompletedCallBack;
//This class wires the ports to the callback method
private sealed class AsyncCallbackMethodAdapter
   private Port<MessageQueueMediatorVisitor> _portResultPeek;
   private MessageQueueMediator _mediator = null;
   internal AsyncCallbackMethodAdapter(
      Port<MessageQueueMediatorVisitor> portResult, 
      MessageQueueMediator mediator)
      _portResultPeek = portResult;
      _mediator = mediator;

Create returns a function matching the PeekCompletedCallBack delegate. There are two ways to read messages from a MessageQueue. Peeking, getting the message without removing it, and Receiving, getting the message and receiving it from the queue. Doing a Peek and posting the Peeked message to the port ensures that exceptions during message handling don’t result in message loss. I’ll explain how the message is removed in a moment. The PeekCompletedEventHandler function returned by Create with exception handling code removed appears below.

internal void PeekCompletedCallBack(
   Object source, PeekCompletedEventArgs asyncResult)
      // Connect to the queue.
      MessageMapper mapper = null;
      MessageQueue mq = (MessageQueue)source;
      // EndPeek completes and returns the message.
      Message m = mq.EndPeek(asyncResult.AsyncResult);
      mapper = new MessageMapper(m);
      Console.WriteLine("PeekCompletedCallBack " + 
         mapper.Body.MessageId + " " + mapper.Body.CreatedDateTime);
      MessageQueueMediatorVisitor visitor = new 
         MessageQueueMediatorVisitor(_mediator, mapper.Body);
   catch (MessageQueueException ex)

As you’ll see throughout the sample application, a MessageMapper class serializes and deserializes into and out of the MessageBody class. To maximize flexibility I opted to handle my own serialization. So MessageMapper utilized the BodyStream properties on the message. The MessageMapper Body property appears below.

public MessageBody Body
      MessageBody body;
      body = MSMQHelper.DeSerializeBody(
      return body;
      Stream stream = null;
      stream = MSMQHelper.SerializeBody(value);
      _message.BodyStream = stream;

MessageQueueMediator handles MessageQueues on behalf of the MSMQGateway. Typically a class like MessageQueueMediator would be internal and therefore hidden inside an assembly. Somehow, a delegate processing a MessageBody class must indicate that the message has been successfully processed and the MessageQueueMediator can move on the next message. MessageQueueMediatorVisitor packages the MessageBody class and controls access to the MessageQueueMediator class.

As you can see in the code below, the MessageQueueMediatorVisitor's CompleteReceive function indicates to the MessageQueueMediator that the message has been processed. MessageQueueMediator calls the MessageQueue class's Receive function and then starts another Peek.

public void CompleteReceive()
public void ReceiveCompleted()
   Console.WriteLine("From Mediator ReceiveCompleted ");
   _queue.Receive();//clear the message out
public void ContinueWithMessages()

Receiving messages is a bit complicated, sending is much easier.

Page 2 of 3

This article was originally published on August 11, 2009

Enterprise Development Update

Don't miss an article. Subscribe to our newsletter below.

Thanks for your registration, follow us on our social networks to keep up-to-date