JPPF, java, parallel computing, distributed computing, grid computing, parallel, distributed, cluster, grid, cloud, open source, android, .net
JPPF, java, parallel computing, distributed computing, grid computing, parallel, distributed, cluster, grid, cloud, open source, android, .net
JPPF

The open source
grid computing
solution

 Home   About   Features   Download   Documentation   On Github   Forums 

Dealing with jobs

From JPPF 3.3 Documentation

Jump to: navigation, search

Contents

Main Page > Development guide > Dealing with jobs


A job is a grouping of tasks with a common set of characteristics and a common SLA. These characteristics include:

  • common data shared between tasks (data provider)
  • A common Service Level Agreement (SLA) comprising:
    • the job priority
    • the maximum number of nodes a job can be executed on
    • an optional execution policy describing which nodes it can run on
    • a suspended indicator, that enables submitting a job in suspended state, waiting for an external command to resume or start its execution
    • an execution start date and time
    • an expiration (timeout) date and time
    • an indicator specifying what the server should do when the application is dicsonnected
  • a blocking/non-blocking indicator, specifying whether the job execution is synchronous or asynchronous from the application's point of view
  • a listener to receive notifications of completed tasks when running in non-blocking mode
  • the ability to receive notifications when the job execution starts and completes
  • a persistence manager, to store the job state during execution, and recover its latest saved state on demand, in particular after an application crash

In the JPPF API, a job is represented by the class JPPFJob. In addition to accessors and mutators for the attributes we have seen above, JPPFJob provides methods to add tasks and a set of constructors that the make creation of jobs easier.

1 Job name and identifier

Each job has a unique identifier (UUID), that allows JPPF to manage and monitor the job while distinguishing it from other jobs. If this identifier is not explicitely specified via a dedicated constructor, JPPF will create one as a string of 32 hexadecimal characters. It is very important that all jobs across an entire JPPF grid have a unique distinct uuid, otherwise there is no guarantee that a job will be executed properly.

Additionally, a job can have a name which doesn't need to be unique, and which is used by the JPPF administration console for display purposes only. You may also use it in your application for logging and tracing. If not set by the user, the name will be by default equal to the uuid.

The class JPPFJob provides the following APIs for the job name and uuid:

 public class JPPFJob implements Serializable, JPPFDistributedJob {
   // create a blocking job with the specified uuid
   public JPPFJob(final String jobUuid)
 
   // get this job's UUID
   public String getUuid()
 
   // get the user-defined display name for this job
   public String getName()
 
   // set the user-defined display name for this job
   public void setName(final String name)
 }

2 Creating a job

To create a job, the JPPFJob class offers a number of constructors, that can be split in 2 groups:

Constructors for blocking jobs

 // creates a blocking job with no data provider and default SLA values
 public JPPFJob()
 
 // creates a blocking job with the specified data provider and default SLA values
 public JPPFJob(DataProvider dataProvider)
 
 // creates a blocking job with the specified data provider and SLA
 public JPPFJob(DataProvider dataProvider, JPPFJobSLA jobSLA)

Constructors for non-blocking jobs

 // creates a blocking job with the specified execution results listener,
 // no data provider and default SLA values
 public JPPFJob(TaskResultListener resultsListener)
 
 // creates a blocking job with the specified execution results listener,
 // data provider and default SLA values
 public JPPFJob(DataProvider dataProvider, TaskResultListener resultsListener)
 
 // creates a blocking job with the specified execution results listener,
 // data provider and SLA
 public JPPFJob(DataProvider dataProvider, JPPFJobSLA jobSLA,
                TaskResultListener resultsListener)

Basically, the distinction for a non-blocking job is made via the presence of a TaskResultListener.

Finally, there is a more generic constructor that embraces everything the other constructors do:

 // creates a job with the specified data provider, SLA, blocking indicator
 // and execution results listener
 public JPPFJob(DataProvider dataProvider, JPPFJobSLA jobSLA, boolean blocking,
                TaskResultListener resultsListener)

No matter which constructor is used, the job id is automatically generated as a pseudo-random string of 32 hexadecimal characters. It can then be obtained or changed with the job's getId() and setId(String) methods. This mechanism ensures that a job always has an id, and that developers always have the possibility to change it to a more readable one.

3 Adding tasks to a job

As we have seen in section 3.1 about the various forms of tasks that we can use in JPPF, JPPFJob provides two methods to add tasks to a job.

Addding a JPPFTask, annotated, Runnable or Callable task

 public JPPFTask addTask(Object taskObject, Object...args) throws JPPFException

The taskObject parameter can be one of the following:

  • an instance of JPPFTask
  • an instance of a class with a non-static public method annotated with @JPPFRunnable
  • a Class object representing a class that has a public static method or a constructor annotated with @JPPFRunnable
  • an instance of a a Runnable class
  • an instance of a Callable class

The args parameter is optional and is only used to pass the arguments of a method or constructor annotated with @JPPFRunnable. It is ignored for all other forms of tasks.

The return value is an instance of (a subclass of) JPPFTask, regardless the type of task that is added. In the case of an annotated, Runnable or Callable task, the original task object, wrapped by this JPPFTask, can be retrieved using the method JPPFTask.getTaskObject(), as in the following example:

 JPPFTask task = job.addTask(new MyRunnableTask());
 MyRunnableTask runnableTask = (MyRunnableTask) task.getTaskObject();

As JPPF is using reflection to properly wrap the task, an eventual exception may be thrown. It will then be wrapped into a JPPFException.

Adding a POJO task

 public JPPFTask addTask(String method, Object taskObject, Object...args)
   throws JPPFException

The method parameter is the name of the method or of the constructor to execute as the entry point of the task. In the case of a constructor, it must be the same as the name of the class.

The taskObject parameter can be one of the following:

  • an instance of the POJO class if the entry point is a non-static method
  • a Class object representing a POJO class that has a public static method or a constructor as entry point

The args parameter is optional and is used to pass the arguments of a method or constructor defined as the task's entry point.

As for the other form of this method, the return value is a JPPFTask, and the original task object can be retrieved using the method JPPFTask.getTaskObject(), as in the following example:

 JPPFTask task = job.addTask("myMethod", new MyPOJO(), 3, "string");
 MyPOJO pojo = (MyPOJO) task.getTaskObject();
 // we can also set a timeout on the wrapper
 task.setTimeoutSchedule(new JPPFSchedule(5000L));

As JPPF is using reflection to properly wrap the task, an eventual exception may be thrown. It will then be wrapped into a JPPFException.

4 Non-blocking jobs

Jobs can be submitted asynchronously from the application's perspective. This means that an asynchronous (or non-blocking) job will not block the application thread from which it is submitted. It also implies that we must have the means to obtain the execution results at a later time. To this effect, it is possible to register a listener with the job, to receive notifications when tasks have been completed and their results were returned to the application.

These listeners are represented by the interface TaskResultListener. It is defined as follows:

 public interface TaskResultListener extends EventListener {
   void resultsReceived(TaskResultEvent event);
 }

We will thus be listening for events of type TaskResultEvent. Since JPPF 2.5, the public API for TaskResultEvent has has two notification methods, as illustrated below:

 public class TaskResultEvent extends EventObject {
   // Get the list of tasks whose results have been received from the server
   public List<JPPFTask> getTaskList();
 
   // Get the throwable eventually raised while receiving the results
   public Throwable getThrowable()
 }

We can see that each notification is wrapping either a list of JPPFTask instances, or a Throwable that may be raised while receiving the results, both being mutually exclusive, meaning that there is always one of the two methods that returns null.

Note that there is no guarantee the tasks are in the same order as when they were originally submitted. In fact the list is generally only a subset of the tasks that were submitted, and multiple notifications may be necessary to collect all the results. To restore the original ordering, JPPFTask.getPosition() should be used. Each task position is automatically calculated by JPPF at the time the job is submitted, as shown in the example below.

When the event's Throwable is non-null, this indicates that there was an issue with the connection to the driver. Using the built-in mechanism in the JPPF client, the client will (attempt to) reconnect and resubmit the job. This implies that you need to reset the state of the TaskResultListener, as if it were created anew. This is illustrated in the following example:

 public class MyResultListener implements TaskResultListener {
   // Initial count of tasks in the job
   private int initialCount = 0;
 
   // Count of results not yet received
   private int pendingCount = 0;
 
   // Sorted map containing the resulting tasks, ordered by ascending position
   private Map<Integer, JPPFTask> resultMap = new TreeMap<Integer, JPPFTask>();
 
   // Initialize this collector with a specified number of tasks
   public MyResultListener(int count) {
     initialCount = count;
     pendingCount = count;
   }
 
   // Notification that the results of a number of tasks have been received
   public void resultsReceived(TaskResultEvent event) {
     if (event.getThrowable() != null) {
       // Insert the tasks in the map, in ascending order based on their position
       for (JPPFTask task: event.getTaskList()) resultMap.put(task.getPosition(), task);
       // Update the number of pending tasks accordingly
       pendingCount -= event.getTaskList().size();
     } else {
       // Reset the state of this listener as if for a new job
       resultMap.clear();
       pendingCount = initialCount;
     }
   }
 
   // Get the list of results
   public List<JPPFTask> getResults() {
     List<JPPFTask> results = new ArrayList<JPPFTask>();
     // collect all results received so far in ascending position order
     for (Integer n: resultMap.keySet()) results.add(resultMap.get(n));
     return results;
   }
 
   public boolean isJobComplete() {
     return pendingCount <= 0;
   }
 }

We can then use our result listener as follows:

 // create a job
 JPPFJob myJob  = new JPPFJob();
 // add 10 tasks
 for (int i=0; i<10; i++) myJob.add(new MyTask(i));
 // create a task result listener
 MyResultListener myResultListener = new MyResultListener(10);
 // register the listener with the job
 myJob.setResultListener(myResultListener);
 // set the job as non-blocking
 myJob.setBlocking(false);
 
 // submit the job
 jppfClient.submit(job);
 
 while (!myResultListener.isJobComplete()) {
   // ... do something while the job is executing ...
 }
 
 // once the job is complete, process the results
 List<JPPFTask> results = myResultListener.getResults();

JPPF uses an existing implementation of TaskResultListener: the class JPPFResultCollector. It can be used directly, and its implementation is very similar to that of the code sample above,except that it provides a way to synchronize with the job execution through its waitForResults() methods, so you don't have to write your own synchronization code. Additionally, JPPFResultCollector is also always used for blocking jobs.


Main Page > Development guide > Dealing with jobs

JPPF Copyright © 2005-2020 JPPF.org Powered by MediaWiki