Introduction
If developers want to leverage muti-cores, without a doubt they’ll need to tackle Threading and race conditions, typically referred to as Concurrency. Concurrency is complicated. So, Microsoft has been working to make Concurrency more accessible to a wider range of developers doing things like embedding Concurrency features in their languages and building APIs around Concurrency.
Concurrency and Coordination Runtime (CCR) is one of Microsoft’s Concurrency APIs available via download to MSDN subscribers. CCR is part of the “CCR and DSS Toolkit 2008 R2 Standard Edition”. Although CCR was originally constructed for Robotics solutions the patterns it employs can be applied to a broader set of domains. In fact, if you’re considering or currently using the .NET framework ThreadPool or Threading, you might want to check out CCR. Before you do, some guidance may be helpful, so, I’m going to show you how CCR can fit into your next .NET Framework Concurrency-dependent application.
.NET Asynchronous Programming Model (APM)
I’m making a big assumption that, if you’re looking at Threading and the Threadpool you’re probably also going to leverage some aspect of the .NET Framework Asynchronous Programming Model (APM). APM dwells in many parts of the .NET Framework like, for example, file reading and writing in the System.IO namespace. APM functions typically follow the following pattern:
- There is a
Begin____
(BeginRead, BeginWrite, etc.) operation. The Begin operation accepts a delegate that receives a “state” parameter and a result parameter.
- There is also an
End___
operation. End is invoked inside the delegate passed to the Begin operation.
- There are typically 2 distinct execution results, the operation succeeded or the operation failed.
There may be some wrinkle to the pattern above, but that’s the general APM idea. APM is important if a developer is building a responsive application. Rather than blocking main thread execution for an operation to complete, APM continues a threads execution and wakes some other part of an application up when an operation completes.
APM is typically used in conjunction with the Threadpool. I like to think of the Threadpool as being composed of three things:
- A Queue you place your function into.
- A set of spinning threads awaiting work.
- An object that assigns a function from the queue to an available spinning thread.
In a running application, at any time, multiple threads may be attempting to change the same place in memory (shared state). Alongside APM and the Threadpool a developer must synchronize access to shared state. A developer typically employs various locks and mutexes to signal exclusive access to state.
Briefly summarizing, a typical APM solution contains the following components:
- One of the APM “callback-oriented” APIs
- Threading and some sort of queue-like data structure to control thread usage
- Synchronized access to state.
All of these aspects and more are elegantly captured in the CCR data structures.
CCR Overview
A complete introduction to all aspects of the CCR is beyond the scope of this article. So, instead, I’m focusing on the core classes and classes more closely associated with the APM solution I outlined above.
Main CCR classes appear in the graphic below:
The Arbiter and Port class comprise the other main CCR classes.
A developer works through an Arbiter class to configure
how a delegate function is executed by the Dispatcher.
Through an Arbiter, for example, a developer can configure a
function to run exclusively thus addressing the shared state
aspect of Concurrency. Arbiter methods also bind operations
to a Dispatcher Queue.
Port is a sort of entry point in the CCR. Once a
developer has created a Dispatcher, Dispatcher Queues, and
enlisted an Arbiter to lay out how a delegate should be
executed; everything is initiated through a Port. Ports are
generic classes. The Port Post method receives a predefined
class that, in turn, initiates the whole process.
Now I’m going to put the CCR to work on an APM solution
using the Ping class built into the .NET Framework.
One Ping Only
Large amounts of Pinging with the .NET Ping class nicely
illustrates the need for APM functions. Pinging an unused
IP address will eventually fail, but only after a timeout.
Pinging a series of IP addresses synchronously will execute in
the total duration it takes to complete all the Pings.
Pinging asynchronously will execute in the total duration of
the longest Ping.
Pinging asynchronously results in a big performance
difference if, for example, you’re Pinging a wide range of
IP addresses and most of the addresses are unused.
My Console application sample Pings some URLs and returns
the results.
Clearly, I didn’t need CCR to do APM Pinging. So, I
imposed two requirements on my sample that really illustrate
when CCR can be a better platform for APM. My special
requirements are as follows:
- I wanted exceptions to be given a higher priority than successful Pings.
- I wanted to avoid doing locking, but I still wanted to update state.
Now let’s look at some code!
Dispatcher and DispatcherQueue
CCRMediator class encapsulates most of the sample
application. The class’s constructor creates the CCR
infrastructure.
public CCRMediator(PingState pingState)
{
_pingState = pingState;_dispatcher = new Dispatcher(0, “CCRMediator Threads”);
_dq = new DispatcherQueue(“CCRMediator DispatcherQueue”, _dispatcher);
_dqExceptions = new DispatcherQueue(“CCRMediator DispatcherQueue Exceptions”, _dispatcher);
}public void Start()
{
GatherPingRepliesModifyState();
}
As you can see, there is not much to setting up the core
CCR classes. Everything is given a name to aid in
debugging. Passing zero to Dispatcher constructor instructs
the class to create a thread on each Core of the
computer.
I mentioned earlier that I wanted to give exceptions a
higher priority. So I built a DispatcherQueue for
exceptions only rather than mixing Exceptions among the
other responses. Having a dedicated queue effectively puts
exceptions to the top of the queue. As the Dispatcher
executes in a round-robin, it will run as follows:
- Check the Exception queue, execute the topmost item (if something exists),
- Then move to the successful queue and execute the topmost item
- Then move back to the Exception queue and so forth
Dispatchers and DispatcherQueues perform the execution.
Configuring and initiating the execution process is done
using Ports and Arbiters.
Ports and Arbiters
Most of how the CCR is configured to execute is housed in the GatherPingRepliesModifyState
function in the CCRMediator
class. The code appears below:
private void GatherPingRepliesModifyState()
{
Port<PINGCOMPLETEDEVENTARGS> portPing = new Port<PINGCOMPLETEDEVENTARGS>();
Port<PINGEXCEPTIONINFO> portException = new Port<PINGEXCEPTIONINFO>();
PingCompletedEventHandler cbfunct = null;//Get a callback function with the ports wired to it
cbfunct = AsyncCallbackFactory.Create(portPing,portException);Arbiter.Activate(_dq, Arbiter.Interleave(
new TeardownReceiverGroup(),// Move Receive to here if code touches shared data to ensure thread safety
new ExclusiveReceiverGroup(
Arbiter.Receive(true, portPing, PingCompletedCB)
),new ConcurrentReceiverGroup()
)
);Arbiter.Activate(_dqExceptions, Arbiter.Interleave(
new TeardownReceiverGroup(),// Move Receive to here if code touches shared data to ensure thread safety
new ExclusiveReceiverGroup(
Arbiter.Receive(true, portException,
delegate(PingExceptionInfo e)
{
string resource = e.resource.ToString();_pingState.UpdateState(resource, “FAILURE”);
PingReplyWriter.WriteReply(_pingState, resource);
}
)
),new ConcurrentReceiverGroup()
)
);SendPings(cbfunct);
}
As I stated earlier Ports are generic. Ports are also relatively simple lightweight classes. Later in the article I’ll discuss the AsyncCallbackFactory
class. For now, it’s just important to note that it returns the APM PingCompletedEventFunction
delegate with the Exception and Success Port wired to the appropriate response.
Often an executing function will modify some data in the application, earlier I referred to this as shared state. In the sample, I modify the PingState class, which stores the results of the Ping. Normally an application gets a lock on a data structure before modifying it. Instead I used an Interleave. In an Interleave, when code associated with the Port assigned to an ExclusiveReceiverGroup executes, all threads in the Dispatcher are suspended until the execution completes.
The sample also uses two techniques for associating an executing function to the Arbiter. First I use a private function in the CCRMediator
class matching the PingCompleteEventFunction
signature. Then I use an inline delegate function
. A third option would be a lambda
(=>) expression. Also important to point out is, when the code executes, it may execute on another thread. Like the ThreadPool, CCR handles all the details seamlessly for you.
AsyncCallBackFactory
I borrowed the idea behind this class from the
“Concurrent Affairs Concurrency and Control Runtime”
article You can access the article in the Resources
section at the end of the article. AsyncCallBackFactory
appears below.
sealed class AsyncCallbackFactory
{
//Wires the Ports to the Callback function so when async completes
//the results are posted to a port.
public static PingCompletedEventHandler Create(Port<PINGCOMPLETEDEVENTARGS> portResult,
Port<PINGEXCEPTIONINFO> portException)
{
AsyncCallbackMethodAdapter builder =
new AsyncCallbackMethodAdapter(portResult, portException);
return builder.CompletedCallBack;
}//This class wires the ports to the callback method
private sealed class AsyncCallbackMethodAdapter
{
private Port<PINGCOMPLETEDEVENTARGS> _portResult;
private Port<PINGEXCEPTIONINFO> _portException;internal AsyncCallbackMethodAdapter(Port<PINGCOMPLETEDEVENTARGS> portResult,
Port<PINGEXCEPTIONINFO> portException)
{
_portResult = portResult;
_portException = portException;
}// Called to process result of completion
internal void CompletedCallBack(object sender, PingCompletedEventArgs e)
{
if (e.Error == null)
{
// Post success item to success Port
_portResult.Post(e);
}
else
{
PingExceptionInfo info = new PingExceptionInfo();info.Ex = e.Error;
info.resource = e.UserState.ToString();//Send the error object
_portException.Post(info);
}
}
}}
As I stated earlier, the class returns a function
matching the PingCompletedEventFunction
delegate. Essentially, the Create function returns a
reference to the CompletedCallBack
function
inside a private class called
AsyncCallBackMethodAdapter
which is known only
to the AsyncCallBackFactory
class.
This may be an odd way of implementing this, but consider
other APM operations you may be performing using other
classes in the .NET Framework. Most APM “End” functions
result in success or an Exception. It may be useful to
handle them all in a similar way and to have an overloaded
static Create
function for each APM class’
operation. If an assignment is repeated why not perform the
process once and be done with it? Also using an intermediate
class allows for some common processing before posting to
the Ports.
Further Investigation
There is quite a bit more to CCR. Here are examples of
other CCR features not demonstrated in the example code.
Policies allow a DispatcherQueue
to throttle
message processing. Policies are useful when only a handful
of the most recent messages make sense to process and older
messages should be ignored. Timers are another useful CCR
feature. Utilize timers when a queue must receive a message
before an elapsed span of time and perform an alternative
action if the time expires first. Joins
allow a
developer to chain response dependencies together. For
example, a Join
allows a developer to configure
a response when a particular number of Ports are activated.
In a future article, I’ll demonstrate how I applied CCR to a
MSMQ solution.
Conclusion
Concurrency and Coordination Runtime (CCR) originated in
Robotics Studio, but can be leveraged outside of Robotics. In
fact, CCR is a great platform for the .NET Framework
Asynchronous Programming Model (APM). In this article I
demonstrated how CCR can work with the .Net Ping class’ APM
functions.
Sources
CCR and DSS Home Page
Concurrent Affairs Concurrency and Control Runtime
PDC 2008 presentation: The Concurrency and Coordination Runtime and Decentralized Software Services Toolkit
CCR Programming – Jeffrey Richter and George Chrysanthakopoulos