Dealing with jobs
From JPPF 3.3 Documentation
|
Main Page > Development guide > Dealing with jobs |
A job is a grouping of tasks with a common set of characteristics and a common SLA. These characteristics include:
- common data shared between tasks (data provider)
- A common Service Level Agreement (SLA) comprising:
- the job priority
- the maximum number of nodes a job can be executed on
- an optional execution policy describing which nodes it can run on
- a suspended indicator, that enables submitting a job in suspended state, waiting for an external command to resume or start its execution
- an execution start date and time
- an expiration (timeout) date and time
- an indicator specifying what the server should do when the application is dicsonnected
- a blocking/non-blocking indicator, specifying whether the job execution is synchronous or asynchronous from the application's point of view
- a listener to receive notifications of completed tasks when running in non-blocking mode
- the ability to receive notifications when the job execution starts and completes
- a persistence manager, to store the job state during execution, and recover its latest saved state on demand, in particular after an application crash
In the JPPF API, a job is represented by the class JPPFJob. In addition to accessors and mutators for the attributes we have seen above, JPPFJob provides methods to add tasks and a set of constructors that the make creation of jobs easier.
1 Job name and identifier
Each job has a unique identifier (UUID), that allows JPPF to manage and monitor the job while distinguishing it from other jobs. If this identifier is not explicitely specified via a dedicated constructor, JPPF will create one as a string of 32 hexadecimal characters. It is very important that all jobs across an entire JPPF grid have a unique distinct uuid, otherwise there is no guarantee that a job will be executed properly.
Additionally, a job can have a name which doesn't need to be unique, and which is used by the JPPF administration console for display purposes only. You may also use it in your application for logging and tracing. If not set by the user, the name will be by default equal to the uuid.
The class JPPFJob provides the following APIs for the job name and uuid:
public class JPPFJob implements Serializable, JPPFDistributedJob { // create a blocking job with the specified uuid public JPPFJob(final String jobUuid) // get this job's UUID public String getUuid() // get the user-defined display name for this job public String getName() // set the user-defined display name for this job public void setName(final String name) }
2 Creating a job
To create a job, the JPPFJob class offers a number of constructors, that can be split in 2 groups:
Constructors for blocking jobs
// creates a blocking job with no data provider and default SLA values public JPPFJob() // creates a blocking job with the specified data provider and default SLA values public JPPFJob(DataProvider dataProvider) // creates a blocking job with the specified data provider and SLA public JPPFJob(DataProvider dataProvider, JPPFJobSLA jobSLA)
Constructors for non-blocking jobs
// creates a blocking job with the specified execution results listener, // no data provider and default SLA values public JPPFJob(TaskResultListener resultsListener) // creates a blocking job with the specified execution results listener, // data provider and default SLA values public JPPFJob(DataProvider dataProvider, TaskResultListener resultsListener) // creates a blocking job with the specified execution results listener, // data provider and SLA public JPPFJob(DataProvider dataProvider, JPPFJobSLA jobSLA, TaskResultListener resultsListener)
Basically, the distinction for a non-blocking job is made via the presence of a TaskResultListener.
Finally, there is a more generic constructor that embraces everything the other constructors do:
// creates a job with the specified data provider, SLA, blocking indicator // and execution results listener public JPPFJob(DataProvider dataProvider, JPPFJobSLA jobSLA, boolean blocking, TaskResultListener resultsListener)
No matter which constructor is used, the job id is automatically generated as a pseudo-random string of 32 hexadecimal characters. It can then be obtained or changed with the job's getId() and setId(String) methods. This mechanism ensures that a job always has an id, and that developers always have the possibility to change it to a more readable one.
3 Adding tasks to a job
As we have seen in section 3.1 about the various forms of tasks that we can use in JPPF, JPPFJob provides two methods to add tasks to a job.
Addding a JPPFTask, annotated, Runnable or Callable task
public JPPFTask addTask(Object taskObject, Object...args) throws JPPFException
The taskObject parameter can be one of the following:
- an instance of JPPFTask
- an instance of a class with a non-static public method annotated with @JPPFRunnable
- a Class object representing a class that has a public static method or a constructor annotated with @JPPFRunnable
- an instance of a a Runnable class
- an instance of a Callable class
The args parameter is optional and is only used to pass the arguments of a method or constructor annotated with @JPPFRunnable. It is ignored for all other forms of tasks.
The return value is an instance of (a subclass of) JPPFTask, regardless the type of task that is added. In the case of an annotated, Runnable or Callable task, the original task object, wrapped by this JPPFTask, can be retrieved using the method JPPFTask.getTaskObject(), as in the following example:
JPPFTask task = job.addTask(new MyRunnableTask()); MyRunnableTask runnableTask = (MyRunnableTask) task.getTaskObject();
As JPPF is using reflection to properly wrap the task, an eventual exception may be thrown. It will then be wrapped into a JPPFException.
Adding a POJO task
public JPPFTask addTask(String method, Object taskObject, Object...args) throws JPPFException
The method parameter is the name of the method or of the constructor to execute as the entry point of the task. In the case of a constructor, it must be the same as the name of the class.
The taskObject parameter can be one of the following:
- an instance of the POJO class if the entry point is a non-static method
- a Class object representing a POJO class that has a public static method or a constructor as entry point
The args parameter is optional and is used to pass the arguments of a method or constructor defined as the task's entry point.
As for the other form of this method, the return value is a JPPFTask, and the original task object can be retrieved using the method JPPFTask.getTaskObject(), as in the following example:
JPPFTask task = job.addTask("myMethod", new MyPOJO(), 3, "string"); MyPOJO pojo = (MyPOJO) task.getTaskObject(); // we can also set a timeout on the wrapper task.setTimeoutSchedule(new JPPFSchedule(5000L));
As JPPF is using reflection to properly wrap the task, an eventual exception may be thrown. It will then be wrapped into a JPPFException.
4 Non-blocking jobs
Jobs can be submitted asynchronously from the application's perspective. This means that an asynchronous (or non-blocking) job will not block the application thread from which it is submitted. It also implies that we must have the means to obtain the execution results at a later time. To this effect, it is possible to register a listener with the job, to receive notifications when tasks have been completed and their results were returned to the application.
These listeners are represented by the interface TaskResultListener. It is defined as follows:
public interface TaskResultListener extends EventListener { void resultsReceived(TaskResultEvent event); }
We will thus be listening for events of type TaskResultEvent. Since JPPF 2.5, the public API for TaskResultEvent has has two notification methods, as illustrated below:
public class TaskResultEvent extends EventObject { // Get the list of tasks whose results have been received from the server public List<JPPFTask> getTaskList(); // Get the throwable eventually raised while receiving the results public Throwable getThrowable() }
We can see that each notification is wrapping either a list of JPPFTask instances, or a Throwable that may be raised while receiving the results, both being mutually exclusive, meaning that there is always one of the two methods that returns null.
Note that there is no guarantee the tasks are in the same order as when they were originally submitted. In fact the list is generally only a subset of the tasks that were submitted, and multiple notifications may be necessary to collect all the results. To restore the original ordering, JPPFTask.getPosition() should be used. Each task position is automatically calculated by JPPF at the time the job is submitted, as shown in the example below.
When the event's Throwable is non-null, this indicates that there was an issue with the connection to the driver. Using the built-in mechanism in the JPPF client, the client will (attempt to) reconnect and resubmit the job. This implies that you need to reset the state of the TaskResultListener, as if it were created anew. This is illustrated in the following example:
public class MyResultListener implements TaskResultListener { // Initial count of tasks in the job private int initialCount = 0; // Count of results not yet received private int pendingCount = 0; // Sorted map containing the resulting tasks, ordered by ascending position private Map<Integer, JPPFTask> resultMap = new TreeMap<Integer, JPPFTask>(); // Initialize this collector with a specified number of tasks public MyResultListener(int count) { initialCount = count; pendingCount = count; } // Notification that the results of a number of tasks have been received public void resultsReceived(TaskResultEvent event) { if (event.getThrowable() != null) { // Insert the tasks in the map, in ascending order based on their position for (JPPFTask task: event.getTaskList()) resultMap.put(task.getPosition(), task); // Update the number of pending tasks accordingly pendingCount -= event.getTaskList().size(); } else { // Reset the state of this listener as if for a new job resultMap.clear(); pendingCount = initialCount; } } // Get the list of results public List<JPPFTask> getResults() { List<JPPFTask> results = new ArrayList<JPPFTask>(); // collect all results received so far in ascending position order for (Integer n: resultMap.keySet()) results.add(resultMap.get(n)); return results; } public boolean isJobComplete() { return pendingCount <= 0; } }
We can then use our result listener as follows:
// create a job JPPFJob myJob = new JPPFJob(); // add 10 tasks for (int i=0; i<10; i++) myJob.add(new MyTask(i)); // create a task result listener MyResultListener myResultListener = new MyResultListener(10); // register the listener with the job myJob.setResultListener(myResultListener); // set the job as non-blocking myJob.setBlocking(false); // submit the job jppfClient.submit(job); while (!myResultListener.isJobComplete()) { // ... do something while the job is executing ... } // once the job is complete, process the results List<JPPFTask> results = myResultListener.getResults();
JPPF uses an existing implementation of TaskResultListener: the class JPPFResultCollector. It can be used directly, and its implementation is very similar to that of the code sample above,except that it provides a way to synchronize with the job execution through its waitForResults() methods, so you don't have to write your own synchronization code. Additionally, JPPFResultCollector is also always used for blocking jobs.
Main Page > Development guide > Dealing with jobs |