Programming
From JPPFWiki
| Main Page > Programming |
Software Requirements
- Any platform with Java 2, Standard Edition (J2SE) 5.0 or later.
- A JPPF distribution (for implementation the jppf-client.jar and jppf-common.jar files are required).
Client Implementation
The client has to split up the calculations into pieces that are executed in a single server node, and pass them to the JPPF framework. The atomic portions that are executed in a JPPF node are called tasks. It is up to the client application to define a reasonable part of the complete algorithm as a task. Small tasks may cause a high transport overhead, while large tasks may cause poor parallelism in execution.
Task Implementation
A task that can be submitted to the JPPF framework for execution must be a specialization of the JPPFTask. It must at least implement the run() method. The run() method contains the code that is executed in a JPPF node.
import org.jppf.server.protocol.JPPFTask;
public class HelloTask extends JPPFTask
{
public void run()
{
setResult("Hello World");
}
}
The run() method must store the result of the calculation using the setResult() method.
Client Initialisation
A JPPF client must initialize a connection to a JPPF server. This is done by creating an instance of JPPFClient. The client connects with default parameters to the server, if no configuration file is set by system properties.
import org.jppf.client.JPPFClient;
public static void main(String[] args)
{
JPPFClient client = new JPPFClient();
// .. use the client
}
Task Submission
Tasks are submitted to the JPPFClient instance as a List of JPPFTask instances. The list is passed to the submit() method, and a List of JPPFTask instances is returned, containing the results.
public static void main(String[] args)
{
JPPFClient client = new JPPFClient();
List<JPPFTask> tasks = new ArrayList<JPPFTask>();
tasks.add(new HelloTask());
try
{
// execute the tasks
List<JPPFTask> results = client.submit(tasks, null);
}
catch (Exception e)
{
e.printStackTrace();
}
}
Task Results
The result of a submission of a task list is returned by the submit() method. Each of the returned JPPFTask instances contains the result of this specific task, or the Exception if the execution failed. A client should always first check if an Exception occurred before the result is examined.
List<JPPFTask> results = client.submit(tasks, null);
JPPFTask task = results.get(0);
if (null == task.getException())
{
Object result = task.getResult();
// use the result ...
}
else
{
Exception ex = task.getException();
// handle the exception ...
}
Advanced Features
Sharing data among tasks
The DataProvider interface enables tasks to share data, in order to avoid transmitting the data multiple times. To use a data provider, you need to instantiate it, add data through its setValue(Object, Object) method, then send it along with the tasks.
Matrix a = new Matrix(300);
DataProvider dataProvider = new MemoryMapDataProvider();
dataProvider.setValue("matrix.a", a);
jppfClient.submit(taskList, dataProvider);
Alternate forms of tasks submission
In addition to the 2-arguments method submit(List<JPPFTask>, DataProvider) JPPFClient provides the following methods to submit tasks:
-
List<JPPFTask> submit(List<JPPFTask>, DataProvider, ExecutionPolicy): synchronous submission with an execution policy. -
void submitNonBlocking(List<JPPFTask>, DataProvider, TaskResultListener): asynchronous submission with a TaskResultListener that handles the notifications of execution results. -
void submitNonBlocking(List<JPPFTask>, DataProvider, TaskResultListener, ExecutionPolicy): asynchronous submission with a TaskResultListener and an execution policy.
Accessing remote data from a task
Another implementation of DataProvider, URLDataProvider, enables reading and writing data located through a URL. Its getValue(Object) method takes a URL as parameter and returns a corresponding input stream. Likewise, its setValue(Object, Object) method takes an input stream as its second parameter, allowing the task to write arbitrary data to a remote location.
public class FileDownloadTestTask extends JPPFTask
{
public void run()
{
try
{
// download the XSD file from an HTTP location
URL url = new URL("http://www.jppf.org/Options.xsd");
InputStream is = (InputStream) getDataProvider().getValue(url);
String s = FileUtils.readTextFile(new BufferedReader(new InputStreamReader(is)));
setResult(s);
// upload the same file onto a remote FTP server
url = new URL("ftp://ftp.myserver.org/Options.xsd");
getDataProvider().setValue(url, new ByteArrayInputStream(s.getBytes()));
}
catch(Exception e)
{
setException(e);
}
}
}
Server Connections Pooling
JPPF clients provide very flexible options for load-balancing and failover. These options are setup using the client configuration file, as described here.
Associated with the asynchronous submission of multiple sets of tasks, this enables a powerful way of load-balancing the execution of tasks and of boosting the overall throughput.
The asynchronous submission of a set of tasks is performed as follows:
public class MyConnectionPool
{
public static void main(String...args)
{
try
{
// Initialize the client and connections pool
JPPFClient client = new JPPFClient();
// Compute the tasks to submit for execution
List<JPPFTask> list1 = ....; // first set of tasks
List<JPPFTask> list2 = ....; // second set of tasks
// Initialize the collectors for the results of each submission
JPPFResultCollector collector1 = new JPPFResultCollector(list1.size());
JPPFResultCollector collector2 = new JPPFResultCollector(list2.size());
// submit the tasks in an asynchronous, non blocking way
client.submitNonBlocking(list1, null, collector1);
client.submitNonBlocking(list2, null, collector2);
// ... execute some code while the tasks are executing ...
...
// get the results back, waiting until the client has received them
List<JPPFTask> result1 = collector1.waitForResults();
List<JPPFTask> result2 = collector2.waitForResults();
// ... process the results ...
}
catch(Exception e)
{
e.printStackTrace();
}
}
}
Here the 2 major points are:
- The use of the new class JPPFResultCollector whose role is to receive, collect and store the results of a submission
- The use of JPPFClient.submitNonBlocking(taskList, dataProvider, resultCollector) to perform an asynchronous submission
Using non-serializable classes in a task
We have seen that a JPPF task requires the use of serializable classes for the type of its instance variables, or any object that is part of its object graph.
Sometimes, however, the source code of these classes is not accessible (for instance in a 3rd-party library) and it may be an issue to use these classes with JPPF.
Here, we are presenting an overview of 2 ways to get around this limitation.
Task-localized serialization
One way to work around this is to use the XStream library directly in the tasks code. XStream is a powerful XML serialization framework that enables the serialization to XML of classes that do not implement the Serializable interface.
The principle is to perform the serialization to XML on the client side, when the task is first instantiated, then perform the deserialization from XML on the node side, when it is deserialized.
Here, we are showing an example task that uses XStream to serialize a simple object graph. This sample uses XStream version 1.2.2 and only requires that you have the xstream-1.2.2.jar in the classpath of the JPPF client.
public class XstreamTask extends JPPFTask
{
/**
* Person object to serialize with xstream. Note that it must be declared as transient.
*/
private transient Person person = null;
/**
* Xml representation of the Person object to deserialize with xstream.
*/
private String personXml = null;
/**
* Initialize this task with the specified person.
* @param person a <code>Person</code> instance.
*/
public XstreamTask(Person person)
{
this.person = person;
// serialize person to XML
XStream xstream = new XStream(new DomDriver());
personXml = xstream.toXML(person);
}
/**
* Run this task.
* @see java.lang.Runnable#run()
*/
public void run()
{
// deserialize person from XML
XStream xstream = new XStream(new DomDriver());
person = (Person) xstream.fromXML(personXml);
String s = person.toString();
System.out.println("deserialized this person: " + s);
setResult(s);
}
}
The full code for this example is accessible from these links:
- Person.java: the Person class, not Serializable
- PhoneNumber.java: the PhoneNumber class, not Serializable
- XstreamTask.java: the JPPF task code
- XstreamRunner.java: the JPPF client code
Specifying alternate object streams
JPPF performs objects transport and associated serialization by the means of object streams, i.e. instances of ObjectInputStream and ObjectOutputStream or subclasses of these classes.
It is now possible to specify alternate object stream classes for a JPPF grid, enabling the use of non-serializable classes without any extra coding required for the JPPF task developer.
JPPF provides 2 ways to achieve this:
1. Specifying the object stream implementation classes
This is done in the JPPF configuration file, by adding these 2 properties:
# configure the object input stream implementation jppf.object.input.stream.class = my.package.MyObjectInputStream # configure the object output stream implementation jppf.object.output.stream.class = my.package.MyObjectOutputStream
Please note that the object stream implementations must have a constructor that takes an InputStream parameter for the object input stream class, and an OutputStream parameter for the object output stream class.
2. Implementing an object stream builder
An object stream builder is an object that instantiates input and output object streams.
It is defined as an implementation of the JPPFObjectStreamBuilder interface:
/**
* Interface for all builders instantiating alternate object input streams and output streams.
*/
public interface JPPFObjectStreamBuilder
{
/**
* Obtain an input stream used for deserializing objects.
* @param in input stream to read from.
* @return an ObjectInputStream
* @throws Exception if an error is raised while creating the stream.
*/
ObjectInputStream newObjectInputStream(InputStream in) throws Exception;
/**
* Obtain an Output stream used for serializing objects.
* @param out output stream to write to.
* @return an ObjectOutputStream
* @throws Exception if an error is raised while creating the stream.
*/
ObjectOutputStream newObjectOutputStream(OutputStream out) throws Exception;
}
You then configure JPPF to use this object stream builder by specifying the following property in the JPPF configuration file:
# configure the object stream builder implementation jppf.object.stream.builder = my.package.MyObjectStreamBuilder
Notes:
a. when no alternate object stream is specified, JPPF uses the standard java.io.ObjectInputStream and java.io.ObjectOutputStream of the JDK
b. when alternate object streams are specified, they must be used by all JPPF clients, servers and nodes, otherwise JPPF will not work. The implementation classes must also be present in the classpath of all JPPF components
c. JPPF has a built-in Object Stream Builder that uses XStream to provide XML serialization: XstreamObjectStreamBuilder. To use it, simply specify "jppf.object.stream.builder = org.jppf.serialization.XstreamObjectStreamBuilder" in the JPPF configuration files. You will also need the XStream 1.3 (or later) jar file and the xpp3 jar file available in the XStream distribution
Caveats & Pitfalls
Some common problems are described in the following sections.
Remote Execution
The tasks are executed remotely in a server node. The instances of the tasks are serialized and transferred to the node server for execution. This might cause some unexpected behaviour, in particular:
-
All
staticfields are initialised in the node server, getting the initial values again as defined in the class. So static fields should not be used in tasks, unless they are final or set to the desired value in the run() method. -
All
transientfields are not transferred to the node server. They will have anullvalue. Fields that are required in the run method should not be transient, while all other fields might be declared transient to reduce the amount of transferred data. - Resources like network connections and files cannot be used in tasks. Network connections have to be re-opened in the run method, and files must be read into class fields before the task is submitted.
Security Restrictions
The node server that executes the tasks typically runs with a restricted access policy. The policy needs to be adapted to the access required by tasks. The policy file delivered with JPPF is named jppf.policy and contains only those rights required by the JPPF node server itself.
