Jobs runtime behavior, recovery and failover
From JPPF 6.1 Documentation
|
Main Page > Development guide > Jobs behavior, recovery and failover |
1 Failover and job re-submission
When the connection with the JPPF server is broken, the client application becomes unable to receive any more results for the jobs it has submitted and which are still executing. When this happens, the default behavior for the JPPF client is to resubmit the job, so that it will either be sent ot another available server, or wait in the client's queue until the connection is re-established.
There can be some side effects to this behavior, which should be carefully accounted for when designing your tasks. In effect, the fact that a task result was not received by the client doesn't necessarily mean that the task was not executed on a node. This implies that a task may be executed more than once on the grid, as the client has no way of knowing this. In particular, if the task performs persistent operations, such as updating a database or writitng to a file system, this may lead to unexpected results whenever the task is executed again.
2 Job persistence and recovery
The entire state of a job can be persisted by associating a persistence manager to the job. A persistence manager is an implementation of the JobPersistence interface, defined as follows:
package org.jppf.client.persistence; public interface JobPersistence<K> { // Compute the key for the specified job. All calls to this method // with the same job instance should always return the same result K computeKey(JPPFJob job); // Get the keys of all jobs in the persistence store Collection<K> allKeys() throws JobPersistenceException; // Load a job from the persistence store given its key JPPFJob loadJob(K key) throws JobPersistenceException; // Store the specified tasks of the specified job with the specified key // The list of tasks may be used to only store the delta for better performance void storeJob(K key, JPPFJob job, List<Task<?>> tasks) throws JobPersistenceException; // Delete the job with the specified key from the persistence store void deleteJob(K key) throws JobPersistenceException; // Close this store and release any used resources void close(); }
As we can see, the persistence manager relies on keys that will allow it to uniquely identify jobs in the persistent store. The type of store is implementation-dependent, and can be any storage device or facility, for example a file system, a database, a cloud storage facility, a distributed cache, etc...
The JPPFJob class provides the following getter and setter for the persistence manager:
public class JPPFJob implements Serializable, JPPFDistributedJob { // Get the persistence manager public <T> JobPersistence<T> getPersistenceManager() // Set the persistence manager public <T> void setPersistenceManager(final JobPersistence<T> persistenceManager) }
JPPF provides a built-in, ready-to-use implementation of JobPersistence: the class DefaultFilePersistenceManager. This implementation stores the jobs on the file system. Each job, with its attributes and tasks, is saved in a single file, using Java serialization. The key associated with each job is the job's uuid (see JPPFJob.getUuid() method). It can be instantiated using one of the following constructors:
public class DefaultFilePersistenceManager implements JobPersistence<String> { // Initialize with the specified root path, using default file prefix and extension public DefaultFilePersistenceManager(File root) // Initialize with the specified root path, file prefix and extension public DefaultFilePersistenceManager(File root, String prefix, String ext) // Initialize with the specified root path, using default file prefix and extension public DefaultFilePersistenceManager(String root) // Initialize with the specified root path, file prefix and extension public DefaultFilePersistenceManager(String root, String prefix, String ext) }
Note that DefaultFilePersistenceManager will use the serializations scheme configured for the client.
Finally, this persistence manager is shown in action in the Job Recovery related sample.
3 Job lifecycle notifications: JobListener
It is possible to receive notifications of when a job is being started (i.e. sent to the server) and when its execution is complete (results have been received for all tasks), when a subset of its tasks is dispatched for execution and when a subset of its tasks has returned from execution. This is done by registering instances of the JobListener interface with the job, defined as follows:
// Listener interface for receiving job started and job ended event notifications public interface JobListener extends EventListener { // Called when a job is sent to the server, or its execution starts locally void jobStarted(JobEvent event); // Called when the execution of a job is complete void jobEnded(JobEvent event); // Called when a job, or a subset of its tasks, is sent to the server, // or to the local executor void jobDispatched(JobEvent event); // Called when the execution of a subset of a job is complete void jobReturned(JobEvent event); }
Please note that jobDispatched() and jobReturned() may be called in parallel by multiple threads, in the case where the JPPF client has multiple connections in its configuration. This happens if the client uses multiple connections to the same server, connections to multiple servers, or a mix of connections to remote servers and a local executor. You will need to synchronize any operations that is not thread-safe within these methods.
In a normal execution cycle, jobStarted() and jobEnded() will be called only once for each job, whereas jobDispatched() and jobReturned() may be called multiple times, depending on the number of available connections, the load-balancing configuration on the client side, and the job's client-side SLA.
Additionally, the built-in job failover mechanism may cause the jobStarted() and jobEnded() callbacks to be invoked multiple times, for instance in the case where the connection to the server is lost, causing the job to be re-submitted.
The notifications are sent as instances of JobEvent, which is defined as:
// Event emitted by a job when its execution starts or completes public class JobEvent extends EventObject { // Get the job source of this event public JPPFJob getJob() // Get the tasks that were dispatched or returned public List<Task<?>> getJobTasks() // Whether the current job duisâtch is sent to a remote driver public boolean isRemoteExecution() // Get the the connection used to send the job dispatch to a remote driver public JPPFClientConnection getConnection() }
Note that the getJobTasks() method is only useful for jobDispatched() and jobReturned() notifications. In all other cases, it will return null. The same applies to the methods isRemoteExecution() and getConnection().
Furthermore, getConnection() will also return null if isRemoteExecution() returns false, that is, if the job dispatch is executed in the client-local executor.
To add or remove listeners, use the related methods in JPPFJob:
public class JPPFJob implements Serializable, JPPFDistributedJob { // Add a listener public void addJobListener(JobListener listener) // Remove a listener public void removeJobListener(JobListener listener) }
A possible use of these listeners is to “intercept” a job before it is sent to the server, and adjust some of its attributes, such as the SLA specifications, which may vary depending on the time at which the job is started or on an application-dependent context. It can also be used to collect the results of non-blocking jobs in a fully asynchronous way.
If you do not need to implement all the methods of JobListener, your implementation may instead extend the class JobListenerAdapter, which provides an empty implementation of each method of the interface.
Multi-threaded usage note: if you intend to use the same JobListener instance from multiple threads, for instance with multiple concurrent non-blocking jobs, you will need to explicitely synchronize the code of the listener.
Here is a simple example of a thread-safe JobListener implementation:
// counts the total submitted and executed tasks for all jobs public class MyJobListener extends JobListenerAdapter { private int totalSubmittedTasks = 0; private int totalExecutedTasks = 0; @Override public synchronized void jobStarted(JobEvent event) { JPPFJob job = event.getJob(); // add the number of tasks in the job totalSubmittedTasks += job.getJobTasks().size(); System.out.println("job started: submitted = " + totalSubmittedTasks + ", executed = " + totalExecutedTasks); } @Override public synchronized void jobReturned(JobEvent event) { List<Task<?>> tasks = event.getJobTasks(); // add the number of task results received totalExecutedTasks += tasks.size(); System.out.println("job returned: submitted = " + totalSubmittedTasks + ", executed = " + totalExecutedTasks); } }
Main Page > Development guide > Jobs behavior, recovery and failover |