JPPF Executor Services

From JPPF Documentation version 3.x

Jump to: navigation, search

Contents

Main Page > Development guide > JPPF Executor Services


Basic usage

JPPF 2.2 introduced a new API, that serves as an ExecutorService facade to the JPPF client API. This API consists in a simple class: JPPFExecutorService, implementing the interface java.util.concurrent.ExecutorService. A JPPFExecutorService is obtained via its constructor, to which a JPPFClient must be passed:

  JPPFClient jppfClient = new JPPFClient();
  ExecutorService executor = new JPPFExecutorService(jppfClient);

The behavior of the resulting executor will depend largely on the configuration of the JPPFClient and on which ExecutorService method you invoke to submit tasks. In effect, each time you invoke an invokeAll(...), invokeAny(...), submit(...) or execute(...) method of the executor, a new JPPFJob will be created and sent for execution on the grid. This means that, if the executor method you invoke only takes a single task, then a job with only one task will be sent to the JPPF server.

Here is an example use:

 JPPFClient jppfClient = new JPPFClient();
 ExecutorService executor = new JPPFExecutorService(jppfClient);
 
 try {
   // submit a single task
   Runnable myTask = new MyRunnable(0);
   Future<?> future = executor.submit(myTask);
   // wait for the results
   future.get();
   // process the results
   ...
 
   // submit a list of tasks
   List<Runnable> myTaskList = new ArrayList<Runnable>;
   for (int i=0; i<10; i++) myTaskList.add(new MyRunnable(i));
   List<Future<?>> futureList = executor.invokeAll(myTaskList);
   // wait for the results
   for (Future<?> future: futureList) future.get();
   // process the results for the list of tasks
   ...
 } finally {
   // clean up after use
   executor.shutdown();
   jppfClient.close();
 }
 
 // !!! it is important that this task is Serializable !!!
 public static class MyRunnable implements Runnable, Serializable {
   private int id = 0;
 
   public MyRunnable(int id) {
     this.id = id;
   }
 
   public void run() {
     System.out.println("Running task id " + id);
   }
 }

Batch modes

The executor's behavior can be modified by using one of the batch modes of the JPPFExecutorService. By batch mode, we mean the ability to group tasks into batches, in several different ways. This enables tasks to be sent together, even if they are submitted individually, and allows them to benefit from the parallel features inherent to JPPF. This will also dramatically improve the throughput of individual tasks sent via an executor service.


Using a batch size: specifying a batch size via the method JPPFExecutorService.setBatchSize(int limit) causes the executor to only send tasks when at least that number of tasks have been submitted. When using this mode, you must be cautious as to how many tasks you send via the executor: if you send less than the batch limit, these tasks will remain pending and un-executed. Sometimes, the executor will send more than the specified number of tasks in the same batch: this will happen in the case where one of the JPPFExecutorService.invokeXXX() method is called with n tasks, such that current batch size + n > limit. The behavior is to send all tasks included in the invokeXXX() call together.

Here is an example:

 JPPFExecutorService executor = new JPPFExecutorService(jppfClient);
 // the executor will send jobs with at least 5 tasks each
 executor.setBatchSize(5);
 List<Future<?>> futures = new ArrayList<Future<?>>();
 // we submit 10 = 2 * 5 tasks, this will cause the client to send 2 jobs
 for (int i=0; i<10; i++) futures.add(executor.submit(new MyTask(i)));
 for (Future<?> f: futures) f.get();


Using a batch timeout: this is done via the method JPPFExecutorService.setBatchTimeout(long timeout) and causes the executor to send the tasks at regular intervals, specified as the timeout. The timeout value is expressed in milliseconds. Once the timeout has expired, the counter is reset to zero. If no task has been submitted between two timeout expirations, then nothing happens.

Example:

 JPPFExecutorService executor = new JPPFExecutorService(jppfClient);
 // the executor will send a job every second (if any task is submitted)
 executor.setBatchTimeout(1000L);
 List<Future<?>> futures = new ArrayList<Future<?>>();
 // we submit 5 tasks
 for (int i=0; i<5; i++) futures.add(executor.submit(new MyTask(i)));
 // we wait 1.5 second, during that time a job with 5 tasks will be submitted
 Thread.sleep(1500L);
 // we submit 6 more tasks, they will be sent in a different job
 for (int i=5; i<11; i++) futures.add(executor.submit(new MyTask(i)));
 // here we get the results for tasks sent in 2 different jobs!
 for (Future<?> f: futures) f.get();


Using both batch size and timeout: it is possible to use a combination of batch size and timeout. In this case, a job will be sent whenever the batch limit is reached or the timeout expires, whichever happens first. In any case, the timeout counter will be reset each time a job is sent. Using a timeout is also an efficient way to deal with the possible blocking behavior of the batch size mode. In this case, just use a timeout that is sufficently large for your needs.

Example:

 JPPFExecutorService executor = new JPPFExecutorService(jppfClient);
 executor.setBatchTimeout(1000L);
 executor.setBatchSize(5);
 List<Future<?>> futures = new ArrayList<Future<?>>();
 // we submit 3 tasks
 for (int i=0; i<3; i++) futures.add(executor.submit(new MyTask(i)));
 // we wait 1.5 second, during that time a job with 3 tasks will be submitted,
 // even though the batch size is set to 5
 Thread.sleep(1500L);
 for (Future<?> f: futures) f.get();

Configuring jobs and tasks

There is a limitation in the JPPFExecutorService, in that if you use only the ExecutorService interface which it extends, it does not provide a way to use JPPF-specific features, such as job SLA, metadata or persistence, or task timeout, onTimeout() and onCancel().

To overcome this limitation without breaking the semantics of ExecutorSevice, JPPFExecutorService provides a way to specify the configuration of the jobs and tasks that will be submitted subsequently.

This can be done via the ExecutorServiceConfiguration interface, which can be accessed from a JPPFExecutorService instance via the following accessor methods:

 // Get the configuration for this executor service
 public ExecutorServiceConfiguration getConfiguration();
 
 // Reset the configuration for this executor service to a blank state
 public ExecutorServiceConfiguration resetConfiguration();

ExecutorServiceConfiguration provides the following API:

 // Get the configuration to use for the jobs submitted by the executor service
 JobConfiguration getJobConfiguration();
 
 // Get the configuration to use for the tasks submitted by the executor service
 TaskConfiguration getTaskConfiguration();

Job configuration

The JobConfiguration interface is defined as follows:

 public interface JobConfiguration {
   // Get the service level agreement between the jobs and the server
   JobSLA getSLA();
 
   // Get the service level agreement between the jobs and the client
   JobClientSLA getClientSLA();
 
   // Get the user-defined metadata associated with the jobs
   JobMetadata getMetadata();
 
   // Get/set the persistence manager which enables saving and
   restoring the state of the jobs
   <T> JobPersistence<T> getPersistenceManager();
   <T> void setPersistenceManager(final JobPersistence<T> persistenceManager);
 
   // Get/set the job's data provider
   DataProvider getDataProvider();
   void setDataProvider(DataProvider dataProvider);
 
   // Add or remove a listener to/from the list of job listeners
   void addJobListener(JobListener listener);
   void removeJobListener(JobListener listener);
 }

As we can see, this provides a way to set the properties normally available to JPPFJob instances, even though the jobs submiited by a JPPFExecutorService are not visible. Any change to the JobConfiguration will apply to the next job that will be submitted by the executor and all subsequent jobs.

Here is an example usage:

 JPPFExecutorService executor = ...;
 // get the executor ocnfiguration
 ExecutorServiceConfiguration config = executor.getConfiguration();
 // get the job configuration
 JobConfiguration jobConfig = config.getJobConfiguration();
 // set all jobs to expire after 5 seconds
 jobConfig.getSLA().setJobExpirationSchedule(new JPPFSchedule(5000L));

Task configuration

The TaskConfiguration interface can be used to set JPPF-specific properties onto executor service tasks that do not extend JPPFTask. It is defined as follows:

 public interface TaskConfiguration {
   // Get the delegate for the onCancel() method
   JPPFTaskCallback getOnCancelCallback();
 
   // Set the delegate for the onCancel() method
   void setOnCancelCallback(final JPPFTaskCallback cancelCallback);
 
   // Get the delegate for the onTimeout() method
   JPPFTaskCallback getOnTimeoutCallback();
 
   // Set the delegate for the onTimeout() method
   void setOnTimeoutCallback(final JPPFTaskCallback timeoutCallback);
 
   // Get the task timeout schedule
   JPPFSchedule getTimeoutSchedule();
 
   // Set the task timeout schedule
   void setTimeoutSchedule(final JPPFSchedule timeoutSchedule);
 }

This API introduces the concept of a callback delegate, which is used in lieu of the “standard” JPPFTask callback methods, JPPFTask.onCancel() and JPPFTask.onTimeout(). This is done by providing an subclass of JPPFTaskCallback, which is defined as follows:

 public abstract class JPPFTaskCallback implements Runnable, Serializable {
   // Get the task this callback is associated with
   public final JPPFTask getTask();
 }

Here is a task configuration usage example:

 JPPFExecutorService executor = ...;
 // get the executor configuration
 ExecutorServiceConfiguration config = executor.getConfiguration();
 // get the task configuration
 TaskConfiguration taskConfig = config.getTaskConfiguration();
 // set the task to timeout after 5 seconds
 taskConfig.setTimeoutSchedule(new JPPFSchedule(5000L));
 // set the onTimeout() callback
 taskConfig.setOnTimeoutCallback(new MyTaskCallback());
 
 // A callback that sets a timeout message as the task result
 static class MyTaskCallback extends JPPFTaskCallback {
   @Override
   public void run() {
     getTask().setResult("this task has timed out");
   }
 }

JPPFCompletionService

The JDK package java.util.concurrent provides the interface CompletionService, which represents “a service that decouples the production of new asynchronous tasks from the consumption of the results of completed tasks”. The JDK also provides a concrete implementation with the class ExecutorCompletionService. Unfortunately, this class does not work with a JPPFExecutorService, as it was not designed with distributed execution in mind.

As a convenience, the JPPF API provides a specific implementation of CompletionService with the class JPPFCompletionService, which respects the contract and semantics defined by the CompletionService interface and which can be used as follows:

 JPPFExecutorService executor = ...;
 JPPFCompletionService<String> completion service =
   new JPPFCompletionService<String>(executor);
 MyCallable<String> task = new MyCallable<String>();
 Future<String> future = completionService.submit(task);
 
 // ... later on ...
 // block until a result is available
 future = completionService.take();
 String result = future.get();
Main Page > Development guide > JPPF Executor Services

Support This Project Powered by MediaWiki