Apache ActiveMQ is a very powerful Java-based message queue system with publish/subscribe capabilities that conforms to the Java Message Service (JMS) specification, and also supports integration with .NET applications. Apache does provide basic API documentation—but without examples. However, there’s documentation for using the .NET Messaging Service (NMS) with Spring.NET. This article shows how to use the Apache ActiveMQ server together with NMS to create a simple publisher and subscriber.
Before creating the publisher/subscriber you need to download a copy of the Apache ActiveMQ server and the NMS DLLs. After downloading the ActiveMQ zip file, you will need to start the ActiveMQ server by navigating down to the bin folder and launching the activemq.bat batch file. Unfortunately, at the time of this article the NMS DLLs are available only in source code form, which you can download from http://activemq.apache.org/nms/source.html using a Subversion client. You’ll find instructions for building the DLLs on the download page. The two application examples listed below need references to these DLLs.
Creating the Publisher
Two types of queues are available: Queue and Topic. Choose a Queue when you have one or many publishers sending messages but only a single subscriber. Use a Topic when you have one or many publishers and one or many subscribers. The examples here use the Topic queue type. You can download the source code for the sample applications to follow along.
To begin, create a publisher as listed below:
static void Main(string[] args)
{
//Create the Connection
Factory IConnectionFactory factory = new
ConnectionFactory("tcp://localhost:61616/");
using (IConnection connection = factory.CreateConnection())
{
//Create the Session
using (ISession session = connection.CreateSession())
{
//Create the Producer for the topic/queue
IMessageProducer prod = session.CreateProducer(
new Apache.NMS.ActiveMQ.Commands.ActiveMQTopic(
"testing"));
//Send Messages
int i = 0;
while (!Console.KeyAvailable)
{
ITextMessage msg = prod.CreateTextMessage();
msg.Text = i.ToString();
Console.WriteLine("Sending: " + msg.Text);
prod.Send(msg, true, 1, TimeSpan.MinValue);
System.Threading.Thread.Sleep(250);
i++;
}
}
}
Console.ReadLine();
}
The preceding code first creates an IConnectionFactory object, which obtains connections to a specific server. Next, it creates the connection and an ISession object to use with that connection. It uses the session object to create a Producer for a specified Queue or Topic. To create the producer, you need to specify whether you’re connecting to a Queue or Topic, and provide a name. This example uses a Topic and specifies the topic name “testing.” To produce messages, the code uses a loop with a small delay. The loop exits when a user presses a key; the delay gives users enough time to press a key, so the loop doesn’t create a huge number of messages. Inside each loop iteration, the code creates an ITextMessage object from the Producer, populates the ITextMessage object with some text, and then sends it using the Producer. The send command provides a couple of useful parameters, including the option to persist the message on the server, the message priority, and the message TTL (Time to Live).
Creating the Subscriber
Now that you have a producer for the messages, you can create a consumer (a.k.a. subscriber) to receive the messages. Here’s the source code for the consumer application:
static void Main(string[] args)
{
//Create the Connection factory
IConnectionFactory factory = new
ConnectionFactory(“tcp://localhost:61616/”);
//Create the connection
using (IConnection connection =
factory.CreateConnection())
{
connection.ClientId = “testing listener”;
connection.Start();
//Create the Session
using (ISession session = connection.CreateSession())
{
//Create the Consumer
IMessageConsumer consumer = session.CreateDurableConsumer(
new Apache.NMS.ActiveMQ.Commands.ActiveMQTopic(
“testing”),”testing listener”,null,false);
consumer.Listener += new MessageListener(
consumer_Listener);
Console.ReadLine();
}
}
}
static void consumer_Listener(IMessage message)
{
Console.WriteLine(“Receive: ” +
((ITextMessage)message).Text);
}
Similar to the producer code shown earlier, you first need to create an IConnectionFactory object and an IConnection. The connection.ClientId specifies a unique identity for this client. Save this value, because the client application must reuse that string to ensure that it receives the appropriate messages. Next, the call to connection.Start instructs the connection to be prepared to start receiving messages. Create an ISession object and an IMessageConsumer object. You can create two types of consumers: durable and non-durable. A non-durable consumer will receive all messages sent after the consumer is created. A durable consumer will receive all messages sent since the durable consumer was first created. For durable consumers, ActiveMQ holds all messages sent when the consumer was disconnected. Using a durable consumer is the way to go for most uses.
With both the Publisher and Subscriber created, you’re ready to test the queue.
Running the Publisher and Subscriber
First, launch the Publisher console app (see Figure 1), which will start sending messages into the Topic queue named “testing.”
Next, start the Subscriber console app. You should see that it’s receiving messages (see Figure 2).
Conclusion
ActiveMQ provides excellent publish/subscribe capabilities and performs well under load. Adding Apache NMS makes it easy to incorporate ActiveMQ into .NET applications without the cost overhead of commercial solutions.
You’ve seen only very basic capabilities here, but be aware that ActiveMQ provides a wide range of configuration options, so it’s well worth your time to read through the configuration documentation, so you can configure it correctly for your specific applications.
Source Code
Download Source Code: NMSPublishSubscribe.zip
About the Author
Chris Bennett works with Crowe Horwath LLP, in their Indianapolis office. He can be reached at chris.bennett@crowehorwath.com