Nested parallelism, Java 8 parallel streams, Bug in the ForkJoinPool framework.

Java 8 Parallel Streams Deadlocking

The Java 8 parallel streams api supports parallel processing of operators and parallel foreach loops. However, it appears as if the backend executor framework, the fork/join pool, has a problem with nested parallel loops: If you use nested parallel streams, e.g. like
IntStream.range(0,24).parallel().forEach(i -> {
  // do some heavy work here
  IntStream.range(0,100).parallel().forEach(j -> {
    // do some work here
  });
});

you may observe very poor performance at high CPU usage. For example, runnning the test program at NestedParallelForEachBenchmark.java I get the following results:

Timing for test cases (parallelism set to 2):
inner loop parallel             = 41,18 +/- 2,99 (min: 33,14 , max: 45,59) [CPU Time: 106,00] (number of threads in F/J pool: 8)
inner loop sequential           = 27,61 +/- 0,50 (min: 26,62 , max: 28,61) [CPU Time: 76,91] (number of threads in F/J pool: 2)
inner loop parallel with bugfix = 26,99 +/- 0,81 (min: 25,75 , max: 28,77) [CPU Time: 77,73] (number of threads in F/J pool: 2)
(test cases were run independent with 20 warm up runs and 20 test runs on an Intel i7@2.6 GHz).


In addition, using synchronization may lead to (unexpected) deadlocks. For example, the following code will result in a deadlock:
IntStream.range(0,24).parallel().forEach(i -> {
  // do some heavy work here
  synchronized(this) {
    IntStream.range(0,100).parallel().forEach(j -> {
      // do some work here
    });
  }
});

In this test case we generate 24 parallel tasks to make some calculation. At one point we need to synchronize (assume we need to access some state). There is only one thread entering the synchronized() part. Inside the synchronized part we evaluate some function in parallel (why not, all the other threads are waiting anyway), but that is fine (there is no nested locking). Some might suspect that - given that all tasks are submitted to a common thread pool - there are no more threads to calculate the inner loop in parallel. However, even if there are no more free workers, there is at least one active thread, namely that one which entered the synchronized {} block, and we expect that parallel execution defaults to in-line sequential execution if there are no additional threads available.
The parallel streams use a common ForkJoinPool as a backend, and this behavior is implemented in the ForkJoinTask: it distinguishes between a ForkJoinWorkerThread and the main Thread and all tasks which cannot be completed by a ForkJoinWorkerThread can run (sequentially) on the main Thread.
Also note, that the documentation of the backend ForkJoinPool states that

"The pool attempts to maintain enough active (or available) threads by dynamically adding, suspending, or resuming internal worker threads, even if some tasks are stalled waiting to join others. However, no such adjustments are guaranteed in the face of blocked IO or other unmanaged synchronization."

- but a deadlock would only be expected if the number of available threads is exhausted (and we are far below that number). So while synchonize should be avoided, the documentation of the ForkJoinPool does not forbit synchronization.
Even without the synchronize the bug will lead to performance issues (see a corresponding post on stackoverflow).

The Bug

The test whether a task is running on a main thread (the creator of that loop) or running on a worker is performed via the line (401 in ForkJoinTask of 1.8u5 (Java 8))
((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
so it just tests if the currentThread is of type ForkJoinWorkerThread. But for a nested loop, the calling thread (the creator) could itself be of type ForkJoinWorkerThread, because it is a worker of the outer loop. In that case the inner loops task is joined with another outer loop task (which is currently waiting for the synchronized lock) and this results in a deadlock.

Reproducing the bug

To reproduce the bug and check the claim:
  1. Run the program NestedParallelForEachAndSynchronization.java (see below) in a debugger. It will hang in a deadlock in the last test case (a simple nested loop with inner synchonization).
  2. In the debugger suspend all threads and check where theses threads are waiting.
  3. You will find out that all threads wait for the synchronize lock inside the outer loop and that lock is owned by one of the threads, lets call that thread ForkJoinPool.commonPool-worker-7.
  4. If you check ForkJoinPool.commonPool-worker-7 then you see that he waits of the synchronize lock too, but he is already inside the inner loop. Now, lets check why a pice of code inside the synchronize waits for the lock: You see that the wait() is issued by the awaitJoin() in line 402 of ForkJoinTask. This is wrong, instead that task should have done an externalWaitDone(). Explanation: That task is the main task of the inner loop, i.e., a task of the outer loop that created the inner loop, but (due to a bug) that tasks considers itself as a forked worker (of the inner loop) and creates a join - effectively joining with the outer loop’s task. The problem is that if the inner loop is running on a forked worked of the outer loop, we cannot distinguish forked (inner loop’s) worker from main threads because line 401 is always true.
  5. Double Checking: If the explanation in 4 would be correct, the problem would go away if we fix line 401 (and also all other corresponding tests). To fix this we change the type of the thread containing the inner loop from ForkJoinWorkerThread to Thread (by creating a wrapper). Indeed: this fixed that deadlock and greatly improved performance. The second test case in NestedParallelForEachAndSynchronization.java does this.
For a complete test demonstrating the deadlock and the workaround see: NestedParallelForEachAndSynchronization.java

Perfomance Problem induced by the Bug

The faulty join may lead to performance problems. For a complete test demonstrating the performance issue see NestedParallelForEachBenchmark.java or NestedParallelForEachTest.java.

Epilog

Nested parallelism may occure quite natural: Consider
IntUnaryOperator func = (i) -> IntStream.range(0,100).parallel().map(j -> i*j).sum();
- a definition of a function i -> func(i), which is defined as a sum of i*j and which is internally calculated in parallel. and then do
IntStream.range(0,100).parallel().map(func).sum();
(a discrete approximation of a two dimensional integral). Of course, this code (and other such problems) can be reformulated into a non-nested version, but assuming that you like to encapsulate the internal definition of func, the nesting is a consequence of abstraction.

I first encountered that bug via a deadlock introduced by using a Semaphore inside the outer loop (e.g. like a blocking IO). I have posted this to stackoverflow (and, following a suggestion on SO, to the concurrency-interest mailing list). The discussion started by that post then focussed on the question whether using a Semaphore and/or nested parallel loops would be a good design practice.
So far my conclusion is:
  • Currently, nesting of parallel streams should be avoided since it has a performance issues.
  • Nesting of parallel streams with inner synchronized has to be avoided, because a real risk of (otherwise unexpected) deadlocks.
  • Currently the use of dedicated Executor frameworks is best to implement nested parallelism. However, using dedicated Executors has the disadvantage that is is much harder to maintain a global level of parallelism.