Monday 16 December 2013

Java 7 - Fork / Join Framework example

Fork / Join as the name suggests is designed for work that can be broken into smaller pieces recursively. It is a new addition to the JDK 1.7 to support parallelism. It is an implementation of the ExecutorService interface that helps you take advantage of multiple processors.The goal is to use all the available processing power to enhance the performance of your application.

The Fork/Join framework is designed to make divide-and-conquer algorithms easy to parallelize. That type of algorithms is perfect for problems that can be divided into two or more sub-problems of the same type. They use recursion to break down the problem to simple tasks until these become simple enough to be solved directly. The solutions to the sub-problems are then combined to give a solution to the original problem.


The center of the fork/join framework is the ForkJoinPool class, an extension of the AbstractExecutorService class. ForkJoinPool implements the core work-stealing algorithm and can execute ForkJoinTask processes.It is similar to the MapReduce approach used to paralyze tasks. Difference is that Fork/Join tasks will subdivide themselves into smaller tasks only if necessary (if too large), whereas MapReduce algorithms divide up all the work into portions as the first step of their execution.


Basic Algorithm:


if(the job is small enough)
{
   compute directly
}
else
{
   split the work in two pieces (fork)
   invoke the pieces and join the results (join)
}


A ForkJoinTask is an abstract base class for tasks that run within a ForkJoinPool. A ForkJoinTask is a thread-like entity that is much lighter weight than a normal thread. Huge numbers of tasks and subtasks may be hosted by a small number of actual threads in a ForkJoinPool, at the price of some usage limitations.

There are two specialized subclasses of the ForkJoinTask :

1. RecursiveAction : It is to be used when you don’t need the task to return a result, for example, when the task works on positions of an array, it doesn’t return anything because it worked on the array. The method you should implement in order to do the job is compute():void, notice the void return.

2. RecursiveTask : It is to be used when your tasks return a result. For example, when computing addition of elements in an array, each task must return the number it computed in order to join them and obtain the general solution. The method you should implement in order to do the job is compute():V, where V is the type of return; for example in calculating the sum of integer elements in an array, V may be java.lang.Integer.

In this post, I ll be demonstrating you an example for the RecursiveAction. The task here is to fill the array elements with a random value. Here's the code :

package com.fork.join.action;

import static java.util.Arrays.asList;

import java.util.Random;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveAction;

public class ForkJoinRandomFillAction {
 Random random = new Random();

 public void loadArray(int[] array) {
  for (int i = 0; i < array.length; i++) {
   array[i] = random.nextInt(10000); // Generates numbers from 0 to
            // 10000
  }
 }

 public static void main(String[] args) {

  ForkJoinRandomFillAction sort = new ForkJoinRandomFillAction();

  int arrayLength = 2_00_00_0000;
  int array[] = new int[arrayLength];

  // No. of times sequential & Parallel operation should be performed
  final int iterations = 10;

  for (int i = 0; i < iterations; i++) {
   long start = System.currentTimeMillis();
   sort.loadArray(array);

   System.out.println("Sequential processing time: "
     + (System.currentTimeMillis() - start) + " ms");

  }

  System.out.println("Number of processor available: "
    + Runtime.getRuntime().availableProcessors());

  ForkJoinPool fjpool = new ForkJoinPool();
  // Default parallelism level
  // Runtime.getRuntime().availableProcessors()

  for (int i = 0; i < iterations; i++) {
   // Create a task with the complete array
   RecursiveAction task = new RandomFillAction(array, 0, array.length);
   long start = System.currentTimeMillis();
   fjpool.invoke(task);

   System.out.println("Parallel processing time: "
     + (System.currentTimeMillis() - start) + " ms");
  }

  System.out
    .println("Number of steals: " + fjpool.getStealCount() + "\n");
 }
}

class RandomFillAction extends RecursiveAction {
 private static final long serialVersionUID = 1L;
 final int low;
 final int high;
 private int[] array;
 final int splitSize = 2000000; // Some threshold size to spit the task

 public RandomFillAction(int[] array, int low, int high) {
  this.low = low;
  this.high = high;
  this.array = array;
 }

 @Override
 protected void compute() {
  if (high - low > splitSize) {
   // task is huge so divide in half
   int mid = (low + high) / 2;
   invokeAll(asList(new RandomFillAction(array, low, mid),
     new RandomFillAction(array, mid, high)));
  } else {
   // Some calculation logic
   Random random = new Random();
   for (int i = low; i < high; i++) {
    array[i] = random.nextInt(10000);
   }
  }
 }
}

For sequential processing the whole array is filled sequentially with random values. During parallel processing,
we first create a ForkJoinPool which will execute the ForkJoinTask. The default parallelism level for the ForkJoinPool is set to the no. of processors available with the system.Runtime.getRuntime().availableProcessors() will get this value.

Note that the RandomFillAction extends the RecursiveAction class and hence it needs to overwrite the default compute function which has the logic to either split the task or to perform the computation.

Some threshold value (array size in this case) is used to decide whether the computation is to be performed directly or is to be divided into sub tasks (ForkJoinTasks).


Observations:


Sequential processing time: 2316 ms
Sequential processing time: 2287 ms
Sequential processing time: 2287 ms
Sequential processing time: 2293 ms
Sequential processing time: 2287 ms
Sequential processing time: 2295 ms
Sequential processing time: 2292 ms
Sequential processing time: 2291 ms
Sequential processing time: 2291 ms
Sequential processing time: 2292 ms
Number of processor available: 4
Parallel processing time: 705 ms
Parallel processing time: 650 ms
Parallel processing time: 684 ms
Parallel processing time: 602 ms
Parallel processing time: 659 ms
Parallel processing time: 737 ms
Parallel processing time: 605 ms
Parallel processing time: 604 ms
Parallel processing time: 602 ms
Parallel processing time: 628 ms

Number of steals: 63

The paralell processing time is only about 30 % of the sequential processing time.


CPU Utilization

Sequential processing - The total CPU usage remained low close to 25 %, never went above 28 %.





















Parallel processing - CPU usage this time around was very attractive, on an average 96 %, hit 100 % at times and never fell below the 90 % mark. 


















This clearly demonstrates the power of Fork / Join framework and its ability to better utilize the CPU processors. 

In my next post, I will be demonstrating an example for the RecursiveTask class.

Thanks and Happy coding !

No comments:

Post a Comment