JPPF, java, parallel computing, distributed computing, grid computing, parallel, distributed, cluster, grid, cloud, open source, android, .net
JPPF, java, parallel computing, distributed computing, grid computing, parallel, distributed, cluster, grid, cloud, open source, android, .net
JPPF

The open source
grid computing
solution

 Home   About   Features   Download   Documentation   On Github   Forums 

Jobs runtime behavior, recovery and failover

From JPPF 3.3 Documentation

Jump to: navigation, search

Contents

Main Page > Development guide > Jobs runtime behavior, recovery and failover


1 Handling of execution results

As we have seen in the previous section, the results of the tasks executions are handed over by an instance of TaskResultListener. Here, it is important to understand that, while JPPF jobs are submitted to the grid as a whole, the results of the tasks execution may be received in multiple chunks, each chunk being made of one or more tasks holding the results.

We can thus say the TaskResultListener is an asynchronous receiver, whose job is to:

  • handle and store the execution results of the tasks
  • ensure the results are in the same order as the tasks initially submitted
  • handle errors occurring while receiving the results from the server
  • update the state of the job's execution
  • optionally handle the persistence of the job's state for later recovery


To store the execution results, the JPPFJob class holds an instance of JobResults, which is accessible via the getResults() method. JobResults provides the following API:

 public class JobResults {
   // Get the current number of received results
   public synchronized int size()
 
   // Determine whether the job received a result
   // for the task at the specified position
   public synchronized boolean hasResult(final int position)
 
   // Add the specified results to the job
   public synchronized void putResults(final List<JPPFTask> tasks)
 
   // Get all the tasks received as results for the job
   public synchronized Collection<JPPFTask> getAll()
 }

Since JPPFResultCollector holds a reference to the job, it will be able to update the execution results each time it receives a resultsReceived(TaskResultEvent) notification.

2 Failover and job re-submission

When the connection with the JPPF server is broken, the client application becomes unable to receive any more results for the jobs it has submitted and which are still executing. When this happens, the default behavior for the JPPF client is to resubmit the job, so that it will either be sent ot another available server, or wait in the client's queue until the connection is re-established.

If the job is using a TaskResultListener instance, such as JPPFResultCollector, which properly updates the job results via the associated JobResults object, then only the tasks for which no result was received will be sent again to the server. This fault-tolerance mechanism minimizes the impact of losing the connection to the server.

There can be some side effects to this behavior, which should be carefully accounted for when designing your tasks. In effect, the fact that a task result was not received by the client doesn't necessarily mean that the task was not executed on a node. This implies that a task may be executed more than once on the grid, as the client has no way of knowing this. In particular, if the task performs persistent operations, such as updating a database or writitng to a file system, this may lead to unexpected results whenever the task is executed again.

3 Job persistence and recovery

The entire state of a job can be persisted by associating a persistence manager to the job. A persistence manager is an implementation of the JobPersistence interface, defined as follows:

 package org.jppf.client.persistence;
 
 public interface JobPersistence<K> {
   // Compute the key for the specified job. All calls to this method
   // with the same job instance should always return the same result
   K computeKey(JPPFJob job);
 
   // Get the keys of all jobs in the persistence store
   Collection<K> allKeys() throws JobPersistenceException;
 
   // Load a job from the persistence store given its key
   JPPFJob loadJob(K key) throws JobPersistenceException;
 
   // Store the specified tasks of the specified job with the specified key
   // The list of tasks may be used to only store the delta for better performance
   void storeJob(K key, JPPFJob job, List<JPPFTask> tasks)
     throws JobPersistenceException;
 
   // Delete the job with the specified key from the persistence store
   void deleteJob(K key) throws JobPersistenceException;
 
   // Close this store and release any used resources
   void close();
 }

As we can see, the persistence manager relies on keys that will allow it to uniquely identify jobs in the persistent store. The type of store is implementation-dependent, and can be any storage device or facility, for example a file system, a database, a cloud storage facility, a distributed cache, etc...

The JPPFJob class provides the following getter and setter for the persistence manager:

 public class JPPFJob implements Serializable, JPPFDistributedJob {
   // Get the persistence manager
   public <T> JobPersistence<T> getPersistenceManager()
 
   // Set the persistence manager
   public <T> void setPersistenceManager(final JobPersistence<T> persistenceManager)
 }

JPPF provides a built-in, ready-to-use implementation of JobPersistence: the class DefaultFilePersistenceManager. This implementation stores the jobs on the file system. Each job, with its attributes and tasks, is saved in a single file, using Java serialization. The key associated with each job is the job's uuid (see JPPFJob.getUuid() method). It can be instantiated using one of the following constructors:

 public class DefaultFilePersistenceManager implements JobPersistence<String> {
   // Initialize with the specified root path, using default file prefix and extension
   public DefaultFilePersistenceManager(File root)
 
   // Initialize with the specified root path, file prefix and extension
   public DefaultFilePersistenceManager(File root, String prefix, String ext)
 
   // Initialize with the specified root path, using default file prefix and extension
   public DefaultFilePersistenceManager(String root)
 
   // Initialize with the specified root path, file prefix and extension
   public DefaultFilePersistenceManager(String root, String prefix, String ext)
 }

Note that DefaultFilePersistenceManager will use the serializations scheme configured for the client.

Finally, this persistence manager is shown in action in the Job Recovery related sample.

4 A complete TaskResultListener example

To put together everything ve have seen in this chapter, here is a TaskResultListener that handles receiving the results, updating the job state, and persisting the job state via a persistence manager. This is, in fact, a simplified version of the code for the class JPPFResultCollector, which you can also find in the JPPF SVN repository at this location.

 public class MyResultListener implements TaskResultListener {
   protected int count;
   protected int pendingCount = 0;
   protected final JPPFJob job;
 
   // Initialize this collector with the specified job
   public JPPFResultCollector(JPPFJob job) {
     this.job = job;
     count = job.getTasks().size() - job.getResults().size();
     pendingCount = count;
   }
 
   // Called to notify that the results of a number of tasks
   // have been received from the server
   public synchronized void resultsReceived(TaskResultEvent event) {
     if (event.getThrowable() == null) {
       List<JPPFTask> tasks = event.getTaskList();
       // update the job's results
       job.getResults().putResults(tasks);
       pendingCount -= tasks.size();
       // notify the threads waiting in waitForResults()
       notifyAll();
       // store the results if a persistence manager is present
       if (job.getPersistenceManager() != null) {
         JobPersistence pm = job.getPersistenceManager();
         try {
           pm.storeJob(pm.computeKey(job), job, tasks);
         } catch (JobPersistenceException e) {
           e.printStackTrace();
         }
       }
     } else {
       // reset this object's state to prepare for job resubmission
       count = job.getTasks().size() - job.getResults().size();
       pendingCount = count;
     }
   }
 
   // Wait until all results of a request have been collected
   public synchronized List<JPPFTask> waitForResults() {
     while (pendingCount > 0) {
       try {
         wait();
       } catch(InterruptedException e) {
         e.printStackTrace();
       }
     }
     return getResults();
   }
 
   // Get the list of final results
   public List<JPPFTask> getResults() {
     return new ArrayList<JPPFTask>(job.getResults().getAll());
   }
 }

5 Job start and completion notifications

It is possible to receive notifications of when a job is being started (i.e. sent to the server) and when its execution is complete (results have been received for all task), by adding instances of JobListener, defined as follows:

 // Listener interface for receiving job started and job ended event notifications
 public interface JobListener extends EventListener {
   // Called when a job is sent to the server, or its execution starts locally
   void jobStarted(JobEvent event);
 
   // Called when the execution of a job is complete
   void jobEnded(JobEvent event);
 
   // Called when a job, or a subset of its tasks, is sent to the server,
   // or to the local executor
   void jobDispatched(JobEvent event);
 
   // Called when the execution of a subset of a job is complete
   void jobReturned(JobEvent event);
 }

Please note that jobDispatched() and jobReturned() may be called in parallel by multiple threads, in the case where the JPPF client has multiple connections in its configuration. This happens if the client uses multiple connections to the same server, connections to multiple servers, or a mix of connections to remote servers and a local executor. You will need to synchronize any operations that is not thread-safe within these methods.

In a normal execution cycle, jobStarted() and jobEnded() will be called only once for each job, whereas jobDispatched() and jobReturned() may be called multiple times, depending on the number of available connections, the load-balancing configuration on the client side, and the job's client-side SLA.

Additionally, the built-in job failover mechanism may cause the jobStarted() and jobEnded() callbacks to be invoked multiple times, for instance in the case where the connection to the server is lost, causing the job to be re-submitted.

Note: it is recommended to only change the job SLA or metadata during the jobStarted() notification. Making changes in the other notifications will lead to unpredictable results and may cause the job to fail.

The notifications are sent as instances of JobEvent, which is defined as:

 // Event emitted by a job when its execution starts or completes
 public class JobEvent extends EventObject {
   // Get the job source of this event
   public JPPFJob getJob()
 
   // Get the tasks that were dispatched or returned
   public List<JPPFTask> getTasks()
 }

Note that the getTasks() method is only useful for jobDispatched() and jobReturned() notifications. In all other cases, it will return null.

To add or remove listeners, use the related methods in JPPFJob:

 public class JPPFJob implements Serializable, JPPFDistributedJob {
   // Add a listener
   public void addJobListener(JobListener listener)
 
   // Remove a listener
   public void removeJobListener(JobListener listener)
 }

A possible use of these listeners is to “intercept” a job before it is sent to the server, and adjust some of its attributes, such as the SLA specifications, which may vary depending on the time at which the job is started or on an application-dependent context.


Main Page > Development guide > Jobs runtime behavior, recovery and failover

JPPF Copyright © 2005-2020 JPPF.org Powered by MediaWiki