In my previous post, I had demonstrated the use of RecursiveAction class for the Fork / Join framework. Continuing that here in this post, an example for the RecursiveTask class has been explained.
Two methods inherited from ForkJoinTask have been used:
1. fork() - It allows a ForkJoinTask to be planned for asynchronous execution. This allows a new ForkJoinTask to be launched from an existing one.
2. join() - It returns the result of the computation when it is done.
The task here is to find the sum of all the elements in an array. Let's have a look at this:
Two methods inherited from ForkJoinTask have been used:
1. fork() - It allows a ForkJoinTask to be planned for asynchronous execution. This allows a new ForkJoinTask to be launched from an existing one.
2. join() - It returns the result of the computation when it is done.
The task here is to find the sum of all the elements in an array. Let's have a look at this:
package com.fork.join.task; import java.util.ArrayList; import java.util.List; import java.util.Random; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.RecursiveTask; public class ForkJoinSumTask { Random random = new Random(); public void fillArray(int[] array) { for (int i = 0; i < array.length; i++) { array[i] = array[i] = random.nextInt(10000); } } public static void main(String[] args) { ForkJoinSumTask sum = new ForkJoinSumTask(); int[] array = new int[20_00_00_000]; sum.fillArray(array); long count; long start1; // Sequential process to get the sum of the elements in array for (int j = 0; j < 20; j++) { count = 0; start1 = System.currentTimeMillis(); for (long i = 0; i < (long) array.length; i++) { count = (count + array[(int) i]); } System.out.println("Addition Result: " + count); System.out.println("Sequential processing time: " + (System.currentTimeMillis() - start1) + " ms"); } System.out.println("Parallel processing time"); System.out.println("Number of processors available: " + Runtime.getRuntime().availableProcessors()); ForkJoinPool fjpool = new ForkJoinPool(Runtime.getRuntime().availableProcessors()); // Default parallelism level // = // Runtime.getRuntime().availableProcessors() long start2; for (int i = 0; i < 20; i++) { RecursiveSumTask task = new RecursiveSumTask(array, 0, array.length); start2 = System.currentTimeMillis(); System.out.println("Addition Result: " + fjpool.invoke(task)); System.out.println("Parallel processing time: " + (System.currentTimeMillis() - start2) + " ms"); } System.out .println("Number of steals: " + fjpool.getStealCount() + "\n"); } } class RecursiveSumTask extends RecursiveTask{ private static final long serialVersionUID = 1L; final int low; final int high; private int[] array; final int splitSize = 1000_00_000; // Some threshold size to spit the task RecursiveSumTask(int[] array, int from, int to) { this.low = from; this.high = to; this.array = array; } @Override protected Long compute() { long count = 0L; List > forks = new ArrayList<>(); if (high - low > splitSize) { // task is huge so divide in half int mid = (low + high) / 2; // Divided the given task into task1 and task2 RecursiveSumTask task1 = new RecursiveSumTask(array, low, mid); forks.add(task1); task1.fork(); RecursiveSumTask task2 = new RecursiveSumTask(array, mid, high); forks.add(task2); task2.fork(); } else { // Calculating sum of the given array range for (int i = (int) low; i < high; i++) { count = count + array[i]; } } // Waiting for the result for (RecursiveTask task : forks) { count = count + task.join(); } return count; } }
Some threshold value (array size in this case) is used to decide whether the computation is to be performed directly or is to be divided into sub tasks (ForkJoinTasks).
Observations:
Addition Result: 999844939360
Sequential processing time: 122 ms
Addition Result: 999844939360
Sequential processing time: 126 ms
Addition Result: 999844939360
Sequential processing time: 120 ms
Addition Result: 999844939360
Sequential processing time: 120 ms
Addition Result: 999844939360
Sequential processing time: 120 ms
Addition Result: 999844939360
Sequential processing time: 121 ms
Addition Result: 999844939360
Sequential processing time: 120 ms
Addition Result: 999844939360
Sequential processing time: 119 ms
Addition Result: 999844939360
Sequential processing time: 120 ms
Addition Result: 999844939360
Sequential processing time: 121 ms
Parallel processing
Number of processors available: 4
Addition Result: 999844939360
Parallel processing time: 42 ms
Addition Result: 999844939360
Parallel processing time: 49 ms
Addition Result: 999844939360
Parallel processing time: 40 ms
Addition Result: 999844939360
Parallel processing time: 38 ms
Addition Result: 999844939360
Parallel processing time: 38 ms
Addition Result: 999844939360
Parallel processing time: 38 ms
Addition Result: 999844939360
Parallel processing time: 39 ms
Addition Result: 999844939360
Parallel processing time: 38 ms
Addition Result: 999844939360
Parallel processing time: 38 ms
Addition Result: 999844939360
Parallel processing time: 38 ms
Number of steals: 30
For parallel processing, the processing time is nearly 1/3 rd of the time required for sequential processing.
CPU Utilization:
CPU usage was found to be around 25 % during sequential execution.
For parallel processing , CPU usage was on an average around 58 %
There is a clear increase in the CPU usage for parallel processing. The computation involved in this case was pretty simple (sum of the array elements). Better results can be experienced if the tasks involved are more complex.
The Fork / Join framework is more useful in cases where sequential operations are complex and time consuming. An appropriate threshold in such situations can result in much better CPU utilization. For short tasks, Fork/Join framework is not recommended.
You can also refer my previous blog on Fork/Join Framework for more information
Thanks and Happy Coding !
No comments:
Post a Comment