Node management
From JPPF 6.1 Documentation
|
Main Page > Management and monitoring > Node management |
Out of the box in JPPF, each node provides 2 MBeans that can be accessed remotely using a JMXMP remote connector with the JMX URL “service:jmx:jmxmp://host:port”, where host is the host name or IP address of the machine where the node is running (value of “jppf.management.host” in the node configuration file), and port is the value of the property “jppf.management.port” specified in the node's configuration file.
1 Node-level management and monitoring MBean
MBean name: “org.jppf:name=admin,type=node”
This is also the value of the constant JPPFNodeAdminMBean.MBEAN_NAME.
This MBean's role is to perform management and monitoring at the node level, however we will see that it also has (for historical reasons) some task-level management functions. It exposes the JPPFNodeAdminMBean interface, which provides the functionalities described hereafter.
1.1 Getting a snapshot of the node's state
This is done by invoking the following method on the MBean:
public interface JPPFNodeAdminMBean extends JPPFAdminMBean { // Get the latest state information from the node public JPPFNodeState state() throws Exception; }
This method returns a JPPFNodeState object, which provides the following information on the node:
public class JPPFNodeState implements Serializable { // the status of the connection with the server public String getConnectionStatus() // the current task execution status public String getExecutionStatus() // the cpu time consumed by the node's execution threads // this includes the tasks cpu time and some JPPF processing overhead public long getCpuTime() // the total number of tasks executed public int getNbTasksExecuted() // the current size of the pool of threads used for tasks execution public int getThreadPoolSize() // the current priority assigned to the execution threads public int getThreadPriority() }
1.2 Updating the execution thread pool properties
public interface JPPFNodeAdminMBean extends JPPFAdminMBean { // Set the size of the node's execution thread pool public void updateThreadPoolSize(Integer size) throws Exception; // Update the priority of all execution threads public void updateThreadsPriority(Integer newPriority) throws Exception; }
1.3 Shutting down and restarting the node
public interface JPPFNodeAdminMBean extends JPPFAdminMBean { // Restart the node immediately public void restart() throws Exception; // Restart the node once it is idle public void restart(Boolean interruptIfRunning) throws Exception; // Shutdown the node immediately public void shutdown() throws Exception; // Shutdown the node once it is idle public void shutdown(Boolean interruptIfRunning) throws Exception; // Determine wether a deffered shutdown or restart was requested and not yet performed public NodePendingAction pendingAction(); // Cancel a previous deferred shutdown or restart request, if any public void cancelPendingAction(); }
These methods should be used with precautions. Please note that, once shutdown() has been invoked, it is not possible anymore to restart the node remotely.
Calling restart() or shutdown() is equivalent to calling restart(true) or shutdown(true), respectively. When any of these methods is invoked without the interruptIfRunning flag, or when the flag's value is true, the tasks that were being executed, if any, are automatically resubmitted to the server queue.
When the interruptIfRunning parameter is false, the node will wait until no more tasks are being executed before restarting or sutting down. Thus, interruptIfRunning = false indicates a deferred operation request.
It is possible to query whether a deferred action has been requested with the pendingAction() method, which returns a PendingNodeAction enum element, defined as follows:
public enum NodePendingAction { // There is no pending action NONE, // A deferred shutdown was requested SHUTDOWN, // A deferred restart was requested RESTART; }
Finally, a deferred action can be cancelled with cancelPendingAction(), provided the action hasn't yet started.
1.4 Updating the executed tasks counter
public interface JPPFNodeAdminMBean extends JPPFAdminMBean { // Reset the node's executed tasks counter to zero public void resetTaskCounter() throws Exception; // Reset the node's executed tasks counter to the specified value public void setTaskCounter(Integer n) throws Exception; }
Please note that resetTaskCounter() is equivalent to setTaskCounter(0).
1.5 Getting information about the node's host
public interface JPPFAdminMBean extends Serializable { // Get detailed information about the node's JVM properties, environment variables, // memory usage, available processors andavailable storage space JPPFSystemInformation systemInformation() throws Exception; }
This method returns an object of type JPPFSystemInformation, which is a snapshot of the environment of the JPPF node, the JVM and the host they run on. The properties defined in this object are also those used by execution policies, as we have seen in section 3.4.1 of this manual.
JPPFSystemInformation provides information about 6 different aspects of the environment:
public class JPPFSystemInformation implements Serializable { // get the system properties public TypedProperties getSystem() // get runtime information about JVM memory and available processors public TypedProperties getRuntime() // get the host environment variables public TypedProperties getEnv() // get IPV4 and IPV6 addresses assigned to the host public TypedProperties getNetwork() // get the JPPF configuration properties public TypedProperties getJppf() // get information on available disk storage public TypedProperties getStorage() }
We encourage the reader to follow the links to the above methods' Javadoc, to obtain details on each set of information, and how the information is formatted and named.
Each of the methods in JPPFSystemInformation returns a TypedProperties object. TypedProperties is a subclass of the standard java.util.Properties that provides convenience methods to read property values as primitive types other than String.
1.6 Canceling a job
public interface JPPFNodeAdminMBean extends JPPFAdminMBean { // Cancel the job with the specified uuid. The requeue parameter determines // whether the job should be requeued on the server side or not public void cancelJob(String jobUuid, Boolean requeue) throws Exception; }
This MBean method is used to cancel a job currently running in the node. The job is identified by its uuid. The requeue parameter is used to notify the server that the canceled job should be requeued on the server and executed again, possibly on an other node. If requeue is false, the job is simply terminated and any remaining task will not be executed.
This method should normally only be used by the JPPF server, in the case where a user requested that the server terminates a job. In effect, a job can contain several tasks, with each task potentially executed concurrently on a separate node. When the server receives a job termination request, it will handle the termination of “sub-jobs” (i.e. subsets of the tasks in the job) by notifying each corresponding node.
1.7 Updating the node's configuration properties
public interface JPPFNodeAdminMBean extends JPPFAdminMBean { // Update the configuration properties of the node and optionally restart the node. void updateConfiguration(Map<String, String> config, Boolean restart) throws Exception; void updateConfiguration(Map<String, String> config, Boolean restart, Boolean interrupIfRunning) throws Exception; }
These methods send a set of configuration properties to the node, that will override those defined in the node's configuration file. The restart parameter will allow the node to take the changes into account, especially in the case where the server connection or discovery properties have been changed, for instance to force the node to connect to another server, or when you need to restart the node with a different JVM by specifying the jppf.java.path property.
Note that, when the restart parameter is true, the updateConfiguration() method will call restart() or restart(interruptIfRunning) after storing the configuration updates, depending on which method overload is invoked initially.
Finally, the interruptIfRunning parameter specifies whether the node should be restarted initially, or wait until it no longer has any task to execute.
2 Task-level monitoring
MBean name : “org.jppf:name=task.monitor,type=node”.
This is also the value of the constant JPPFNodeTaskMonitorMBean.MBEAN_NAME
This MBean monitors the task activity within a node. It exposes the interface JPPFNodeTaskMonitorMBean and also emits JMX notifications of type TaskExecutionNotification.
2.1 Snapshot of the tasks activity
The interface JPPFNodeTaskMonitorMBean provides access to aggregated statistics on the tasks executed within a node:
public interface JPPFNodeTaskMonitorMBean extends NotificationEmitter { // The total number of tasks executed by the node Integer getTotalTasksExecuted(); // The total number of tasks that ended in error Integer getTotalTasksInError(); // The total number of tasks that executed sucessfully Integer getTotalTasksSucessfull(); // The total cpu time used by the tasks in milliseconds Long getTotalTaskCpuTime(); // The total elapsed time used by the tasks in milliseconds Long getTotalTaskElapsedTime(); }
2.2 Notification of tasks execution
Each time a task completes its execution in a node, the task monitor MBean will emit a JMX notification of type TaskExecutionNotification defined as follows:
public class TaskExecutionNotification extends Notification { // Get the object encapsulating information about the task public TaskInformation getTaskInformation(); // Whether this is a user-defined notification sent from a task public boolean isUerNotification(); }
This notification essentially encapsulates an object of type TaskInformation, which provides the following information about each executed task:
public class TaskInformation implements Serializable { // Get the task id public String getId() // Get the uuid of the job this task belongs to public String getJobId() // Get the cpu time used by the task public long getCpuTime() // Get the wall clock time used by the task public long getElapsedTime() // Determines whether the task had an exception public boolean hasError() // Get the timestamp for the task completion // Caution: this value is related to the node's system time, // not to the time of the notification receiver public long getTimestamp() // Get the position of the task in the job to which it belongs public int getJobPosition() }
TaskExecutionNotification also inherits the method getUserData(), which returns the object specified by the user code when calling Task.fireNotification(Object, boolean) with the second parameter set to true.
Additionally, the method isUserNotification() allows you to unambiguously distinguish between user-defined notifications, sent via Task.fireNotification(Object, boolean), and task completion notifications automatcially sent by the JPPF nodes.
3 Node maintenance
MBean name : "org.jppf:name=node.maintenance,type=node".
This is also the value of the constant JPPFNodeMaintenanceMBean.MBEAN_NAME
This MBean provides operations for the maintenance of a node. It exposes the interface JPPFNodeMaintenanceMBean defined as follows:
public interface JPPFNodeMaintenanceMBean extends Serializable { // object name for this MBean String MBEAN_NAME = "org.jppf:name=node.maintenance,type=node"; // request a reset of the resource caches of all the JPPF class loaders // maintained by the node void requestResourceCacheReset() throws Exception; }
Please note that the method requestResourceCacheReset() does not perform the reset immediately. Instead, it sets an internal flag, and the reset will take place when it is safe to do so, as part of the node's life cycle. The outcome of the reset operation is that the temporary files created by the JPPF class loaders wil be deleted, freeing space in the temporary files folder.
4 Node provisioning
Any JPPF node has the ability to start new nodes on the same physical or virtual machine, and stop and monitor these nodes afterwards. This provides a node provisioning facility, which allows to dynamically grow or shrink a JPPF grid based on the workload requirements.
This provisioning ability establishes a master/slave relationship between a standard node (master) and the nodes that it starts (slaves). Please note that a slave node cannot be in turn used as a master. Apart from this restriction, slave nodes can be managed and monitored as any oher node.
4.1 Node provisioning MBean
Node provisioning is implemented with a dedicated Mbean, which exposes the JPPFNodeProvisioningMBean interface:
MBean name: "org.jppf:name=provisioning,type=node"
This is also the value of the constant JPPFNodeProvisioningMBean.MBEAN_NAME.
JPPFNodeProvisioningMBean is defined as follows:
public interface JPPFNodeProvisioningMBean { // The object name of this MBean String MBEAN_NAME = "org.jppf:name=provisioning,type=node"; // Constant for notifications that a slave node has started String SLAVE_STARTED_NOTIFICATION_TYPE = "slave_started"; // Constant for notifications that a slave node has stopped String SLAVE_STOPPED_NOTIFICATION_TYPE = "slave_stopped"; // Get the number of slave nodes started by this MBean int getNbSlaves(); // Start or stop the required number of slaves to reach the specified number, // with an interrupt flag set to true void provisionSlaveNodes(int nbNodes); // Same action, explicitely specifiying the interrupt flag void provisionSlaveNodes(int nbNodes, boolean interruptIfRunning); // Start or stop the required number of slaves to reach the specified number, // using the specified configuration overrides and an interrupt flag set to true void provisionSlaveNodes(int nbNodes, TypedProperties configOverrides); // Same action, explicitely specifiying the interrupt flag void provisionSlaveNodes(int nbNodes, boolean interruptIfRunning, TypedProperties configOverrides); }
The method provisionSlaveNodes(int) will start or stop a number of slave nodes, according to how many slaves are already started.For instance, if 4 slaves are already running and provisionSlaveNodes(2) is invoked, then 2 slaves will be stopped. Inversely, if no slave is running, then 2 slave nodes will be started.
The method provisionSlaveNodes(int, TypedProperties) behaves differently, unless the second argument is null. Since the argument of type TypedProperties specifies overrides of the slaves configuration, this means that all already running slaves must be restarted to take these configuration changes into account. Thus, this method will first stop all running slaves, then start the specified number of slaves, after applying the configuration overrides.
When the interruptIfRunning flag is set to false, it will cause slave nodes to stop only once they are idle, that is they will not stop immediately if they are executing tasks at the time the provisioning request is made. This flag is true by default, in the provisionSlaveNodes() methods that do not specify it.
Therefore, provisionSlaveNodes(n), provisionSlaveNodes(n, null), provisionSlaveNodes(n, true) and provisiongSlaveNodes(n, true, null) all have the same effect.
The following example shows how to start new slave nodes with a single processing thread each:
// connect to the node's JMX server JMXNodeConnectionWrapper jmxNode = new JMXNodeConnectionWrapper(host, port, false); // create a provisioning proxy instance String mbeanName = JPPFNodeProvisioningMBean.MBEAN_NAME; JPPFNodeProvisioningMBean provisioning = jmxNode.getProxy(mbeanName, JPPFNodeProvisioningMBean.class); // set the configuration with a single processing thread TypedProperties overrides = new TypedProperties().setInt("jppf.processing.threads", 1); // start 2 slaves with the config overrides provisioning.provisionSlaveNodes(2, overrides); // check the number of slave nodes int nbSlaves = provisioning.getNbSlaves(); // or, using jmxNode directly get it as a JMX attribute nbSlaves = (Integer) jmxNode.getAttribute(mbeanName, "NbSlaves");
4.2 Provisioning notifications
The node provisioning MBean also emits notfications when a slave node is started or stopped. These notifications are standard JMX Notification instances, with the following characteristics:
- the notification type, obtained with Notification.getType() is one of these constants in JPPFNodeProvisioningMBean:
- the notification's user data, obtained with Notification.getUserData(),
is a JPPFProvisioningInfo object, defined as:
public class JPPFProvisioningInfo implements Serializable { // Get the uuid of the master node that launched the slave public String getMasterUuid() // Get the id of the slave, relevant only to the master public int getSlaveId() // Get the slave node process exit code public int getExitCode() // Get the command line used to start the slave node process public List<String> getLaunchCommand() }
Note that getExitCode() always returns -1 for slave started notifications.
In the following example, we define a NotificationListener which receives provisioning notifications and prints out the relevant information for each slave node that is started or stopped:
public class MyProvisioningListener implements NotificationListener { @Override public void handleNotification(Notification notif, Object handback) { switch(notif.getType()) { // case when a slave node is started case JPPFNodeProvisioningMBean.SLAVE_STARTED_NOTIFICATION_TYPE: System.out.println("slave node started: " + notif.getUserData()); break; // case when a slave node is stopped case JPPFNodeProvisioningMBean.SLAVE_STOPPED_NOTIFICATION_TYPE: System.out.println("slave node stopped: " + notif.getUserData()); break; } } }
The notification listener can then be used as follows:
try (JMXNodeConnectionWrapper nodeJmx = new JMXNodeConnectionWrapper("localhost", 12001)) { if (nodeJmx.connectAndWait(5000L)) { JPPFNodeProvisioningMBean provisioning = nodeJmx.getJPPFNodeProvisioningProxy(); // register a provisioning notification listener provisioning.addNotificationListener(new MyProvisioningListener(), null, null); // start 2 slave nodes and wait for the output of the notification listener provisioning.provisionSlaveNodes(2); } }
5 Accessing and using the node MBeans
JPPF provides an API that simplifies access to the JMX-based management features of a node, by abstracting most of the complexities of JMX programming. This API is represented by the class JMXNodeConnectionWrapper, which provides a simplified way of connecting to the node's MBean server, along with a set of convenience methods to easily access the MBeans' exposed methods and attributes.
5.1 Connecting to an MBean server
Connection to to a node MBean server is done in two steps:
a. Create an instance of JMXNodeConnectionWrapper
To connect to a local (same JVM, no network connection involved) MBean server, use the no-arg constructor:
JMXNodeConnectionWrapper wrapper = new JMXNodeConnectionWrapper();
To connect to a remote MBean server, use the constructor specifying the management host, port and secure flag:
JMXNodeConnectionWrapper wrapper = new JMXNodeConnectionWrapper(host, port, secure);
Here host and port represent the node's configuration properties "jppf.management.host" and "jppf.management.port", and secure is a boolean flag indicating whether network transport is secured via SSL/TLS.
b. Initiate the connection to the MBean server and wait until it is established
There are two ways to do this:
Synchronously:
// connect and wait for the connection to be established // choose a reasonable value for the timeout, or 0 for no timeout wrapper.connectAndWait(timeout);
Asynchronously:
// initiate the connection; this method returns immediately wrapper.connect() // ... do something else ... // check if we are connected if (wrapper.isConnected()) ...; else ...;
5.2 Direct use of the JMX wrapper
JMXNodeConnectionWrapper implements directly the interface JPPFNodeAdminMBean. This means that all the methods of this interface can be used directly from the JMX wrapper. For example:
JMXNodeConnectionWrapper wrapper = new JMXNodeConnectionWrapper(host, port, secure); wrapper.connectAndWait(timeout); // get the number of tasks executed since the last reset int nbTasks = wrapper.state().getNbTasksExecuted(); // stop the node wrapper.shutdown();
5.3 Use of the JMX wrapper's invoke() method
JMXConnectionWrapper.invoke() is a generic method that allows invoking any exposed method of an MBean.
Here is an example:
JMXNodeConnectionWrapper wrapper = new JMXNodeConnectionWrapper(host, port, secure); wrapper.connectAndWait(timeout); // equivalent to JPPFNodeState state = wrapper.state(); JPPFNodeState state = (JPPFNodeState) wrapper.invoke( JPPFNodeAdminMBean.MBEAN_NAME, "state", (Object[]) null, (String[]) null); int nbTasks = state.getNbTasksExecuted(); // get the total CPU time used long cpuTime = (Long) wrapper.invoke(JPPFNodeTaskMonitorMBean.MBEAN_NAME, "getTotalTaskCpuTime", (Object[]) null, (String[]) null);
5.4 Use of an MBean proxy
A proxy is a dynamically created object that implements an interface specified at runtime.
The standard JMX API provides a way to create a proxy to a remote or local MBeans. This is done as follows:
JMXNodeConnectionWrapper wrapper = new JMXNodeConnectionWrapper(host, port, secure); wrapper.connectAndWait(timeout); // create the proxy instance JPPFNodeTaskMonitorMBean proxy = wrapper.getProxy( JPPFNodeTaskMonitorMBean.MBEAN_NAME, JPPFNodeTaskMonitorMBean.class); // get the total CPU time used long cpuTime = proxy.getTotalTaskCpuTime();
5.5 Subscribing to MBean notifications
We have seen that the task monitoring MBean represented by the JPPFNodeTaskMonitorMBean interface is able to emit notifications of type TaskExecutionNotification. There are 2 ways to subscribe to these notifications:
a. Using a proxy to the MBean
JMXNodeConnectionWrapper wrapper = new JMXNodeConnectionWrapper(host, port, secure); wrapper.connectAndWait(timeout); JPPFNodeTaskMonitorMBean proxy = wrapper.getProxy( JPPFNodeTaskMonitorMBean.MBEAN_NAME, JPPFNodeTaskMonitorMBean.class); // subscribe to all notifications from the MBean proxy.addNotificationListener(myNotificationListener, null, null);
b. Using the MBeanServerConnection API
JMXNodeConnectionWrapper wrapper = new JMXNodeConnectionWrapper(host, port, secure); wrapper.connectAndWait(timeout); MBeanServerConnection mbsc = wrapper.getMbeanConnection(); ObjectName objectName = new ObjectName(JPPFNodeTaskMonitorMBean.MBEAN_NAME); // subscribe to all notifications from the MBean mbsc.addNotificationListener(objectName, myNotificationListener, null, null);
Here is an example notification listener implementing the NotificationListener interface:
// this class counts the number of tasks executed, along with // the total cpu time and wall clock time used by the node public class MyNotificationListener implements NotificationListener { AtomicInteger taskCount = new AtomicInteger(0); AtomicLong cpuTime = new AtomicLong(0L); AtomicLong elapsedTime = new AtomicLong(0L); // Handle an MBean notification public void handleNotification(Notification notification, Object handback) { TaskExecutionNotification jppfNotif = (TaskExecutionNotification) notification; TaskInformation info = jppfNotif.getTaskInformation(); int n = taskCount.incrementAndGet(); long cpu = cpuTime.addAndGet(info.getCpuTime()); long elapsed = elapsedTime.addAndGet(info.getElapsedTime()); // display the statistics for every 50 tasks executed if (n % 50 == 0) { System.out.println("nb tasks = " + n + ", cpu time = " + cpu + " ms, elapsed time = " + elapsed +" ms"); } } }; NotificationListener myNotificationListener = new MyNotificationListener();
6 Remote logging
It is possible to receive logging messages from a node as JMX notifications. Specific implementation are available for Log4j and JDK logging.
To configure Log4j for emitting JMX notifications, edit the log4j configuration files of the node and add the following:
### direct messages to the JMX Logger ### log4j.appender.JMX = org.jppf.logging.log4j.JmxAppender log4j.appender.JMX.layout = org.apache.log4j.PatternLayout log4j.appender.JMX.layout.ConversionPattern = %d [%-5p][%c.%M(%L)]: %m\n ### set log levels - for more verbose logging change 'info' to 'debug' ### log4j.rootLogger = INFO, JPPF, JMX
To configure the JDK logging to send JMX notifications, edit the JDK logging configuration file of the node and add the following:
# list of handlers handlers = java.util.logging.FileHandler, org.jppf.logging.jdk.JmxHandler # Write log messages as JMX notifications org.jppf.logging.jdk.JmxHandler.level = FINEST org.jppf.logging.jdk.JmxHandler.formatter = org.jppf.logging.jdk.JPPFLogFormatter
To receive the logging notifications from a remote application, you can use the following code:
// get a JMX connection to the node MBean server JMXNodeConnectionWrapper jmxNode = new JMXNodeConnectionWrapper(host, port, secure); jmxNode.connectAndWait(5000L); // get a proxy to the MBean JmxLogger nodeProxy = jmxNode.getProxy(JmxLogger.DEFAULT_MBEAN_NAME, JmxLogger.class); // use a handback object so we know where the log messages come from String source = "node " + jmxNode.getHost() + ":" + jmxNode.getPort(); // subbscribe to all notifications from the MBean NotificationListener listener = new MyLoggingHandler(); nodeProxy.addNotificationListener(listener, null, source); // Logging notification listener that prints remote log messages // to the console public class MyLoggingHandler implements NotificationListener { // handle the logging notifications public void handleNotification(Notification notification, final Object handback) { String message = notification.getMessage(); String toDisplay = handback.toString() + ": " + message; System.out.println(toDisplay); } }
Main Page > Management and monitoring > Node management |