http://www.developer.com/

Back to article

Java 5's DelayQueue


January 18, 2007

The queue classes in the Java 5 package java.util.concurrency package provide solutions for common queuing needs. The DelayQueue class provides a blocking queue from which objects cannot be removed until they have been in the queue for a minimum period of time.

Elements in the DelayQueue must be of type java.util.concurrent.Delayed, an interface that requires two methods to be defined: getDelay, a method that returns how much time is left before the delay completes, and compareTo. The Delayed interface extends the java.lang.Comparable interface, so Delayed implementations must specify how they should be ordered with respect to other Delayed objects.

As an example, consider a fax server tied to a single phone line. The outgoing phone line can handle only one call at a time, and transmitting a fax takes many second or even minutes. The fax server cannot lose any incoming fax requests while the server is currently transmitting.

As a simple solution, the server can place all incoming fax requests in a queue, returning immediately to the client requesting the transmission. A separate thread on the server pulls entries off the queue and processes them in the order received. When a request is initially made, it's marked to indicate that it should be sent without delay.

When attempting to send a fax, sometimes the line is busy or the line drops during transmission. If a fax transmission attempt fails, the fax server must place the transmission request back into the queue. At this point, the server marks the request with a delay period (ten seconds, in the below implementation). This wait period allows for the remote connection to be reset or to become available. The wait period also allows for other waiting faxes to have a opportunity to attempt transmission.

The below code demonstrates use of the DelayQueue as the core of a fax server implementation. The building block classes of this application are Fax, Dialer, and Transmitter. In fact, Dialer and Transmitter are represented here as just interfaces--we're not not concerned with the communication details. These interfaces are shown in Listing 1, as well as the definition of a simple thread utility class used by other code in this example. Listing 2 shows the Fax class, a simple data class.

Listing 1. Dialer and Transmitter interfaces; ThreadUtil.

// Dialer.java
public interface Dialer {
   boolean connect(String number);
}

// Transmitter.java
public interface Transmitter {
   void send(Fax fax);
}

// ThreadUtil.java
package util;

public class ThreadUtil {
   public static void pause(int seconds) {
      try {
         Thread.sleep(seconds * 1000L);
      }
      catch (InterruptedException e) {
      }
   }
}

Listing 2. The Fax class.

public class Fax {
   private String to;
   private String from;
   private String text;

   public Fax(String to, String from, String text) {
      this.to = to;
      this.from = from;
      this.text = text;
   }

   public String to() {
      return to;
   }

   public String from() {
      return from;
   }

   public String text() {
      return text;
   }

   public String toString() {
      return String.format("[fax to: %s from: %s]", to, from);
   }
}

The meat of the application is in the FaxServer (Listing 3) and FaxTransmission (Listing 4) classes. A FaxTransmission holds onto a Fax object, and contains the logic to determine whether a Fax needs to wait. I'll provide more details on the FaxTransmission class shortly. The FaxServer encapsulates a DelayQueue that stores FaxTransmission objects.

Listing 3. The FaxServer class.

import java.util.concurrent.*;

public class FaxServer {
   private DelayQueue<FaxTransmission> queue =
      new DelayQueue<FaxTransmission>();
   private Dialer dialer;
   private Transmitter transmitter;

   public FaxServer(Dialer dialer, Transmitter transmitter) {
      this.dialer = dialer;
      this.transmitter = transmitter;
   }

   public void start() {
      new Thread(new Runnable() {
         public void run() {
            while (true) {
               try {
                  transmit(queue.take());
               }
               catch (InterruptedException e) {
               }
            }
         }
      }).start();
   }

   private void transmit(FaxTransmission transmission) {
      if (dialer.connect(transmission.getFax().to())) {
         System.out.printf("sending %s.", transmission);
         transmitter.send(transmission.getFax());
         System.out.println("completed");
      }
      else {
         System.out.printf(
            "busy, queuing %s for resend%n", transmission);
         transmission.setToResend();
         queue.add(transmission);
      }
   }

   public void send(Fax fax) {
      System.out.printf("queuing %s%n", fax);
      queue.add(new FaxTransmission(fax));
   }
}

A client requests a Fax to be sent by calling the send method on FaxServer. The FaxServer code wraps the Fax object in a FaxTransmission, which then gets enqueued on the DelayQueue. Control returns immediately to the client.

A separate thread on the server, defined in the start method, loops infinitely. The body of the loop calls the method take against the DelayQueue object. This call blocks until there is an appropriate element to remove from the queue (i.e. one that has waited the specified minimum amount of time):

	transmit(queue.take());

FaxServer code calls the method transmit for each object taken from the DelayQueue. Code in transmit attempts to dial the fax recipient, and if a connection is made, uses the Transmitter object to send the fax. If a connection cannot be made, the FaxTransmission object method setToResend is invoked (in which it must designate how long to wait before redialing). Code in transmit then re-adds the FaxTransmission object to the DelayQueue.

So, how does the queue know when a fax has been waiting long enough? That's the primary job of the FaxTransmission class (see Listing 4). A FaxTransmission object stores a Fax object plus the time at which the request was initially made (requestTime). Since FaxTransmission implements the Delayed interface, objects of type FaxTransmission objects can be placed in a DelayQueue. The method getDelay, declared in the Delayed interface, indicates to the DelayQueue how much longer an object must wait in order to meet the minimum delay period.

Initially, fax requests shouldn't have to wait--once taken from the queue, the FaxServer should attempt to send them. Thus FaxTransmission holds a boolean called initialTransmission to indicate whether or not a transmission has already been attempted. The first time around, then, getDelay returns a delay of 0.

Sun designed the getDelay function to be flexible, allowing for delay periods to be specified using a variety of time units--either microseconds, milliseconds, nanoseconds, or seconds. The getDelay method is passed a TimeUnit object that represents a duration. Once getDelay calculates how much time is left, it must return this amount in the same unit as the duration passed in. The convert method defined on TimeUnit accomplishes this goal, by taking as a second argument the unit from which the remaining time is to be converted:

      return timeUnit.convert(remainingMs, TimeUnit.MILLISECONDS);

In order to define the proper ordering of objects that are delayed in the queue, the FaxTransmission class also must define a compareTo method. According to Sun's javadoc, this method must provide "an ordering consistent with" the getDelay method. For FaxTransmission, a Delayed object comes before another delayed object (i.e. should be removed first from the queue) if its timeRemaining is less, or if the timeRemaining is equal and its original request time is earlier.

Listing 4. The FaxTransmission class.

import java.util.*;
import java.util.concurrent.*;

public class FaxTransmission implements Delayed {
   static final int REDIAL_PERIOD_IN_SECONDS = 10;
   private long endOfDelay;
   private boolean initialTransmission = true;
   private final Fax fax;
   private final Date requestTime;

   public FaxTransmission(Fax fax) {
      this.fax = fax;
      this.requestTime = new Date();
   }

   public long getDelay(TimeUnit timeUnit) {
      if (initialTransmission)
         return 0;
      return timeUnit.convert(endOfDelay - System.currentTimeMillis(),
                              TimeUnit.MILLISECONDS);
   }

   public int compareTo(Delayed delayed) {
      FaxTransmission request = (FaxTransmission)delayed;
      if (this.endOfDelay < request.endOfDelay)
         return -1;
      if (this.endOfDelay > request.endOfDelay)
         return 1;
      return this.requestTime.compareTo(request.requestTime);
   }

   public void setToResend() {
      initialTransmission = false;
      endOfDelay = System.currentTimeMillis() + REDIAL_PERIOD_IN_SECONDS
                   * 1000L;
   }

   public String toString() {
      return fax.toString();
   }

   public Fax getFax() {
      return fax;
   }
}

The final class, Demo (shown in Listing 5), provides a demonstration of the fax server. This driver code creates a stub implementation of the Dialer, one that connects randomly, roughly every five attempts. It also creates a stub Transmitter that pauses for about ten seconds while printing pseudo-progress information. The driver then sends four faxes, one immediately after the other. Note that the Demo application will run infinitely due to the infinite loop defined in FaxServer.

Listing 5. A demonstration driver.

import java.util.*;

import util.*;

public class Demo {
   public static void main(String[] args) {
      Fax fax1 = new Fax("1", "5555", "some message 1");
      Fax fax2 = new Fax("2", "6666", "some message 2");
      Fax fax3 = new Fax("3", "7777", "some message 3");
      Fax fax4 = new Fax("4", "8888", "some message 4");

      Dialer dialer = new Dialer() {
         private Random random = new Random();
         public boolean connect(String number) {
            return random.nextInt(5) == 0;
         }
      };

      Transmitter transmitter = new Transmitter()  {
         public void send(Fax fax) {
            for (int i = 0; i < 10; i++) {
               System.out.print(".");
               ThreadUtil.pause(1);
            }
         }
      };

      FaxServer server = new FaxServer(dialer, transmitter);
      server.start();
      server.send(fax1);
      ThreadUtil.pause(1);
      server.send(fax2);
      ThreadUtil.pause(1);
      server.send(fax3);
      ThreadUtil.pause(1);
      server.send(fax4);
   }
}

The power of the Java 5 queue implementations, including DelayQueue, is that they define constructs for common queueing needs in a thread-safe manner. It would be reasonably easy to build the logic for your own DelayQueue, but getting the thread safety correct is not so easy. The other benefit of using the Java 5 queues is that they help separate concerns (and thus help your system adhere to the Single Responsibility Principle): The semantics of how items have to wait on the queue is defined separately from the code that defines how the queue is used by the fax server.

About the Author

Jeff Langr is a veteran software developer with a score and more years of experience. He's authored two books and dozens of published articles on software development, including Agile Java: Crafting Code With Test-Driven Development (Prentice Hall) in 2005. You can find out more about Jeff at his site, http://langrsoft.com, or you can contact him directly at jeff@langrsoft.com.

Sitemap | Contact Us

Thanks for your registration, follow us on our social networks to keep up-to-date