Monday, 16 December 2013

Fork / Join Framework : RecursiveTask Example

In my previous post, I had demonstrated the use of RecursiveAction class for the Fork / Join framework. Continuing that here in this post, an example for the RecursiveTask class has been explained. 

Two methods inherited from ForkJoinTask have been used:

1. fork() - It allows a ForkJoinTask to be planned for asynchronous execution. This allows a new ForkJoinTask to be launched from an existing one.

2. join() - It returns the result of the computation when it is done

The task here is to find the sum of all the elements in an array. Let's have a look at this: 


package com.fork.join.task;

import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;

public class ForkJoinSumTask {

 Random random = new Random();

 public void fillArray(int[] array) {
  for (int i = 0; i < array.length; i++) {
   array[i] = array[i] = random.nextInt(10000);
  }
 }

 public static void main(String[] args) {
  ForkJoinSumTask sum = new ForkJoinSumTask();
  int[] array = new int[20_00_00_000];
  sum.fillArray(array);

  long count;
  long start1;

  // Sequential process to get the sum of the elements in array
  for (int j = 0; j < 20; j++) {
   count = 0;
   start1 = System.currentTimeMillis();
   for (long i = 0; i < (long) array.length; i++) {
    count = (count + array[(int) i]);
   }

   System.out.println("Addition Result: " + count);
   System.out.println("Sequential processing time: "
     + (System.currentTimeMillis() - start1) + " ms");

  }
  System.out.println("Parallel processing time");
  System.out.println("Number of processors available: "
    + Runtime.getRuntime().availableProcessors());

  ForkJoinPool fjpool = new ForkJoinPool(Runtime.getRuntime().availableProcessors());
  // Default parallelism level
  // =
  // Runtime.getRuntime().availableProcessors()
  long start2;

  for (int i = 0; i < 20; i++) {
   RecursiveSumTask task = new RecursiveSumTask(array, 0, array.length);
   start2 = System.currentTimeMillis();
   System.out.println("Addition Result: " + fjpool.invoke(task));
   System.out.println("Parallel processing time: "
     + (System.currentTimeMillis() - start2) + " ms");
  }

  System.out
    .println("Number of steals: " + fjpool.getStealCount() + "\n");
 }
}

class RecursiveSumTask extends RecursiveTask {
 private static final long serialVersionUID = 1L;
 final int low;
 final int high;
 private int[] array;
 final int splitSize = 1000_00_000; // Some threshold size to spit the task

 RecursiveSumTask(int[] array, int from, int to) {
  this.low = from;
  this.high = to;
  this.array = array;
 }

 @Override
 protected Long compute() {
  long count = 0L;
  List> forks = new ArrayList<>();

  if (high - low > splitSize) {
   // task is huge so divide in half
   int mid = (low + high) / 2;

   // Divided the given task into task1 and task2
   RecursiveSumTask task1 = new RecursiveSumTask(array, low, mid);
   forks.add(task1);
   task1.fork();

   RecursiveSumTask task2 = new RecursiveSumTask(array, mid, high);
   forks.add(task2);
   task2.fork();

  } else {
   // Calculating sum of the given array range
   for (int i = (int) low; i < high; i++) {
    count = count + array[i];
   }
  }

  // Waiting for the result
  for (RecursiveTask task : forks) {
   count = count + task.join();
  }

  return count;
 }
}

Note that RecursiveSumTask extends ForkJoinTask in this case. The compute() has a return type unlike ForkJoinAction whose compute() did not have a return type.

During sequential processing, the whole array is scanned sequentially to perform addition of the elements present in it. 

For parallel execution, again a ForkJoinPool is created. Runtime.getRuntime().availableProcessors() will return the number of available processors available with the system. fork() is performed on each recursive task and result of each such task is added using the join() function join() returns the computation result for the task. 

Some threshold value (array size in this case) is used to decide whether the computation is to be performed directly or is to be divided into sub tasks (ForkJoinTasks).


Observations: 


Addition Result: 999844939360

Sequential processing time: 122 ms
Addition Result: 999844939360
Sequential processing time: 126 ms
Addition Result: 999844939360
Sequential processing time: 120 ms
Addition Result: 999844939360
Sequential processing time: 120 ms
Addition Result: 999844939360
Sequential processing time: 120 ms
Addition Result: 999844939360
Sequential processing time: 121 ms
Addition Result: 999844939360
Sequential processing time: 120 ms
Addition Result: 999844939360
Sequential processing time: 119 ms
Addition Result: 999844939360
Sequential processing time: 120 ms
Addition Result: 999844939360
Sequential processing time: 121 ms

Parallel processing 
Number of processors available: 4

Addition Result: 999844939360
Parallel processing time: 42 ms
Addition Result: 999844939360
Parallel processing time: 49 ms
Addition Result: 999844939360
Parallel processing time: 40 ms
Addition Result: 999844939360
Parallel processing time: 38 ms
Addition Result: 999844939360
Parallel processing time: 38 ms
Addition Result: 999844939360
Parallel processing time: 38 ms
Addition Result: 999844939360
Parallel processing time: 39 ms
Addition Result: 999844939360
Parallel processing time: 38 ms
Addition Result: 999844939360
Parallel processing time: 38 ms
Addition Result: 999844939360
Parallel processing time: 38 ms
Number of steals: 30


For parallel processing, the processing time is nearly 1/3 rd of the time required for sequential processing. 

CPU Utilization: 

CPU usage was found to be around 25 % during sequential execution. 

















For parallel processing , CPU usage was on an average around 58 %
















There is a clear increase in the CPU usage for parallel processing. The computation involved in this case was pretty simple (sum of the array elements). Better results can be experienced if the tasks involved are more complex.

The Fork / Join framework is more useful in cases where sequential operations are complex and time consuming. An appropriate threshold in such situations can result in much better CPU utilization. For short tasks, Fork/Join framework is not recommended.

You can also refer my previous blog on Fork/Join Framework for more information

Thanks and Happy Coding !

No comments:

Post a Comment