September 20, 2014
Hot Topics:
RSS RSS feed Download our iPhone app

Java 5's BlockingQueue

  • November 21, 2006
  • By Jeff Langr
  • Send Email »
  • More Articles »

Writing well-behaved, multi-threaded applications is one of the more challenging aspects of Java coding. One of the best ways to write multi-threaded applications is to take advantage of known threading patterns.

Queues are common constructs in multithreaded applications. Clients add requests to a queue. A server retrieves requests from the queue. When no requests are available in the queue, the server waits for entries to be posted to it.

Prior to Java 5, you'd have to build such a queue yourself. It's not terribly difficult, but you'd need to worry about all of the synchronization points yourself. There are many ways to code a solution, and most of them are going to contain insidious concurrency defects. You can find one possible implementation on Developer.com itself.

Fortunately, you no longer need to build your own thread-safe queue. Java 5 provides a number of utility classes for multithreaded applications. You'll find these classes in the java.util.concurrent, java.utilconcurrent.atomic, and java.util.concurrent.locks packages. These classes are largely based on Doug Lea's work and book Concurrent Programming in Java. Lea's book is considered the bible for multithreaded Java programming.

The very first interface listed in java.util.concurrent is BlockingQueue, a java.util.Queue subclass. When clients attempt to retrieve elements but none are available, a BlockingQueue encapsulates code that waits until an element becomes available. The Queue class, also new to Java 5, is a new collection class that supports the classic concept of a FIFO (first-in, first out) data structure. Other Java 5 queue implementations include support for removal based on priority, and a collection that acts like a stack.

Listing 1 shows a simple server class based on use of a BlockingQueue. The server runs infinitely, accepting requests from a client. Clients can send requests asynchronously and rapidly—the server only adds a client request to the server's queue, then returns immediately to the client. A separate server thread loops indefinitely, asking the BlockingQueue to return when a new request becomes available. Once a request is available, it's removed from the queue. The server then handles the request.

Listing 1. The server.

import java.util.concurrent.*;

public class Server extends Thread {
   private BlockingQueue<Request> queue =
      new LinkedBlockingQueue<Request>();

   public void accept(Request request) {
      queue.add(request);
   }

   public void run() {
      while (true)
         try {
            execute(queue.take());
         }
         catch (InterruptedException e) {
         }
   }

   private void execute(final Request request) {
      new Thread(new Runnable() {
         public void run() {
            request.execute();
         }
      }).start();
   }
}

In this server implementation, I chose to execute requests coming off the queue by spawning them in an entirely new thread. But, it might make more sense to constrain the overall number of threads concurrently executing. It's not terribly efficient to execute large numbers of threads simultaneously. I'll explore the concept of thread pools, which can address this concern, in a future article.

Let's work through the code. I first create a BlockingQueue field named queue, assigning to it a LinkedBlockingQueue instance. In a LinkedBlockingQueue, elements get added to the tail of the queue. The element stored on the queue the longest is the head, and thus is the next element to be retrieved.

The accept method runs in the primary thread of Server, adding requests to the queue as they arrive. A request is simply a class that implements the Request interface (see Listing 2).

Listing 2. Request.

public interface Request {
   void execute();
}

The run method executes in a second server thread, looping infinitely. It invokes the take method on the queue, which will block until a request is available. Once a request is available, it's removed from the queue and returned from the call to take. At that point, I call the local Server method execute to process the request. As mentioned before, execute spawns and executes a new Thread object for each request.

Listing 3 shows sample client code that exercises the Server class.

Listing 3. Client.

import java.util.*;

public class Client {
   public static void main(String[] args) {
      new Client().go();
   }

   public void go() {
      final Server server = new Server();
      server.start();

      for (int i = 0; i < 10; i++) {
         final Request request = createRequest(i);
         server.accept(request);
      }
   }

   Request createRequest(final int index) {
      return new Request() {
         public void execute() {
            for (int i = 0; i <= 100; i += 10) {
               sleep((new Random().nextInt(5) + 1) * 1000);
               System.out.println(
                  String.format(
                     "request: %d completed: %d%%", index, i));
            }
            System.out.println(
               String.format("reqest %d completed", index));
         }

      };
   }

   private void sleep(int millis) {
      try {
         Thread.sleep(millis);
      }
      catch (InterruptedException e) {

      }
   }
}

You might want to constrain the queue itself to support only so many incoming requests. In listing 4, I create a Server with a capacity of two. As long as a queue is empty, the add method returns the boolean value true. But, once the queue contains two elements, attempts to add additional elements immediately throw an IllegalStateException. Instead of using add, then, I chose to use the offer method, also defined in the BlockingQueue interface. This method returns false immediately when the queue is already full.

Listing 4. A limited capacity queue.

public class Server extends Thread {
   static final int CAPACITY = 2;
   private BlockingQueue<Request> queue =
      new LinkedBlockingQueue<Request>(CAPACITY);

   public boolean accept(Request request) {
      return queue.offer(request);
   }
   ...

I've altered the accept method to return the result of the offer call. This makes the client aware of the problem. Client code can resubmit the request, or display a message to the user indicating that the server is busy. Another form of offer allows you to specify a timeout period, in which case the BlockingQueue will wait up to that period to retrieve an element when the queue is empty.

An alternative is to use the BlockingQueue method put (see Listing 5). Calls to this method will block until the queue has available capacity.

Listing 5. Blocking on add.

public class Server extends Thread {
   static final int CAPACITY = 2;
   private BlockingQueue<Request> queue =
      new LinkedBlockingQueue<Request>(CAPACITY);

   public void accept(Request request) {
      try {
         queue.put(request);
      }
      catch (InterruptedException e) {
         throw new RuntimeException("add to queue interrupted");
      }
   }
   ...

In the past, I've spent painstaking hours building multithreaded support into my own applications. Worse, I was rarely certain that my code was completely thread-safe. BlockingQueue is one very useful building block that can help you quickly and confidently build your own multithreaded applications.






Comment and Contribute

 


(Maximum characters: 1200). You have characters left.

 

 


Sitemap | Contact Us

Rocket Fuel