/** * Copyright 2013, Somete Group, LLC. All rights reserved. $LastChangedDate$ * $LastChangedBy$ $Revision$ */ package com.sometegroup.jppf.client.assessors; import java.util.Collection; import java.util.HashMap; import java.util.Map; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import javax.management.Notification; import javax.management.NotificationListener; import org.apache.log4j.Logger; import org.jppf.client.JPPFClient; import org.jppf.client.JPPFClientConnection; import org.jppf.job.JobEventType; import org.jppf.job.JobNotification; import org.jppf.management.JMXDriverConnectionWrapper; import org.jppf.management.JPPFDriverAdminMBean; import org.jppf.management.JPPFManagementInfo; import org.jppf.server.job.management.DriverJobManagementMBean; import org.jppf.utils.stats.JPPFStatistics; import org.jppf.utils.stats.JPPFStatisticsHelper; import com.gkn.util.LoggerUtil; import com.sometegroup.jclouds.ServerDeletor; public class ServerMonitor implements Runnable { private static final Logger LOG = LoggerUtil.newLogger(ServerMonitor.class); private final JPPFClient client; private JMXDriverConnectionWrapper jmx = null; public ServerMonitor(JPPFClient client) { this.client = client; } @Override public void run() { // wait until a standard connection to the server is established while (!client.hasAvailableConnection()) try { Thread.sleep(10L); } catch (InterruptedException e) { e.printStackTrace(); } JPPFClientConnection connection = client.getClientConnection(); // wait until a JMX connection is created while ((jmx = connection.getJmxConnection()) == null) try { Thread.sleep(10L); } catch (InterruptedException e) { e.printStackTrace(); } // wait until the JMX connection is established while (!jmx.isConnected()) try { Thread.sleep(10L); } catch (InterruptedException e) { e.printStackTrace(); } // Register listeners NotificationListener myJobNotificationListener = new MyJobNotificationListener(); try { DriverJobManagementMBean proxy = jmx.getProxy( DriverJobManagementMBean.MBEAN_NAME, DriverJobManagementMBean.class); // subscribe to all notifications from the MBean proxy.addNotificationListener(myJobNotificationListener, null, null); } catch (Exception e) { e.printStackTrace(); } LOG.info("Job Notification Listener registered."); try { JPPFStatistics stats = (JPPFStatistics) jmx.invoke( JPPFDriverAdminMBean.MBEAN_NAME, "statistics"); stats.reset(); LOG.info("Driver statistics reset."); } catch (Exception e) { e.printStackTrace(); } // Signal AssessorsClient to start jobs AssessorsClient.getStartSignal().countDown(); } // this class prints a message each time a job is added to the server's queue public class MyJobNotificationListener implements NotificationListener { private Map startTime = new HashMap(); // Handle an MBean notification public void handleNotification(Notification notification, Object handback) { JobNotification jobNotif = (JobNotification) notification; String jobId = jobNotif.getJobInformation().getJobName(); String jobUuid = jobNotif.getJobInformation().getJobUuid(); JobEventType eventType = jobNotif.getEventType(); switch (eventType) { case JOB_ENDED: if (LOG.isInfoEnabled()) { LOG.info(String.format("Job %s ended. Elapsed time %.1f minutes.", jobId, (jobNotif.getTimeStamp() - startTime.get(jobUuid)) / 60000f)); } break; case JOB_DISPATCHED: if (!startTime.containsKey(jobUuid)) { startTime.put(jobUuid, jobNotif.getTimeStamp()); if (LOG.isInfoEnabled()) { LOG.info(String.format("Job %s started", jobId)); } } if (LOG.isInfoEnabled()) { LOG.info(String.format( "Job %s dispatched %d tasks to %s. Queue now has %.0f tasks.", jobId, jobNotif.getJobInformation().getTaskCount(), jobNotif .getNodeInfo().getHost(), queueSize())); } break; case JOB_RETURNED: double queueSize = queueSize(); if (LOG.isInfoEnabled()) { LOG.info(String.format( "Job %s returned %d tasks from %s. Queue now has %.0f tasks.", jobId, jobNotif.getJobInformation().getTaskCount(), jobNotif .getNodeInfo().getHost(), queueSize)); } if (AssessorsClient.isCloud() && queueSize <= 0d) { try { @SuppressWarnings("unchecked") Collection idleNodes = (Collection) jmx .invoke(JPPFDriverAdminMBean.MBEAN_NAME, "idleNodesInformation"); for (JPPFManagementInfo node : idleNodes) { if (LOG.isInfoEnabled()) { LOG.info("Shutting down idle node: " + node.getHost()); } shutdownNode(node); } } catch (Exception e) { e.printStackTrace(); } } break; case JOB_QUEUED: if (LOG.isInfoEnabled()) { LOG.info(String.format("Job %s queued. Queue now has %.0f tasks.", jobId, queueSize())); } break; case JOB_UPDATED: default: } } private double queueSize() { try { JPPFStatistics stats = (JPPFStatistics) jmx.invoke( JPPFDriverAdminMBean.MBEAN_NAME, "statistics"); return stats.getSnapshot(JPPFStatisticsHelper.TASK_QUEUE_COUNT) .getLatest(); } catch (Exception e) { e.printStackTrace(); } return Double.NaN; } private void shutdownNode(JPPFManagementInfo node) { // Shut down this cloud server in a new thread ExecutorService executor = Executors.newFixedThreadPool(2); executor.execute(new ServerDeletor(node)); executor.shutdown(); } }; }