Tail-Callable Optimization for Work Sets
 
Support Ukraine

Tail-Callable Optimization for Work Sets

Work sets are a great abstraction in concurrent programming, but yesterday I read A Java Fork-Join Calamity[a], A Java Parallel Calamity[b], and a question on StackOverflow about why the Fork/Join pool creates 700 threads[c].

Suddenly I was very much afraid that I had done something equally stupid. I almost had, but I think I've salvaged it.

1. The Problem

The problem can be summarized as the thread calling join() blocks, thereby "wasting" one thread. This is not noticeable if you have many forks and few joins, but can have an effect otherwise. Let's look at the problem in the base case:

WorkSet<Object> workSet = new WorkSet<>();

// Submit one callable for execution
workSet.execute (new MyCallable<Object>());

// Join the work set. Execute any remaining
// Callables in the calling thread.
workSet.join ();

When we reach workSet.join(), the callable has (most likely) been accepted by the system thread pool. There is nothing for the calling thread to do but wait for it to complete processing. We have, in this case, used one thread-pool thread, and one thread (the calling thread) to do work that should really been done by one thread. Note that if there had been code between the call to execute and the join(), like background I/O, we would not have this problem - then the main thread would be doing useful work, and while the setup isn't the most elegant it's not "wasted" the same way.

In this case, we only "waste" one thread, but in the case where the Callable itself has a work set, we could end up "wasting" the thread executing that one, too, and so on. For example, let's say we have this:

public void recurse (
    final int fanout, 
    final int depth) 
    throws Exception {
    
    if (depth == 0) {
        Thread.sleep (100);
    } else {
        WorkSet<Void> ws = new WorkSet<Void> ();
        for (int i = 0; i < fanout; ++i) {
            ws.execute (new Callable<Void> () {
                    public Void call () throws Exception {
                        recurse (fanout, depth - 1);
                        return null;
                    }
                });
        }
        ws.join ();
    }
}

This results in a tree of work sets, and every one of them wastes a thread. Ultimately the number of wasted threads is capped by the number of threads in the thread pool. In that case, the Callables are not accepted for execution before we reach the join(), and are executed in the calling thread.

Here, we will assume that the WorkSet is used as in the example: First a lot of Callables are put into it with execute, then it is immediately join():ed.

2. Further Investigation

Let's look at the next case up, where we submit two Callables instead of one:

WorkSet<Object> workSet = new WorkSet<>();

// Submit one callable for execution
workSet.execute (new MyCallable<Object>());

// Submit another callable for execution
workSet.execute (new MyCallable<Object>());

// Join the work set.
workSet.join ();

Looks the same, doesn't it? Two Callables are accepted by the thread pool, and the calling thread ends up wasted. The optimal case would be for the thread pool to accept the first Callable, and leave the second to the calling thread for execution in join() when the Future resulting from it is complete():d.

3. The Pattern

The problem is this really: We don't want the last submitted Callable to be executed by the thread pool. We want to save it for the calling thread to execute in join().

4. The Solution

Let the WorkSet keep track of the last submitted Callable. Call this the "tail callable". When a Callable is submitted, we first see if there already is a tail callable. If so, we throw that one into the thread pool. Then the new Callable becomes the current tail callable. When the caller join():s the work set, we first execute the tail callable, if any, then call complete() to help the thread pool complete the remaining Callables.

public void execute (Callable<T> callable) {
    if (tailCallable != null) {
        futures.add (threadPool.execute (tailCallable));
        tailCallable = null;
    }
    tailCallable = callable;
}

public void join () throws Exception {
    if (tailCallable != null) {
        // run the tailCallable, somehow
    }
    
    // complete the rest
    complete (futures);
}

Since this is only valid in the case where we don't run any code between the last submission to the work set and the join(), this behavior must be optional:

public WorkSet tail () {
    this.useTailCallable = true;
    return this;
}

public void execute (Callable<T> callable) {
    if (tailCallable != null) {
        futures.add (threadPool.execute (tailCallable));
        tailCallable = null;
    }
    if (useTailCallable) {
        // Save for the calling thread
        tailCallable = callable;
    } else {
        // Into the pool with you
        futures.add (threadPool.execute (callable));
    }
}

Now we can do this:

WorkSet<Object> workSet = new WorkSet<>().tail ();

// Submit one callable for execution
workSet.execute (new MyCallable<Object>());

// Submit another callable for execution
workSet.execute (new MyCallable<Object>());

// Join the work set.
workSet.join ();

...and know that we aren't wasting any threads.

5. Endnote

Note that this depends on having a work set abstraction to analyze the problem - otherwise there is no way to connect the join with the submission of work. java.util.concurrent has no such thing, which reinforces my view that java.util.concurrent isn't really up to serious use.