JPPF, java, parallel computing, distributed computing, grid computing, parallel, distributed, cluster, grid, cloud, open source, android, .net
JPPF, java, parallel computing, distributed computing, grid computing, parallel, distributed, cluster, grid, cloud, open source, android, .net
JPPF

The open source
grid computing
solution

 Home   About   Features   Download   Documentation   On Github   Forums 

Jobs persistence in the driver

From JPPF 6.3 Documentation

(Difference between revisions)
Jump to: navigation, search
 

Latest revision as of 07:08, 30 June 2019

Contents

Main Page > Development guide > Jobs persistence in the driver


As of JPPF 6.0, drivers can persist the jobs they receive, along with their results, in a permanenert store. This adds major new capabilities to the drivers, in particular:

  • automatic recovery of the jobs in case of driver crashes and resubmission to completion after driver restart, without any external intervention
  • the ability to submit jobs from a JPPF client, then check their completion and retrieve their results from a separate client
  • the ability to retrieve jobs on demand from a persistene store and resubmit them to completion


To achieve this, JPPF provides the following components:

  • a pluggable persistence facility in the driver, with ready to use, built-in implementations
  • extensions to the job SLA specifying if and how the persistence of each job should be handled
  • a client-side facility to manage, retrieve and monitor persisted jobs


These items are detailed in the next sections.

The persistence facility relies on an implementation of the JobPersistence interface, whose role is to store, load, delete and query the job elements that are persisted. A job element can be one of 4 types:

  • a job header element, which includes the job's uuid, name, SLA, and metadata, but also the information required for job routing and scheduling
  • a data provider element, if any is present
  • an element for each task, before its execution
  • an element for each executed task returned by the nodes, also known as task execution result

Note: the types of job elements are represented by the PersistenceObjectType enum.

Each job element is stored as binary data which represents a serialized object graph. Where and how it is stored depends solely on the JobPersistence implementation. It can be on a file system, a relational database, cloud storage facility, etc.

[edit] 1 Job persistence specification

The persistence of a job is specified via a PersistenceSpec object, defined as follows:

public class PersistenceSpec implements Serializable {
  // Determine whether the job is persisted in the driver. Defaults to false
  public boolean isPersistent()
  // Specify whether the job is persisted in the driver
  public PersistenceSpec setPersistent(boolean persistent)

  // Determine whether the driver should automatically execute the persisted job
  // after a restart. Defaults to false
  public boolean isAutoExecuteOnRestart()
  // Specify whether the driver should automatically execute the job after a restart
  public PersistenceSpec setAutoExecuteOnRestart(boolean autoExecuteOnRestart)

  // Determine whether the persisted job should be deleted from the store upon
  // completion. Defaults to true
  public boolean isDeleteOnCompletion()
  // Determine whether the job should be deleted from the store upon completion
  public PersistenceSpec setDeleteOnCompletion(boolean deleteOnCompletion)
}

As we can see, instances of this class manage three boolean flags which specify whether the driver will persist the job and what it will do with a persisted job when it completes or when a driver restart occurs.

The "persistent" flag determines whether the job is persisted at all. By default, it is set to false, which means that a job is not persisted by default. When set to true, the driver will persist the job elements it receives from the client, and later on the execution results received from the nodes. When a job is not configured as persistent, it will be processed without any overhead. If this flag is set to false, none of the other flags has any effect.

The "delete on completion" flag determines whether the job should be removed from the store when it completes. This addresses situations where a client remains connected to the driver and awaits the job results for further processing, while you still want the driver to be able to recover the job in case of a crash followed by a restart. Since the client receives the results, they no longer need to be kept in the permanent store. This flag is set to true by default.

The "auto execute on restart" flag tells a driver that, upon restart, it should automatically resubmit the job's unexecuted tasks until the job completes. At startup, the driver will retrieve all the jobs with this flag set to true that have not yet completed, and resubmit them. This flag is set to false by default.

Note: when the "auto execute on restart" flag is true, the nodes to which unexecuted tasks are dispatched still need access to all the required classes for these tasks. The easiest way to achieve this is to add the corresponding jar files and class folders to the driver's classpath and let the nodes' distributed class loader find them there.

As an example, here is how we would configure a persistent job that should be automatically executed upon driver restart and deleted from the store upon completion:

JPPFJob job = new JPPFJob();
job.getSLA().getPersistenceSpec()
  .setPersistent(true)
  .setAutoExecuteOnRestart(true)
  .setDeleteOnCompletion(true);

Note: if your intended usage scenario is to submit a job, close the client application, then query the job results later on, you MUST set the job's cancelUponClientDisconnect flag to false. This will prevent the job from being cancelled when disconnecting the client.

[edit] 2 Managing persisted jobs

In order to manage and query the jobs in a persistence store, JPPF provides a client-side facility, based on JMX management APIs, which connects to a remote driver and accesses its persistence store. This facility is implemented in the JPPFDriverJobPersistence class, defined as follows:

public class JPPFDriverJobPersistence {
  // Initialize this persisted job manager with the specified driver JMX connection
  public JPPFDriverJobPersistence(JMXDriverConnectionWrapper jmx)

  // List the persisted jobs that match the provided job selector
  public List<String> listJobs(JobSelector selector) throws Exception

  // Delete the persisted job with the sdpecified uuid.
  // This method is equivalent to deleteJobs(new JobUuidSelector(uuid))
  public boolean deleteJob(String uuid) throws Exception

  // Delete the persisted jobs that match the provided job selector
  public List<String> deleteJobs(JobSelector selector) throws Exception

  // Retrieve and rebuild the persisted job with the specified uuid
  // This method is equivalent to retrieveJob(uuid, false)
  public JPPFJob retrieveJob(String uuid) throws Exception

  // Get the description of the job with the specified uuid.
  // This method retrieves the job's uuid, name, number of tasks, SLA and metadata
  public JPPFDistributedJob getJobDescription(String uuid) throws Exception

  // Determines whether the job has completed and all execution results are available
  public boolean isJobComplete(String uuid) throws Exception
}

Note that the constructor for this class takes a JMXDriverConnectionWrapper, so that it can establish a JMX connection to the remote driver. A JMXDriverConnectionWrapper can be obtained in 2 ways:

- by creating it directly, for example:

JMXDriverConnectionWrapper jmx =
  new JMXDriverConnectionWrapper("my.driver.com", 11198, false);
jmx.connectAndWait(3000L);
JPPFDriverJobPersistence jobPersistence = new JPPFDriverJobPersistence(jmx);

- or by getting it from a JPPFClient, for example:

JPPFClient client = new JPPFClient();
JMXDriverConnectionWrapper jmx =
  client.awaitWorkingConnectionPool().awaitWorkingJMXConnection();
JPPFDriverJobPersistence jobPersistence = new JPPFDriverJobPersistence(jmx);

[edit] 2.1 Listing the persisted jobs

This operation can be achieved with the listJobs() method of JPPFDriverJobPersistence. This method takes a job selector and returns a list of the uuids of the persisted jobs that match the selector. If no persisted job matches, then the returned list is empty. The uuids in the list can then be reused in other methods of JPPFDriverJobPersistence.

Example usage:

JPPFClient client = new JPPFClient();
// create the job persistence manager instance
JPPFDriverJobPersistence jobPersistence = new JPPFDriverJobPersistence(
  client.awaitWorkingConnectionPool().awaitWorkingJMXConnection());

// list the uuids of all currently persisted jobs
List<String> uuids = jobPersistence.listJobs(JobSelector.ALL_JOBS);
for (String uuid: uuids) {
  // do something whith each job uuid
}

[edit] 2.2 Retrieving a job

To retrieve a job from a persistence store, you use the retrieveJob() method of JPPFDriverJobPersistence. This method takes a job uuid and returns a JPPFJob that can be used as if it had been created directly on the client side. For instance, you can resubmit it if it still has unexecuted tasks, or process its results if it has completed.

Here is an example usage:

JPPFClient client = new JPPFClient();
JPPFDriverJobPersistence jobPersistence = ...;

// list the uuids of all currently persisted jobs
List<String> uuids = jobPersistence.listJobs(JobSelector.ALL_JOBS);

// retrieve all the persisted jobs and resubmit those that haven't completed yet
LinkedList<JPPFJob> jobs = new LinkedList<>();
for (String uuid: uuids) {
  JPPFJob job = jobPersistence.retrieveJob(uuid);
  jobs.add(job);
  // delete the job from the store, it will be stored again if resubmitted
  jobPersistence.deleteJob(uuid);
  // if the job has unexecuted tasks, resubmit it
  if (job.unexecutedTaskCount() > 0) {
    jobs.addLast(job);
    client.submitAsync(job);
  } else {
    jobs.addFirst(job);
  }
}

// process the results of all jobs
for (JPPFJob job: jobs) {
  // if the job has already completed, this method returns immediately
  List<Task<?>> results = job.awaitResults();
  // ... process the results ...
}

[edit] 2.3 Deleting one or more jobs

To delete one or more jobs from the persistence store, you can use either:

  • the deleteJob(String) method to delete a single job at a time. This method takes a job uuid and returns true if a job with this uuid was found and effectively deleted, false otherwise.
  • or the deleteJobs(JobSelector) method. This method takes a JobSelector and returns a list of the uuids of the jobs that were found and effectively deleted.


The following example deletes all persisted jobs using deleteJob(String) in a loop:

JPPFDriverJobPersistence jobPersistence = ...;
// list all currently persisted jobs
List<String> uuids = jobPersistence.listJobs(JobSelector.ALL_JOBS);
// delete all the persisted jobs
for (String uuid: uuids) {
  // delete the job from the store and check success
  if (jobPersistence.deleteJob(uuid)) {
    System.out.println("sucessfully deleted job with uuid = " + uuid);
  }
}

This example performs exactly the same thing using deleteJobs(JobSelector) without a loop:

JPPFDriverJobPersistence jobPersistence = ...;
// delete all persisted jobs
List<String> deletedUuids = jobPersistence.deleteJobs(JobSelector.ALL_JOBS);
System.out.println("sucessfully deleted jobs with uuids = " + deletedUuids);

[edit] 2.4 Getting information on the jobs

JPPFDriverJobPersistence provides 2 methods to obtain information on a persisted job:

  • isJobComlete() determines whether a job has completed its execution. It takes a job uuid as input and returns true if the job has completed, false otherwise
  • getJobDescription() provides detailed information on the job name, number of tasks, SLA and metadata. It takes a job uuid and returns an instance of JPPFDistributedJob which encapsulates the details.

The following example illustrates a possible usage of these methods:

JPPFDriverJobPersistence jobPersistence = ...;

// retrieve all the jobs submitted by "john.doe"
String script = "'john.doe'.equals(jppfJob.getMetadata().getParameter('submitter'));";
JobSelector selector = new ScriptedJobSelector("javascript", script);
List<String> uuids = jobPersistence.listJobs(selector);
for (String uuid: uuids) {
  if (jobPersistence.isJobComplete(uuid)) {
    // ... process completed job ...
  } else {
    JPPFDistributedJob jobDesc = jobPersistence.getJobDescription(uuid);
    // ... do something with the job description ...
  }
}

[edit] 2.5 Configuring jobs persistence in the driver

A job persistence facility is essentially a pluggable implementation of the JobPersistence interface. A driver supports a single persistence implementation at a time, configured thorugh the "jppf.job.persistence" configuration property:

jppf.job.persistence = <implementation class name> param1 ... paramN

where:

  • "implementation class name" is the fully qualified class name of the JobPersistence implementation
  • "param1 ... paramN" are optional string parameters used by the persistence implementation


For example, to configure the default file persistence with a root directory named "persistence" in the driver's working directory, we would configure the following:

jppf.job.persistence = org.jppf.job.persistence.impl.DefaultFilePersistence persistence

If no job persistence is configured, the driver will default to the default file persistence with a root directory "persistence", exactly as in the example above.

[edit] 3 Built-in persistence implementations

[edit] 3.1 Default file persistence

The default file persistence is a file-based persistent store for jobs. The corresponding implementation class is DefaultFilePersistence.

The store's structure is made of a root directory, under which there is one directory per job, named after the job's uuid. Each job directory contains:

  • a file named "header.data" for the job header
  • a file named "data_provider.data" for the job's data_provider
  • a file named "task-i.data" for each task i of the job, where i represents the position of the task in the job
  • a file "result-i.data" for each task result i received from a node, where i represents the position of the task in the job


For example, if we define the file persistence with a root directory named "persistence" in the driver's working directory:

jppf.job.persistence = org.jppf.job.persistence.impl.DefaultFilePersistence persistence

Let's say we submitted a job with two tasks which ran to completion, with a uuid of "my_job_uuid". The persistence store structure would then look like this:

JPPF-6.0-driver
|_persistence
  |_my_job_uuid
    |_header.data
    |_data_provider.data
    |_task-0.data
    |_task-1.data
    |_result-0.data
    |_result-1.data

Important note: when a job element (header, data provider, task or result) is stored, its data is first put into a temporary file with the ".tmp" extension. When the data is fully stored in the file, and only then, the file is renamed with a ".data" extension. This avoids ending up with incomplete or corrupted files that would prevent JPPF from restoring the job.

The built-in file persistence is configured as:

jppf.job.persistence = org.jppf.job.persistence.impl.DefaultFilePersistence <root_dir>

where root_dir can be either an absolute file path, or a path relative to the driver's working directory. If it is omitted, it defaults to "persistence".

Tip: you may also use system properties substitutions in your configuration to specify common paths. For instance, to point the file persistence to a root directory named "jppf_jobs" in the current user's home directory, we could write:

jppf.job.persistence = org.jppf.job.persistence.impl.DefaultFilePersistence ${sys.user.home}/jppf_jobs

[edit] 3.2 Default database persistence

The default database persistence is a job persistence implementation which stores jobs in a single database table. Its corresponding implementation class is DefaultDatabasePersistence, and it is configured as follows:

jppf.job.persistence = org.jppf.job.persistence.impl.DefaultDatabasePersistence \ 
      <table_name> <datasource_name>

where:

  • "table_name" is the name of the table in which jobs are stored
  • "datasource_name" is the name of a datasource configured via the database services facitlity


If both table_name and datasource_name are omitted, they will default to "JOB_PERSISTENCE" and "job_persistence", respectively. If datasource_name is omitted, it will default to "job_persistence".

The following is an example configuration with a table named "JPPF_JOBS" and a datasource named "jobDS":

# persistence definition
jppf.job.persistence = org.jppf.job.persistence.impl.DefaultDatabasePersistence \
    JPPF_JOBS jobDS

# datsource definition
jppf.datasource.jobs.name = jobDS
jppf.datasource.jobs.driverClassName = com.mysql.jdbc.Driver
jppf.datasource.jobs.jdbcUrl = jdbc:mysql://localhost:3306/testjppf
jppf.datasource.jobs.username = testjppf
jppf.datasource.jobs.password = testjppf
jppf.datasource.jobs.minimumIdle = 5
jppf.datasource.jobs.maximumPoolSize = 10
jppf.datasource.jobs.connectionTimeout = 30000
jppf.datasource.jobs.idleTimeout = 600000

The table structure is as follows:

CREATE TABLE <table_name> (
   UUID varchar(250) NOT NULL,
   TYPE varchar(20) NOT NULL,
   POSITION int NOT NULL,
   CONTENT blob NOT NULL,
   PRIMARY KEY (UUID, TYPE, POSITION)
 );

Where:

  • the UUID column represents the job uuid
  • the TYPE column represents the type of object, taken from the PersistenceObjectType enum
  • the POSITION column represents the object's position in the job, if TYPE is 'task' or 'task_result', otherwise -1
  • the CONTENT column represents the serialized job element

Very important: the table definition should be adjusted depending on the database you are using. For instance, in MySQL the BLOB type has a size limit of 64 KB, thus storing job elements larger than this size will always fail. In this use case, the MEDIUMBLOB or LONGBLOB type should be used instead.

If the table does not exist, JPPF will attempt to create it. If this fails for any reason, for instance if the database user does not have sufficient privileges, then persistence will be disabled.

It is possible to specify the path to the file that contains the DDL statement(s) to create the table, like so:

# path to the file or resource containng the DDL statements to create the table
jppf.job.persistence.ddl.location = <ddl_path>

Here, <ddl_path> is the path to either a file in the file system or a resource in the class path. The file system is always looked up first and if no file is found, then JPPF looks up in the driver's classpath. The default value for this property is "org/jppf/job/persistence/impl/job_persistence.sql", which points to a file in the classpath which can be found in the jppf-common-x.y.z.jar file.

[edit] 3.3 Asynchronous persistence (write-behind)

Asynchronous persistence is an asynchronous wrapper for any other job persistence implementation. It delegates the persistence operations to this other implementation, and executes the delegated operations asynchronously via a pool of threads. The corresponding implementation class is AsynchronousPersistence.

The methods of JobPersistence that do not return a result (void return type) are non-blocking and return immediately. All other methods will block until the delegated operation is executed and its result is available. In particular, all operations that store job data are executed some time in the future, which makes this implementation an effective "write-behind" facitlity.

Asynchronous persistence can be configured as follows:

# shorten the configuration value for clarity
wrapper = org.jppf.job.persistence.impl.AsynchronousPersistence
# asynchronous persistence with a specified thread pool size
jppf.job.persistence = ${wrapper} <pool_size> <actual_persistence> <param1> ... <paramN>

Where:

  • "<pool_size>" is the size of the pool of threads used to execute the delegated operations. It can be omitted, in which case it defaults to 1 (single-threaded).
  • "<actual_persistence> <param1> ... <paramN>" is the configuration of the delegate persistence implementation


Here is an example configuration for an asynchronous database persistence:

pkg = org.jppf.job.persistence.impl
# asynchronous database persistence with pool of 4 threads,
# a table named 'JPPF_JOBS' and datasource named 'JobDS'
jppf.job.persistence = ${pkg}.AsynchronousPersistence 4 ${pkg}.DefaultDatabasePersistence JPPF_JOBS JobDS

Performance implications: the main goal of the asynchronous persistence is to minimize the impact of persistence on performance, at the risk of a greater data loss in case of a driver crash. In scenarios where jobs are submitted and executed faster than they are persisted, they will tend to accumulate in the thread pool's queue, with a risk of an out of memory condition if the excessive load persists for too long.</p>

To mitigate this possible issue, the asynchronous persistence monitors the heap usage. When heap usage reaches a given threshold, it stops asynchronous operations and delegates directly to the underlying persistence implementation instead, until the heap usage drops back under the threshold. <p>The heap usage threshold is the ratio used_heap / max_heap expressed as a percentage. It has a default value of 70% and can be set with the following configuration property:

jppf.job.persistence.memory.threshold = 60.0

[edit] 3.4 Cacheable persistence

The cacheable persistence is a caching wrapper for any other job persistence implementation, whose corresponding implementation class is CacheablePersistence.

The cached artifacts are those handled by the load() and store() methods, that is, job headers, data providers, tasks and task results. The cache is an LRU cache of soft references to the artifacts. It guarantees that all its entries will be garbage-collected before an out of memory error is raised. Additionally the cache has a capacity that can be specified in the configuration and which defaults to 1024.

This cacheable persistence is configured as follows:

# shorten the configuration value for clarity
wrapper = org.jppf.job.persistence.impl.CacheablePersistence
# cacheable persistence with a specified capacity
jppf.job.persistence = ${wrapper} <capacity> <actual_persistence> <param1> ... <paramN>

Where:

  • "capacity" is the capacity of the cache, that is, the maximum number of entries it can hold at any time. If omitted, it defaults to 1024.
  • "<actual_persistence> <param1> ... <paramN>" is the configuration of the delegate persistence implementation


Here is a concrete example wrapping a default database persistence:

# shortcut for the package name
pkg = org.jppf.job.persistence.impl
# cacheable database persistence with a capacity of 10000,
# a table named 'JPPF_JOBS and datasource named 'JobDS'
jppf.job.persistence = ${pkg}.CacheablePersistence 10000 ${pkg}.DefaultDatabasePersistence JPPF_JOBS JobDS

Note: since the job persistence faciltiy, by its nature and design, performs mostly "write" operations, you will generally see little or no benefit to using the cacheable persistence wrapper. You may see significant performance gains essentially in situations where the persisted jobs are accessed multiple times by the client-side management facillity.

Tip: it is possible to combine cacheable persistence with asynchronous persistence to wrap any concrete persistence implementation. This is done by simply concatenating the class names and related arguments in the configuration, e.g.:

# shortcuts for package name and persistence implementations
pkg = org.jppf.job.persistence.impl
cacheable = ${pkg}.CacheablePersistence 1024
async = ${pkg}.AsynchronousPersistence 8
db = ${pkg}.DefaultDatabasePersistence JPPF_JOBS jobDS

# combine them to configure a cacheable asynchronous database persistence
jppf.job.persistence = ${cacheable} ${async} ${db}

[edit] 4 Custom persistence implementations

Reference: custom implmentations are fully detailed in a dedicated section of the customization chapter.

[edit] 5 Class loading and classpath considerations

In a scenario where you want a job to be automatically resubmitted by the persistence facility, after a driver restart and without any client connected, the nodes to which the tasks of the job are dispatched will need to be able to deserialize these tasks and then execute them. For this, they will need to load all the classes required by these tasks, otherwise the execution will fail. Normally, these classes would be downloaded from the client via the driver, however that is not possible here, since there is no client.

To ensure that these classes can be loaded, they must be in a place acessible from either the nodes or the driver. Our recommendation is to put the corresponding jar files and class folders in the driver's classpath, to ensure all the nodes can access them.

Reference: for details on how class loading works in JPPF, please read the class loading documentation.

Similarly, when restoring jobs on the client side with the JPPFDriverJobPersistence facility, you must ensure that all required classes are available in the client classpath, to allow job elements to be deserialized properly.

[edit] 6 Jobs persistence in multi-server topologies

In topologies that include multiple drivers, wether they communicate with each other and/or the JPPF client is connected to one or more of them at once, there are scenarios that require special handling and care in order to guarantee the integrity of the jobs in the persistence store.

[edit] 6.1 Only the first peer persists the job

Consider a toplogy with 2 drivers communicating each other. When a JPPF client submits a job to driver 1, and driver 1 delegates all or a part of the job to driver 2, then only driver 1 will persist the job. This simplifies the handling of persistence, avoids redundant persistence operations and generally increses performance.

[edit] 6.2 Submitting a job via multiple driver connections:

The client-side load balancer, combined with the job's client SLA maxChannels attribute, allows JPPF clients to submit jobs via multiple connections in parallel. We distinguish 2 cases with regards to jobs persistence:

  • All connections point to the same driver. In this case no special handling is needed, because the same driver also means the same persistence store.
  • The connections point to 2 or more separate drivers. In this scenario, two conditions must be met:
    • all the drivers must point to the same persistence store
    • the persistence store must provide some sort of transactionality or locking mechanism to protect against integrity constraint violations.

In effect, some elements of the job might be stored multiple times in parallel by multiple drivers and the store must be protected against possible corruption. Relational databases will generally provide the transactional capabilities to achieve this. For instance the built-in database persistence does, but the built-in file persistence does not.

Main Page > Development guide > Jobs persistence in the driver



JPPF Copyright © 2005-2020 JPPF.org Powered by MediaWiki