CustomLoadBalancerRunner.java
/*
 * JPPF.
 * Copyright (C) 2005-2014 JPPF Team.
 * http://www.jppf.org
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.jppf.example.loadbalancer.client;

import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.List;

import org.jppf.client.*;
import org.jppf.example.loadbalancer.common.MyCustomPolicy;
import org.jppf.node.policy.AtLeast;
import org.jppf.node.policy.ExecutionPolicy;
import org.jppf.node.protocol.Task;
import org.jppf.server.protocol.JPPFJobMetadata;

/**
 * This is a fully commented job runner for the Custom Load Balancer sample.
 @author Laurent Cohen
 */
public class CustomLoadBalancerRunner
{
  /**
   * The JPPF client, handles all communications with the server.
   * It is recommended to only use one JPPF client per JVM, so it
   * should generally be created and used as a singleton.
   */
  private static JPPFClient jppfClient =  null;
  /**
   * Value representing one kilobyte.
   */
  private static int KB = 1024;
  /**
   * Value representing one megabyte.
   */
  private static int MB = 1024 * KB;

  /**
   * The entry point for this application runner to be run from a Java command line.
   @param args by default, we do not use the command line arguments,
   * however nothing prevents us from using them if need be.
   */
  public static void main(final String...args)
  {
    try
    {
      // create the JPPFClient. This constructor call causes JPPF to read the configuration file
      // and connect with one or multiple JPPF drivers.
      jppfClient = new JPPFClient();

      // create a runner instance.
      CustomLoadBalancerRunner runner = new CustomLoadBalancerRunner();

      // Create a "heavy" job
      // The is the maximum memory footprint of a task in the job
      int taskFootprint = 20 * MB;
      // We want 40 MB available for each heavy task running concurrently.
      // This is not easily doable with a standard execution policy, so we create a custom one.
      // We use double the task footprint because it will take approximately twice the memory footprint
      // when each task is serialized or deserialized in the node (serialized data + the object itself).
      ExecutionPolicy heavyPolicy = new MyCustomPolicy("" (2*taskFootprint));
      // Tasks in the job will have 20 MB data size, will last at most 5 seconds,
      // and the maximum execution time for a set of tasks will be no more than 60 seconds.
      // With 4 tasks and the node's heap set to 64 MB, the load-balancer will be forced to split the job in 2 at least.
      JPPFJob heavyJob = runner.createJob("Heavy Job"4, taskFootprint, 5L*1000L60L*1000L, heavyPolicy);
      // This job has long-running tasks, so we can submit it and create the second job while it is executing.
      jppfClient.submitJob(heavyJob);

      // Create a "light" job
      // We want at least 2 light tasks executing concurrently in a node, to mitigate the network overhead.
      ExecutionPolicy lightPolicy = new AtLeast("jppf.processing.threads"2);
      // Tasks in the job will have 10 KB data size, will last at most 80 milliseconds,
      // and the maximum execution time for a set of tasks will be no more than 3 seconds.
      // Here the allowed time will be the limiting factor for the number of tasks that can be sent to a node,
      // so the if the node has 4 threads, the job should be split in 2: one set of 150 tasks, then one set of 50 (approximately).
      // (total time = 200 * 80 / 4)
      JPPFJob lightJob = runner.createJob("Light Job"20010*KB, 80L3L*1000L, lightPolicy);
      // Submit the light job.
      jppfClient.submitJob(lightJob);

      // now we obtain and process the results - this will cause a lot of logging to the console!
      runner.processJobResults(heavyJob);
      runner.processJobResults(lightJob);
    }
    catch(Exception e)
    {
      e.printStackTrace();
    }
    finally
    {
      if (jppfClient != nulljppfClient.close();
    }
  }

  /**
   * Create a JPPF job that can be submitted for execution.
   @param jobName the name (or id) assigned to the job.
   @param nbTasks the number of tasks to add to the job.
   @param size the data size of each task, in bytes.
   @param duration the duration of each tasks, in milliseconds.
   @param allowedTime the maximum execution time for a set of tasks on a node.
   @param policy execution policy assigned to the job.
   @return an instance of the {@link JPPFJob} class.
   @throws Exception if an error occurs while creating the job or adding tasks.
   */
  public JPPFJob createJob(final String jobName, final int nbTasks, final int size, final long duration,
      final long allowedTime, final ExecutionPolicy policythrows Exception
  {
    // Create a JPPF job.
    JPPFJob job = new JPPFJob();

    // Give this job a readable unique id that we can use to monitor and manage it.
    job.setName(jobName);

    // Specify the job metadata.
    JPPFJobMetadata metadata = (JPPFJobMetadatajob.getMetadata();
    metadata.setParameter("task.memory""" + size);
    metadata.setParameter("task.time""" + duration);
    metadata.setParameter("allowed.time""" + allowedTime);
    metadata.setParameter("id", jobName);

    // Add the tasks to the job.
    for (int i=1; i<=nbTasks; i++)
    {
      // create a task with the specified data size and duration
      Task<Object> task = new CustomLoadBalancerTask(size, duration);
      // task id contains the job name for easier identification
      task.setId(jobName + " - task " + i);
      job.add(task);
    }

    // Assign an execution policy to the job.
    job.getSLA().setExecutionPolicy(policy);

    // Set the job in non-blocking (asynchronous) mode.
    job.setBlocking(false);

    return job;
  }

  /**
   * Collect and process the execution results of a job.
   @param job the JPPF job to process.
   @throws Exception if an error occurs while processing the results.
   */
  public void processJobResults(final JPPFJob jobthrows Exception
  {
    // Print a banner to visually separate the results for each job.
    System.out.println("\n********** Results for job : " + job.getName() " **********\n");

    // We are now ready to get the results of the job execution.
    // We use JPPFJob.awaitResults() for this. This method returns immediately with the
    // results if the job has completed, otherwise it waits until the job execution is complete.
    List<Task<?>> results = job.awaitResults();

    // Process the results
    for (Task<?> task: results)
    {
      // If the task execution resulted in an exception, display the stack trace.
      if (task.getThrowable() != null)
      {
        // process the exception here ...
        StringWriter sw = new StringWriter();
        task.getThrowable().printStackTrace(new PrintWriter(sw));
        System.out.println("Exception occurred while executing task " + task.getId() ":");
        System.out.println(sw.toString());
      }
      else
      {
        // Display the task result.
        System.out.println("Result for task " + task.getId() " : " + task.getResult());
      }
    }
  }
}