Work sets are a great abstraction in concurrent programming, but yesterday I read A Java Fork-Join Calamity[a], A Java Parallel Calamity[b], and a question on StackOverflow about why the Fork/Join pool creates 700 threads[c].
Suddenly I was very much afraid that I had done something equally stupid. I almost had, but I think I've salvaged it.
1. The Problem
The problem can be summarized as the thread calling join()
blocks, thereby "wasting" one thread. This is not noticeable if you have many forks and few joins, but can have an effect otherwise. Let's look at the problem in the base case:
WorkSet<Object> workSet = new WorkSet<>();
// Submit one callable for execution
workSet.execute (new MyCallable<Object>());
// Join the work set. Execute any remaining
// Callables in the calling thread.
workSet.join ();
When we reach workSet.join()
, the callable has (most likely) been accepted by the system thread pool. There is nothing for the calling thread to do but wait for it to complete processing. We have, in this case, used one thread-pool thread, and one thread (the calling thread) to do work that should really been done by one thread. Note that if there had been code between the call to execute
and the join()
, like background I/O, we would not have this problem - then the main thread would be doing useful work, and while the setup isn't the most elegant it's not "wasted" the same way.
In this case, we only "waste" one thread, but in the case where the Callable
itself has a work set, we could end up "wasting" the thread executing that one, too, and so on. For example, let's say we have this:
public void recurse (
final int fanout,
final int depth)
throws Exception {
if (depth == 0) {
Thread.sleep (100);
} else {
WorkSet<Void> ws = new WorkSet<Void> ();
for (int i = 0; i < fanout; ++i) {
ws.execute (new Callable<Void> () {
public Void call () throws Exception {
recurse (fanout, depth - 1);
return null;
}
});
}
ws.join ();
}
}
This results in a tree of work sets, and every one of them wastes a thread. Ultimately the number of wasted threads is capped by the number of threads in the thread pool. In that case, the Callable
s are not accepted for execution before we reach the join()
, and are executed in the calling thread.
Here, we will assume that the WorkSet
is used as in the example: First a lot of Callable
s are put into it with execute
, then it is immediately join()
:ed.
2. Further Investigation
Let's look at the next case up, where we submit two Callable
s instead of one:
WorkSet<Object> workSet = new WorkSet<>();
// Submit one callable for execution
workSet.execute (new MyCallable<Object>());
// Submit another callable for execution
workSet.execute (new MyCallable<Object>());
// Join the work set.
workSet.join ();
Looks the same, doesn't it? Two Callable
s are accepted by the thread pool, and the calling thread ends up wasted. The optimal case would be for the thread pool to accept the first Callable
, and leave the second to the calling thread for execution in join()
when the Future
resulting from it is complete()
:d.
3. The Pattern
The problem is this really: We don't want the last submitted Callable
to be executed by the thread pool. We want to save it for the calling thread to execute in join()
.
4. The Solution
Let the WorkSet
keep track of the last submitted Callable
. Call this the "tail callable". When a Callable
is submitted, we first see if there already is a tail callable. If so, we throw that one into the thread pool. Then the new Callable
becomes the current tail callable. When the caller join()
:s the work set, we first execute the tail callable, if any, then call complete()
to help the thread pool complete the remaining Callable
s.
public void execute (Callable<T> callable) {
if (tailCallable != null) {
futures.add (threadPool.execute (tailCallable));
tailCallable = null;
}
tailCallable = callable;
}
public void join () throws Exception {
if (tailCallable != null) {
// run the tailCallable, somehow
}
// complete the rest
complete (futures);
}
Since this is only valid in the case where we don't run any code between the last submission to the work set and the join()
, this behavior must be optional:
public WorkSet tail () {
this.useTailCallable = true;
return this;
}
public void execute (Callable<T> callable) {
if (tailCallable != null) {
futures.add (threadPool.execute (tailCallable));
tailCallable = null;
}
if (useTailCallable) {
// Save for the calling thread
tailCallable = callable;
} else {
// Into the pool with you
futures.add (threadPool.execute (callable));
}
}
Now we can do this:
WorkSet<Object> workSet = new WorkSet<>().tail ();
// Submit one callable for execution
workSet.execute (new MyCallable<Object>());
// Submit another callable for execution
workSet.execute (new MyCallable<Object>());
// Join the work set.
workSet.join ();
...and know that we aren't wasting any threads.
5. Endnote
Note that this depends on having a work set abstraction to analyze the problem - otherwise there is no way to connect the join with the submission of work. java.util.concurrent
has no such thing, which reinforces my view that java.util.concurrent
isn't really up to serious use.