JPPF, java, parallel computing, distributed computing, grid computing, parallel, distributed, cluster, grid, cloud, open source, android, .net
JPPF, java, parallel computing, distributed computing, grid computing, parallel, distributed, cluster, grid, cloud, open source, android, .net

The open source
grid computing

 Home   About   Features   Download   Documentation   Forums 

Job Service Level Agreement

From JPPF 5.2 Documentation

Jump to: navigation, search


Main Page > Development guide > Job Service Level Agreement

A job service level agreement (SLA) defines the terms and conditions in which a job will be processed. A job carries two distinct SLAs, one which defines a contract between the job and the JPPF server, the other defining a different contract between the job and the JPPF client.

Server and client SLAs have common attributes, which specify:

  • the characteristics of the nodes it can run on (server side), or of the channels it can be sent through (client side): the job execution policy
  • the time at which a job is scheduled to start
  • an expiration date for the job

The attributes specific to the server side SLA are:

  • the priority of a job
  • whether it is submitted in suspended state
  • the maximum number of nodes it can run on
  • whether the job is a standard or broadcast job
  • whether the server should immediately cancel the job, if the client that submitted it is disconnected

The attributes specific to the client side SLA are:

  • the maximum number of channels it can be sent through

A job SLA is represented by the interface JobSLA for the server side SLA, and by the interface JobClientSLA for the client side SLA. It can be accessed from a job using the related getters and setters:

public class JPPFJob implements Serializable, JPPFDistributedJob {
  // The job's server-side SLA
  public JobSLA getSLA()
  public void setSLA(final JobSLA jobSLA)

  // The job's client-side SLA
  public JobClientSLA getClientSLA()
  public void setClientSLA(final JobClientSLA jobClientSLA)

Example usage:

JPPFJob myJob = new JPPFJob();

Also note that both interfaces extend the common interface JobCommonSLA. We will go into the details of these interfaces in the following sections.

1 Attributes common to server and client side SLAs

As seen previously, the common attributes for server and client side SLAs are defined by the JobCommonSLA interface:

public interface JobCommonSLA extends Serializable {
  // The execution policy
  ExecutionPolicy getExecutionPolicy();
  void setExecutionPolicy(ExecutionPolicy executionPolicy);

  // The job start schedule
  JPPFSchedule getJobSchedule();
  void setJobSchedule(JPPFSchedule jobSchedule);

  // The job expiration schedule
  JPPFSchedule getJobExpirationSchedule();
  void setJobExpirationSchedule(JPPFSchedule jobExpirationSchedule);

1.1 Execution policy

An execution policy is an object that determines whether a particular set of JPPF tasks can be executed on a JPPF node (for the server-side SLA) or if it can be sent via a communication channel (for the client-side). It does so by applying the set of rules (or tests) it is made of, against a set of properties associated with the node or channel.

For a fully detailed description of how to create and use execution policies, please read the Execution policies section of this development guide.

Example usage:

// define a non-trivial server-side execution policy:
// execute on nodes that have at least 2 threads and whose IPv4 address
// is in the 192.168.1.nnn subnet
ExecutionPolicy serverPolicy = new AtLeast("processing.threads", 2).and(
  new Contains("ipv4.addresses", true, "192.168.1."));
// define a client-side execution policy:
// submit to the client local executor or to drivers whose IPv4 address
// is in the 192.168.1.nnn subnet
ExecutionPolicy clientPolicy = new Equal("", true).or(
  new Contains("ipv4.addresses", true, "192.168.1."));
JPPFJob job = new JPPFJob();
// set the server-side policy
// set the client-side policy
// print an XML representation of the server-side policy
System.out.println("server policy is:\n" + job.getSLA().getExecutionPolicy());

1.2 Job start and expiration scheduling

It is possible to schedule a job for a later start, and also to set a job for expiration at a specified date/time. The job SLA allows this by providing the following methods:

// job start schedule
public JPPFSchedule getJobSchedule()
public void setJobSchedule(JPPFSchedule schedule)

// job expiration schedule
public JPPFSchedule getJobExpirationSchedule()
public void setJobExpirationSchedule(JPPFSchedule schedule)

As we can see, this is all about getting and setting an instance of JPPFSchedule. A schedule is normally defined through one of its constructors:

As a fixed length of time

public JPPFSchedule(long duration)

The semantics is that the job will start duration milliseconds after the job is received by the server. Here is an example:

JPPFJob myJob = new Job();
// set the job to start 5 seconds after being received
JPPFSchedule mySchedule = new JPPFSchedule(5000L);

As a specific date/time

public JPPFSchedule(String date, String dateFormat)

Here the date format is specified as a pattern for a SimpleDateFormat instance.

Here is an example use of this constructor:

JPPFJob myJob = new Job();
String dateFormat = "MM/dd/yyyy hh:mm a z";
// set the job to expire on September 30, 2010 at 12:08 PM in the CEDT time zone
JPPFSchedule schedule = new JPPFSchedule("09/30/2010 12:08 PM CEDT", dateFormat);

2 Server side SLA attributes

A server-side SLA is described by the JobSLA interface, defined as:

public interface JobSLA extends JobCommonSLA {
  // Job priority
  int getPriority();
  void setPriority(int priority);

  // Maximum number of nodes the job can run on
  int getMaxNodes();
  void setMaxNodes(int maxNodes);

  // maximum number of groups of master/slaves nodes the job can run on at any given time
  int getMaxNodeProvisioningGroups();
  void setMaxNodeProvisioningGroups(int maxNodeProvisioningGroups);
  // Whether the job is initially suspended
  boolean isSuspended();
  void setSuspended(boolean suspended);

  // Whether the job is a broadcast job
  boolean isBroadcastJob();
  void setBroadcastJob(boolean broadcastJob);

  // Determine whether the job should be canceled by the server
  // if the client gets disconnected
  boolean isCancelUponClientDisconnect();
  void setCancelUponClientDisconnect(boolean cancelUponClientDisconnect);

  // expiration schedule for any subset of the job dispatched to a node
  JPPFSchedule getDispatchExpirationSchedule();
  void setDispatchExpirationSchedule(JPPFSchedule schedule);

  // number of times a dispatched task can expire before it is finally cancelled
  int getMaxDispatchExpirations();
  void setMaxDispatchExpirations(int max);

  // class path associated with the job
  ClassPath getClassPath();
  void setClassPath(ClassPath classpath);

2.1 Job priority

The priority of a job determines the order in which the job will be executed by the server. It can be any integer value, such that if jobA.getPriority() > jobB.getPriority() then jobA will be executed before jobB. There are situations where both jobs may be executed at the same time, for instance if there remain any available nodes for jobB after jobA has been dispatched. Two jobs with the same priority will have an equal share (as much as is possible) of the available grid nodes.

The priority attribute is also manageable, which means that it can be dynamically updated, while the job is still executing, using the JPPF administration console or the related management APIs. The default priority is zero.

Example usage:

// create a job with a non-default priority
JPPFJob job1 = new JPPFJob();
// create a second job with a slightly higher priority
JPPFJob job2 = new JPPFJob();
job2.getSLA().setPriority(job1.getSLA().getPriority() + 1);

2.2 Maximum number of nodes

The maximum number of nodes attribute determines how many grid nodes a job can run on, at any given time. This is an upper bound limit, and does not guarantee that always this number of nodes will be used, only that no more than this number of nodes will be assigned to the job. This attribute is also non-distinctive, in that it does not specify which nodes the job will run on. The default value of this attribute is equal to Integer.MAX_VALUE, i.e. 231-1.

The resulting assignment of nodes to the job is influenced by other attributes, especially the job priority and an eventual execution policy.

The maximum number of nodes is a manageable attribute, which means it can be dynamically updated, while the job is still executing, using the JPPF administration console or the related management APIs.

Example usage:

JPPFJob job = new JPPFJob();
// this job will execute on a maximum of 10 nodes

2.3 Maximum number of node provisioning groups

A node provisioning group designates a set of nodes made of one master node and its provisioned slave nodes, if it has any. The SLA allows restricting a job execution to a maximum number of master node groups. This SLA attribute is useful whenever you want to take advantage of the fact that, by definition, a master and its slave nodes all run on the same machine, for instance to exploit data locality properties. This attribute's default value of is Integer.MAX_VALUE (231-1).

Note that this attribute does not specifically restrict the total number of nodes the job can run on, since each master node can have any number of slaves. For this, you also need to set the maximum number of nodes attribute. Additionally, this attribute has no effect on the selection of nodes that are neither master nor slave, such as offline nodes.

Example usage:

JPPFJob job = new JPPFJob();
// only execute on a single group of master/slaves at a time
// further restrict to only the slave nodes in the provisioning group
job.getSLA().setExecutionPolicy(new Equal("jppf.node.provisioning.slave", true));

2.4 Initial suspended state

A job can be initially suspended. In this case, it will remain in the server's queue until it is explicitly resumed or canceled, or if it expires (if a timeout was set), whichever happens first. A job can be resumed and suspended again any number of times via the JPPF administration console or the related management APIs.

Example usage:

JPPFJob job = new JPPFJob();
// this job will be submitted to the server and will remain suspended until
// it is resumed or cancelled via the admin console or management APIs

2.5 Broadcast jobs

A broadcast job is a specific type of job, for which each task will be be executed on all the nodes currently present in the grid. This opens new possibilities for grid applications, such as performing maintenance operations on the nodes or drastically reducing the size of a job that performs identical tasks on each node.

With regards to the job SLA, a job is set in broadcast mode via a boolean indicator, for which the interface JobSLA provides the following accessors:

public boolean isBroadcastJob()
public void setBroadcastJob(boolean broadcastJob)

To set a job in broadcast mode:

JPPFJob myJob = new JPPFJob();

With respect to the dynamic aspect of a JPPF grid, the following behavior is enforced:

  • a broadcast job is executed on all the nodes connected to the driver, at the time the job is received by the JPPF driver. This includes nodes that are executing another job at that time
  • if a node dies or disconnects while the job is executing on it, the job is canceled for this node
  • if a new node connects while the job is executing, the broadcast job will not execute on it
  • a broadcast job does not return any results, i.e. it returns the tasks in the same state as they were submitted

Additionally, if local execution of jobs is enabled for the JPPF client, a broadcast job will not be executed locally. In other words, a broadcast job is only executed on remote nodes.

2.6 Canceling a job upon client disconnection

By default, if the JPPF client is disconnected from the server while a job is executing, the server will automatically attempt to cancel the job's execution on all nodes it was dispatched to, and remove the job from the server queue. You may disable this behavior on a per-job basis, for example if you want to let the job execute until completion but do not need the execution results.

This property is set once for each job, and cannot be changed once the job has been submitted to the server, i.e. it is not dynamically manageable.

Example usage:

JPPFJob myJob = new JPPFJob();

2.7 Expiration of job dispatches

Definition: a job dispatch is the whole or part of a job that is dispatched by the server to a node.

The server-side job SLA enables specifying whether a job dispatch will expire, along with the behavior upon exipration. This is done with a combination of two attributes: a dispatch expiration schedule, which specifies when the dispatch will expire, and a maximum number of expirations after which the tasks in the dispatch will be cancelled instead of resubmitted. By default, a job dispatch will not expire and the number of expirations is set to zero (tasks are cancelled upon the first expiration, if any).

One possible use for this mechanism is to prevent resource-intensive tasks from bloating slow nodes, without having to cancel the whole job or set timeouts on inidividual tasks.

Example usage:

JPPFJob job = new JPPFJob();
// job dispatches will expire if they execute for more than 5 seconds
job.getSLA().setDispatchExpirationSchedule(new JPPFSchedule(5000L));
// dispatched tasks will be resubmitted at most 2 times before they are cancelled

2.8 Setting a class path onto the job

The classpath attribute of the job SLA allows sending library files along with the job and its tasks. Out of the box, this attribute is only used by offline nodes, to work around the fact that offline nodes do no have remote class loading capabilities. The class path attribute, by default empty but not not null, is accessed with the following methods:

public interface JobClientSLA extends JobCommonSLA {
  // get / set the class path associated with the job
  ClassPath getClassPath();
  void setClassPath(ClassPath classpath);

We can see that a class path is represented by the ClassPath interface, defined as follows:

public interface ClassPath extends Serializable, Iterable<ClassPathElement> {
  // add an element to this classpath
  ClassPath add(ClassPathElement element);
  ClassPath add(String name, Location<?> location);
  ClassPath add(String name, Location<?> localLocation, Location<?> remoteLocation);

  // remove an element from this classpath
  ClassPath remove(ClassPathElement element);
  ClassPath remove(String name);

  // get an element with the specified name
  ClassPathElement element(String name);

  // get all the elements in this classpath
  Collection<ClassPathElement> allElements();

  // empty this classpath (remove all elements)
  ClassPath clear();

  // is this classpath empty?
  boolean isEmpty();

  // should the node force a reset of the class loader before executing the tasks?
  boolean isForceClassLoaderReset();
  void setForceClassLoaderReset(boolean forceReset);

Note that one of the add(...) methods uses a ClassPathElement as parameter, while the others use a name with one or two Location objects (see the Location API section). These methods are equivalent. For the last two, JPPF will internally create instances of a default implementation of ClassPathElement (class ClassPathElementImpl). It is preferred to avoid creating ClassPathElement instances, as it makes the code less cumbersome and independent from any specific implementation.

Also note that ClassPath implements Iterable<ClassPathElement>, so that it can be used in for loops:

for (ClassPathElement elt: myJob.getSLA().getClassPath()) ...;

The ClassPathElement interface is defined as follows:

public interface ClassPathElement extends Serializable {
  // get the name of this classpath element
  String getName();
  // get the local (to the client) location of this element
  Location<?> getLocalLocation();
  // get the remote (local to the node) location of this element, if any
  Location<?> getRemoteLocation();
  // perform a validation of this classpath element
  boolean validate();

JPPF provides a default implementation ClassPathElementImpl which does not perform any validation, that is, its validate() method always returns true.

Finally, here is an example of how this can all be put together:

JPPFJob myJob = new JPPFJob();
ClassPath classpath = myJob.getSLA().getClassPath();
// wrap a jar file into a FileLocation object
Location jarLocation = new FileLocation(“libs/MyLib.jar”);
// copy the jar file into memory
Location location = jarLocation.copyTo(new MemoryLocation(jarLocation.size());
// or another way to do this:
location = new MemoryLocation(jarLocation.toByteArray());
// add it as classpath element
classpath.add(“myLib”, location);
// the following is functionally equivalent:
classpath.add(new ClassPathElementImpl(“myLib”,  location));
// tell the node to reset the tasks classloader with this new class path

2.9 Maximum number of tasks resubmits

As we have seen in the "resubmitting a task" section, tasks have the ability to schedule themselves for resubmission by the server. The job server-side SLA allows you to set the maximum number of times this can occur, with the following accessors:

public interface JobSLA extends JobCommonSLA {
  // get the naximum number of times a task can resubmit itself
  // via AbstractTask.setResubmit(boolean)
  int getMaxTaskResubmits();

  // set the naximum number of times a task can resubmit itself
  void setMaxTaskResubmits(int maxResubmits);
  // Determine whether the max resubmits limit for tasks is also applied
  // when tasks are resubmitted due to a node error
  boolean isApplyMaxResubmitsUponNodeError();

  // Specify whether the max resubmits limit for tasks should also be applied
  // when tasks are resubmitted due to a node error
  void setApplyMaxResubmitsUponNodeError(boolean applyMaxResubmitsUponNodeError);

The default value for the maxTaskResubmits attribute is 1, which means that by default a task can resubmit itself at most once. Additionally, this attribute can be overriden by setting the maxResubmits attribute of individual tasks.

The applyMaxResubmitsUponNodeError flag is set to false by default. This means that, when the tasks are resubmitted due to a node connection error, the resubmit will not count with regards to the limit. To change this behavior, setApplyMaxResubmitsUponNodeError(true) must be called explicitely.

Example usage:

public class MyTask extends AbstractTask<String> {
  @Override public void run() {
    // unconditional resubmit could lead to an infinite loop
    // the result will only be kept after the max number of resubmits is reached

JPPFJob job = new JPPFJob();
job.add(new MyTask());
// tasks can be resubmitted 4 times, meaning they can execute up to 5 times total
// resubmits due to node errors are also counted
// ... submit the job and get the results ...

2.10 Disabling remote class loading during job execution

Jobs can specify whether remote class loader lookups are enabled during their execution in a remote node. When remote class loading is disabled, lookups are only performed in the local classpath of each class loader in the class loader hierarchy, and no remote resource requests are sent to the server or client. This is done with the following accessors:

public interface JobSLA extends JobCommonSLA {
  // Determine whether remote class loading is enabled for the job. Default to true
  boolean isRemoteClassLoadingEnabled();

  // Specify whether remote class loading is enabled for the job
  void setRemoteClassLoadingEnabled(boolean enabled);

Note 1: when remote class loading is disabled, the classes that the JPPF node normally loads from the server cannot be loaded remotely either. It is thus required to have these classes in the node's local classpath, which is usally done by adding the "jppf-server.jar" and "jppf-common.jar" files to the node's classpath.

Note 2: if a class is not found while remote class loading is disabled, it will remain not found, even if the next job specifies that remote class loading is enabled. This is due to the fact that the JPPF class loaders maintain a cache of classes not found to avoid unnecessary remote lookups. To avoid this behavior, the task class loader should be reset before the next job is executed.

2.11 Grid policy

Jobs can also specify an execution policy that will be evaluated against the server and the totality of its nodes, instead of just against individual nodes as for the SLA's execution policy attribute we saw earlier in this documentation.

This grid policy is defined as a normal execution policy with two differences:

  • it is evaluated against the properties of the server
  • it may include any number of server global policies that count the nodes matching a given node policy

This policy is accessible with the following setter and getter of the SLA:

public interface JobSLA extends JobCommonSLA {
  // Get the global grid execution policy
  ExecutionPolicy getGridExecutionPolicy();

  // Set the global grid execution policy
  void setGridExecutionPolicy(ExecutionPolicy policy);

For example, to express and set the policy "execute the job when the server has at least 2 GB of avaialble heap memory and at least 3 nodes with more than 4 processing threads each", we would code something like this:

int GB = 1024*1024*1024; // 1 GB
JPPFJob job = new JPPFJob();
// evaluated against each node's properties
ExecutionPolicy nodePolicy = new MoreThan("jppf.processing.threads", 4);
// evaluated against the server's properties
ExecutionPolicy gridPolicy = new AtLeast("availableMemory", 2*GB)
  .and(new NodesMatching(Operator.MORE_THAN, 3, nodePolicy));
// set the grid policy onto the SLA

2.12 Specifying the desired node configuration

It is possible for a job to specify the configuration of the nodes it needs to run on and force eligible nodes to update their configuration accordingly and restart for the configuration changes to take place. The specified configuration includes all existing JPPF properties, in particular "" and "jppf.jvm.options", which allow specifiying the JVM and its options for running the node after restart. It also includes any custom, application-defined property than can be expressed in a configuration file.

This is done with the following JobSLA methods:

public interface JobSLA extends JobCommonSLA {
  // Get the configuration of the node(s) this job should be executed on
  JPPFNodeConfigSpec getDesiredNodeConfiguration();

  // Set the configuration of the node(s) this job should be executed on
  void setDesiredNodeConfiguration(JPPFNodeConfigSpec nodeConfigurationSpec);

The desired node configuration is specified as a JPPFNodeConfigSpec object, defined as follows:

public class JPPFNodeConfigSpec implements Serializable {
  // Initialize this object with a desired configuration and a restart flag set to true
  public JPPFNodeConfigSpec(TypedProperties desiredConfiguration)
    throws IllegalArgumentException

  // Initialize this object with a desired configuration and restart flag
  public JPPFNodeConfigSpec(TypedProperties desiredConfiguration, boolean forceRestart)
    throws IllegalArgumentException

  // Get the desired JPPF configuration of each node
  public TypedProperties getConfiguration()

  // Determine whether to force the restart of a node after reconfiguring it
  public boolean isForceRestart()

The configuration attribute specifies the properties that will be overriden or added to the node configuration. In terms of node selection, the JPPF server will prioritize the nodes whose configuration most closely matches the desired one, by computing a similarity score which relies on the distances between the string values of the desired and actual properties. Only the properties specified in the configuration attribute are compared.

The forceRestart flag determines whether a node should be restarted when it matches exactly the desired configuration. If set to true, the nodes will always be restarted. Otherwise, nodes that exactly match the desired configuration will not be restarted.

It is important to note that this SLA attribute is evaluated in combination with the other attrbiutes of the job SLA. In particular, it should not be confused with the execution policy, which is used to first filter eligible nodes, whereas the desired node configuration is applied to eligble nodes and triggers a configuration change and restart in those nodes.

There are restrictions as to the kind of nodes that can be affected by this SLA attribute: since a configuration change and restart of the node is triggered, this can only be done with manageable nodes, which excludes offline nodes and Android nodes. Furthermore, it does not apply to server-local nodes, since the node restart would also cause the server to be restarted.

Lastly, it is strongly advised to use this SLA attribute in combination with the maximum number of nodes and a job expiration: since the reconfiguration and restart is very disruptive for the nodes, it has a non-trivial impact on performance, so you might want to limit the number of nodes that are restarted. Also, between the request for the node reconfiguration and the time the node becomes available after restart, the server reserves the node for the specific job involved. Setting an expiration timeout on the job ensures that the node can be reused for other jobs, should anything wrong happen. In effect, the server will remove all reservations for this job whenever it is cancelled or expires.

Example usage:

JPPFJob job = new JPPFJob();
// define the desired node configuration properties
TypedProperties props = new TypedProperties()
  .set(JPPFProperties.JVM_OPTIONS, "-server -Xmx1g")
  .setInt("property.1", 123456)
  .setString("property.2", "abcdef");
// create the node config spec with restart only when the properties don't match
JPPFNodeConfigSpec desiredConfig = new JPPFNodeConfigSpec(props, false);
// set the corresponding SLA attribute
// limit to 2 nodes max
// ensure the job expires after 10 minutes max
job.getSLA().setJobExpirationSchedule(new JPPFSchedule(10L*60L*1000L));

3 Client side SLA attributes

A client-side SLA is described by the interface JobClientSLA, defined as:

public interface JobClientSLA extends JobCommonSLA {
  // The maximum number of channels the job can be sent through,
  // including the local executor if any is configured
  int getMaxChannels();
  void setMaxChannels(int maxChannels);

Note: since JPPF clients do not have a management interface, none of the client-side SLA attributes are manageable.

3.1 Maximum number of execution channels

The maximum number of channels attribute determines how many server connections a job can be sent through, at any given time. This is an upper bound limit, and does not guarantee that this number of channels will always be used. This attribute is also non-specific, since it does not specify which channels will be used.

Using more than one channel for a job enables faster I/O between the client and the server, since the job can be split in multiple chunks and sent to the server via multiple channels in parallel.

Note 1: when the JPPF client is configured with a single server connection, this attribute has no effect.

Note 2: when local execution is enabled in the JPPF client, the local executor counts as one (additional) channel.

Note 3: the resulting assignment of channels to the job is influenced by other attributes, especially the execution policy.

Example usage:

JPPFJob job = new JPPFJob();
// use 2 channels to send the job and receive the results
Main Page > Development guide > Job Service Level Agreement

Support This Project Copyright © 2005-2016 Powered by MediaWiki