Background I/O
 
Support Ukraine

Background I/O

A pattern that I've found very useful is to offload input and output on background tasks.

The basic idea is to keep a work set for background tasks. When we send work to the background, we first join() any current background task before sending the next work unit. The general pattern is this (I've left out exception handling and other boilerplate in the interest of clarity):

public class Worker {
    private WorkSet<Object> workSet;

    public void doSomething () {
        // Do the non-i/o part
        Object objectToWrite = createObjectToWrite ();

        // Complete any previous writes
        completePrevious ();
        
        // Offload the actual write 
        // to another thread
        workSet = new WorkSet<Object> ();
        workSet.execute (new Callable<Object> () {
            public void call () {
                writeObject (objectToWrite);
            }
        });
    }
    
    protected void completePrevious () {
        if (workSet != null) {
            workSet.join ();
            workSet = null;
        }
    }
    
    public void close () {
        completePrevious ();
    }
}

I've found this very useful when the non-i/o task is very long, as it maxmizes the overlap of the background write and the foreground computation. One downside is that the background write needs to have access to the buffer that is about to be written, and needs to hold on to that until the write completes. In the example above, the object that is being written in the background, and the new object being created, will both exist in memory if we succeed in making the two processes overlap. This means that the maximum buffer size is halved compared to a synchronous solution, since we then only have one buffer in memory at all times, and can release it at the end of each write.

Another example is this buffered writer:

public class AsyncBufferedObjectWriter {
    private Buffer buffer = new Buffer ();
    private WorkSet<Object> writer;
    
    public void write (Object o) {
        // Just add to buffer
        buffer.add (o);
        
        // Check if we should flush
        if (buffer.size () > maxBufferSize) {
            flush (true);
        }
    }
    
    public void flush () {
        // By default, do a synchronous
        // flush. This is consistent
        // with the contract for flush.
        flush (false);
    }
    
    protected void flush (boolean async) {
        // Complete previous flush, if any
        if (writer != null) {
            writer.join ();
        }
        writer = null;
        
        // Set up new buffer
        final Buffer bufferToFlush = buffer;
        buffer = new Buffer ();
        
        if (async) {
            // Set up new flush-work set
            writer = new WorkSet<Object> ();
            
            // Trigger flush of the 
            // buffer that was full
            writer.execute (new Callable<Object> () {
                public void call () {
                    // Actually flush the buffer.
                    bufferToFlush.flush ();
                }
            });
        } else {
            bufferToFlush.flush ();
        }
    }
    
    public void close () {
        // Empty internal buffer
        flush (false);
    }
}

Here the buffer size issue becomes more apparent. In flush(true), we make a copy of the current buffer. While the copy is being flushed we allow clients to add to a new buffer object. If the rate with which clients add objects exceed the rate at which we can flush them, we will end up with a full this.buffer and a full bufferToFlush copy in memory simultaneously.

However, if we can still have a reasonable sized buffer, that is, one where the roundtrip times to disk become negligible compared to the time of actually getting the bits onto the storage medium, then this is not much of a problem.

2013-02-17, updated 2014-11-11
Alby