JPPF Executor Services

From JPPF Documentation version 2.x

Jump to: navigation, search

Contents

Main Page > Development guide > JPPF Executor Services


Basic usage

JPPF 2.2 introduced a new API, that serves as an ExecutorService facade to the JPPF client API. This API consists in a simple class: JPPFExecutorService, implementing the interface java.util.concurrent.ExecutorService. A JPPFExecutorService is obtained via its constructor, to which a JPPFClient must be passed:

  JPPFClient jppfClient = new JPPFClient();
  ExecutorService executor = new JPPFExecutorService(jppfClient);

The behavior of the resulting executor will depend largely on the configuration of the JPPFClient and on which ExecutorService method you invoke to submit tasks. In effect, each time you invoke an invokeAll(...), invokeAny(...), submit(...) or execute(...) method of the executor, a new JPPFJob will be created and sent for execution on the grid. This means that, if the executor method you invoke only takes a single task, then a job with only one task will be sent to the JPPF server.

Here is an example use:

 JPPFClient jppfClient = new JPPFClient();
 ExecutorService executor = new JPPFExecutorService(jppfClient);
 
 try
 {
   // submit a single task
   Runnable myTask = new MyRunnable(0);
   Future<?> future = executor.submit(myTask);
   // wait for the results
   future.get();
   // process the results
   ...
 
   // submit a list of tasks
   List<Runnable> myTaskList = new ArrayList<Runnable>;
   for (int i=0; i<10; i++) myTaskList.add(new MyRunnable(i));
   List<Future<?>> futureList = executor.invokeAll(myTaskList);
   // wait for the results
   for (Future<?> future: futureList) future.get();
   // process the results for the list of tasks
   ...
 }
 finally
 {
   // clean up after use
   executor.shutdown();
   jppfClient.close();
 }
 
 // !!! it is important that this task is Serializable !!!
 public static class MyRunnable implements Runnable, Serializable
 {
   private int id = 0;
 
   public MyRunnable(int id)
   {
     this.id = id;
   }
 
   public void run()
   {
     System.out.println("Running task id " + id);
   }
 }

Batch modes

The executor's behavior can be modified by using one of the batch modes of the JPPFExecutorService. By batch mode, we mean the ability to group tasks into batches, in several different ways. This enables tasks to be sent together, even if they are submitted individually, and allows them to benefit from the parallel features inherent to JPPF. This will also dramatically improve the throughput of individual tasks sent via an executor service.


Using a batch size: specifying a batch size via the method JPPFExecutorService.setBatchSize(int limit) causes the executor to only send tasks when at least that number of tasks have been submitted. When using this mode, you must be cautious as to how many tasks you send via the executor: if you send less than the batch limit, these tasks will remain pending and un-executed. Sometimes, the executor will send more than the specified number of tasks in the same batch: this will happen in the case where one of the JPPFExecutorService.invokeXXX() method is called with n tasks, such that current batch size + n > limit. The behavior is to send all tasks included in the invokeXXX() call together.

Here is an example:

 JPPFExecutorService executor = new JPPFExecutorService(jppfClient);
 // the executor will send jobs with at least 5 tasks each
 executor.setBatchSize(5);
 List<Future<?>> futures = new ArrayList<Future<?>>();
 // we submit 10 = 2 * 5 tasks, this will cause the client to send 2 jobs
 for (int i=0; i<10; i++) futures.add(executor.submit(new MyTask(i)));
 for (Future<?> f: futures) f.get();


Using a batch timeout: this is done via the method JPPFExecutorService.setBatchTimeout(long timeout) and causes the executor to send the tasks at regular intervals, specified as the timeout. The timeout value is expressed in milliseconds. Once the timeout has expired, the counter is reset to zero. If no task has been submitted between two timeout expirations, then nothing happens.

Example:

 JPPFExecutorService executor = new JPPFExecutorService(jppfClient);
 // the executor will send a job every second (if any task is submitted)
 executor.setBatchTimeout(1000L);
 List<Future<?>> futures = new ArrayList<Future<?>>();
 // we submit 5 tasks
 for (int i=0; i<5; i++) futures.add(executor.submit(new MyTask(i)));
 // we wait 1.5 second, during that time a job with 5 tasks will be submitted
 Thread.sleep(1500L);
 // we submit 6 more tasks, they will be sent in a different job
 for (int i=5; i<11; i++) futures.add(executor.submit(new MyTask(i)));
 // here we get the results for tasks sent in 2 different jobs!
 for (Future<?> f: futures) f.get();


Using both batch size and timeout: it is possible to use a combination of batch size and timeout. In this case, a job will be sent whenever the batch limit is reached or the timeout expires, whichever happens first. In any case, the timeout counter will be reset each time a job is sent. Using a timeout is also an efficient way to deal with the possible blocking behavior of the batch size mode. In this case, just use a timeout that is sufficently large for your needs.

Example:

 JPPFExecutorService executor = new JPPFExecutorService(jppfClient);
 executor.setBatchTimeout(1000L);
 executor.setBatchSize(5);
 List<Future<?>> futures = new ArrayList<Future<?>>();
 // we submit 3 tasks
 for (int i=0; i<3; i++) futures.add(executor.submit(new MyTask(i)));
 // we wait 1.5 second, during that time a job with 3 tasks will be submitted,
 // even though the batch size is set to 5
 Thread.sleep(1500L);
 for (Future<?> f: futures) f.get();
Main Page > Development guide > JPPF Executor Services