JPPF, java, parallel computing, distributed computing, grid computing, parallel, distributed, cluster, grid, cloud, open source, android, .net
JPPF, java, parallel computing, distributed computing, grid computing, parallel, distributed, cluster, grid, cloud, open source, android, .net
JPPF

The open source
grid computing
solution

 Home   About   Features   Download   Documentation   On Github   Forums 

Task objects

From JPPF 3.3 Documentation

Jump to: navigation, search

Contents

Main Page > Development guide > Task objects


In JPPF terms, a task is the smallest unit of execution that can be handled by the framework. We will say that it is an atomic execution unit. A JPPF application creates tasks, groups them into a job, and submits the job for execution on the grid.

1 JPPFTask

JPPFTask is the base super class for any task that is run by JPPF. We will see in the next sections that other forms of tasks, that do not inherit from JPPFTask, are still wrapped by the framework in a subclass of JPPFTask.

JPPFTask is defined as follows:

 public abstract class JPPFTask implements Task<Object> {
   ...
 }

We can see that this class implements the Task<T> interface, defined as follows:

 public interface Task<T> extends Runnable, Serializable {
   ...
 }

We have outlined three important keywords that characterize JPPFTask:

  • abstract: JPPFTask cannot be used directly, it must be extended to construct a real task
  • Runnable, via Task<Object>: when writing a JPPF task, the run() method of java.lang.Runnable must be implemented. This is the part of a task that will be executed on a remote node.
  • Serializable, via Task<Object>: tasks are sent to servers and nodes over a network. JPPF uses the Java serialiazation mechanism to transform task objects into a form appropriate for networking

To write a real task in your application, you simply extend JPPFTask to implement your own type:

 public class MyTask extends JPPFTask {
   public void run() {
     // ... your code here ...
   }
 }

We will now review the functionalities that are inherited from JPPFTask.

1.1 Execution results handling

JPPFTask provides 2 convenience methods to store and retrieve the results of the execution:

  • public void setResult(Object result) : stores the execution result; the argument must be serializable
  • public Object getResult() : retrieves the execution result

Here is an example using these methods:

 public class MyTask extends JPPFTask {
   public void run() {
     // ... some code here ...
     setResult("This is the result");
   }
 }

and later in your application, you would use:

 String result = (String) myTask.getResult();

Using getResult() and setResult() is not mandatory. As we mentioned earlier, these methods are provided as conveniences with a meaningful semantics attached to them. There are many other ways to store and retrieve execution results, which can be used to the exclusion of others, or in any combination. These include, but are not limited to:

  • using custom attributes in the task class and their accessors
  • storing and getting data to/from a database
  • using a file system
  • using third-party applications or libraries
  • etc ...

1.2 Exception handling

Exception handling is a very important part of processing a task. In effect, exceptions may arise from many places: in the application code, in the JVM, in third-party APIs, etc... To handle this, JPPF provides both a mechanism to process uncaught exceptions and methods to store and retrieve exceptions that arise while executing a task.


JPPFTask provides 2 methods to explicitely handle exceptions:

  • public void setException(Exception e) : store an exception for later retrieval
  • public Exception getException() : retrieve an exception that was thrown during execution

Here is an example of explicit exception handling:

 public class MyTask extends JPPFTask {
   public void run() {
     try {
       // ... some code here ...
     }  catch(Exception e) {
       setException(e);
     }
   }
 }

Later on, you can retrieve the exception as follows:

 Exception exception = myTask.getException();

JPPF also automatically handles uncaught exceptions. In fact, we should say that it handles uncaught throwables. Uncaught throwables are never propagated beyond the scope of a task, as this would cause an unpredictable behavior of the node that executes the task. Instead, they are stored within the task using the setException() method. This way, it is always possible for the application to know what happened.


We may wonder what happens when the throwable is not an instance of Exception, as the method setException() only takes an Exception object as argument. In this case, the throwable is wrapped into an instance of JPPFException and can be retrieved by calling its getCause() method.The following code shows how JPPF handles uncaught exceptions and throwables:

 JPPFTask task = ...;
 try  {
   task.run();
 } catch(Throwable t) {
   if (t instanceof Exception) {
     task.setException((Exception) t);
   } else {
     task.setException(new JPPFException(t));
   }
 }

Then in the application, you can retrieve the throwable as follows:

 JPPFTask task = ...;
 if (task.getException() != null) {
   Throwable cause = null;
   Exception e = task.getException();
   if (e instanceof JPPFException) {
     cause = e.getCause();
   } else {
     cause = e;
   }
 }

1.3 Task life cycle

JPPF provides some options to control a task's life cycle once it has been submitted. These include the following functionalities:

  • task cancellation: this cannot be invoked directly on a task, but is rather invoked as part of cancelling a whole job. If a task is cancelled before its execution starts, then it will never start.
  • task timeout: the timeout countdown starts with the task's execution. If a timeout expires before the task starts executing, then the task will not time out.

In all cases, if a task has already completed its execution, it cannot be cancelled or timed out anymore.

Apart from timeout settings, controlling the life cycle of a task is normally done externally, using the JPPF remote management facilities. We will see those later, in a dedicated chapter of this user manual.

It is also possible to perform a specific processing when a task life cycle event occurs. For this, JPPFTask provides a callback method for each type of event:

public void onCancel(): invoked when the task is canceled
public void onTimeout(): invoked when the task times out

By default, these methods do not do anything. You can, however, override them to implement any application-specific processing, such as releasing resources used by the task, updating the state of the task, etc.


Here is a code sample that illustrates this:

 public class MyTask extends JPPFTask {
   public MyTask(String taskId) {
     // set the task id
     this.setId(taskId);
   }
 
   @Override
   public void run() {
     // task processing here ...
   }
 
   @Override
   public void onCancel() {
     // process task cancel event ...
   }
 
   @Override
   public void onTimeout() {
     // process task timeout event ...
   }
 }

A task timeout can be set by using a JPPFSchedule object, which is an immutable object that provides two constructors:

 // schedule after a specified duration in milliseconds
 public JPPFSchedule(final long duration)
 // schedule at a specified fixed date/time
 public JPPFSchedule(final String date, final String format)

Using a JPPFSchedule, we can thus set and obtain a task timeout using the corresponding accessors:

 public JPPFTask implements Task<Object> {
   // get the timeout schedule
   public JPPFSchedule getTimeoutSchedule();
   // set a new timeout schedule
   public void setTimeoutSchedule(final JPPFSchedule timeoutSchedule);
 }


For example:

 // set the task to expire after 5 seconds
 myTask.setTimeout(new JPPFSchedule(5000L));
 // set the task to expire on 9/30/2012 at 12:08 pm
 myTask.setTimeoutSchedule(
   new JPPFSchedule("09/30/2012 12:08 PM", "MM/dd/yyyy hh:mm a"));

2 Exception handling - node processing

It is possible that an error occurs while the node is processing a job or a task, before or after its execution. These error conditions include any instance of Throwable, i.e. any Exception or Error occurring during serialization or deserialization of the tasks, or while sending or receiving data to or from the server.

Depending on when they occur, these errors can be handled in two different ways:

  1. When the node is receiving tasks from the server: if any error is generated, for any task, then all the tasks will be sent back to the client application as they were initially, with only their exception attribute (see JPPFTask.getException()) set to the captured error.
  2. After execution of the tasks: for each task that generates an error that prevents it from being sent back to the server, the JPPF will substitute an instance of JPPFExceptionResult, which is JPPF task used solely to describe the error and the object on which it occurred.

3 Executing code in the client from a task

The JPPFTask API provides two methods that will allow a task to send code for execution on the client, and to determine whether the task is executing within a node or within a client with local execution enabled:

 public abstract class JPPFTask implements Task<Object> {
   // is the task executing in a node or in the client?
   public boolean isInNode()
 
   // send a callable for execution on the client side
   public <V> V compute(final JPPFCallable<V> callable)
 }

The method compute() takes a JPPFCallable as input, which is a Serializable extension of the Callable interface and will be executed on the client side. The return value is the result of calling JPPFCallable.call() on the client side.

This API is a more flexible version of the ClientDataProvider facility, which will be deprecated in the future.

Example usage:

 public class MyTask extends JPPFTask {
   @Override
   public void run() {
     String callableResult;
     // if this task is executing in a JPPF node
     if (isInNode()) callableResult = compute(new MyNodeCallable());
     // otherwise if it is executing locally in the JPPF client
     else callableResult = compute(new MyClientallable());
     // set the callable result as this task's result
     setResult(callableResult);
   }
 
   public static class MyNodeCallable implements JPPFCallable<String> {
     @Override
     public String call() throws Exception {
       return "executed in the NODE";
     }
   }
 
   public static class MyClientCallable implements JPPFCallable<String> {
     @Override
     public String call() throws Exception {
       return "executed in the CLIENT";
     }
   }
 }

4 JPPF-annotated tasks

Another way to write a JPPF task is to take an existing class and annotate one of its public methods or constructors using @JPPFRunnable.

Here is an example:

 public class MyClass implements Serializable {
   @JPPFRunnable
   public String myMethod(int intArg, String stringArg) {
     String s = "int arg = " + intArg + ", string arg = \"" + stringArg + "\"";
     System.out.println(s);
     return s;
   }
 }

We can see that we are simply using a POJO class, for which we annotated the myMethod() method with @JPPFRunnable. At runtime, the arguments of the method will be passed when the task is added to a job, as illustrated in the following example:

 JPPFJob job = new JPPFJob();
 JPPFTask task = job.addTask(new MyClass(), 3, "string arg");

Here we simply add our annotated class as a task, setting the two arguments of the annotated method in the same call. Note also that a JPPFTask object is returned. It is generated by a mechanism that wraps the annotated class into a JPPFTask, which allows it to use most of the functionalities that come with it.


JPPF-annotated tasks present the following properties and constraints:

  • if the annotated element is an instance (non-static) method, the annotated class must be serializable
  • if the class is already an instance of JPPFTask, the annotation will be ignored
  • if an annotated method has a return type (i.e. non void), the return value will be set as the task result
  • it is possible to annotate a public method or constructor
  • an annotated method can be static or non-static
  • if the annotated element is a constructor or a static method, the first argument of JPPFJob.addTask() must be a Class object representing the class that declares it.
  • an annotated method or constructor can have any signature, with no limit on the number of arguments
  • through the task-wrapping mechanism, a JPPF-annotated class benefits from the JPPFTask facilities described in the previous section 3.1.1 , except for the callback methods onCancel(), onRestart() and onTimeout().


Here is an example using an annotated constructor:

 public class MyClass implements Serializable {
   @JPPFRunnable
   public MyClass(int intArg, String stringArg) {
     String s = "int arg = " + intArg + ", string arg = \"" + stringArg + "\"";
     System.out.println(s);
   }
 }
 
 JPPFJob job = new JPPFJob();
 JPPFTask task = job.addTask(MyClass.class, 3, "string arg");

Another example using an annotated static method:

 public class MyClass implements Serializable {
   @JPPFRunnable
   public static String myStaticMethod(int intArg, String stringArg) {
     String s = "int arg = " + intArg + ", string arg = \"" + stringArg + "\"";
     System.out.println(s);
     return s;
   }
 }
 
 JPPFJob job = new JPPFJob();
 JPPFTask task = job.addTask(MyClass.class, 3, "string arg");

Note how, in the last 2 examples, we use MyClass.class as the first argument in JPPFJob.addTask().

5 Runnable tasks

Classes that implement java.lang.Runnable can be used as JPPF tasks without any modification. The run() method will then be executed as the task's entry point. Here is an example:

 public class MyRunnableClass implements Runnable, Serializable {
   public void run() {
     System.out.println("Hello from a Runnable task");
   }
 }
 
 JPPFJob job = new JPPFJob();
 JPPFTask task = job.add(new MyRunnableClass());

The following rules apply to Runnable tasks:

  • the class must be serializable
  • if the class is already an instance of JPPFTask, or annotated with @JPPFRunnable, it will be processed as such
  • through the task-wrapping mechanism, a Runnable task benefits from the JPPFTask facilities described in the previous section 3.1.1 , except for the callback methods onCancel(), onRestart() and onTimeout().

6 Callable tasks

In the same way as Runnable tasks, classes implementing java.util.concurrent.Callable<V> can be directly used as tasks. In this case, the call() method will be used as the task's execution entry point. Here's an example:

 public class MyCallableClass implements Callable<String>, Serializable {
   public String call() throws Exception {
     String s = "Hello from a Callable task";
     System.out.println(s);
     return s;
   }
 }
 
 JPPFJob job = new JPPFJob();
 JPPFTask task = job.add(new MyCallableClass());

The following rules apply to Callable tasks:

  • the Callable class must be serializable
  • if the class is already an instance of JPPFTask, annotated with @JPPFRunnable or implements Runnable, it will be processed as such and the call() method will be ignored
  • the return value of the call() method will be set as the task result
  • through the task-wrapping mechanism, a callable class benefits from the JPPFTask facilities described in the previous section 3.1.1 , except for the callback methods onCancel(), onRestart() and onTimeout().

7 POJO tasks

The most unintrusive way of defining a task is by simply using an existing POJO class without any modification. This will allow you to use existing classes directly even if you don't have the source code. A POJO task offers the same possibilities as a JPPF annotated task (see section 3.1.2 ), except for the fact that we need to specifiy explicitely which method or constructor to use when adding the task to a job. To this effect, we use a different form of the method JPPFJob.addTask(), that takes a method or constructor name as its first argument.

Here is a code example illustrating these possibilities:

 public class MyClass implements Serializable {
   public MyClass() {
   }
 
   public MyClass(int intArg, String stringArg) {
     String s = "int arg = " + intArg + ", string arg = \"" + stringArg + "\"";
     System.out.println(s);
   }
 
   public String myMethod(int intArg, String stringArg) {
     String s = "int arg = " + intArg + ", string arg = \"" + stringArg + "\"";
     System.out.println(s);
     return s;
   }
 
   public static String myStaticMethod(int intArg, String stringArg) {
     String s = "int arg = " + intArg + ", string arg = \"" + stringArg + "\"";
     System.out.println(s);
     return s;
   }
 }
 
 JPPFJob job = new JPPFJob();
 
 // add a task using the constructor as entry point
 JPPFTask task1 = job.addTask("MyClass", MyClass.class, 3, "string arg");
 
 // add a task using an instance method as entry point
 JPPFTask task2 = job.addTask("myMethod", new MyClass(), 3, "string arg");
 
 // add a task using a static method as entry point
 JPPFTask task3 = job.addTask("myStaticMethod", MyClass.class, 3, "string arg");

POJO tasks present the following properties and constraints:

  • if the entry point is an instance (non-static) method, the class must be serializable
  • if a method has a return type (i.e. non void), the return value will be set as the task resul
  • it is possible to use a public method or constructor as entry point
  • a method entry point can be static or non-static
  • A POJO task is added to a job by calling a JPPFJob.addTask() method whose first argument is the method or constructor name.
  • if the entry point is a constructor or a static method, the second argument of JPPFJob.addTask() be a Class object representing the class that declares it.
  • an annotated method or constructor can have any signature, with no limit on the number of arguments
  • through the task-wrapping mechanism, a JPPF-annotated class benefits from the JPPFTask facilities described in the previous section 3.1.1 , except for the callback methods onCancel(), onRestart() and onTimeout().

8 Running non-Java tasks: CommandLineTask

JPPF has a pre-defined task type that allows you to run an external process from a task. This process can be any executable program (including java), shell script or command. The JPPF API also provides a set of simple classes to access data, whether in-process or outside, local or remote.

The class that will allow you to run a process is CommandLineTask. Like JPPFTask, it is an abstract class that you must extend and whose run() method you must override.

This class provides methods to:

Setup the external process name, path, arguments and environment:

 // list of commands passed to the shell
 List<String> getCommandList()
 void setCommandList(List<String> commandList)
 void setCommandList(String... commands)
 
 // set of environment variables
 Map<String,String> getEnv()
 void setEnv(Map<String, String> env)
 
 // directory in which the command is executed
 String getStartDir()
 void setStartDir(String startDir)

You can also use the built-in constructors to do this at task initialization time:

 CommandLineTask(Map<String, String> env, String startDir, String... commands)
 CommandLineTask(String... commands)

Launch the process:

The process is launched by calling the following method from the run() method of the task:

 int launchProcess()

This method will block until the process has completed or is destroyed. The process exit code can also be obtained via the following method:

 // get the process exit code
 int getExitCode()

Setup the capture of the process output:

You can specify and determine whether the process output (either standard or error console output) is or should be captured, and obtain the captured output:

 boolean isCaptureOutput()
 
 void setCaptureOutput(boolean captureOutput)
 
 // corresponds to what is sent to System.out / stdout
 String getErrorOutput()
 
 // corresponds to what is sent to System.err / stderr
 String getStandardOutput()

Here is a sample command line task that lists the content of a directory in the node's file system:

 import org.jppf.server.protocol.*;
 
 // This task lists the files in a directory of the node's host
 public class ListDirectoryTask extends CommandLineTask {
   // Execute the script
   public void run() {
     try {
       // get the name of the node's operating system
       String os = System.getProperty("os.name").toLowerCase();
       // the type of OS determines which command to execute
       if (os.indexOf("linux") >= 0) {
         setCommandList("ls", "-a", "/usr/local");
       }
       else if (os.indexOf("windows") >= 0) {
         setCommandList("cmd", "/C", "dir", "C:\\Windows");
       }
       // enable the capture of the console output
       setCaptureOutput(true);
       // execute the script/command
       launchProcess();
       // get the resulting console output and set it as a result
       String output = getStandardOutput();
       setResult(output);
     } catch(Exception e) {
       setException(e);
     }
   }
 }

9 The Location API

This API allows developers to easily write data to, or read data from various sources: JVM heap, file system or URL.

It is based on the interface Location, which provides the following methods:

 public interface Location<T> {
   // Copy the content at this location to another location
   void copyTo(Location location);
   // Obtain an input stream to read from this location
   InputStream getInputStream();
   // Obtain an output stream to write to this location
   OutputStream getOutputStream():
   // Get this location's path
   T getPath();
   // Get the size of the data this location points to
   long size();
   // Get the content at this location as an array of bytes
   byte[] toByteArray() throws Exception;
 }

Currently, JPPF provides 3 implementations of this interface:

  • FileLocation represents a path in the file system
  • URLLocation can be used to get data to and from a URL, including HTTP and FTP URLs
  • MemoryLocation represents a block of data in memory that can be copied from or sent to another location

To illustrate the use of this API, let's transform our previous ListDirectoryTask in a way such that the output of the command is redirected to a file, instead of the console. We then read the content of this file and set it as the task's result:

 import org.jppf.server.protocol.*;
 
 // This task lists the files in a directory of the node's host
 public class ListDirectoryTask extends CommandLineTask {
   // Execute the script
   public void run() {
     try {
       String os = System.getProperty("os.name").toLowerCase();
       if (os.indexOf("linux") >= 0)
         // equivalent to shell command "ls -a /usr/local > output.txt"
         setCommandList("ls", "-a", "/usr/local", ">", "output.txt");
       else if (os.indexOf("windows") >= 0)
         // equivalent to shell command "dir C:\Windows > output.txt"
         setCommandList("cmd", "/C", "dir", "C:\\Windows", ">", "output.txt");
       // disable the capture of the console output
       setCaptureOutput(false);
       // execute the script or command
       launchProcess();
       // copy the resulting file in memory
       FileLocation fileLoc = new FileLocation("output.txt");
       MemoryLocation memoryLoc = new MemoryLocation((int) fileLoc.size());
       fileLoc.copyTo(memoryLoc);
       // set the file content as a result
       setResult(new String(memoryLoc.toByteArray()));
     }  catch(Exception e) {
       setException(e);
     }
   }
 }
Main Page > Development guide > Task objects

JPPF Copyright © 2005-2020 JPPF.org Powered by MediaWiki