Fork/Join thread pool in the nodes

From JPPF Documentation

Jump to: navigation, search
Main Page > Customizing JPPF > Fork/Join thread pool

By default, JPPF nodes use a “standard” thread pool for executing tasks. This add-on allows the use of a thread pool instead of the standard one. This enables JPPF tasks to locally (in the node) spawn ForkJoinTask (or any of its subclasses) instances and have them processed as expected for a ForkJoinPool.

To use this add-on, you will need to deploy the jar file "ThreadManagerForkJoin.jar" to either the JPPF server's or node's classpath. If deployed in the server's classpath, it will be available to all nodes.

The next step is to configure each node for use of the fork/join thread pool. This is achieved by adding the following property to the node's configuration:

 jppf.thread.manager.class = org.jppf.server.node.fj.ThreadManagerForkJoin

Here is an example usage, which computes the number of occurrences of each word in a set of documents:

 public class WordCountTask extends JPPFTask {
   // a list of docuuments to process
   private final List<String> documents;
 
   public WordCountTask(final List<String> documents) {
     this.documents = documents;
   }
 
   @Override
   public void run()
   {
     List<Map<String, Integer>> results = new ArrayList<>();
     // compute word counts in each document
     if (ForkJoinTask.inForkJoinPool()) {
       List<ForkJoinTask<Map<String, Integer>>> tasks = new ArrayList<>();
       // fork one new task per document
       for (String doc: documents) tasks.add(new MyForkJoinTask(doc).fork());
       // wait until all forked tasks have completed (i.e. join)
       for (ForkJoinTask<Map<String, Integer>> task: tasks) results.add(task.join());
     } else {
       // if not in FJ pool, process documents sequentially
       for (String doc: documents) results.add(new MyForkJoinTask(doc).compute());
     }
     // merge the results of all documents
     Map<String, Integer> globalResult = new HashMap<>();
     for (Map<String, Integer> map: results) {
       for (Map.Entry<String, Integer> entry: map.entrySet()) {
         Integer n = globalResult.get(entry.getKey());
         if (n == null) globalResult.put(entry.getKey(), entry.getValue());
         else globalResult.put(entry.getKey(), n + entry.getValue());
       }
     }
     // set the merged word counts as this task's result
     this.setResult(globalResult);
   }
 }

We can see here that the execution strategy depends on the result of calling ForkJoinTask.inForkJoinPool(): if we determine that a fork/join pool is available, then a new task is forked for each document, and thus executed asynchronously. The execution is then synchronized by joining each forked task. Otherwise, the documents are processed sequentially.

In this example, our fork/join task is defined as follows:

 public class MyForkJoinTask extends RecursiveTask<Map<String, Integer>> {
   // remove spaces and non-word characters
   private static Pattern pattern = Pattern.compile("\\s|\\W");
   private final String document;
 
   public MyForkJoinTask(final String document) {
     this.document = document;
   }
 
   @Override
   // return a mapping of each word to its number of occurrences
   public Map<String, Integer> compute() {
     Map<String, Integer> result = new HashMap<>();
     // split the document into individual words
     String[] words = pattern.split(document);
     // count the number of occurrences of each word in the document
     for (String word: words) {
       Integer n = result.get(w);
       result.put(word, (n == null) ? 1 : n+1);
     }
     return result;
   }
 }

Related sample: the fork/join thread pool add-on of the JPPF distribution provides a more sophisticated example, taking full advantage of the fork/join features in Java 7. This example is packaged along with the downloadable “JPPF-x.y.z-jdk7-addons.zip” file.

Main Page > Customizing JPPF > Fork/Join thread pool

Support This Project Powered by MediaWiki