JavaWorking with the blocking queue

Working with the blocking queue


Java’s threading implementation is one of the most elegant out there. The quality of its design has introduced countless programmers — some of them learning programming for the very first time — to multi-threaded programming.

Java’s threading implementation is also very simple. Following the “less is more” design principle, the designers included only those features that were necessary to get the job done elegantly.

In this article, we introduce a basic threading construct called the blocking queue. The beauty of Java’s threading implementation is this element can be easily written in Java.

Queues

First, a few quick definitions. A queue is a container object that supports two basic methods:

get()
and

put()
. Objects are put into the queue, and they are retrieved at a later time with get. A queue is different from a stack in that it implements first-in-first-out (FIFO) semantics. That is, if you put several objects into a queue, the first one that is retrieved with

get
will be the first one that was put in with

put
.

What is a queue for? Well, in a multithreaded program, we might have one thread responsible for processing a certain object. Other threads “send” objects to these processing threads by placing them on the queue with

put
. The processing threads take the objects off the queue with

get
, one at a time, and process them in turn.

The chat server is the classic example used to demonstrate multithreading concepts in Java. In a chat server, we might have a small, finite pool of threads that is responsible for sending messages out to the chat clients. These threads are called “writer threads.” Meanwhile, another small finite pool of threads are reading messages from chat clients. These are called “reader threads.”

The reader threads take incoming messages and place them on the main message queue. The writer threads take the messages off of the queue, one at a time, and send them out to the appropriate clients.

Using a queue effectively decouples the reading process from the writing process. This is useful because network connections vary widely in their efficiency: some connections are very fast and some are slow. Often, a connection might “go dead” for a long time. If a single thread was responsible for both reading a message and sending it out to other clients, then any outgoing connection might block that thread. While that thread is blocked, it wouldn’t be able to spend time reading new messages from clients.

Blocking queues

Continuing to use our chat example, we ask the following question: “What does a writer thread do when a queue is empty?” At any given moment, there may be no messages to process. In this case, what happens when the writer thread calls the queue’s

get
method?

One solution is to have the queue return

null
. Semantically, this return value means “nothing in the queue right now.” Well, what does the writer thread do at this point? It could call the

get
method again, and then again and again, until the queue returned a message. This is called polling or busywaiting (depending on who you ask), and it can be a tremendous waste of CPU resources.

Another possibility would be to go to sleep for a short while, and then, after waking up, check again to see if the

get
method will return something. This solves the problem of wasting raw CPU power, but it still isn’t perfect. If the thread sleeps five seconds, an object might become available only one second into that sleep. If the thread waits only one second, an object might become available only one-tenth of a second into that sleep. In general, the thread might be wasting time sleeping when there is an object available, even if the sleep is very short. (If the sleep is extremely short, then you’re really just busywaiting again.)

Java provides a solution to this problem. Instead of polling or sleeping, the writer thread can wait for an object to become available. A wait is like a sleep, in that the thread suspends its execution for a time. However, unlike sleep, which suspends execution for a fixed length of time, a wait suspends execution until another thread does a

notify
. The advantage of this is that the thread can be awakened the moment that an object is available.

This solves our waste problems: we’re not polling, and we’re not going to run the risk of being suspended for too long. Instead, the writer thread suspends its execution until the very moment that an object is available.

Hiding the details

Using

wait
involves a good deal of programming logic. The thread calling

get
has to put itself into a wait state if there aren’t any objects available. Meanwhile, some other thread has to be around to wake the waiting thread as soon as an object becomes available.

The blocking queue hides all this detail by putting all of the wait and notify logic inside the queue object itself. That way, this logic doesn’t need to be inside the writer thread or any other object that might want to use the queue.

The idea is to do the

wait
inside the

get
method itself. Thus, the

get
method is responsible for putting the calling thread into a wait state if there aren’t any objects available in the queue. At some later point, an object will become available, and the

get
method returns this object.

This is elegant because the semantics of the

get
call are very simple: you call

get
, and later, you get an object back. This is really as simple as a method like this can be. The fact that the call to

get
might actually involve going into a suspended state for an arbitrary length of time doesn’t really affect the semantics; it only affects the time the call takes. One call to

get
might return an object immediately, while another call might wait an hour. But in both cases, all that the calling code needs to do is assume that an object will be returned.

Who does the

notify
?

We’ve left something out, however. Previously, I mentioned that once a thread has gone into a wait state, it needs to be

notify
‘d, or awakened, by some other thread. In the blocking queue, which thread is responsible for this task?

We don’t need to create an extra thread for this purpose. Instead, we simply put a

notify
inside the

put
method of the blocking queue. The effect is that any time some other thread puts an object into the queue, any threads waiting for an object will be awakened.

This takes care of the

notify
rather nicely: the waiting threads are only woken up when an object has just been placed on the queue. And we don’t need a special thread to do it.



Part 2 of 2

Multiple

get
‘ers

Things get a bit more complicated if we have multiple threads taking objects from our queue. However, the elegant semantics of the

wait
and

notify
methods take care of most of this for us. There’s no problem, in terms of the threading implementation, with having multiple threads in

wait
state, waiting for the same event.

However, there is one potential issue. Suppose two threads are in a

wait
state, waiting to taken an object off the queue. Another thread comes along and places an object on the queue. Which of the waiting threads gets the object?

The answer is: whichever thread grabs it first. When a

notify
— really, a

notifyAll
— event happens, all the waiting threads wake up. However, because of the synchronization blocks we’ve wrapped around the

get
and put calls, only one of these threads actually gets to run. Which one is implementation-dependent and, in general, entirely unpredictable. We can only be sure that one of the threads will get to run.

Okay, but what about the second thread? The first thread woke up, took the object and left the queue empty again. When the second thread, having just been woken up, gets a chance to run, the queue will probably still be empty.

The only answer is the straightforward one: check the queue, and if there isn’t an object, on it, go back into a wait state. This might seem awkward, and, in reality, it does waste some CPU time, but very little. And there really isn’t any other way, given Java’s synchronization primitives. This technique is used in almost every situation involving multiple threads and

wait
/

notify
, so it’s best to get familiar with it.

And, again, this logic is all hidden inside the

get
call of the blocking queue. Once you have it working, you can forget about it.

The code

Speaking of getting the blocking queue code working, here’s a working implementation, along with a test program. The code in Queue.java implements all the logic of the blocking queue, while QT.java contains a simple test program.

The test program is a command-line program, and works as follows. You specify a number of threads on the command-line:


> java QT 5

The QT object creates five threads that all attempt to get objects from the blocking queue it contains. Each time an object is returned from the blocking queue, the receiving thread prints the object out and goes back to waiting for another one.

Meanwhile, the main thread waits for you to type some text and press <RETURN>. When you press <RETURN>, the text you typed is put into the blocking queue. Thus, you can put objects into the queue as fast as you can type them in. Meanwhile, the various threads are printing the objects out. Each thread announces its thread ID when it prints out an object, so you can see that different threads are receiving the objects.

(Note: This code is written for Java 1.0.2 for maximum portability. Later Java implementations might complain about deprecated methods when this code is compiled.)

Queue.java

import java.util.*;

public class Queue
{
  // Internal storage for the queue'd objects
  private Vector vec = new Vector();

  synchronized public void put( Object o ) {
    // Add the element
    vec.addElement( o );

    // There might be threads waiting for the new object --
    // give them a chance to get it
    notifyAll();
  }

  synchronized public Object get() {
    while (true) {
      if (vec.size()>0) {
        // There's an available object!
        Object o = vec.elementAt( 0 );

        // Remove it from our internal list, so someone else
        // doesn't get it.
        vec.removeElementAt( 0 );

        // Return the object
        return o;
      } else {
        // There aren't any objects available.  Do a wait(),
        // and when we wake up, check again to see if there
        // are any.
        try { wait(); } catch( InterruptedException ie ) {}
      }
    }
  }
}


QT.java

import java.io.*;

public class QT implements Runnable
{
  // The blocking queue we are testing.
  private Queue q = new Queue();

  // Create the specified number of object-getting threads.
  public QT( int nThreads ) {
    for (int i=0; i<nThreads; ++i)
      new Thread( this ).start();
  }

  // Each thread does this: get an object, and print it out.
  // Over and over.
  public void run() {
    while (true) {
      String s = (String)q.get();
      System.out.println( "Thread "+Thread.currentThread()+": "+s );
    }
  }

  // Place a text string into the queue.
  public void put( String s ) {
    q.put( s );
  }

  static public void main( String args[] ) throws Exception {
    // Check command-line parameters
    if (args.length<1) throw new Exception( "Usage: QT <numthreads>" );

    // Create the QT object and its threads
    QT qt = new QT( new Integer( args[0] ).intValue() );

    // Main thread: read string; put string in queue; repeat.
    DataInputStream din = new DataInputStream( System.in );
    String s = null;
    while ((s=din.readLine())!=null) {
      qt.put( s );
      Thread.yield();
    }
  }
}


Problems

The blocking queue is simple and useful. There are some potential problems that might happen, especially in complex or high-traffic systems.

Deadlock: Deadlock is a problem in any multithreaded program, and there’s no simple solution for it. When using a blocking queue, deadlock can happen if one thread is waiting for an object while another thread — the one that can supply that object — is waiting for an object that can only be supplied by the first thread. Such a situation can simply be described as bad design.

Waiting Forever: Even without deadlock, it’s possible for a thread to wait for an object that is never going to come. One modification that might take care of this is to modify the queue code to have a timeout, so that a thread will wait only a certain amount of time before the

get
method returns a

null
. This makes the semantics of

get
slightly more complicated — since a

null
return value is now possible — but if the application requires it, there may be no way around it.

Summary

The blocking queue may not be right for all applications, but it is remarkably effective given the simplicity of its implementation. The blocking queue should be handy in the toolbox of any Java programmer who wants to write a multithreaded program.

Greg Travis is a freelance programmer living in New York City. His interest in computers can probably be traced back to the episode of “The Bionic Woman” where Jamie runs around trying to escape a building whose lights and doors are controlled by an evil artificial intelligence which mocks her through loudspeakers. He’s a devout believer in the idea that when a computer program works, it’s a complete coincidence. He can be reached at mito@panix.com.

Get the Free Newsletter!

Subscribe to Developer Insider for top news, trends & analysis

Latest Posts

Related Stories