http://www.developer.com/

Back to article

Simple Thread Control With Java's CountDownLatch


November 27, 2007

The CountDownLatch is a simplified mechanism that allows waiting on a fixed number of threads to complete. It was introduced as part of the Java 5 concurrency API as one of a number of constructs designed to make multithreading code simpler and safer.

I test drove the development of a LineCounter class, as an example class to be used in the context of a multithreaded server. The purpose of this class is to calculate the number of source lines within a file. It's reasonably trivial code. The tests are shown in Listing 1; the source for LineCounter is in Listing 2. LineCounter supports threading via implementing the Runnable interface, but most of the tests have nothing to do with threading. They instead directly interact with the run method.

Listing 1: LineCounterTest.

import static org.junit.Assert.*;
import java.io.*;
import org.junit.*;

public class LineCounterTest {
   private BufferedReader reader;

   @Test
   public void returns0WhenEmpty() {
      write("");
      assertEquals(0, count());
   }

   @Test
   public void returns1WhenOneLine() {
      write("a");
      assertEquals(1, count());
   }

   @Test
   public void returns2WhenTwoLines() {
      write("a\na");
      assertEquals(2, count());
   }

   @Test
   public void returnsNForNLines() {
      write("a\nb\nc");
      assertEquals(3, count());
   }

   @Test
   public void countsMultipleEmptyLines() {
      write("a\nb\n\nc");
      assertEquals(4, count());
   }

   @Test
   public void handlesEndOfLineAsLastCharacter() {
      write("a\n");
      assertEquals(1, count());
   }

   @Test
   public void resetsCountOnException() {
      reader = new BufferedReader(new StringReader(" ") {
      }) {
         @Override
         public String readLine() throws IOException {
            throw new IOException();
         }
      };
      assertEquals(LineCounter.NOT_CALCULATED, count());
   }

   private void write(String contents) {
      reader = new BufferedReader(new StringReader(contents));
   }

   private int count() {
      LineCounter counter = new LineCounter(reader);
      counter.run();
      return counter.getCount();
   }

   @Test
   public void supportsFiles() throws IOException {
      File temp = File.createTempFile("LineCountTest", ".txt");
      BufferedWriter writer = null;
      try {
         writer = new BufferedWriter(new FileWriter(temp));
         writeLine(writer, "a");
         writeLine(writer, "b");
         writer.close();
         LineCounter counter = new LineCounter(temp);
         counter.run();
         assertEquals(2, counter.getCount());
      }
      finally {
         writer.close();
         temp.delete(); // risky!
      }
   }

   @Test(expected = FileNotFoundException.class)
   public void failsConstructionOnFileNotFound()
      throws FileNotFoundException {
      File unlikelyFile = new File("zzzzznotexisting");
      assertFalse(unlikelyFile.exists());
      new LineCounter(unlikelyFile);
   }

   @Test
   public void supportsMultithreading()
      throws InterruptedException {
      write("a\nb\nc");
      LineCounter counter = new LineCounter(reader);
      Thread thread = new Thread(counter);
      thread.start();
      thread.join();
      assertEquals(3, counter.getCount());
   }

   private void writeLine(BufferedWriter writer, String text)
      throws IOException {
      writer.write(text);
      writer.newLine();
   }
}

Listing 2: LineCounter.

import java.io.*;

public class LineCounter implements Runnable {
   static final int NOT_CALCULATED = -1;
   private final BufferedReader reader;
   private int count = NOT_CALCULATED;
   private String filename;

   public LineCounter(BufferedReader reader) {
      this.reader = reader;
   }

   public LineCounter(File file) throws FileNotFoundException {
      this(new BufferedReader(new FileReader(file)));
      this.filename = file.getName();
   }

   public String getFilename() {
      return filename;
   }

   public void run() {
      count = 0;
      try {
         while (reader.readLine() != null)
            count++;
      } catch (IOException e) {
         count = NOT_CALCULATED;
      }
   }

   public int getCount() {
      return count;
   }
}

To demonstrate the CountDownLatch class, I implemented a simple server that processes a number of LineCounter requests in a threaded manner. The definition I chose for the server's interface is simple: It accepts a queue of requests, after which the server can be started.

The CountDownLatch is initialized with a number that represents how many times the latch can be counted down. In a typical use, an application creates a CountDownLatch initialized to a number x, and then creates x threads. The application immediately blocks by calling the await method on the CountDownLatch. When each of the spawned threads completes, it tells the latch to count down. When the latch count reaches zero, any code blocking on await now restarts.

The CountDownLatch can only count down, not up. As such, it's most applicable when the number of threads to create is known beforehand. A different design of the LineCountServer would process individual incoming requests as they were received; this would no longer represent a design for which CountDownLatch was applicable.

The test, shown in Listing 3, demonstrates the protocol for client applications interacting with a LineCountServer.

Listing 3: LineCountServerTest.

import static org.junit.Assert.*;
import org.junit.*;
import java.io.*;
import java.util.*;

public class LineCountServerTest {
   @Test
   public void multipleRequests() {
      LineCountServer server = new LineCountServer();
      LineCounter counter1 = new LineCounter(new BufferedReader(
            new StringReader("a")));
      LineCounter counter2 = new LineCounter(new BufferedReader(
            new StringReader("a\nb")));
      Queue<LineCounter> queue = new LinkedList<LineCounter>();
      queue.add(counter1);
      queue.add(counter2);
      server.setRequests(queue);
      server.run();
      assertEquals(1, counter1.getCount());
      assertEquals(2, counter2.getCount());
   }
}

The LineCountServer code is shown in Listing 4. The waitOnLatch method encapsulates a loop that handles any spurious wakeups from the await function.

The run method in the server controls processing of the counting thread. First, the latch field is initialized to a CountDownLatch with a count that matches the queue size. The while loop iterates until the queue is empty; an element is removed and processed with each iteration of the loop. For each LineCounter object removed from the queue, a new processing thread is created and started. This allows the loop to quickly iterate through the queue and initiate threads for each LineCounter.

Once the queue is emptied and all LineCounter execution threads have been spawned, the run method blocks by calling a method whose job is to wait on the CountDownLatch. As each spawned LineCounter thread completes, it triggers a countdown on the latch. Once the countdown reaches zero, the call to await unblocks.

Listing 4: LineCountServer.

import java.io.*;
import java.util.*;
import java.util.concurrent.*;

public class LineCountServer extends Thread {
   private Queue<LineCounter> queue =
      new LinkedList<LineCounter>();
   private CountDownLatch latch;

   public void setRequests(Queue<LineCounter> queue) {
      this.queue = queue;
   }

   public void run() {
      latch = new CountDownLatch(queue.size());
      while (!queue.isEmpty()) {
         final LineCounter counter = queue.remove();
         new Thread(new Runnable() {
            public void run() {
               execute(counter);
               latch.countDown();
            }
         }).start();
      }
      waitOnLatch();
   }

   private void waitOnLatch() {
      while (true) {
         try {
            latch.await();
            break;
         } catch (InterruptedException e) {
         }
      }
   }

   protected void execute(LineCounter counter) {
      counter.run();
      synchronized (this) {
         System.out.println(counter.getFilename() + " " +
                            counter.getCount());
      }
   }
}

The CountDownLatch is a simple class to understand and use, one that you should take advantage of in your multithreaded applications!

About the Author

Jeff Langr is a veteran software developer with over a quarter century of professional software development experience. He's authored two books and over 50 published articles on software development, including Agile Java: Crafting Code With Test-Driven Development (Prentice Hall) in 2005. You can find out more about Jeff at his site, http://langrsoft.com, or you can contact him via email at jeff at langrsoft dot com.

Sitemap | Contact Us

Thanks for your registration, follow us on our social networks to keep up-to-date