Implementing a custom job persistence
From JPPF 6.2 Documentation
|
Main Page > Customizing JPPF > Custom job persistence |
For full details on how the jobs persistence in the driver works, we invite you to read the dedicated documentation chapter in Jobs persistence in the driver.
1 Implementation
The jobs persistence facility relies on one or more impementations of the JobPersistence interface, defined as follows:
public interface JobPersistence { // Store the specified job elements. All elements are part of the same job void store(Collection<PersistenceInfo> infos) throws JobPersistenceException; // Load the specified job elements. All elements are part of the same job List<InputStream> load(Collection<PersistenceInfo> infos) throws JobPersistenceException; // Get the UUIDs of all persisted job List<String> getPersistedJobUuids() throws JobPersistenceException; // Get the positions of all the tasks in the specified job int[] getTaskPositions(String jobUuid) throws JobPersistenceException; // Get the positions of all the task results in the specified job int[] getTaskResultPositions(String jobUuid) throws JobPersistenceException; // Delete the persisted job with the specified UUID void deleteJob(String jobUuid) throws JobPersistenceException; // Determine whether a job is persisted, that is, present in the persistence store boolean isJobPersisted(String jobUuid) throws JobPersistenceException; }
Note that all the methods in this interface define callbacks that are called by the JPPF driver during the life cycle of the jobs it processes.
If the persistence implementation does not require any parameter to be passed on from the configuration, then it can simply use a default no-arg constructor, implicit or not. Otherwise, string parameters can be specified in the configuration and can be passed on to the implementation via either:
- a public constructor that takes a String... vararg parameter
- a public void setParameter(String...) method
If both are defined, then JPPF will always use the constructor.
The store() and load() methods both take a collection of objects of type PersistenceInfo, defined as follows:
public interface PersistenceInfo extends Serializable { // Get the job uuid String getJobUuid(); // Get the related job information JPPFDistributedJob getJob(); // Get the type of persisted object PersistenceObjectType getType(); // Get the position of the task or task result in the job int getPosition(); // Get an input stream for the persisted object InputStream getInputStream() throws Exception // Get the size in bytes of the persisted object int getSize(); }
Each instance of this class represents a job element, which can be either a job header, a data provider, a task or a task result. The type of job element can be obtained via the getType() method, which returns a PersistenceObjectType enum element.
Additionally, the fact that store() and load() take a collection of PersistenceInfo objects means that multiple job elements can be persisted in the same transaction, in the case of a transactional persistence implementation.
You will also notice that the job element's data is only available via an input stream: the persisted data represents a serialized object graph which can be temporarily stored anywhere. Here, the serialization format is completely transparent to the job persistence implementation.
Note: the load() method returns a list of InputStream objects. Each InputStream corresponds to a PersistenceInfo object passed in the input collection. The list's ordering must be the same as the ordering defined by the input collection's iterator.
2 Configuration
Configuring a job persistence imeplementation is done via the "jppf.job.persistence" configuration property, by passing it the fully qualified name of the implementation class, along with its optional parameters:
jppf.job.persistence = <full_class_name> <param1> ... <paramN>
for instance, if we define the following implementation:
package test.persistence; ... public class MyPersistence implements JobPersistence { public MyPersistence(String...params) { String dataSourceName = (params.length > 0) ? params[0] : "myDS"; int numberOfThreads = (params.length > 1) ? Integer.valueOf(params[1]) : 1; // ... initialize ... } ... implementation of JobPersistence ... }
then we could configure it as follows:
jppf.job.persistence = test.persistence.MyPersistence testDS 4
3 Examples
Implementing a custom job persistence is a non-trivial task. Rather than just listing the code of a lengthy real-life implementation, or a simplified implementation that would be unrealistic, we prefer to provide links to the code of the predefined built-in implementations:
- all built-in implementations are in the org.jppf.job.persistence.impl package
- default database persistence: class DefaultDatabasePersistence (javadoc)
- default file persistence: class DefaultFilePersistence (javadoc)
- asynchronous persistence wrapper: class AsynchronousPersistence (javadoc)
- cacheable persistence wrapper: class CacheablePersistence (javadoc)
Note: the source code of these classes is also available from within the Javadoc.
Main Page > Customizing JPPF > Custom job persistence |