Hello,
Sorry for the late answer.
Yes you can definitely restart nodes or start new slaves from the application runner, using the
server management and monitoring APIs.
To access these APIs, you first first need to obtain a JMX connection to the server, from the JPPF client:
try (JPPFClient client = new JPPFClient()) {
// get a JMX connection to the server from the JPPF client
JMXDriverConnectionWrapper jmx = client.awaitWorkingConnectionPool().awaitWorkingJMXConnection();
...
}
Once we have a jmx connection, we want to receive notifications of
job life cycle events so we know when a job returns from a node where it was dispatched, so we can start a new slave (if the node is a master). We will also know when a job is complete, in which case we will restart all nodes.
We define a separate class for our notification listener:
public class JMXHandler implements NotificationListener {
private final JMXDriverConnectionWrapper jmx;
private boolean registered = false;
public JMXHandler(JMXDriverConnectionWrapper jmx) {
this.jmx = jmx;
}
public void register() {
try {
// get a proxy to the job management MBean
DriverJobManagementMBean jobManager = jmx.getJobManager();
// register this notification listener
jobManager.addNotificationListener(this, null, null);
synchronized(this) {
registered = true;
}
} catch (Exception e) {
e.printStackTrace();
}
}
public void unregister() {
try {
DriverJobManagementMBean jobManager = jmx.getJobManager();
// unregister this notification listener
jobManager.removeNotificationListener(this, null, null);
} catch (Exception e) {
e.printStackTrace();
} finally {
synchronized(this) {
registered = false;
}
}
}
@Override
public void handleNotification(Notification notification, Object handback) {
JobNotification jobNotif = (JobNotification) notification;
JobEventType eventType = jobNotif.getEventType();
if (eventType == JobEventType.JOB_RETURNED) {
// start a slave of the node where the job was dispatched
JPPFManagementInfo nodeInfo = jobNotif.getNodeInfo();
if (nodeInfo.isMasterNode()) {
startOrStopSlaveNodes(nodeInfo.getUuid(), 1);
}
} else if (eventType == JobEventType.JOB_ENDED) {
// handle completed jobs
System.out.printf("job '%s' ended%n", jobNotif.getJobInformation().getJobName());
restartAllNodes();
}
}
...
}
The register() and unregister() methods are pretty straightforward and the interesting things happen in the handleNotification() method. When we receive a JOB_RETURNED notification for a master node, we want to start a new slave node, hence the call to the startOrStopSlaveNodes() method. This method is a little more generic than we need, because it can also start or stop more than one slave if needed. It would be like this:
// use nbSlave = 1 to start a new slave, nbSlave = -1 to stop a slave
private void startOrStopSlaveNodes(String masterNodeUuid, int nbSlaves) {
try {
// node forwarder forwards requests to selected nodes
JPPFNodeForwardingMBean forwarder = jmx.getNodeForwarder();
NodeSelector selector = new UuidSelector(masterNodeUuid);
// get the current number of slaves
Map<String, Object> result = forwarder.getNbSlaves(selector);
Object o = result.get(masterNodeUuid);
if (o instanceof Integer) {
int n = (Integer) o;
// provision or un-provision slaves based on nbSlaves
result = forwarder.provisionSlaveNodes(selector, n + nbSlaves);
// check the result for each node
for (Map.Entry<String, Object> entry: result.entrySet()) {
if (entry.getValue() instanceof Exception) {
System.out.printf("restarting node uuid '%s' raised throwable: %s%n", entry.getKey(), ExceptionUtils.getStackTrace((Exception) entry.getValue()));
}
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
Here, you will notice that we do not establish a direct JMX connection with each node, using a JMXNodeConnectionWrapper as in my first post. Instead we use the server's
node forwarding mechanism, along with a
node selector. This is the recommended approach, because it addresses the issue of network topologies where the nodes are not directly reachable from the client.
Using the same forwarding mechanism, we could write the method that restarts all nodes like this:
// beware that killing a master node will also kill all its slaves
private void restartAllNodes() {
try {
// node forwarder fowards requests to selected nodes
JPPFNodeForwardingMBean forwarder = jmx.getNodeForwarder();
// restart all nodes
Map<String, Object> result = forwarder.restart(NodeSelector.ALL_NODES);
// check the result for each node
for (Map.Entry<String, Object> entry: result.entrySet()) {
if (entry.getValue() instanceof Exception) {
throw (Exception) entry.getValue();
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
Additionally, if you'd prefer to use this code for any job submitted by any client from any machine, you can easily reuse the JMXHandler class and plug it into a server by using a
server startup class. The main difference is that here you get a
local connection to the JMX server, since the code is running in the same JVM:
// Here, the JMX connection is local and the JMX handler registration is done from a driver startup class.
public class ServerTest implements JPPFDriverStartupSPI {
@Override
public void run() {
JMXDriverConnectionWrapper jmx = new JMXDriverConnectionWrapper();
jmx.connect();
new JMXHandler(jmx).register();
}
}
For your convenience, I have attached the full sources illustrating these techniques to this post.
Sincerely,
-Laurent