Although this topic is handled in Chapter
6 Management and monitoring it took me some time to
understand how I can connect to a JPPF server and retrieve some information about all connected nodes and the jobs.
Therefore I thought it would be a good idea to share my code snippet as a reference. I need this as an alternative
to the JPPF console ... to allow the user to select the nodes a job is allowed to be executed on and
to display some progress in my custom GUI.
The last issue I had to solve:I got null when I tried to retrieve the system information of a node. => Is there a distinct property
of the node that I have to set to allow to retrieve this information?
Edit: The API says
Some or all of these properties may be missing if a security manager is installed that does not grant access to the related Runtime APIs.
http://www.jppf.org/api-2.0/org/jppf/management/JPPFSystemInformation.html#getRuntime%28%29This forum entry gives a solution that works without modifying the file jppf.policy:
http://www.jppf.org/forums/index.php?topic=4920.0Below is my updated code that works for me.
* I run the code with java7 on Windows7.
* I started a single server and a single node on a single computer using the *.bat files that come with JPPF and did not modify the configurations.
package ClusterInfo;
import java.util.Collection;
import java.util.Map;
import org.jppf.job.JobInformation;
import org.jppf.management.JMXDriverConnectionWrapper;
import org.jppf.management.JMXNodeConnectionWrapper;
import org.jppf.management.JPPFManagementInfo;
import org.jppf.management.JPPFSystemInformation;
import org.jppf.management.NodeSelector;
import org.jppf.management.forwarding.JPPFNodeForwardingMBean;
import org.jppf.utils.TypedProperties;
import org.jppf.utils.stats.JPPFStatistics;
/**
* Shows how to connect to JPPF server and retrieve some information about
* the nodes and jobs.
*/
public class ReadJppfClusterInfo {
//#region ATTRIBUTES
/**
* the serverHost on which the JPPF server/driver is running
*/
private final static String serverHost = "localhost";
/**
* the serverPort the JPPF server/driver listens to; this is shown in the second line of the server console
*/
private final static int serverPort = 11198;
/**
* set to true if you want to use secured connections
*/
private final static boolean useSecureConnection = true;
//#end region
//#region METHODS
/**
* Main
*
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception {
//get JMXDriverConnectionWrapper and connect to it
//it implements JPPFDriverAdminMBean, which can be used to get retrieve server statistics
//see http://www.jppf.org/api/index.html?org/jppf/management/JPPFDriverAdminMBean.html
JMXDriverConnectionWrapper server = new JMXDriverConnectionWrapper(serverHost, serverPort, useSecureConnection);
server.connectAndWait(5000);
if (server.isConnected()) {
//get server statistics
//JPPFStatistics serverStatistics = server.statistics();
//log(serverStatistics.toString());
//get number of nodes that would match the specified execution policy
//ExecutionPolicy policy = null;
//int numberOfNodesForPolicy = server.matchingNodes(policy)
//get number of nodes that are connected to the server
int numberOfNodes = server.nbNodes();
log("Number of nodes connected to the JPPF server: " + numberOfNodes);
//get the number of idle nodes
int numberOfIdleNodes = server.nbIdleNodes();
log("Number of idle nodes connected to the JPPF server: " + numberOfIdleNodes);
//get a collection of node information
Collection<JPPFManagementInfo> nodeInfos = server.nodesInformation();
//create a forwarder
JPPFNodeForwardingMBean forwarder = server.getProxy(JPPFNodeForwardingMBean.MBEAN_NAME, JPPFNodeForwardingMBean.class);
//use the forwarder to collect system information from the nodes
//(this is kind of a work around to be able to access information that is only
//known by the nodes by default)
Map<String, Object> systemInformationMap = forwarder.systemInformation(NodeSelector.ALL_NODES);
//do something with individual nodes
processIndividualNodes(nodeInfos, systemInformationMap);
//get all job ids
String[] jobIds = server.getAllJobIds();
//do something with individual jobs
processIndivisualJobs(server, jobIds);
} else {
throw new IllegalStateException("Could not connect to JPPF server on " + serverHost + ":" + serverPort);
}
}
/**
* Loops through the individual nodes and retrieves some information
* @param nodeInfos a collection of JPPFManagementInfo that contains information about the nodes
* @param systemInformationMap maps from uuid of nodes to systemInformation of nodes
*/
private static void processIndividualNodes(Collection<JPPFManagementInfo> nodeInfos, Map<String, Object> systemInformationMap) {
log("-- Node information: ----------------");
for(JPPFManagementInfo nodeInfo: nodeInfos) {
//get uuid
String uuid = nodeInfo.getUuid();
//get serverHost
String host = nodeInfo.getHost();
//get serverPort
int port = nodeInfo.getPort();
//get system info from map, using the node uuid
JPPFSystemInformation systemInformation = getSystemInformation(systemInformationMap, uuid);
if (systemInformation!=null) {
//get runtime info
TypedProperties runtimeInfo = systemInformation.getRuntime();
//availableProcessors : number of processors available to the JVM
//freeMemory : estimated free JVM heap memory, in bytes
//totalMemory : estimated total JVM heap memory, in bytes
//maxMemory : maximum JVM heap memory, in bytes, equivalent to the value defined through the -Xmx JVM flag
int nodeAvailableProcessors = runtimeInfo.getInt("availableProcessors");
int nodeFreeMemoryInByte = runtimeInfo.getInt("freeMemory");
String infoMessage = "Node '" + uuid + "' on '" + host + ":" + port + "' has\n" +
"" + nodeAvailableProcessors + " processors available and "
+ nodeFreeMemoryInByte + " Byte of free memory.";
log(infoMessage);
} else {
log("Could not retrive system info of node " + uuid + " on " + host + ":" + port );
}
//get JMXNodeConnectionWrapper and connect to it
//it implements JPPFNodeAdminMBean,
//see http://www.jppf.org/api-2.0/org/jppf/management/JMXNodeConnectionWrapper.html
JMXNodeConnectionWrapper node = new JMXNodeConnectionWrapper(host, port, useSecureConnection);
node.connectAndWait(5000);
if (node.isConnected()) {
//do something on the node if you want:
//node.cancelJob(jobUuid, requeue)
} else {
log("Could not connect to node " + uuid);
}
}
}
/**
* Gets the JPPFSystemInformation from the given map for the node with the given uuid.
* If the map contains a throwable instead of a JPPFSystemInformation, the throwable is
* shown and null is returned.
* @param systemInformationMap
* @param uuid
* @return
*/
private static JPPFSystemInformation getSystemInformation(Map<String, Object> systemInformationMap, String uuid) {
JPPFSystemInformation systemInformation = null;
Object value = systemInformationMap.get(uuid);
boolean containsThrowable = value instanceof Throwable;
if (containsThrowable) {
//get throwable and show it
Throwable throwable = (Throwable) value;
throwable.printStackTrace();
} else {
//get system information for the current node
systemInformation = (JPPFSystemInformation) value;
}
return systemInformation;
}
/**
* Loops through the individual jobs and retrieves some information
*
* @param server
* @param jobIds
* @throws Exception
*/
private static void processIndivisualJobs(JMXDriverConnectionWrapper server, String[] jobIds) throws Exception {
log("-- Job information: -----------------");
int totalNumberOfJobs = jobIds.length;
int numberOfPendingJobs = 0; //initial value for the loop
int numberOfSuspendedJobs = 0; //initial value for the loop
for(String jobId:jobIds) {
//get job information
JobInformation jobInformation = server.getJobInformation(jobId);
String name = jobInformation.getJobName();
int numberOfTasks = jobInformation.getTaskCount();
int priority = jobInformation.getPriority();
boolean isSuspended = jobInformation.isSuspended();
boolean isPending = jobInformation.isPending();
//count pending jobs
if (isPending) {
numberOfPendingJobs++;
}
//count suspended jobs
if (isSuspended) {
numberOfSuspendedJobs++;
}
String infoString = "The job '" + name + "' consists of " + numberOfTasks + " tasks and has the priority " + priority;
log(infoString);
}
log("-------------------------------------");
String summaryString = "The total number of jobs is " + totalNumberOfJobs + ".\n" +
"Currently " + numberOfPendingJobs + " jobs are pending and " + numberOfSuspendedJobs + " jobs are suspended.";
log(summaryString);
log("-- End of job information -----------");
}
/**
* Logs an info message on the console
* @param message
*/
static void log(String message) {
System.out.println(message);
}
//#end region
}