Task objects

From JPPF Documentation version 3.x

Jump to: navigation, search

Contents

Main Page > Development guide > Task objects


In JPPF terms, a task is the smallest unit of execution that can be handled by the framework. We will say that it is an atomic execution unit. A JPPF application creates tasks, groups them into a job, and submits the job for execution on the grid.

Task

Task is the base interface for any task that is run by JPPF. We will see in the next sections that other forms of tasks, that do not inherit from JPPFTask, are still wrapped by the framework in an implementation of Task.

Task is defined as follows:

public interface Task<T> extends Runnable, Serializable {
  ...
}


We have outlined three important keywords that characterize Task:

  • interface: Task cannot be used directly, it must be implemented / extended to construct a real task
  • Runnable: when writing a JPPF task, the run() method of java.lang.Runnable must be implemented. This is the part of a task that will be executed on a remote node.
  • Serializable: tasks are sent to servers and nodes over a network. JPPF uses the Java serialiazation mechanism to transform task objects into a form appropriate for networking

The JPPF API provides a convenient abstract implementation of Task, which implements all the methods of Task except run(): to write a real task in your application, you simply extend AbstractTask to implement your own type:

public class MyTask extends AbstractTaskwObject> {
  public void run() {
    // ... your code here ...
  }
}

We will now review the functionalities that are inherited from JPPFTask.

If you are familiar with the JPPF 3.x APIs, please note that the legacy class JPPFTask is now redefined as:

public class JPPFTask extends AbstractTask<Object> {
}

Execution results handling

Task<T> provides 2 convenience methods to store and retrieve the results of the execution:

  • public void setResult(T result) : stores the execution result; the argument must be serializable
  • public T getResult() : retrieves the execution result

Here is an example using these methods:

public class MyTask extends AbstractTask<String> {
  public void run() {
    // ... some code here ...
    setResult("This is the result");
  }
}

and later in your application, you would use:

 String result = myTask.getResult();

Using getResult() and setResult() is not mandatory. As we mentioned earlier, these methods are provided as conveniences with a meaningful semantics attached to them. There are many other ways to store and retrieve execution results, which can be used to the exclusion of others, or in any combination. These include, but are not limited to:

  • using custom attributes in the task class and their accessors
  • storing and getting data to/from a database
  • using a file system
  • using third-party applications or libraries
  • etc ...

Exception handling

Exception handling is a very important part of processing a task. In effect, exceptions may arise from many places: in the application code, in the JVM, in third-party APIs, etc... To handle this, JPPF provides both a mechanism to process uncaught exceptions and methods to store and retrieve exceptions that arise while executing a task.


Task provides 2 methods to explicitely handle exceptions:

  • public void setThrowable(Throwable t) : store a throwable for later retrieval
  • public Throwable getThrowable() : retrieve a throwable that was thrown during execution

Here is an example of explicit exception handling:

public class MyTask extends AbstractTask<Object> {
  public void run() {
    try {
      // ... some code here ...
    }  catch(Exception e) {
      setThrowable(e);
    }
  }
}

Later on, you can retrieve the exception as follows:

 Throwable throwable = myTask.getThrowable();

JPPF also automatically handles uncaught throwables. Uncaught throwables are never propagated beyond the scope of a task, as this would cause an unpredictable behavior of the node that executes the task. Instead, they are stored within the task using the setThrowable() method. This way, it is always possible for the application to know what happened. The following code shows how JPPF handles uncaught throwables:

Task<Object> task = ...;
try  {
  task.run();
} catch(Throwable t) {
  task.setThrowable(t);
}

Then in the application, you can retrieve the throwable as follows:

Task<Object> task = ...;
if (task.getThrowable() != null) {
  Throwable t = task.getThrowable();
  t.printStackTrace();
}

Task life cycle

JPPF provides some options to control a task's life cycle once it has been submitted. These include the following functionalities:

  • task cancellation: this cannot be invoked directly on a task, but is rather invoked as part of cancelling a whole job. If a task is cancelled before its execution starts, then it will never start.
  • task timeout: the timeout countdown starts with the task's execution. If a timeout expires before the task starts executing, then the task will not time out.

In all cases, if a task has already completed its execution, it cannot be cancelled or timed out anymore.

Apart from timeout settings, controlling the life cycle of a task is normally done externally, using the JPPF remote management facilities. We will see those later, in a dedicated chapter of this user manual.

It is also possible to perform a specific processing when a task life cycle event occurs. For this, JPPFTask provides a callback method for each type of event:

public void onCancel(): invoked when the task is canceled
public void onTimeout(): invoked when the task times out

By default, these methods do not do anything. You can, however, override them to implement any application-specific processing, such as releasing resources used by the task, updating the state of the task, etc.


Here is a code sample that illustrates this:

public class MyTask extends AbstractTask<Object> {
  public MyTask(String taskId) {
    // set the task id
    this.setId(taskId);
  }

  @Override
  public void run() {
    // task processing here ...
  }

  @Override
  public void onCancel() {
    // process task cancel event ...
  }

  @Override
  public void onTimeout() {
    // process task timeout event ...
  }
}

A task timeout can be set by using a JPPFSchedule object, which is an immutable object that provides two constructors:

// schedule after a specified duration in milliseconds
public JPPFSchedule(final long duration)
// schedule at a specified fixed date/time
public JPPFSchedule(final String date, final String format)

Using a JPPFSchedule, we can thus set and obtain a task timeout using the corresponding accessors:

public interface Task<T> {
  // get the timeout schedule
  public JPPFSchedule getTimeoutSchedule();
  // set a new timeout schedule
  public void setTimeoutSchedule(final JPPFSchedule timeoutSchedule);
}


For example:

// set the task to expire after 5 seconds
myTask.setTimeout(new JPPFSchedule(5000L));
// set the task to expire on 9/30/2012 at 12:08 pm
myTask.setTimeoutSchedule(
  new JPPFSchedule("09/30/2012 12:08 PM", "MM/dd/yyyy hh:mm a"));

Exception handling - node processing

It is possible that an error occurs while the node is processing a a task, before or after its execution. These error conditions include any instance of Throwable, i.e. any Exception or Error occurring during serialization or deserialization of the tasks, or while sending or receiving data to or from the server.

When such an error occurs, the Throwable that was raised for each task in error is propagated back to the client which submitted the job, and set upon the initial instance of the task in the job. It can then be obtained, upon receiving the execution results, with a call to Task.getThrowable().

Sending notifications from a task

Task provides an API which allows tasks to send notifications during their execution:

public interface Task<T> extends Runnable, Serializable {
  // Causes the task to send a notification to all listeners
  void fireNotification(Object userObject, boolean sendViaJmx);
}

The first parameter userObject can be any object provided by the user code. The second parameter sendViaJmx specifies whether this notification should also be sent via the node's JPPFNodeTaskMonitorMBean, instead of only to locally registered listeners. If it is true, it is recommended that userObject be Serializable. We will see in further chapters of this documentation how to register local and JMX-based listeners. Let's see here how these listeners can handle the notifications.

Here is an example of JMX listener registered with one or more JPPFNodeTaskMonitorMBean instances:

public class MyTaskJmxListener implements NotificationListener {
  @Override
  public synchronized void handleNotification(
    Notification notification, Object handback) {
    // cast to the JPPF notification type
    TaskExecutionNotification notif = (TaskExecutionNotification) notification;
    // get and print the user object
    Object userObject = notif.getUserData();
    System.out.println("received notification with user object = " + userObject);
    // notifications generated by JPPF node always have a  TaskExecutionInfo
    TaskExecutionInfo info = notif.getTaskInformation();
    System.out.println("this notification was sent by "
      + (info == null ? "the user" : "the JPPF node"));
  }
}

A local TaskExecutionListener would be like this:

public class MyTaskLocalListener extends TaskExecutionListener {
  @Override
  // task completion event sent by the node
  void taskExecuted(TaskExecutionEvent event) {
    TaskExecutionInfo info = event.getTaskInformation();
     System.out.println("received notification with task info = " + info);
  }

  @Override
  // task notification event sent by user code
  void taskNotification(TaskExecutionEvent event) {
    Object userObject = event.getUserObject();
    System.out.println("received notification with user object = " + userObject);
  }
}

Consider the following task:

public class MyTask extends AbstractTask<String> {
  @Override
  public void run() {
    fireNotification("notification 1", false);
    fireNotification("notification 2", true);
  }
}

During the execution of this task, a MyTaskJmxListener instance would only receive “notification 2”, whereas a MyTaskLocalListener instance would receive both “notification 1” and “notification 2”.

Resubmitting a task

The class AbstractTask also provides an API which allows a task to request that it be resubmitted by the server, instead of being sent back to the client as an execution result. This can prove very useful for instance when a task must absolutely complete successfully, but an error occurs during its execution. The API for this is defined as follows:

public abstract class AbstractTask<T> implements Task<T> {
  // Determine whether this task will be resubmitted by the server
  public boolean isResubmit()
  // Specify whether this task should be resubmitted by the server
  public void setResubmit(final boolean resubmit)

  // Get the maximum number of times a task can be resubmitted
  int getMaxResubmits();
  // Set the maximum number of times a task can be resubmitted
  void setMaxResubmits(int maxResubmits);
}

Note that the resubmit and maxResubmits attributes are transient, which means that when the task is executed in a remote node, they will be reset to their initial value of false and -1, respectively.

The maximum number of times a task can be resubmitted may be specified in two ways:

  • in the job SLA via the maxTaskResubmits attribute
  • with the task's setMaxResubmits() method; in this case any value >= 0 will override the job SLA's value


Finally, please note that task resubmission only works for tasks sent to a remote node, and will not work in the client's local executor.

JPPF-annotated tasks

Another way to write a JPPF task is to take an existing class and annotate one of its public methods or constructors using @JPPFRunnable.

Here is an example:

public class MyClass implements Serializable {
  @JPPFRunnable
  public String myMethod(int intArg, String stringArg) {
    String s = "int arg = " + intArg + ", string arg = \"" + stringArg + "\"";
    System.out.println(s);
    return s;
  }
}

We can see that we are simply using a POJO class, for which we annotated the myMethod() method with @JPPFRunnable. At runtime, the arguments of the method will be passed when the task is added to a job, as illustrated in the following example:

JPPFJob job = new JPPFJob();
Task<?> task = job.add(new MyClass(), 3, "string arg");

Here we simply add our annotated class as a task, setting the two arguments of the annotated method in the same call. Note also that a Task object is returned. It is generated by a mechanism that wraps the annotated class into a Task, which allows it to use most of the functionalities that come with it.


JPPF-annotated tasks present the following properties and constraints:

  • if the annotated element is an instance (non-static) method, the annotated class must be serializable
  • if the class is already an instance of Task, the annotation will be ignored
  • if an annotated method has a return type (i.e. non void), the return value will be set as the task result
  • it is possible to annotate a public method or constructor
  • an annotated method can be static or non-static
  • if the annotated element is a constructor or a static method, the first argument of JPPFJob.add() must be a Class object representing the class that declares it.
  • an annotated method or constructor can have any signature, with no limit on the number of arguments
  • through the task-wrapping mechanism, a JPPF-annotated class benefits from the Task facilities described in the previous section 3.1.1 , except for the callback methods onCancel() and onTimeout().


Here is an example using an annotated constructor:

public class MyClass implements Serializable {
  @JPPFRunnable
  public MyClass(int intArg, String stringArg) {
    String s = "int arg = " + intArg + ", string arg = \"" + stringArg + "\"";
    System.out.println(s);
  }
}
 
JPPFJob job = new JPPFJob();
JPPFTask task = job.addTask(MyClass.class, 3, "string arg");

Another example using an annotated static method:

public class MyClass implements Serializable {
  @JPPFRunnable
  public static String myStaticMethod(int intArg, String stringArg) {
    String s = "int arg = " + intArg + ", string arg = \"" + stringArg + "\"";
    System.out.println(s);
    return s;
  }
}
 
JPPFJob job = new JPPFJob();
Task<?> task = job.add(MyClass.class, 3, "string arg");

Note how, in the last 2 examples, we use MyClass.class as the first argument in JPPFJob.add().

Runnable tasks

Classes that implement java.lang.Runnable can be used as JPPF tasks without any modification. The run() method will then be executed as the task's entry point. Here is an example:

public class MyRunnableClass implements Runnable, Serializable {
  public void run() {
    System.out.println("Hello from a Runnable task");
  }
}
 
JPPFJob job = new JPPFJob();
Task<?> task = job.add(new MyRunnableClass());

The following rules apply to Runnable tasks:

  • the class must be serializable
  • if the class is already an instance of Task, or annotated with @JPPFRunnable, it will be processed as such
  • through the task-wrapping mechanism, a Runnable task benefits from the Task facilities described in the previous section 3.1.1 , except for the callback methods onCancel() and onTimeout().

Callable tasks

In the same way as Runnable tasks, classes implementing java.util.concurrent.Callable<V> can be directly used as tasks. In this case, the call() method will be used as the task's execution entry point. Here's an example:

public class MyCallableClass implements Callable<String>, Serializable {
  public String call() throws Exception {
    String s = "Hello from a Callable task";
    System.out.println(s);
    return s;
  }
}
 
JPPFJob job = new JPPFJob();
Task task = job.add(new MyCallableClass());

The following rules apply to Callable tasks:

  • the Callable class must be serializable
  • if the class is already an instance of Task, annotated with @JPPFRunnable or implements Runnable, it will be processed as such and the call() method will be ignored
  • the return value of the call() method will be set as the task result
  • through the task-wrapping mechanism, a callable class benefits from the Task facilities described in the previous section 3.1.1 , except for the callback methods onCancel() and onTimeout().

POJO tasks

The most unintrusive way of defining a task is by simply using an existing POJO class without any modification. This will allow you to use existing classes directly even if you don't have the source code. A POJO task offers the same possibilities as a JPPF annotated task (see section 3.1.2 ), except for the fact that we need to specifiy explicitely which method or constructor to use when adding the task to a job. To this effect, we use a different form of the method JPPFJob.addTask(), that takes a method or constructor name as its first argument.

Here is a code example illustrating these possibilities:

public class MyClass implements Serializable {
  public MyClass() {
  }

  public MyClass(int intArg, String stringArg) {
    String s = "int arg = " + intArg + ", string arg = \"" + stringArg + "\"";
    System.out.println(s);
  }

  public String myMethod(int intArg, String stringArg) {
    String s = "int arg = " + intArg + ", string arg = \"" + stringArg + "\"";
    System.out.println(s);
    return s;
  }
 
  public static String myStaticMethod(int intArg, String stringArg) {
    String s = "int arg = " + intArg + ", string arg = \"" + stringArg + "\"";
    System.out.println(s);
    return s;
  }
}
 
JPPFJob job = new JPPFJob();

// add a task using the constructor as entry point
Task<?> task1 = job.add("MyClass", MyClass.class, 3, "string arg");

// add a task using an instance method as entry point
Task<?> task2 = job.add("myMethod", new MyClass(), 3, "string arg");
 
// add a task using a static method as entry point
Task<?> task3 = job.add("myStaticMethod", MyClass.class, 3, "string arg");

POJO tasks present the following properties and constraints:

  • if the entry point is an instance (non-static) method, the class must be serializable
  • if a method has a return type (i.e. non void), the return value will be set as the task resul
  • it is possible to use a public method or constructor as entry point
  • a method entry point can be static or non-static
  • A POJO task is added to a job by calling a JPPFJob.add() method whose first argument is the method or constructor name.
  • if the entry point is a constructor or a static method, the second argument of JPPFJob.add() be a Class object representing the class that declares it.
  • an annotated method or constructor can have any signature, with no limit on the number of arguments
  • through the task-wrapping mechanism, a JPPF-annotated class benefits from the Task facilities described in the previous section 3.1.1 , except for the callback methods onCancel() and onTimeout().

Running non-Java tasks: CommandLineTask

JPPF has a pre-defined task type that allows you to run an external process from a task. This process can be any executable program (including java), shell script or command. The JPPF API also provides a set of simple classes to access data, whether in-process or outside, local or remote.

The class that will allow you to run a process is CommandLineTask. Like JPPFTask, it is an abstract class that you must extend and whose run() method you must override.

This class provides methods to:

Setup the external process name, path, arguments and environment:

// list of commands passed to the shell
List<String> getCommandList()
void setCommandList(List<String> commandList)
void setCommandList(String... commands)

// set of environment variables
Map<String,String> getEnv()
void setEnv(Map<String, String> env)

// directory in which the command is executed
String getStartDir()
void setStartDir(String startDir)

You can also use the built-in constructors to do this at task initialization time:

CommandLineTask(Map<String, String> env, String startDir, String... commands)
CommandLineTask(String... commands)

Launch the process:

The process is launched by calling the following method from the run() method of the task:

int launchProcess()

This method will block until the process has completed or is destroyed. The process exit code can also be obtained via the following method:

// get the process exit code
int getExitCode()

Setup the capture of the process output:

You can specify and determine whether the process output (either standard or error console output) is or should be captured, and obtain the captured output:

boolean isCaptureOutput()

void setCaptureOutput(boolean captureOutput)

// corresponds to what is sent to System.out / stdout
String getErrorOutput()

// corresponds to what is sent to System.err / stderr
String getStandardOutput()

Here is a sample command line task that lists the content of a directory in the node's file system:

import org.jppf.server.protocol.*;

// This task lists the files in a directory of the node's host
public class ListDirectoryTask extends CommandLineTask {
  // Execute the script
  public void run() {
    try {
      // get the name of the node's operating system
      String os = System.getProperty("os.name").toLowerCase();
      // the type of OS determines which command to execute
      if (os.indexOf("linux") >= 0) {
        setCommandList("ls", "-a", "/usr/local");
      }
      else if (os.indexOf("windows") >= 0) {
        setCommandList("cmd", "/C", "dir", "C:\\Windows");
      }
      // enable the capture of the console output
      setCaptureOutput(true);
      // execute the script/command
      launchProcess();
      // get the resulting console output and set it as a result
      String output = getStandardOutput();
      setResult(output);
    } catch(Exception e) {
      setException(e);
    }
  }
}

Executing dynamic scripts: ScriptedTask

The class ScriptedTask allows you to execute scripts written in any dynamic language available via the javax.script APIs. It is defined as follows:

public class ScriptedTask<T> extends AbstractTask<T> {
  // Initialize this task with the specified language, script provided as a string
  // and set of variable bindings
  public ScriptedTask(String language, String script, String reusableId,
    Map<String, Object> bindings) throws IllegalArgumentException

  // Initialize this task with the specified language, script provided as a reader
  // and set of variable bindings
  public ScriptedTask(String language, Reader scriptReader, String reusableId,
    Map<String, Object> bindings) throws IllegalArgumentException, IOException

  // Initialize this task with the specified language, script provided as a file
  // and set of variable bindings
  public ScriptedTask(String language, File scriptFile, String reusableId,
    Map<String, Object> bindings) throws IllegalArgumentException, IOException

  // Get the JSR 223 script language to use
  public String getLanguage()

  // Get the script to execute from this task
  public String getScript()

  // Get the unique identifier for the script
  public String getReusableId()

  // Get the user-defined variable bindings
  public Map<String, Object> getBindings()

  // Add the specified variable to the user-defined bindings
  public void addBinding(String name, Object value)

  // Remove the specified variable from the user-defined bindings
  public Object removeBinding(String name)
}

Since ScriptedTask is a subclass of AbstractTask, it has all the features that come with it, including life cycle management, error handling, etc. There is a special processing for Throwables raised by the script engine: some engines raise throwables which are not serializable, which may prevent JPPF from capturing them and return them back to the client application. To work around this, JPPF will instantiate a new exception with the same message and stack trace as the original exception. Thus some information may be lost, and you may need to handle these exceptions from within the scripts to retrieve this information.

The reusableId parameter, provided in the constructors, indicates that, if the script engine has that capability, compiled scripts will be stored and reused, to avoid compiling the same scripts repeatedly. In a multithreaded context, as is the case in a JPPF node, multiple compilations may still occur for the same script id, since it is not possible to guarantee the thread-safety of a script engine, and compiled scripts are always associated with a single script engine instance. Thus, a script may be compiled multiple times, but normally no more than there are processing threads in the node.

Java objects can be passed as variables to the script via the bindings, either in one of the constructors or using the addBinding() and removeBinding() methods. Additionally, a ScriptedTask always adds a reference to itself with the name "jppfTask", or the equivalent in the chosen script language, for instance $jppfTask in PHP.

The value returned by the script, if any, will be set as the task result, unless it has already been set to a non-null value, by calling jppfTask.setResult(...) from within the script.

For example, in the following Javascript script:

function myFunc() {
  jppfTask.setResult('Hello 1');
  return 'Hello 2';
}
myFunc();

The result of the evaluation of the script is the string “Hello 2”. However, the task result will be “Hello 1”, since it was set before the end of the script. If you comment out the first statement of the function (jppfStat.setResult() statement), then this time the task result will be the same as the script result “Hello 2”.

The Location API

This API allows developers to easily write data to, or read data from various sources: JVM heap, file system or URL.

It is based on the interface Location, which provides the following methods:

public interface Location<T> {
  // Copy the content at this location to another location
  void copyTo(Location location);
  // Obtain an input stream to read from this location
  InputStream getInputStream();
  // Obtain an output stream to write to this location
  OutputStream getOutputStream():
  // Get this location's path
  T getPath();
  // Get the size of the data this location points to
  long size();
  // Get the content at this location as an array of bytes
  byte[] toByteArray() throws Exception;
}

Currently, JPPF provides 3 implementations of this interface:

  • FileLocation represents a path in the file system
  • URLLocation can be used to get data to and from a URL, including HTTP and FTP URLs
  • MemoryLocation represents a block of data in memory that can be copied from or sent to another location

To illustrate the use of this API, let's transform our previous ListDirectoryTask in a way such that the output of the command is redirected to a file, instead of the console. We then read the content of this file and set it as the task's result:

import org.jppf.server.protocol.*;
 
// This task lists the files in a directory of the node's host
public class ListDirectoryTask extends CommandLineTask {
  // Execute the script
  public void run() {
    try {
      String os = System.getProperty("os.name").toLowerCase();
      if (os.indexOf("linux") >= 0)
        // equivalent to shell command "ls -a /usr/local > output.txt"
        setCommandList("ls", "-a", "/usr/local", ">", "output.txt");
      else if (os.indexOf("windows") >= 0)
        // equivalent to shell command "dir C:\Windows > output.txt"
        setCommandList("cmd", "/C", "dir", "C:\\Windows", ">", "output.txt");
      // disable the capture of the console output
      setCaptureOutput(false);
      // execute the script or command
      launchProcess();
      // copy the resulting file in memory
      FileLocation fileLoc = new FileLocation("output.txt");
      MemoryLocation memoryLoc = new MemoryLocation((int) fileLoc.size());
      fileLoc.copyTo(memoryLoc);
      // set the file content as a result
      setResult(new String(memoryLoc.toByteArray()));
    }  catch(Exception e) {
      setException(e);
    }
  }
}
Main Page > Development guide > Task objects

Support This Project Powered by MediaWiki