Home
| pfodApps/pfodDevices
| WebStringTemplates
| Java/J2EE
| Unix
| Torches
| Superannuation
|
| About
Us
|
FutureTalker V1.3
|
by Matthew Ford 19th December 2004 (revised 11th
April 2005)
© Forward Computing and Control Pty. Ltd. NSW
Australia
All rights reserved.
Java V5.0 (or V1.5 as it otherwise know) incorporates Doug Lee's Concurrent package as java.util.concurrent . This package provides many of the utilities needed for writing multi-threaded programs in Java. One notable omission is a built-in means to listen for the completion of a task. In Java V1.4, my ThreadReturn package provides this functionality and I have found it invaluable in writing multi-tasking programs. This article describes how to get the same functionality in Java 1.5 by building on the hooks provided in the java.util.concurrent package.
The code discribed in this article can be downloaded as a zip file (or tar.gz file), together with their javadocs and package jar file.
First I will detail the requirements and then cover the implementation of each of them.
The requirements are to be able to:-
submit a task for execution (possibly at some later time) and have it return its result or any error that may occur.
to be able to cancel the task.
add listeners to the task which will be notified when it completes or is cancelled or terminates with an error.
Requirement i) is handled by the java.util.concurrent.Callable<V> . This interface defines a single method.
V call() throws Exception
which computes a result, or throws an exception if unable to do so.
So define your task so it implements Callable. As an example, consider a task returning an Integer
class Task implements Callable<Integer> { ...... public Integer call() throws Exception { int rnd = random.nextInt(3000); if (rnd<1000) { throw new Exception("Delay too small, "+rnd); } Thread.currentThread().sleep(rnd); return new Integer(rnd); } }
This callable class can be wrapped in a Future class and then passed to the execute() method of an Executor. The Future interface defines methods for cancelling the task, checking if the task has completed and retrieving the result/error.
As discussed in To
Catch A Thread, Java has a stop()
method in the Thread
class, but this method is
depreciated
. See the Thread
javadocs and Why
are Thread.stop, Thread.suspend and Thread.resume Deprecated?
for details.
In the Thread
javadocs, Sun suggest two methods of stopping a thread:- using a stop
variable and calling interrupt()
. The
java.util.concurrent
package uses interrupt() exclusively to try and stop/cancel tasks. As
explained in To
Catch A Thread , you need to use interrupt() to release the
thread if it is waiting. This will throw an exception (usually an
InterruptedException).
If the thread is not waiting, the thread's interrupted
state will be set and this can be used as the stop variable.
The static method, FutureTalker.ifInterruptedStop(), is provided in this package to check if the thread's interrupt flag has been set and to throw an InterruptedException if it has.
public static void ifInterruptedStop() throws InterruptedException { Thread.yield(); // let another thread have some time perhaps to stop this one. if (Thread.currentThread().isInterrupted()) { throw new InterruptedException("Stopped by ifInterruptedStop()"); } }
Adding the check for the interrupt flag being set to the above call() method gives.
public Integer call() throws Exception { // do some work here // check for interrupts every now and then FutureTalker.ifInterruptedStop(); int rnd = random.nextInt(3000); if (rnd<1000) { throw new Exception("Delay too small, "+rnd); } FutureTalker.ifInterruptedStop(); // check again Thread.currentThread().sleep(rnd); return new Integer(rnd); }
However it is not sufficient to just throw an InterruptedException, you have to ensure this exception is propagated back up and out of the task's call() method. This may involve catching and wrapping the InterruptedException in an un-checked exception, such as a RuntimeException, and re-throwing it in order to pass it out of a method that has not been defined to throw an Exception.
You also have to ensure the task cleans
up after itself, releasing resources, closing files etc. A finally
clause is usually the place to do this cleanup. There you check for
non-null resources and release them and set them to null. Be careful
that any errors occurring in this clean up do not prevent other
resources being released. In other words, be sure to catch and handle
all errors that occur in cleaning up each individual resource. For
example:-
FileInputStream in_1 = null; FileInputStream in_2 = null; try { .... in_1.close(); // throws exception if error. in_1 = null; in_2.close(); // throws exception if error. in_2 = null; } finally { // clean up any un-released resources. if (in_1 != null) { try { in_1.close(); } catch (Throwable t) { // ignore as already handling an error. } in_1 = null; if (in_2 != null) { try { in_2.close(); } catch (Throwable t) { // ignore as already handling an error. } in_2 = null; // etc... }
So to summarize, in order to be able to cancel your tasks you need to:-
a) ensure any methods called by your
task throw some type of exception when the thread running them is
interrupted.
b) add a check in long loops for the interrupt flag
being set and throw an InterruptedException.
c) ensure your task
cleans up after itself.
To add listeners to be notified when the task terminates or is cancelled, requirement iii), we define an interface class
public interface IFutureListener<T> { public void futureResult(T result, FutureTalker<T> talker); // called on normal completion public void futureCancelled(CancellationException cex, FutureTalker<T> talker); // called if cancelled. public void futureError(Throwable t, FutureTalker<T> talker); // called on error. }
and define an extended version of FutureTask<V>, a FutureTalker<V> to which we can add IFutureListeners.
As well as adding methods to add and remove listeners, FutureTalker overrides the FutureTask.done() method to call the listeners
/** * fire the listeners when task terminates or is cancelled */ protected void done() { fireListeners(); }
Each listener is only ever called once for this FutureTalker. Listeners are usually called on the thread executing the task. However if a listener is added after the task is finished/cancelled then it is called immediately on the calling thread.
To actually run the task you need to execute it.
To execute the task you typically get an instance of an Executor and call execute( ). For example, for a task returning an Integer,
Callable<Integer> task = new Task(); FutureTalker<Integer> talker = new FutureTalker<Integer>(task); talker.addListener(an_IFutureListener); Executor executor = an_Executor; // get some executor executor.execute(talker); // execute the task, perhaps later
The Executor can be as simple as
class DirectExecutor implements Executor { public void execute(Runnable r) { r.run(); } }
which runs the task immediately on the current thread, or more typically, tasks are executed in some thread other than the caller's thread as in
class ThreadPerTaskExecutor implements Executor { public void execute(Runnable r) { new Thread(r).start(); } }
The Executors class provides a number of static methods which return useful executors. One such is
public static ExecutorService newFixedThreadPool(int nThreads);
which returns a ThreadPoolExecutor of a fixed pool of thread on which to run tasks.
Using an ExecutorService is convenient, but it brings other problems, that of rejection and shutdown.
For various reasons the ExecutorService may not accept your task for execution. For example, a ThreadPoolExecutor will call reject(task) when it cannot accept it for any reason. By default this method throws a RejectedExecutionException on the current thread, so when using a ThreadPoolExecutor your code should look like
Callable<Integer> task = new Task();
FutureTalker<Integer> futureTalker = new FutureTalker<Integer>(task);
futureTalker.addListener(an_IFutureListener);
ExecutorService executorService = newFixedThreadPool(5); // get some executor
try {
executor.execute(futureTalker); // execute the task, perhaps later
} catch (RejectedExecutionException rex) {
futureTalker.rejected(rex); // failed to execute sets task to error and notifies listeners
}
The FutureTalker.rejected() method sets this task's error to the RejectedExecutionException argument and then fires the listeners.
ExecutorServices can be shutdown by calling their shutdownNow() method. Typically tasks currently executing will be interrupted and provided your tasks respond to the interrupt, they will eventually fire the futureError() method of their listeners. Those tasks that have not commenced executing are returned in a List<Runnable>. To notify your listeners of this failure to execute, your code should look something like
Callable<Integer> task = new Task();
FutureTalker<Integer> futureTalker = new FutureTalker<Integer>(task);
futureTalker.addListener(an_IFutureListener); // add a listener
ExecutorService executorService = newFixedThreadPool(5); // get some executor
.. add some tasks here
List<Runnable> waitingTasks = executorService.shutdownNow();
RejectedExecutionException rex = new RejectedExecutionException("shutdown");
for( Runnable r : waitingTasks) {
if(r instanceof FutureTalker) {
((FutureTalker)r).rejected(rex); // mark as rejected.
} else if (r instanceof Future) {
((Future)r).cancel(true); // mark as cancelled.
}
}
FutureTalker provides a convenience static method FutureTalker.shutdownNow( ) which takes ThreadPoolExecutor as an argument, shuts it down and calls rejected() on any FutureTalkers returned and cancel() on any Future's returned, before returning the lists of un-executed tasks. Using this method the above code becomes
Callable<Integer> task = new Task();
FutureTalker<Integer> futureTalker = new FutureTalker<Integer>(task);
futureTalker.addListener(an_IFutureListener); // add a listener
ExecutorService executorService = newFixedThreadPool(5); // get some executor
.. add some tasks here
.. now shutdown and fireListeners of waiting tasks.
List<Runnable> waitingTasks = FutureTalker.shutdownNow((ThreadPoolExecutor)executorService);
These two classes, FutureTalker and IFutureListener, together with their javadocs are in this zip file (or tar.gz file). The futureTalker.jar has a TalkerTest class as its main class. Running the jar using
java -jar futureTalker.jar
typically gives the follow output (this may vary from machine to machine due to the threading)
Task 8 stopped due to error. java.lang.Exception Message:Delay too small, 847 Task 6 was cancelled. java.util.concurrent.CancellationException Message:null Task 9 returned 1588 Task 5 stopped due to error. java.lang.Exception Message:Delay too small, 254 Task 4 stopped due to error. java.lang.Exception Message:Delay too small, 904 All tasks created. Shutting Down. Task 10 stopped due to error. java.lang.InterruptedException Message:sleep interrupted Task 7 stopped due to error. java.lang.InterruptedException Message:sleep interrupted Task 3 stopped due to error. java.util.concurrent.RejectedExecutionException Message:shutdown Task 2 stopped due to error. java.util.concurrent.RejectedExecutionException Message:shutdown Task 1 stopped due to error. java.util.concurrent.RejectedExecutionException Message:shutdown
See the TalkerTest.java code for details.
This article details the requirements for writing multi-threaded programs, and then describes how to satisfy these requirements using the java.util.concurrent package, provided with Java V5.0, and two utilities classes provided here (zip or tar.gz). Using these tools, you can replicate the convenience of the ThreadReturn package in Java V5.0 and take advangate of the new thread pool support provided.
Contact Forward Computing and Control by
©Copyright 1996-2020 Forward Computing and Control Pty. Ltd.
ACN 003 669 994