Home | pfodApps/pfodDevices | WebStringTemplates | Java/J2EE | Unix | Torches | Superannuation | | About Us
 

Forward Logo (image)      

FutureTalker V1.3
Listeners for java.util.concurrent

FutureTalker

by Matthew Ford 19th December 2004 (revised 11th April 2005)
© Forward Computing and Control Pty. Ltd. NSW Australia
All rights reserved.

Introduction

Java V5.0 (or V1.5 as it otherwise know) incorporates Doug Lee's Concurrent package as java.util.concurrent . This package provides many of the utilities needed for writing multi-threaded programs in Java. One notable omission is a built-in means to listen for the completion of a task. In Java V1.4, my ThreadReturn package provides this functionality and I have found it invaluable in writing multi-tasking programs. This article describes how to get the same functionality in Java 1.5 by building on the hooks provided in the java.util.concurrent package.

The code discribed in this article can be downloaded as a zip file (or tar.gz file), together with their javadocs and package jar file.

First I will detail the requirements and then cover the implementation of each of them.

Requirements

The requirements are to be able to:-

  1. submit a task for execution (possibly at some later time) and have it return its result or any error that may occur.

  2. to be able to cancel the task.

  3. add listeners to the task which will be notified when it completes or is cancelled or terminates with an error.

Callable Interface

Requirement i) is handled by the java.util.concurrent.Callable<V> . This interface defines a single method.

V call() throws Exception

which computes a result, or throws an exception if unable to do so.

So define your task so it implements Callable. As an example, consider a task returning an Integer

class Task implements Callable<Integer> {
......
  public Integer call() throws Exception {
    
    int rnd = random.nextInt(3000);
    if (rnd<1000) {
      throw new Exception("Delay too small, "+rnd);
    }
    
    Thread.currentThread().sleep(rnd);
    return new Integer(rnd);
  }
}

This callable class can be wrapped in a Future class and then passed to the execute() method of an Executor. The Future interface defines methods for cancelling the task, checking if the task has completed and retrieving the result/error.

Stopping / Cancelling a Task

As discussed in To Catch A Thread, Java has a stop() method in the Thread class, but this method is depreciated. See the Thread javadocs and Why are Thread.stop, Thread.suspend and Thread.resume Deprecated? for details.

In the Thread javadocs, Sun suggest two methods of stopping a thread:- using a stop variable and calling interrupt(). The java.util.concurrent package uses interrupt() exclusively to try and stop/cancel tasks. As explained in To Catch A Thread , you need to use interrupt() to release the thread if it is waiting. This will throw an exception (usually an InterruptedException). If the thread is not waiting, the thread's interrupted state will be set and this can be used as the stop variable.

The static method, FutureTalker.ifInterruptedStop(), is provided in this package to check if the thread's interrupt flag has been set and to throw an InterruptedException if it has.

  public static void ifInterruptedStop() throws InterruptedException {
    Thread.yield(); // let another thread have some time perhaps to stop this one.
    if (Thread.currentThread().isInterrupted()) {
      throw new InterruptedException("Stopped by ifInterruptedStop()");
    }
  }

Adding the check for the interrupt flag being set to the above call() method gives.

  public Integer call() throws Exception { 
    // do some work here
    // check for interrupts every now and then
    FutureTalker.ifInterruptedStop();
    
    int rnd = random.nextInt(3000);
    if (rnd<1000) {
      throw new Exception("Delay too small, "+rnd);
    }
    
    FutureTalker.ifInterruptedStop(); // check again
    Thread.currentThread().sleep(rnd);
    return new Integer(rnd);
  }

However it is not sufficient to just throw an InterruptedException, you have to ensure this exception is propagated back up and out of the task's call() method. This may involve catching and wrapping the InterruptedException in an un-checked exception, such as a RuntimeException, and re-throwing it in order to pass it out of a method that has not been defined to throw an Exception.

You also have to ensure the task cleans up after itself, releasing resources, closing files etc. A finally clause is usually the place to do this cleanup. There you check for non-null resources and release them and set them to null. Be careful that any errors occurring in this clean up do not prevent other resources being released. In other words, be sure to catch and handle all errors that occur in cleaning up each individual resource. For example:-

FileInputStream in_1 = null;
FileInputStream in_2 = null;
try {
   ....
   in_1.close(); // throws exception if error.
   in_1 = null;
   in_2.close(); // throws exception if error.
   in_2 = null;
} finally {
   // clean up any un-released resources.
   if (in_1 != null) {
   try {
      in_1.close();
   } catch (Throwable t) {
      // ignore as already handling an error.
   }
   in_1 = null;
   if (in_2 != null) {
   try {
      in_2.close();
   } catch (Throwable t) {
      // ignore as already handling an error.
   }
   in_2 = null;
   // etc...
}

Stopping / Cancelling a Task -- Summary

So to summarize, in order to be able to cancel your tasks you need to:-

a) ensure any methods called by your task throw some type of exception when the thread running them is interrupted.
b) add a check in long loops for the interrupt flag being set and throw an InterruptedException.
c) ensure your task cleans up after itself.

IFutureListener<T>

To add listeners to be notified when the task terminates or is cancelled, requirement iii), we define an interface class

public interface IFutureListener<T> {
        public void futureResult(T result, FutureTalker<T> talker);  // called on normal completion
        public void futureCancelled(CancellationException cex, FutureTalker<T> talker);  // called if cancelled.
        public void futureError(Throwable t, FutureTalker<T> talker); // called on error.
}

and define an extended version of FutureTask<V>, a FutureTalker<V> to which we can add IFutureListeners.

As well as adding methods to add and remove listeners, FutureTalker overrides the FutureTask.done() method to call the listeners

    /**
     * fire the listeners when task terminates or is cancelled
     */
    protected void done() {
      fireListeners();
    }

Each listener is only ever called once for this FutureTalker. Listeners are usually called on the thread executing the task. However if a listener is added after the task is finished/cancelled then it is called immediately on the calling thread.

To actually run the task you need to execute it.

Execute

To execute the task you typically get an instance of an Executor and call execute( ). For example, for a task returning an Integer,

 Callable<Integer> task = new Task();
 FutureTalker<Integer> talker = new FutureTalker<Integer>(task);
 talker.addListener(an_IFutureListener);
 Executor executor = an_Executor;  // get some executor
 executor.execute(talker);  // execute the task, perhaps later

The Executor can be as simple as

 class DirectExecutor implements Executor {
     public void execute(Runnable r) {
         r.run();
     }
 }

which runs the task immediately on the current thread, or more typically, tasks are executed in some thread other than the caller's thread as in

 class ThreadPerTaskExecutor implements Executor {
     public void execute(Runnable r) {
         new Thread(r).start();
     }
 }

The Executors class provides a number of static methods which return useful executors. One such is

public static ExecutorService newFixedThreadPool(int nThreads);

which returns a ThreadPoolExecutor of a fixed pool of thread on which to run tasks.

Rejected or ShutDown

Using an ExecutorService is convenient, but it brings other problems, that of rejection and shutdown.

Rejection

For various reasons the ExecutorService may not accept your task for execution. For example, a ThreadPoolExecutor will call reject(task) when it cannot accept it for any reason. By default this method throws a RejectedExecutionException on the current thread, so when using a ThreadPoolExecutor your code should look like

 Callable<Integer> task = new Task();
 FutureTalker<Integer> futureTalker = new FutureTalker<Integer>(task);
 futureTalker.addListener(an_IFutureListener);
 ExecutorService executorService = newFixedThreadPool(5);  // get some executor
 try {
  executor.execute(futureTalker);  // execute the task, perhaps later
 } catch (RejectedExecutionException rex) {
   futureTalker.rejected(rex);  // failed to execute sets task to error and notifies listeners
 }

The FutureTalker.rejected() method sets this task's error to the RejectedExecutionException argument and then fires the listeners.

Shutdown

ExecutorServices can be shutdown by calling their shutdownNow() method. Typically tasks currently executing will be interrupted and provided your tasks respond to the interrupt, they will eventually fire the futureError() method of their listeners. Those tasks that have not commenced executing are returned in a List<Runnable>. To notify your listeners of this failure to execute, your code should look something like

 Callable<Integer> task = new Task();
 FutureTalker<Integer> futureTalker = new FutureTalker<Integer>(task);
 futureTalker.addListener(an_IFutureListener);  // add a listener 
 ExecutorService executorService = newFixedThreadPool(5);  // get some executor
  .. add some tasks here
 List<Runnable> waitingTasks = executorService.shutdownNow();
 RejectedExecutionException rex = new RejectedExecutionException("shutdown");
 for( Runnable r : waitingTasks) {
   if(r instanceof FutureTalker) {
     ((FutureTalker)r).rejected(rex);  // mark as rejected.
   } else if (r instanceof Future) {
     ((Future)r).cancel(true);  // mark as cancelled.
   }
 }

FutureTalker provides a convenience static method FutureTalker.shutdownNow( ) which takes ThreadPoolExecutor as an argument, shuts it down and calls rejected() on any FutureTalkers returned and cancel() on any Future's returned, before returning the lists of un-executed tasks. Using this method the above code becomes

 Callable<Integer> task = new Task();
 FutureTalker<Integer> futureTalker = new FutureTalker<Integer>(task);
 futureTalker.addListener(an_IFutureListener);  // add a listener 
 ExecutorService executorService = newFixedThreadPool(5);  // get some executor
  .. add some tasks here
  .. now shutdown and fireListeners of waiting tasks.
 List<Runnable> waitingTasks = FutureTalker.shutdownNow((ThreadPoolExecutor)executorService);

Test Example

These two classes, FutureTalker and IFutureListener, together with their javadocs are in this zip file (or tar.gz file). The futureTalker.jar has a TalkerTest class as its main class. Running the jar using

java -jar futureTalker.jar

typically gives the follow output (this may vary from machine to machine due to the threading)

Task 8 stopped due to error. java.lang.Exception  Message:Delay too small, 847
Task 6 was cancelled. java.util.concurrent.CancellationException  Message:null
Task 9 returned 1588
Task 5 stopped due to error. java.lang.Exception  Message:Delay too small, 254
Task 4 stopped due to error. java.lang.Exception  Message:Delay too small, 904
All tasks created. Shutting Down.
Task 10 stopped due to error. java.lang.InterruptedException  Message:sleep interrupted
Task 7 stopped due to error. java.lang.InterruptedException  Message:sleep interrupted
Task 3 stopped due to error. java.util.concurrent.RejectedExecutionException  Message:shutdown
Task 2 stopped due to error. java.util.concurrent.RejectedExecutionException  Message:shutdown
Task 1 stopped due to error. java.util.concurrent.RejectedExecutionException  Message:shutdown

See the TalkerTest.java code for details.

Conclusion

This article details the requirements for writing multi-threaded programs, and then describes how to satisfy these requirements using the java.util.concurrent package, provided with Java V5.0, and two utilities classes provided here (zip or tar.gz). Using these tools, you can replicate the convenience of the ThreadReturn package in Java V5.0 and take advangate of the new thread pool support provided.


Forward home page link (image)

Contact Forward Computing and Control by
©Copyright 1996-2012 Forward Computing and Control Pty. Ltd. ACN 003 669 994