JPPF, java, parallel computing, distributed computing, grid computing, parallel, distributed, cluster, grid, cloud, open source, android, .net
JPPF, java, parallel computing, distributed computing, grid computing, parallel, distributed, cluster, grid, cloud, open source, android, .net
JPPF

The open source
grid computing
solution

 Home   About   Features   Download   Documentation   On Github   Forums 

Implementing a custom job persistence

From JPPF 6.2 Documentation

Jump to: navigation, search

Contents

Main Page > Customizing JPPF > Custom job persistence


For full details on how the jobs persistence in the driver works, we invite you to read the dedicated documentation chapter in Jobs persistence in the driver.

1 Implementation

The jobs persistence facility relies on one or more impementations of the JobPersistence interface, defined as follows:

public interface JobPersistence {
  // Store the specified job elements. All elements are part of the same job
  void store(Collection<PersistenceInfo> infos) throws JobPersistenceException;

  // Load the specified job elements. All elements are part of the same job
  List<InputStream> load(Collection<PersistenceInfo> infos)
   throws JobPersistenceException;

  // Get the UUIDs of all persisted job
  List<String> getPersistedJobUuids() throws JobPersistenceException;

  // Get the positions of all the tasks in the specified job
  int[] getTaskPositions(String jobUuid) throws JobPersistenceException;

  // Get the positions of all the task results in the specified job
  int[] getTaskResultPositions(String jobUuid) throws JobPersistenceException;

  // Delete the persisted job with the specified UUID
  void deleteJob(String jobUuid) throws JobPersistenceException;

  // Determine whether a job is persisted, that is, present in the persistence store
  boolean isJobPersisted(String jobUuid) throws JobPersistenceException;
}

Note that all the methods in this interface define callbacks that are called by the JPPF driver during the life cycle of the jobs it processes.

If the persistence implementation does not require any parameter to be passed on from the configuration, then it can simply use a default no-arg constructor, implicit or not. Otherwise, string parameters can be specified in the configuration and can be passed on to the implementation via either:

  • a public constructor that takes a String... vararg parameter
  • a public void setParameter(String...) method

If both are defined, then JPPF will always use the constructor.

The store() and load() methods both take a collection of objects of type PersistenceInfo, defined as follows:

public interface PersistenceInfo extends Serializable {
  // Get the job uuid
  String getJobUuid();

  // Get the related job information
  JPPFDistributedJob getJob();

  // Get the type of persisted object
  PersistenceObjectType getType();

  // Get the position of the task or task result in the job
  int getPosition();

  // Get an input stream for the persisted object
  InputStream getInputStream() throws Exception

  // Get the size in bytes of the persisted object
  int getSize();
}

Each instance of this class represents a job element, which can be either a job header, a data provider, a task or a task result. The type of job element can be obtained via the getType() method, which returns a PersistenceObjectType enum element.

Additionally, the fact that store() and load() take a collection of PersistenceInfo objects means that multiple job elements can be persisted in the same transaction, in the case of a transactional persistence implementation.

You will also notice that the job element's data is only available via an input stream: the persisted data represents a serialized object graph which can be temporarily stored anywhere. Here, the serialization format is completely transparent to the job persistence implementation.

Note: the load() method returns a list of InputStream objects. Each InputStream corresponds to a PersistenceInfo object passed in the input collection. The list's ordering must be the same as the ordering defined by the input collection's iterator.

2 Configuration

Configuring a job persistence imeplementation is done via the "jppf.job.persistence" configuration property, by passing it the fully qualified name of the implementation class, along with its optional parameters:

jppf.job.persistence = <full_class_name> <param1> ... <paramN>

for instance, if we define the following implementation:

package test.persistence;
...
public class MyPersistence implements JobPersistence {
  public MyPersistence(String...params) {
    String dataSourceName = (params.length > 0) ? params[0] : "myDS";
    int numberOfThreads = (params.length > 1) ? Integer.valueOf(params[1]) : 1;
    // ... initialize ...
  }

  ... implementation of JobPersistence ...
}

then we could configure it as follows:

jppf.job.persistence = test.persistence.MyPersistence testDS 4

3 Examples

Implementing a custom job persistence is a non-trivial task. Rather than just listing the code of a lengthy real-life implementation, or a simplified implementation that would be unrealistic, we prefer to provide links to the code of the predefined built-in implementations:


Note: the source code of these classes is also available from within the Javadoc.


Main Page > Customizing JPPF > Custom job persistence



JPPF Copyright © 2005-2020 JPPF.org Powered by MediaWiki