Task objects
From JPPF 6.3 Documentation
In JPPF terms, a task is the smallest unit of execution that can be handled by the framework. We will say that it is an atomic execution unit. A JPPF application creates tasks, groups them into a job, and submits the job for execution on the grid.
1 Task
Task is the base interface for any task that is run by JPPF. We will see in the next sections that other forms of tasks, that do not inherit from JPPFTask, are still wrapped by the framework in an implementation of Task.
Task is defined as follows:
public interface Task<T> extends Runnable, Serializable { ... }
We have outlined three important keywords that characterize Task:
- interface: Task cannot be used directly, it must be implemented / extended to construct a real task
- Runnable: when writing a JPPF task, the run() method of java.lang.Runnable must be implemented. This is the part of a task that will be executed on a remote node.
- Serializable: tasks are sent to servers and nodes over a network. JPPF uses the Java serialiazation mechanism to transform task objects into a form appropriate for networking
The JPPF API provides a convenient abstract implementation of Task, which implements all the methods of Task except run(): to write a real task in your application, you simply extend AbstractTask to implement your own type:
public class MyTask extends AbstractTask<Object> { public void run() { // ... your code here ... } }
We will now review the functionalities that are inherited from Task.
1.1 Execution results handling
Task<T> provides 2 convenience methods to store and retrieve the results of the execution:
- public void setResult(T result) : stores the execution result; the argument must be serializable
- public T getResult() : retrieves the execution result
Here is an example using these methods:
public class MyTask extends AbstractTask<String> { public void run() { // ... some code here ... setResult("This is the result"); } }
and later in your application, you would use:
String result = myTask.getResult();
Using getResult() and setResult() is not mandatory. As we mentioned earlier, these methods are provided as conveniences with a meaningful semantics attached to them. There are many other ways to store and retrieve execution results, which can be used to the exclusion of others, or in any combination. These include, but are not limited to:
- using custom attributes in the task class and their accessors
- storing and getting data to/from a database
- using a file system
- using third-party applications or libraries
- etc ...
1.2 Exception handling
Exception handling is a very important part of processing a task. In effect, exceptions may arise from many places: in the application code, in the JVM, in third-party APIs, etc... To handle this, JPPF provides both a mechanism to process uncaught exceptions and methods to store and retrieve exceptions that arise while executing a task.
Task provides 2 methods to explicitely handle exceptions:
- public void setThrowable(Throwable t) : store a throwable for later retrieval
- public Throwable getThrowable() : retrieve a throwable that was thrown during execution
Here is an example of explicit exception handling:
public class MyTask extends AbstractTask<Object> { public void run() { try { // ... some code here ... } catch(Exception e) { setThrowable(e); } } }
Later on, you can retrieve the exception as follows:
Throwable throwable = myTask.getThrowable();
JPPF also automatically handles uncaught throwables. Uncaught throwables are never propagated beyond the scope of a task, as this would cause an unpredictable behavior of the node that executes the task. Instead, they are stored within the task using the setThrowable() method. This way, it is always possible for the application to know what happened. The following code shows how JPPF handles uncaught throwables:
Task<Object> task = ...; try { task.run(); } catch(Throwable t) { task.setThrowable(t); }
Then in the application, you can retrieve the throwable as follows:
Task<Object> task = ...; if (task.getThrowable() != null) { Throwable t = task.getThrowable(); t.printStackTrace(); }
1.3 Task life cycle
JPPF provides some options to control a task's life cycle once it has been submitted. These include the following functionalities:
- task cancellation: this cannot be invoked directly on a task, but is rather invoked as part of cancelling a whole job. If a task is cancelled before its execution starts, then it will never start.
- task timeout: the timeout countdown starts with the task's execution. If a timeout expires before the task starts executing, then the task will not time out.
In all cases, if a task has already completed its execution, it cannot be cancelled or timed out anymore.
Apart from timeout settings, controlling the life cycle of a task is normally done externally, using the JPPF remote management facilities. We will see those later, in a dedicated chapter of this user manual.
It is also possible to perform a specific processing when a task life cycle event occurs. For this, JPPFTask provides a callback method for each type of event:
public void onCancel(): invoked when the task is canceled
public void onTimeout(): invoked when the task times out
For both methods, an attempt to cancel the task will be performed, by calling Thread.interrrupt() on the thread that executes it, then the onCancel() or onTimeout() method will be invoked. The implication is that the callback invocation takes place after the task's run() method returns, whether it was immediately interrupted (if the thread was doing an interruptible operation) or not.
By default, these methods do not do anything. You can, however, override them to implement any application-specific processing, such as releasing resources used by the task, updating the state of the task, etc.
Here is a code sample that illustrates this:
public class MyTask extends AbstractTask<Object> { @Override public void run() { // task processing here ... } @Override public void onCancel() { // process task cancel event ... } @Override public void onTimeout() { // process task timeout event ... } }
A task timeout can be set by using a JPPFSchedule object, which is an immutable object that provides two constructors:
// schedule after a specified duration in milliseconds public JPPFSchedule(final long duration) // schedule at a specified fixed date/time public JPPFSchedule(final String date, final String format)
Using a JPPFSchedule, we can thus set and obtain a task timeout using the corresponding accessors:
public interface Task<T> { // get the timeout schedule public JPPFSchedule getTimeoutSchedule(); // set a new timeout schedule public void setTimeoutSchedule(final JPPFSchedule timeoutSchedule); }
For example:
// set the task to expire after 5 seconds myTask.setTimeout(new JPPFSchedule(5000L)); // set the task to expire on 9/30/2012 at 12:08 pm myTask.setTimeoutSchedule( new JPPFSchedule("09/30/2012 12:08 PM", "MM/dd/yyyy hh:mm a"));
2 Exception handling - node processing
It is possible that an error occurs while the node is processing a a task, before or after its execution. These error conditions include any instance of Throwable, i.e. any Exception or Error occurring during serialization or deserialization of the tasks, or while sending or receiving data to or from the server.
When such an error occurs, the Throwable that was raised for each task in error is propagated back to the client which submitted the job, and set upon the initial instance of the task in the job. It can then be obtained, upon receiving the execution results, with a call to Task.getThrowable().
3 Getting information on the node executing the task
A Task can obtain information on the node using the following methods:
public interface Task<T> extends Runnable, Serializable { // is the task executing in a node or in the client? public boolean isInNode() // get the node executing this task, if any public Node getNode() }
The isInNode() method determines whether the task is executing within a node or within a client with local execution enabled.
The getNode() method returns an instance of the Node interface, defined as follows:
public interface Node extends Runnable { // Get this node's UUID String getUuid(); // Get the system information for this node JPPFSystemInformation getSystemInformation(); // Determine whether this node is local to another component boolean isLocal(); // Determine whether this node is running in offline mode boolean isOffline(); // Determine whether this node is a 'master' node for the provisioning features boolean isMasterNode(); // Determine whether this node is a 'slave' node for the provisioning features boolean isSlaveNode(); // Determine whether this node can execute Net tasks boolean isDotnetCapable(); // Determine whether this node is an Android node boolean isAndroid(); // Get the JMX connector server associated with the node JMXServer getJmxServer() throws Exception; // Reset the current task class loader if any is present ClassLoader resetTaskClassLoader(Object...params); }
Note that Task.getNode() will return null if the task is executing within a client local executor.
Here is an example usage:
public class MyTask extends AbstractTask<String> { @Override public void run() { if (isInNode()) { setResult("executing in remote node with uuid = " + getNode().getUuid()); } else { setResult("executing in client-local executor"); } } }
4 Accessing the job
A task can access the job it is a part of, using its getJob() method, which returns a JPPFDistributedJob instance. This enables any executing task to access information on the job it belongs to: job uuid, name, metadata, service level agreement and number of tasks. This method can be used as in this example:
public class MyTask extends AbstractTask<String> { @Override public void run() { JPPFDistributedJob job = this.getJob(); String jobKey = job.getMetadata().getParameter("JOB_KEY"); setResult(jobKey + "-" + this.getId()); } }
5 Executing code in the client from a task
The Task API provides a method that will allow a task to send code for execution in the client:
public interface Task<T> extends Runnable, Serializable { // send a callable for execution on the client side public <V> V compute(final JPPFCallable<V> callable) }
The method compute() takes a JPPFCallable as input, which is a Serializable extension of the Callable interface and will be executed on the client side.The return value is the result of calling JPPFCallable.call() on the client side.
Example usage:
public class MyTask extends AbstractTask<String> { @Override public void run() { String callableResult; // if this task is executing in a JPPF node String callResult = compute(isInNode() ? new NodeCallable() : new Clientallable()); // set the callable result as this task's result setResult(callResult); } public static class NodeCallable implements JPPFCallable<String> { @Override public String call() throws Exception { return "executed in the NODE"; } } public static class ClientCallable implements JPPFCallable<String> { @Override public String call() throws Exception { return "executed in the CLIENT"; } } }
6 Sending notifications from a task
Task provides an API which allows tasks to send notifications during their execution:
public interface Task<T> extends Runnable, Serializable { // Causes the task to send a notification to all listeners Task<T> fireNotification(Object userObject, boolean sendViaJmx); }
The first parameter userObject can be any object provided by the user code. The second parameter sendViaJmx specifies whether this notification should also be sent via the node's JPPFNodeTaskMonitorMBean, instead of only to locally registered listeners. If it is true, it is recommended that userObject be Serializable. We will see in further chapters of this documentation how to register local and JMX-based listeners. Let's see here how these listeners can handle the notifications.
Here is an example of JMX listener registered with one or more JPPFNodeTaskMonitorMBean instances:
public class MyTaskJmxListener implements NotificationListener { @Override public synchronized void handleNotification(Notification notif, Object handback) { // cast to the JPPF notification type, then get and print the user object Object userObject = ((TaskExecutionNotification) notif).getUserData(); System.out.println("received notification with user object = " + userObject); // determine who sent this notification boolean userNotif = ((TaskExecutionNotification) notif).isUserNotification(); System.out.println("this notification was sent by the " + (userNotif ? "user" : "JPPF node")); } }
A local TaskExecutionListener would be like this:
public class MyTaskLocalListener extends TaskExecutionListener { @Override // task completion event sent by the node void taskExecuted(TaskExecutionEvent event) { TaskExecutionInfo info = event.getTaskInformation(); System.out.println("received notification with task info = " + info); } @Override // task notification event sent by user code void taskNotification(TaskExecutionEvent event) { Object userObject = event.getUserObject(); System.out.println("received notification with user object = " + userObject); } }
Consider the following task:
public class MyTask extends AbstractTask<String> { @Override public void run() { fireNotification("notification 1", false); fireNotification("notification 2", true); } }
During the execution of this task, a MyTaskJmxListener instance would only receive “notification 2”, whereas a MyTaskLocalListener instance would receive both “notification 1” and “notification 2”.
7 Resubmitting a task
The class AbstractTask also provides an API which allows a task to request that it be resubmitted by the server, instead of being sent back to the client as an execution result. This can prove very useful for instance when a task must absolutely complete successfully, but an error occurs during its execution. The API for this is defined as follows:
public abstract class AbstractTask<T> implements Task<T> { // Determine whether this task will be resubmitted by the server public boolean isResubmit() // Specify whether this task should be resubmitted by the server public void setResubmit(final boolean resubmit) // Get the maximum number of times a task can be resubmitted int getMaxResubmits(); // Set the maximum number of times a task can be resubmitted void setMaxResubmits(int maxResubmits); }
Note that the resubmit and maxResubmits attributes are transient, which means that when the task is executed in a remote node, they will be reset to their initial value of false and -1, respectively.
The maximum number of times a task can be resubmitted may be specified in two ways:
- in the job SLA via the maxTaskResubmits attribute
- with the task's setMaxResubmits() method; in this case any value >= 0 will override the job SLA's value
Finally, please note that task resubmission only works for tasks sent to a remote node, and will not work in the client's local executor.
8 JPPF-annotated tasks
Another way to write a JPPF task is to take an existing class and annotate one of its public methods or constructors using @JPPFRunnable.
Here is an example:
public class MyClass implements Serializable { @JPPFRunnable public String myMethod(int intArg, String stringArg) { String s = "int arg = " + intArg + ", string arg = \"" + stringArg + "\""; System.out.println(s); return s; } }
We can see that we are simply using a POJO class, for which we annotated the myMethod() method with @JPPFRunnable. At runtime, the arguments of the method will be passed when the task is added to a job, as illustrated in the following example:
JPPFJob job = new JPPFJob(); Task<?> task = job.add(new MyClass(), 3, "string arg");
Here we simply add our annotated class as a task, setting the two arguments of the annotated method in the same call. Note also that a Task object is returned. It is generated by a mechanism that wraps the annotated class into a Task, which allows it to use most of the functionalities that come with it.
JPPF-annotated tasks present the following properties and constraints:
- if the annotated element is an instance (non-static) method, the annotated class must be serializable
- if the class is already an instance of Task, the annotation will be ignored
- if an annotated method has a return type (i.e. non void), the return value will be set as the task result
- it is possible to annotate a public method or constructor
- an annotated method can be static or non-static
- if the annotated element is a constructor or a static method, the first argument of JPPFJob.add() must be a Class object representing the class that declares it.
- an annotated method or constructor can have any signature, with no limit on the number of arguments
- through the task-wrapping mechanism, a JPPF-annotated class benefits from the Task facilities described in the previous section 3.1.1 , except for the callback methods onCancel() and onTimeout().
Here is an example using an annotated constructor:
public class MyClass implements Serializable { @JPPFRunnable public MyClass(int intArg, String stringArg) { String s = "int arg = " + intArg + ", string arg = \"" + stringArg + "\""; System.out.println(s); } } JPPFJob job = new JPPFJob(); JPPFTask task = job.addTask(MyClass.class, 3, "string arg");
Another example using an annotated static method:
public class MyClass implements Serializable { @JPPFRunnable public static String myStaticMethod(int intArg, String stringArg) { String s = "int arg = " + intArg + ", string arg = \"" + stringArg + "\""; System.out.println(s); return s; } } JPPFJob job = new JPPFJob(); Task<?> task = job.add(MyClass.class, 3, "string arg");
Note how, in the last 2 examples, we use MyClass.class as the first argument in JPPFJob.add().
9 Runnable tasks
Classes that implement either Runnable or JPPFRunnableTask can be used as JPPF tasks without any modification. The run() method will then be executed as the task's entry point. Here is an example:
JPPFJob job = new JPPFJob(); // adding an anonymous runnnable task Task<?> task1 = job.add(new JPPFRunnableTask() { @Override public void run() { System.out.println("Hello!"); } }); // adding a runnable task as a lambda expression Task<?> task2 = job.add(() -> System.out.println("Hello again!"));
The following rules apply to Runnable tasks:
- the class must be serializable (this is automatically the case with JPPFRunnableTask)
- if the class is already an instance of Task, or annotated with @JPPFRunnable, it will be processed as such
- through the task-wrapping mechanism, a Runnable task benefits from the Task facilities, except for the callback methods onCancel() and onTimeout().
10 Callable tasks
In the same way as Runnable tasks, classes implementing Callable<V> or JPPFCallable<V> can be directly used as tasks. In this case, the call() method will be used as the task's execution entry point, and its return value will be the task's result. Here's an example:
JPPFJob job = new JPPFJob(); // adding an anonymous callable task Task<?> task1 = job.add(new JPPFCallable<String>() { @Override public String call() throws Exception { return "Hello!"; } }); // adding a callable task as a lambda expression Task<?> task2 = job.add(() -> "Hello again!");
The following rules apply to Callable tasks:
- the Callable class must be serializable (this is automatically the case with JPPFCallable)
- if the class is already an instance of Task, annotated with @JPPFRunnable or implements Runnable, it will be processed as such and the call() method will be ignored
- the return value of the call() method will be set as the task result
- through the task-wrapping mechanism, a callable class benefits from the Task facilities, except for the callback methods onCancel() and onTimeout().
11 POJO tasks
The most unintrusive way of defining a task is by simply using an existing POJO class without any modification. This will allow you to use existing classes directly even if you don't have the source code. A POJO task offers the same possibilities as a JPPF annotated task (see section 3.1.2 ), except for the fact that we need to specifiy explicitely which method or constructor to use when adding the task to a job. To this effect, we use a different form of the method JPPFJob.addTask(), that takes a method or constructor name as its first argument.
Here is a code example illustrating these possibilities:
public class MyClass implements Serializable { public MyClass() { } public MyClass(int intArg, String stringArg) { String s = "int arg = " + intArg + ", string arg = \"" + stringArg + "\""; System.out.println(s); } public String myMethod(int intArg, String stringArg) { String s = "int arg = " + intArg + ", string arg = \"" + stringArg + "\""; System.out.println(s); return s; } public static String myStaticMethod(int intArg, String stringArg) { String s = "int arg = " + intArg + ", string arg = \"" + stringArg + "\""; System.out.println(s); return s; } } JPPFJob job = new JPPFJob(); // add a task using the constructor as entry point Task<?> task1 = job.add("MyClass", MyClass.class, 3, "string arg"); // add a task using an instance method as entry point Task<?> task2 = job.add("myMethod", new MyClass(), 3, "string arg"); // add a task using a static method as entry point Task<?> task3 = job.add("myStaticMethod", MyClass.class, 3, "string arg");
POJO tasks present the following properties and constraints:
- if the entry point is an instance (non-static) method, the class must be serializable
- if a method has a return type (i.e. non void), the return value will be set as the task resul
- it is possible to use a public method or constructor as entry point
- a method entry point can be static or non-static
- A POJO task is added to a job by calling a JPPFJob.add() method whose first argument is the method or constructor name.
- if the entry point is a constructor or a static method, the second argument of JPPFJob.add() be a Class object representing the class that declares it.
- an annotated method or constructor can have any signature, with no limit on the number of arguments
- through the task-wrapping mechanism, a JPPF-annotated class benefits from the Task facilities described in the previous section 3.1.1 , except for the callback methods onCancel() and onTimeout().
12 Interruptibility
Sometimes, it is not desirable that the thread executing a task be interrupted upon a timeout or cancellation request. To control this behavior, a task should override its isInterruptible() method, as in this example:
public class MyTask extends AbstractTask<String> { @Override public void run() { // ... } @Override public boolean isInterruptible() { // this task can't be interrupted return false; } }
Note that, by default, a task which does not override its isInterruptible() method is interruptible.
Tasks that do not extend AbstractTask, such as Callable, Runnable, Pojo tasks or tasks annotated with @JPPFRunnable, will need to implement the Interruptibility interface to override the interruptible flag, as in this example:
public class MyCallable implements Callable<String>, Serializable, Interruptibility { @Override public String call() throws Exception { return "task result"; } @Override public boolean isInterruptible() { return false; // NOT interruptible } }
13 Cancellation handler
Tasks that need a callback invoked immediately upon cancellation, whether the thread interruption succeeded or not, can implement the CancellationHandler interface, defined as follows:
public interface CancellationHandler { // Invoked immediately when a task is cancelled void doCancelAction() throws Exception; }
This can be used on its own, or in combination with the onCancel() method as in this example:
public class MyTask extends AbstractTask<String> implements CancellationHandler { private long start; @Override public void run() { start = System.nanoTime(); // ... task processing ... } @Override public void doCancelAction() throws Exception { long elapsed = (System.nanoTime() - start) / 1_000_000L; System.out.println("doCancelAction() called after " + elapsed + " ms"); } @Override public void onCancel() { long elapsed = (System.nanoTime() - start) / 1_000_000L; System.out.println("onCancel() called after " + elapsed + " ms"); } }
The CancellationHandler interface applies to all types of tasks: tasks that extend AbstractTask, pojo tasks, Cancellable and Runnable tasks, along with @JPPFRunnable-annotated tasks.
14 Running non-Java tasks: CommandLineTask
JPPF has a pre-defined task type that allows you to run an external process from a task. This process can be any executable program (including java), shell script or command. The JPPF API also provides a set of simple classes to access data, whether in-process or outside, local or remote.
The class that will allow you to run a process is CommandLineTask. Like AbstractTask, it is an abstract class that you must extend and whose run() method you must override.
This class provides methods to:
Setup the external process name, path, arguments and environment:
public abstract class CommandLineTask<T> extends AbstractTask<T> { // list of commands passed to the shell public List<String> getCommandList() public CommandLineTask<T> setCommandList(List<String> commandList) public CommandLineTask<T> setCommandList(String... commands) // set of environment variables public Map<String,String> getEnv() public CommandLineTask<T> setEnv(Map<String, String> env) // directory in which the command is executed public String getStartDir() public CommandLineTask<T> setStartDir(String startDir) }
You can also use the built-in constructors to do this at task initialization time:
public abstract class CommandLineTask<T> extends AbstractTask<T> { public CommandLineTask(Map<String, String> env, String startDir, String... commands) public CommandLineTask(String... commands) }
Launch the process:
The process is launched by calling the following method from the run() method of the task:
public int launchProcess()
This method will block until the process has completed or is destroyed. The process exit code can also be obtained via the following method:
// get the process exit code public int getExitCode()
Setup the capture of the process output:
You can specify and determine whether the process output (either standard or error console output) is or should be captured, and obtain the captured output:
public abstract class CommandLineTask<T> extends AbstractTask<T> { public boolean isCaptureOutput() public CommandLineTask<T> setCaptureOutput(boolean captureOutput) // corresponds to what is sent to System.out / stdout public String getErrorOutput() // corresponds to what is sent to System.err / stderr public String getStandardOutput() }
Here is a sample command line task that lists the content of a directory in the node's file system:
import org.jppf.server.protocol.*; // This task lists the files in a directory of the node's host public class ListDirectoryTask extends CommandLineTask { // Execute the script @Override public void run() { try { // get the name of the node's operating system String os = System.getProperty("os.name").toLowerCase(); // the type of OS determines which command to execute if (os.indexOf("linux") >= 0) { setCommandList("ls", "-a", "/usr/local"); } else if (os.indexOf("windows") >= 0) { setCommandList("cmd", "/C", "dir", "C:\\Windows"); } // enable the capture of the console output setCaptureOutput(true); // execute the script/command launchProcess(); // get the resulting console output and set it as a result String output = getStandardOutput(); setResult(output); } catch(Exception e) { setException(e); } } }
15 Executing dynamic scripts: ScriptedTask
The class ScriptedTask allows you to execute scripts written in any dynamic language available via the javax.script APIs. It is defined as follows:
public class ScriptedTask<T> extends AbstractTask<T> { // Initialize this task with the specified language, script provided as a string // and set of variable bindings public ScriptedTask(String language, String script, String reusableId, Map<String, Object> bindings) throws IllegalArgumentException // Initialize this task with the specified language, script provided as a reader // and set of variable bindings public ScriptedTask(String language, Reader scriptReader, String reusableId, Map<String, Object> bindings) throws IllegalArgumentException, IOException // Initialize this task with the specified language, script provided as a file // and set of variable bindings public ScriptedTask(String language, File scriptFile, String reusableId, Map<String, Object> bindings) throws IllegalArgumentException, IOException // Get the JSR 223 script language to use public String getLanguage() // Get the script to execute from this task public String getScript() // Get the unique identifier for the script public String getReusableId() // Get the user-defined variable bindings public Map<String, Object> getBindings() // Add the specified variable to the user-defined bindings public ScriptedTask<T> addBinding(String name, Object value) // Remove the specified variable from the user-defined bindings public Object removeBinding(String name) }
Since ScriptedTask is a subclass of AbstractTask, it has all the features that come with it, including life cycle management, error handling, etc. There is a special processing for Throwables raised by the script engine: some engines raise throwables which are not serializable, which may prevent JPPF from capturing them and return them back to the client application. To work around this, JPPF will instantiate a new exception with the same message and stack trace as the original exception. Thus some information may be lost, and you may need to handle these exceptions from within the scripts to retrieve this information.
The reusableId parameter, provided in the constructors, indicates that, if the script engine has that capability, compiled scripts will be stored and reused, to avoid compiling the same scripts repeatedly. In a multithreaded context, as is the case in a JPPF node, multiple compilations may still occur for the same script id, since it is not possible to guarantee the thread-safety of a script engine, and compiled scripts are always associated with a single script engine instance. Thus, a script may be compiled multiple times, but normally no more than there are processing threads in the node.
Java objects can be passed as variables to the script via the bindings, either in one of the constructors or using the addBinding() and removeBinding() methods. Additionally, a ScriptedTask always adds a reference to itself with the name "jppfTask", or the equivalent in the chosen script language, for instance $jppfTask in PHP.
The value returned by the script, if any, will be set as the task result, unless it has already been set to a non-null value, by calling jppfTask.setResult(...) from within the script.
For example, in the following Javascript script:
function myFunc() { jppfTask.setResult('Hello 1'); return 'Hello 2'; } myFunc();
The result of the evaluation of the script is the string “Hello 2”. However, the task result will be “Hello 1”, since it was set before the end of the script. If you comment out the first statement of the function (jppfStat.setResult() statement), then this time the task result will be the same as the script result “Hello 2”.
16 Dependencies between tasks
16.1 Specifying task dependencies
It is possible to specify that a task has a dependency on one or more other tasks. The dependency relationship guarantees that the dependencies of a task will always be executed before the task itself. To enable dependencies, the task must implement the TaskNode interface, defined as follows:
public interface TaskNode<T> extends Task<T> { // Add a set of dependencies to this task TaskNode<T> dependsOn(Collection<TaskNode<?>> tasks) throws JPPFDependencyCycleException; // Add a set of dependencies to this task TaskNode<T> dependsOn(TaskNode<?>...tasks) throws JPPFDependencyCycleException; // Get the dependencies of this task, if any Collection<TaskNode<?>> getDependencies(); // Get the tasks that depend on this task, if any Collection<TaskNode<?>> getDependants(); // Determine whether this tasks has at least one dependency boolean hasDependency(); // Determine whether this tasks has at least one other task that depends on it boolean hasDependant(); }
As we can see, this interface extends the Task interface and therefore represents a standard JPPF task, with extended capabilities. In practice, an implementation of a task with dependencies should extend the corresponding abstract class AbstractTaskNode.
Dependencies are added using the the dependsOn(Collection<TaskNode<?>>) and dependsOn(TaskNode<?>...) methods. In the example below, the code on the left creates the diamond dependency graph shown on the right:
public class MyTask extends AbstractTaskNode<String> { @Override public void run() { setResult("Hello!"); } } MyTask t0 = new MyTask(), t1 = new MyTask(), t2 = new MyTask(), t3 = new MyTask(); t0.dependsOn(t1.dependsOn(t3), t2.dependsOn(t3)); |
|
Cycles are checked for and detected when dependencies are added, that is, each time a dependsOn(...) method is invoked. When a cycle is detected in the dependency graph, a JPPFDependencyCycleException is raised, with a message describing how the cycle occured.
Preventing cycles offers a guarantee that the tasks form a directed acyclic graph (DAG). Such a graph can be traversed in a topological order, ensuring that no task is executed before its dependencies have completed. For example, the following code creates a dependency cycle:
MyTask t1 = new MyTask("T1"), t2 = new MyTask("T2"), t3 = new MyTask("T3"); t2.dependsOn(t3); t3.dependsOn(t1); t1.dependsOn(t2, t3);
It will result in an error similar to this:
org.jppf.node.protocol.graph.JPPFDependencyCycleException: MyTask@7225790e (T1) ==> MyTask@35fb3008 (T2) ==> MyTask@3cbbc1e0 (T3) ==> MyTask@7225790e (T1)
16.2 Using the results of direct dependencies
When a task is executed in a node, it does not see the totality of the graph of tasks for the job it belongs to. Instead, it will only see its direct dependencies, in the state they are in after completion. This allows the tasks to access and use the results of their dependencies, while minimizing the impact on performance.
The dependencies of a task can be obtained through a call to the getDependencies() method in TaskNode, as in this example:
public class MyTask extends AbstractTaskNode<String> { @Override public void run() { if (hasDependency()) { // collect the results of all direct dependencies List<String> depsResults = getDependencies().stream() .map(dep -> (String) dep.getResult()) .collect(Collectors.toList()); setResult("this is me and my dependencies: " + depsResults); } else { setResult("this is just me"); } } }
Main Page > Development guide > Task objects |