October 21, 2014
Hot Topics:
RSS RSS feed Download our iPhone app

Java 5's DelayQueue

  • January 18, 2007
  • By Jeff Langr
  • Send Email »
  • More Articles »

The queue classes in the Java 5 package java.util.concurrency package provide solutions for common queuing needs. The DelayQueue class provides a blocking queue from which objects cannot be removed until they have been in the queue for a minimum period of time.

Elements in the DelayQueue must be of type java.util.concurrent.Delayed, an interface that requires two methods to be defined: getDelay, a method that returns how much time is left before the delay completes, and compareTo. The Delayed interface extends the java.lang.Comparable interface, so Delayed implementations must specify how they should be ordered with respect to other Delayed objects.

As an example, consider a fax server tied to a single phone line. The outgoing phone line can handle only one call at a time, and transmitting a fax takes many second or even minutes. The fax server cannot lose any incoming fax requests while the server is currently transmitting.

As a simple solution, the server can place all incoming fax requests in a queue, returning immediately to the client requesting the transmission. A separate thread on the server pulls entries off the queue and processes them in the order received. When a request is initially made, it's marked to indicate that it should be sent without delay.

When attempting to send a fax, sometimes the line is busy or the line drops during transmission. If a fax transmission attempt fails, the fax server must place the transmission request back into the queue. At this point, the server marks the request with a delay period (ten seconds, in the below implementation). This wait period allows for the remote connection to be reset or to become available. The wait period also allows for other waiting faxes to have a opportunity to attempt transmission.

The below code demonstrates use of the DelayQueue as the core of a fax server implementation. The building block classes of this application are Fax, Dialer, and Transmitter. In fact, Dialer and Transmitter are represented here as just interfaces--we're not not concerned with the communication details. These interfaces are shown in Listing 1, as well as the definition of a simple thread utility class used by other code in this example. Listing 2 shows the Fax class, a simple data class.

Listing 1. Dialer and Transmitter interfaces; ThreadUtil.

// Dialer.java
public interface Dialer {
   boolean connect(String number);
}

// Transmitter.java
public interface Transmitter {
   void send(Fax fax);
}

// ThreadUtil.java
package util;

public class ThreadUtil {
   public static void pause(int seconds) {
      try {
         Thread.sleep(seconds * 1000L);
      }
      catch (InterruptedException e) {
      }
   }
}

Listing 2. The Fax class.

public class Fax {
   private String to;
   private String from;
   private String text;

   public Fax(String to, String from, String text) {
      this.to = to;
      this.from = from;
      this.text = text;
   }

   public String to() {
      return to;
   }

   public String from() {
      return from;
   }

   public String text() {
      return text;
   }

   public String toString() {
      return String.format("[fax to: %s from: %s]", to, from);
   }
}

The meat of the application is in the FaxServer (Listing 3) and FaxTransmission (Listing 4) classes. A FaxTransmission holds onto a Fax object, and contains the logic to determine whether a Fax needs to wait. I'll provide more details on the FaxTransmission class shortly. The FaxServer encapsulates a DelayQueue that stores FaxTransmission objects.

Listing 3. The FaxServer class.

import java.util.concurrent.*;

public class FaxServer {
   private DelayQueue<FaxTransmission> queue =
      new DelayQueue<FaxTransmission>();
   private Dialer dialer;
   private Transmitter transmitter;

   public FaxServer(Dialer dialer, Transmitter transmitter) {
      this.dialer = dialer;
      this.transmitter = transmitter;
   }

   public void start() {
      new Thread(new Runnable() {
         public void run() {
            while (true) {
               try {
                  transmit(queue.take());
               }
               catch (InterruptedException e) {
               }
            }
         }
      }).start();
   }

   private void transmit(FaxTransmission transmission) {
      if (dialer.connect(transmission.getFax().to())) {
         System.out.printf("sending %s.", transmission);
         transmitter.send(transmission.getFax());
         System.out.println("completed");
      }
      else {
         System.out.printf(
            "busy, queuing %s for resend%n", transmission);
         transmission.setToResend();
         queue.add(transmission);
      }
   }

   public void send(Fax fax) {
      System.out.printf("queuing %s%n", fax);
      queue.add(new FaxTransmission(fax));
   }
}

A client requests a Fax to be sent by calling the send method on FaxServer. The FaxServer code wraps the Fax object in a FaxTransmission, which then gets enqueued on the DelayQueue. Control returns immediately to the client.

A separate thread on the server, defined in the start method, loops infinitely. The body of the loop calls the method take against the DelayQueue object. This call blocks until there is an appropriate element to remove from the queue (i.e. one that has waited the specified minimum amount of time):

	transmit(queue.take());





Page 1 of 2



Comment and Contribute

 


(Maximum characters: 1200). You have characters left.

 

 


Sitemap | Contact Us

Rocket Fuel