Thread Pools in Google App Engine
Google App Engine supports multithreading, but only in a restricted form. In particular, all threads except the request thread must finish their work at within the request; and while there are exceptions if you run on a back-end instance, there are a lot of interesting things you can do in parallel on a front-end instance, such as multiple parallel datastore queries.
1. Overview
App Engine provides for front-end multithreading via the ThreadManager[a].createThreadForCurrentRequest[b]
method. It takes a Runnable
and returns a Thread
if a new thread executing the runnable if one could be started.
In order to keep using the useful abstractions we have constructed so far, such as work sets, we must implement a thread pool using this. Then we're pretty much good to go.
2. Differences in Threading
There are two big differences in multi-threading in App Engine and in normal Java. The threads are short-lived, and scoped to the current request.
Threads being short-lived means that the worker threads don't wait for work in the thread pool. Instead, they only remain alive for as long as there is work to perform, after which they exit. We need to start and exit threads as needed during a request instead of having a number of pool threads waiting
Threads being scoped to the current request means that we can't have a shared thread pool. Instead, each request must start up its own thread pool, and can only submit work to threads in it.
2.1. Short-lived Threads
Having short-lived threads calls for a slight change in thread pool implementation. In a long lived thread scenario, we would typically do something like this:
/*
* Start all threads in the constructor
* or a separate start() method.
*/
Queue<Callable> queue = new LinkedList<> ();
public void submit (Callable work) {
synchronized (queue) {
queue.add (work);
queue.notify ();
}
}
public void work () {
while (true) {
Callable work = null;
synchronized (queue) {
// Wait for work to be available
while (queue.isEmpty ()) {
queue.wait ();
}
// Get work unit
work = queue.poll ();
}
// Execute
work.call ();
}
}
Now, we will instead start threads as needed, and exit as soon as the work queue is empty.
Queue<Callable> queue = new LinkedList<> ();
AtomicInteger activeThreads = new AtomicInteger (0);
final int maxThreads = 50;
protected void startNewThread () {
// Start a new thread
}
public void submit (Callable work) {
synchronized (queue) {
queue.add (work);
if (activeThreads.get () < maxThreads) {
activeThreads.incrementAndGet ();
startNewThread ();
}
}
}
public void work () {
while (true) {
Callable work = null;
synchronized (queue) {
// Get work unit
work = queue.poll ();
// Exit if the queue is empty
if (work == null) {
activeThreads.decrementAndGet ();
return;
}
}
// Execute
work.call ();
}
}
2.2. Request-Scoped Threads
The easiest way to handle the set up and tear down of the request context is to wrap the HttpServlet.service[c](HttpServletRequest,HttpServletResponse)
method in calls that registers the current thread as a request thread:
@Override
public void service (
HttpServletRequest req,
HttpServletResponse resp)
throws ServletException,
IOException {
ThreadFactoryImpl.beginRequestThread ();
try {
super.service (req, resp);
} finally {
ThreadFactoryImpl.endRequestThread ();
}
}
The ThreadFactoryImpl.beginRequestThread()
should then store the request thread in a ThreadLocal
:
private static final ThreadLocal<Thread> requestThread =
new ThreadLocal<> ();
public static beginRequestThread () {
requestThread.set (Thread.currentThread ());
}
The thread pool should propagate the calling thread's associated request thread to any threads it starts:
private static final ThreadLocal<Thread> requestThread =
new ThreadLocal<> ();
protected void startNewThread () {
final Thread requestThreadForCallingThread =
requestThread.get ();
Thread workerThread = new Thread () {
public void run () {
requestThread.set (requestThreadForCallingThread);
// ... do work
}
};
workerThread.start ();
}
All threads can now get a reference to their associated request thread, which can be used as a key to create or retrieve their associated thread pool:
private static ThreadLocal<ThreadPool> threadPool =
new ThreadLocal<> ();
private static Map<Thread,ThreadPool> threadPools =
new WeakHashMap<> ();
public static ThreadPool getThreadPool () {
ThreadPool tp = threadPool.get ();
if (tp != null) {
return tp;
}
/*
* Need to set up a thread pool for this thread
*
* This can all be done in a ThreadLocal.initialValue
* but I'll write it out here because it is much
* clearer.
*/
// Figure out which thread
// is our request thread
Thread requestThread = requestThread.get ();
synchronized (threadPools) {
tp = threadPools.get (requestThread);
if (tp != null) {
// Ok, we have a thread pool for
// this request thread. Save it.
threadPool.set (tp);
return tp;
}
// No thread pool for this request
// thread. Set up a new one.
tp = new ThreadPoolImpl ();
// Put it in the big map of all
// thread pools
threadPools.put (requestThread, tp);
// Put it in the thread local for quick
// access
threadPool.set (tp);
// Return it
return tp;
}
}
3. ThreadManager Performance
In order to test whether an application should hold on to a per-request thread or if it could release it back to the App Engine, I wrote a small benchmark that would start up 160 threads and measure the time it took from submitting the Runnable
to the ThreadManager
until the Runnable
had started executing.
protected static class TestRunnable
implements Runnable {
/**
* Nanos when the runnable was submitted
* to the ThreadManager.
*/
public long submittedAt;
/**
* Nanos from submittedAt to start of
* run() method.
*/
public long timeToStart;
/**
* Flag to signal completion.
*/
private boolean done = false;
/**
* Used to synchronize the benchmark
*/
private final Semaphore semaphore;
public TestRunnable (Semaphore semaphore) {
this.semaphore = semaphore;
}
public synchronized void preSubmit () {
submittedAt = System.nanoTime ();
}
public void run () {
// Mark when we entered the run() method
final long startedAt = System.nanoTime ();
// Fill in the result and signal
// completion
synchronized (this) {
timeToStart = startedAt - submittedAt;
// Set the done flag and notify
// waiters on exit
done = true;
notifyAll ();
// Notify the benchmark that a
// slot is available.
semaphore.release ();
}
}
public synchronized void await () throws Exception {
while (!done) {
wait ();
}
}
}
protected String threadManagerBenchmark () throws Exception {
List<TestRunnable> runnables = new ArrayList<> ();
// App Engine limit is 50 concurrent per-request threads
// so set this to a little bit lower.
Semaphore semaphore = new Semaphore (40, true);
for (int i = 0; i < 160; ++i) {
// Set up the runnable
TestRunnable testRunnable = new TestRunnable (semaphore);
runnables.add (testRunnable);
// Wait for a slot to become available
semaphore.acquire ();
// Mark this as the time of submission
testRunnable.preSubmit ();
// Start it
ThreadManager.createThreadForCurrentRequest (testRunnable).start ();
}
// Gather up the results
StringBuilder sb = new StringBuilder ();
for (TestRunnable testRunnable : runnables) {
// Wait for the runnable to complete
testRunnable.await ();
sb.append ("," + testRunnable.timeToStart);
}
return sb.toString ();
}
3.1. Results
Median | 0.55 ms |
Mean | 3.4 ms |
Standard deviation | 14 ms |
I'd say its conclusive that an App Engine application should return the ThreadManager
thread as soon as possible.
3.2. Benchmark Times
For completeness, here are the benchmark times.
Nanoseconds |
---|
1625478 |
1052468 |
1032667 |
596672 |
669900 |
634185 |
618386 |
538826 |
486734 |
605344 |
509547 |
427936 |
454675 |
510443 |
512497 |
479394 |
688947 |
449713 |
448289 |
461846 |
415874 |
497965 |
447341 |
6538459 |
6183541 |
6294014 |
553444 |
550433 |
440069 |
454608 |
354756 |
505350 |
400744 |
575268 |
478115 |
22916950 |
746057 |
616596 |
796069 |
546892 |
527706 |
588648 |
449054 |
428609 |
752572 |
817447 |
800186 |
4384689 |
751349 |
878085 |
774418 |
616482 |
974612 |
638632 |
724069 |
500903 |
793643 |
1114472 |
1135742 |
712489 |
649605 |
621493 |
573516 |
499618 |
625427 |
506528 |
517759 |
482207 |
462711 |
491932 |
1803143 |
475701 |
537707 |
595929 |
486622 |
506366 |
453155 |
564956 |
524579 |
763073 |
456771 |
525695 |
73878005 |
835225 |
838766 |
594241 |
620090 |
806793 |
559102 |
483536 |
588210 |
505786 |
548833 |
632268 |
609740 |
572687 |
615935 |
521054 |
547028 |
540745 |
503766 |
450779 |
445388 |
478534 |
531844 |
465216 |
654004 |
500662 |
631713 |
522498 |
465792 |
491966 |
1424045 |
726502 |
557966 |
492410 |
497682 |
520651 |
471177 |
613008 |
82712268 |
82313958 |
81836573 |
82466849 |
948225 |
535108 |
599416 |
537563 |
463282 |
528528 |
578315 |
524043 |
536569 |
475059 |
2622857 |
535079 |
660813 |
728875 |
898359 |
799069 |
563135 |
420158 |
501612 |
477155 |
391734 |
417772 |
1469489 |
1403973 |
612382 |
550014 |
528515 |
535359 |
531551 |
450608 |
395743 |
428368 |
428362 |
352901 |
480293 |
443312 |
Links
https://cloud.google.com/appengine/docs/java/javadoc/com/google/appengine/api/ThreadManager | |
https://cloud.google.com/appengine/docs/java/javadoc/com/google/appengine/api/ThreadManager#createThreadForCurrentRequest-java.lang.Runnable- | |
http://docs.oracle.com/javaee/6/api/javax/servlet/http/HttpServlet.html#service%28javax.servlet.http.HttpServletRequest,%20javax.servlet.http.HttpServletResponse%29 |