public class BroadcastTask extends AbstractTask<Object> { @Override public void run() { // the execution result sent as a notification user object Map<String, Object> result = new HashMap<>(); try { // do whatever the task is supposed to do here // ... // it could be a more complex data structure instead, such as a Map result.put("success", true); } catch (Exception e) { result.put("success", false); // add the exception so the notification listener knows what happened result.put("exception", e); } // first param is a user object, here our result Map fireNotification(result, true); }}
public class BroadcastRunner { public static void main(final String[] args) { try (JPPFClient client = new JPPFClient()) { JMXDriverConnectionWrapper jmx = client.awaitWorkingConnectionPool() .awaitJMXConnections(Operator.AT_LEAST, 1, true).get(0); final Map<String, Boolean> responseMap = new ConcurrentHashMap<>(); // register a notification listener on all nodes NotificationListener listener = new NotificationListener() { @Override public void handleNotification(final Notification notification, final Object handback) { JPPFNodeForwardingNotification notif = (JPPFNodeForwardingNotification) notification; TaskExecutionNotification taskNotif = (TaskExecutionNotification) notif.getNotification(); if (taskNotif.isUserNotification()) { @SuppressWarnings("unchecked") Map<String, Object> result = (Map<String, Object>) taskNotif.getUserData(); boolean success = (Boolean) result.get("success"); responseMap.put(notif.getNodeUuid(), success); if (!success) { Exception e = (Exception) result.get("exception"); System.out.printf("Exception occurred in node '%s' : %s%n", notif.getNodeUuid(), ExceptionUtils.getStackTrace(e)); } } } }; String listeneerID = jmx.registerForwardingNotificationListener(NodeSelector.ALL_NODES, JPPFNodeTaskMonitorMBean.MBEAN_NAME, listener, null, null); JPPFJob job = new JPPFJob(); job.setName("my broadcast job"); job.getSLA().setBroadcastJob(true); job.add(new BroadcastTask()).setId("broadcast task id"); client.submitJob(job); // count of nodes is our exit condition int nbNodes = jmx.nbNodes(); // some notifications may arrive in the client after the job is done while (responseMap.size() < nbNodes) Thread.sleep(10L); // handle the results int successCount = 0, errorCount = 0; for (Map.Entry<String, Boolean> entry: responseMap.entrySet()) { boolean success = entry.getValue(); System.out.printf("Execution on node '%s' is a %s%n", entry.getKey(), (success ? "success" : "failure")); if (success) successCount++; else errorCount++; } System.out.printf("got tresponses from all nodes: success count = %d, error count = %d%n", successCount, errorCount); } catch (Exception e) { e.printStackTrace(); } }}