Microsoft & .NETVisual C#Batched Execution Using the .NET Thread Pool

Batched Execution Using the .NET Thread Pool

Most articles about areas of functionality such as the System.Threading namespace describe the individual types within the namespace but don’t pay much attention to how the various types can be used together to form a real-world method or class. This article takes a different approach and fits the pieces together to form some production-ready code that you can use as-is in an application. (For an introduction to the functionality the threads and the thread pool in .NET offer, see Paul Kimmel’s Asynchronous Programming with Thread Pools.)

Most applications written today require concurrent execution of various tasks. For simple tasks that can be run in isolation, the .NET thread pool provides a simple programming model: Write a method whose signature matches the WaitCallback delegate, create a delegate instance, and pass the instance to the ThreadPool’s static QueueUserWorkItem method:

__gc class Simple{

   static void AsyncTask(Object* stateInfo){
   Console::WriteLine(S"Task completed on ThreadPool thread");
}

public:
   static void UseThreadPool(){
          ThreadPool::QueueUserWorkItem(new WaitCallback(0,
          Simple::AsyncTask));
   }
};

For tasks that depend on the completion of other tasks, simply queuing the tasks in the order that they should be executed is not enough. The ThreadPool does not guarantee that tasks will be executed in the order that they are queued or that subsequent tasks will begin execution only after previously queued tasks have completed. To accomplish in-order queued execution using the ThreadPool, tasks need to be queued after their dependent tasks have completed. The simplest way to accomplish this is to schedule the dependent tasks after the previous tasks have been completed. Using the System.Collections.Queue type makes this task easy:

__gc class Queued{
   Queue __gc* _taskQueue;

   void Task1(Object* stateInfo){
      Console::WriteLine(S"Task1");
      RunNextTask();
   }

   void Task2(Object* stateInfo){
      Console::WriteLine(S"Task2");
      RunNextTask();
   }

   void Task3(Object* stateInfo){
      Console::WriteLine(S"Task3");
      RunNextTask();
   }

   //schedule next task is available
   void RunNextTask(){
      if (_taskQueue->Count != 0){
      ThreadPool::QueueUserWorkItem(__try_cast<WaitCallback __gc*>
      (_taskQueue->Dequeue()));
      }
   }

   public:
   void UseThreadPool(){
      _taskQueue = new Queue();
      //queue
      _taskQueue->Enqueue(new WaitCallback(this, Task1));
      _taskQueue->Enqueue(new WaitCallback(this, Task2));
      _taskQueue->Enqueue(new WaitCallback(this, Task3));

      //run task 1
      RunNextTask();
   }
};

When a task is dependent on only one previous task, and in turn has a single task dependent on it, using the ThreadPool and Queue types (as demonstrated in the previous example) works well. However, when multiple tasks can run simultaneously, but all of these tasks must complete before the next group of tasks start, a more advanced synchronization mechanism is required. The naove approach is to simply maintain a boolean flag for each separate task, and poll each task flag until all are set. The problem with polling is that it wastes processor resources if the check for completion is not successful, and the time lapse between the point when all the tasks are complete and when the next polling takes place also slows overall execution speed.

Specialized threading types—AutoResetEvent and ManualResetEvent—exist to help in situations like this. These event types exist solely to allow communication between threads. The typical pattern of use is that one thread will wait on one or more events, which will become signaled by one or more other threads. Rather than polling for these events to become signaled, the WaitOne, WaitAny, and WaitAll methods of the WaitHandle type are used to block the waiting thread until the events become signaled. As the method names suggest, WaitOne is used to wait on a single event object, WaitAny is used to wait for any of a collection of event objects to become signaled, and WaitAll is used to wait for all event objects in a collection to become signaled.

You can utilize the batched thread execution model by associating each task in a batch with an event, queuing all the events for a batch, and then using the WaitHandle.WaitAll method to wait for all the tasks to complete. After the WaitAll method call returns, all events have been signaled, and the next batch can be queued.

Take particular care to avoid deadlocks. Deadlocks can occur in two main ways:

  • Using lock acquisition in a non-ordered and stagger manner
  • Inadvertently holding a lock because of an early exit from a method, often due to an exception

The first situation can best be illustrated by an example: If thread one acquires lock A at the same time that thread two acquires lock B, and then thread one attempts to acquire lock B while thread two attempts to acquire lock A, each thread will be locked waiting for the other. If neither thread uses a timeout in its attempted lock acquisition, the deadlock will continue until one of the threads is terminated, which typically won’t happen until the process terminates.

For the second case, in which the lock is inadvertently left unreleased, the simplest prevention technique is wrapping all the code that acquires and uses the lock in a __try-__finally block, with the release of the lock in the __finally block.

The following code sample is an example use of batched execution using the thread pool. In this case, tasks 1 and 2 can be run simultaneously. Once the WaitAll method call is complete, subsequent waves of tasks can be queued:

__gc class Batch{
   //tasks 1 and 2 to be batched together
   static void Task1(Object* stateInfo){
      __try{
         Console::WriteLine(S"Task1");
      }
      __finally{
         (__try_cast<ManualResetEvent __gc*>(stateInfo))->Set();
      }
   }

   static void Task2(Object* stateInfo){
      __try{
      Console::WriteLine(S"Task2");
   }
   __finally{
      (__try_cast<ManualResetEvent __gc*>(stateInfo))->Set();
   }
}

public:
//main entry point of class
static void UseThreadPool(){
   //holds handles for each task
   ArrayList __gc* handles = new ArrayList();

      //queue tasks 1 and 2
      ManualResetEvent __gc* event = new ManualResetEvent(false);
      handles->Add(event);
      ThreadPool::QueueUserWorkItem(new WaitCallback(0,
                                    Batch::Task1), event);

      event = new ManualResetEvent(false);
      handles->Add(event);
   ThreadPool::QueueUserWorkItem(new WaitCallback(0, Batch::Task2),
                                 event);

   //wait for both handles
   WaitHandle::WaitAll(__try_cast<WaitHandle __gc*[]>
   (handles->ToArray(__typeof(WaitHandle))));

   Console::WriteLine(S"Tasks 1 and 2 are finished");
   }
};

To highlight the general usefulness of this pattern, a helper method that implements the pattern is shown below. The method takes a delegate array of tasks that should be executed in a batch and have no dependencies on each other, and returns when all the tasks have completed, at which time other tasks can be scheduled.

//Used to pass state to the thread pool callback.
//Holds original delegate, its state, and the event.
__gc class StateHolder{
   public:
      StateHolder(WaitCallback* wc, Object* orginalState,
                  ManualResetEvent* event){
         OriginalCallback = wc;
         OriginalState = orginalState;
         Event = event;
   }

   Object* OriginalState;
   ManualResetEvent* Event;
   WaitCallback* OriginalCallback;;
};

__sealed __gc public class ThreadPoolHelper{
   //used to run task then set event
   static void TaskRunner(Object* stateInfo){
      StateHolder __gc* state =
         __try_cast<StateHolder __gc*>(stateInfo);
      __try{
         state->OriginalCallback->Invoke(state->OriginalState);
      }
   __finally{
   state->Event->Set();
   }
}

public:
   static bool QueueMultipleUserWorkItem(
      WaitCallback __gc* callbacks __gc[],
                         Object __gc* states __gc[]){
      //check params
      if (callbacks == 0)
      throw new ArgumentNullException(S"callbacks");

   if (states != 0 && callbacks->Length != states->Length)
   throw new ArgumentException(
      S"states must be null or the same length as callbacks",
      S"states");

   //queue each callback
   ArrayList __gc* handles = new ArrayList();
   for(int ix = 0; ix < callbacks->Length; ++ix){
      ManualResetEvent* event = new ManualResetEvent(false);
      handles->Add(event);
      StateHolder __gc* state = new StateHolder(callbacks[ix],
                                states != 0? states[ix]: 0, event);
      if (!ThreadPool::QueueUserWorkItem
      (new WaitCallback(0, ThreadPoolHelper::TaskRunner), state))
      return false;
   }

   //wait for all handles
   WaitHandle::WaitAll(__try_cast<WaitHandle __gc*[]>
   (handles->ToArray(__typeof(WaitHandle))));

   return true;
   }
};

//example use
__gc class HelperExamples{
   public:
   static void Task1(Object* stateInfo){
    Console::WriteLine(S"Task1");
}

static void Task2(Object* stateInfo){
   Console::WriteLine(S"Task2");
}

static void Task3(Object* stateInfo){
   Console::WriteLine(S"Task3");
}

static void Task4(Object* stateInfo){
   Console::WriteLine(S"Task4");
   }
};

int _tmain()
{
   Console::WriteLine(S"Batched thread pool use using helper");
   Console::WriteLine(S"Batching tasks 1 and 2");
   WaitCallback __gc* callbacks __gc[]  =
      new WaitCallback __gc* __gc[2];
   callbacks[0] = new WaitCallback(0, HelperExamples::Task1);
   callbacks[1] = new WaitCallback(0, HelperExamples::Task2);
   ThreadPoolHelper::QueueMultipleUserWorkItem(callbacks, 0);

   Console::WriteLine(S"Tasks 1 and 2 done. Batching 3 and 4");
   callbacks[0] = new WaitCallback(0, HelperExamples::Task3);
   callbacks[1] = new WaitCallback(0, HelperExamples::Task4);
   ThreadPoolHelper::QueueMultipleUserWorkItem(callbacks, 0);

   Console::WriteLine(S"All tasks done");

   Console::ReadLine();
   return 0;
}

If you need to execute a batch asynchronously, you can define a delegate with the same signature as the batch runner method and use this to complete an asynchronous call (which uses the ThreadPool under the hood).

Download the Code

To download the accompanying source code for the demo, click here.

About the Author

Nick Wienholt is an independent Windows and .NET consultant based in Sydney, Australia. He is the author of Maximizing .NET Performance from Apress, and specializes in system-level software architecture and development with a particular focus on performance, security, interoperability, and debugging. Nick can be reached at NickW@dotnetperformance.com.

Get the Free Newsletter!

Subscribe to Developer Insider for top news, trends & analysis

Latest Posts

Related Stories