JPPF, java, parallel computing, distributed computing, grid computing, parallel, distributed, cluster, grid, cloud, open source, android, .net
JPPF

The open source
grid computing
solution

 Home   About   Features   Download   Documentation   On Github   Forums 
June 04, 2023, 07:57:00 PM *
Welcome,
Please login or register.

Login with username, password and session length
Advanced search  
News: New users, please read this message. Thank you!
  Home Help Search Login Register  
Pages: [1]   Go Down

Author Topic: Modifying Node  (Read 1492 times)

Jim

  • Guest
Modifying Node
« on: November 17, 2015, 10:50:49 PM »

Hello, I was wondering if it was possible to modify how nodes behave. Is there any way I can safely change how a node acts? For example, say I wanted a master node to create a slave node every time it finishes a task. Can such a thing be done with JPPF?

Logged

lolo

  • Administrator
  • JPPF Council Member
  • *****
  • Posts: 2272
    • JPPF Web site
Re: Modifying Node
« Reply #1 on: November 18, 2015, 08:32:28 AM »

Hi,

To provision a node programmatically, you need to use the node management APIs, in particular those for node provisioning. For instance, from within a node, you could use code like this:

Code: [Select]
public class NodeUtil {
  public static void startSlave() {
    try {
      // no-args constructor uses a local connection to the mbean server
      JMXNodeConnectionWrapper jmx = new JMXNodeConnectionWrapper();
      jmx.connect();
      // get a proxy to the provisioning mbean
      JPPFNodeProvisioningMBean provisioner =
        jmx.getProxy(JPPFNodeProvisioningMBean.MBEAN_NAME, JPPFNodeProvisioningMBean.class);
      // get the current number of slaves
      int n = provisioner.getNbSlaves();
      // add one
      provisioner.provisionSlaveNodes(n + 1);
    } catch (Exception e) {
      e.printStackTrace();
    }
  }
}

Then you could call this method from within your task, upon its termination:

Code: [Select]
public class MyTask extends AbstractTask<String> {
  @Override
  public void run() {
    try {
      // ... do the work here ...
    } finally {
      // start a slave node
      NodeUtil.startSlave();
    }
  }
}

Sincerely,
-Laurent
« Last Edit: November 27, 2015, 07:21:13 AM by lolo »
Logged

Jim

  • Guest
Re: Modifying Node
« Reply #2 on: November 23, 2015, 03:07:02 AM »

Hello, Thank you for clarifying on node managements! Is the same thing possible for the driver? For example, if I wanted to create another slave node or restart all nodes every time I finish a job instead of a task, would I also use the same coding style in my application runner class? Thanks!
Logged

lolo

  • Administrator
  • JPPF Council Member
  • *****
  • Posts: 2272
    • JPPF Web site
Re: Modifying Node
« Reply #3 on: November 27, 2015, 08:13:49 AM »

Hello,

Sorry for the late answer.
Yes you can definitely restart nodes or start new slaves from the application runner, using the server management and monitoring APIs.

To access these APIs, you first first need to obtain a JMX connection to the server, from the JPPF client:

Code: [Select]
try (JPPFClient client = new JPPFClient()) {
  // get a JMX connection to the server from the JPPF client
  JMXDriverConnectionWrapper jmx = client.awaitWorkingConnectionPool().awaitWorkingJMXConnection();
  ...
}

Once we have a jmx connection, we want to receive notifications of job life cycle events so we know when a job returns from a node where it was dispatched, so we can start a new slave (if the node is a master). We will also know when a job is complete, in which case we will restart all nodes.

We define a separate class for our notification listener:

Code: [Select]
public class JMXHandler implements NotificationListener {
  private final JMXDriverConnectionWrapper jmx;
  private boolean registered = false;

  public JMXHandler(JMXDriverConnectionWrapper jmx) {
    this.jmx = jmx;
  }

  public void register() {
    try {
      // get a proxy to the job management MBean
      DriverJobManagementMBean jobManager = jmx.getJobManager();
      // register this notification listener
      jobManager.addNotificationListener(this, null, null);
      synchronized(this) {
        registered = true;
      }
    } catch (Exception e) {
      e.printStackTrace();
    }
  }

  public void unregister() {
    try {
      DriverJobManagementMBean jobManager = jmx.getJobManager();
      // unregister this notification listener
      jobManager.removeNotificationListener(this, null, null);
    } catch (Exception e) {
      e.printStackTrace();
    } finally {
      synchronized(this) {
        registered = false;
      }
    }
  }

  @Override
  public void handleNotification(Notification notification, Object handback) {
    JobNotification jobNotif = (JobNotification) notification;
    JobEventType eventType = jobNotif.getEventType();
    if (eventType == JobEventType.JOB_RETURNED) {
      // start a slave of the node where the job was dispatched
      JPPFManagementInfo nodeInfo = jobNotif.getNodeInfo();
      if (nodeInfo.isMasterNode()) {
        startOrStopSlaveNodes(nodeInfo.getUuid(), 1);
      }
    } else if (eventType == JobEventType.JOB_ENDED) {
      // handle completed jobs
      System.out.printf("job '%s' ended%n", jobNotif.getJobInformation().getJobName());
      restartAllNodes();
    }
  }

  ...
}

The register() and unregister() methods are pretty straightforward and the interesting things happen in the handleNotification() method. When we receive a JOB_RETURNED notification for a master node, we want to start a new slave node, hence the call to the startOrStopSlaveNodes() method. This method is a little more generic than we need, because it can also start or stop more than one slave if needed. It would be like this:

Code: [Select]
// use nbSlave = 1 to start a new slave, nbSlave = -1 to stop a slave
private void startOrStopSlaveNodes(String masterNodeUuid, int nbSlaves) {
  try {
    // node forwarder forwards requests to selected nodes
    JPPFNodeForwardingMBean forwarder = jmx.getNodeForwarder();
    NodeSelector selector = new UuidSelector(masterNodeUuid);
    // get the current number of slaves
    Map<String, Object> result = forwarder.getNbSlaves(selector);
    Object o = result.get(masterNodeUuid);
    if (o instanceof Integer) {
      int n = (Integer) o;
      // provision or un-provision slaves based on nbSlaves
      result = forwarder.provisionSlaveNodes(selector, n + nbSlaves);
      // check the result for each node
      for (Map.Entry<String, Object> entry: result.entrySet()) {
        if (entry.getValue() instanceof Exception) {
          System.out.printf("restarting node uuid '%s' raised throwable: %s%n", entry.getKey(), ExceptionUtils.getStackTrace((Exception) entry.getValue()));
        }
      }
    }
  } catch (Exception e) {
    e.printStackTrace();
  }
}

Here, you will notice that we do not establish a direct JMX connection with each node, using a JMXNodeConnectionWrapper as in my first post. Instead we use the server's node forwarding mechanism, along with a node selector. This is the recommended approach, because it addresses the issue of network topologies where the nodes are not directly reachable from the client.

Using the same forwarding mechanism, we could write the method that restarts all nodes like this:

Code: [Select]
// beware that killing a master node will also kill all its slaves
private void restartAllNodes() {
  try {
    // node forwarder fowards requests to selected nodes
    JPPFNodeForwardingMBean forwarder = jmx.getNodeForwarder();
    // restart all nodes
    Map<String, Object> result = forwarder.restart(NodeSelector.ALL_NODES);
    // check the result for each node
    for (Map.Entry<String, Object> entry: result.entrySet()) {
      if (entry.getValue() instanceof Exception) {
        throw (Exception) entry.getValue();
      }
    }
  } catch (Exception e) {
    e.printStackTrace();
  }
}

Additionally, if you'd prefer to use this code for any job submitted by any client from any machine, you can easily reuse the JMXHandler class and plug it into a server by using a server startup class. The main difference is that here you get a local connection to the JMX server, since the code is running in the same JVM:

Code: [Select]
// Here, the JMX connection is local and the JMX handler registration is done from a driver startup class. 
public class ServerTest implements JPPFDriverStartupSPI {
  @Override
  public void run() {
    JMXDriverConnectionWrapper jmx = new JMXDriverConnectionWrapper();
    jmx.connect();
    new JMXHandler(jmx).register();
  }
}

For your convenience, I have attached the full sources illustrating these techniques to this post.

Sincerely,
-Laurent
Logged
Pages: [1]   Go Up
 
JPPF Powered by SMF 2.0 RC5 | SMF © 2006–2011, Simple Machines LLC Get JPPF at SourceForge.net. Fast, secure and Free Open Source software downloads