CustomLoadBalancer.java

/*
 * JPPF.
 * Copyright (C) 2005-2016 JPPF Team.
 * http://www.jppf.org
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.jppf.example.loadbalancer.server;
import org.jppf.load.balancer.*;
import org.jppf.management.JPPFSystemInformation;
import org.jppf.node.protocol.*;
import org.jppf.utils.TypedProperties;
import org.jppf.utils.configuration.JPPFProperties;
import org.slf4j.*;
/**
 * This implementation of a load-balancing algorithm illustrates the use of
 * the {@link NodeAwareness} and {@link JobAwareness} APIs, allowing the algorithm to work
 * based on known information about the nodes and the jobs.
 * <p>In this implementation, we assume each job provides the following metadata:
 * <ul>
 * <li>"task.memory": the maximum memory footprint of each task in bytes</li>
 * <li>"task.time": the maximum duration of a task in milliseconds</li>
 * <li>"allowed.time": the maximum allowed time for execution of a single set of tasks on a node, in milliseconds</li>
 * <li>"id": the id of the current job being executed, used for debugging and logging</li>
 * </ul>
 * @author Laurent Cohen
 */
public class CustomLoadBalancer extends AbstractBundler implements NodeAwareness, JobAwarenessEx, ContextAwareness {
  /**
   * Logger for this class.
   */
  private static Logger log = LoggerFactory.getLogger(CustomLoadBalancer.class);
  /**
   * Holds information about the node's environment and configuration.
   */
  private JPPFSystemInformation nodeConfiguration = null;
  /**
   * Holds information about the current job being dispatched.
   */
  private JPPFDistributedJob jobInformation = null;
  /**
   * The current number of tasks to send to the node.
   */
  private int bundleSize = 1;
  /**
   * Holds information about the execution context.
   */
  private JPPFContext jppfContext = null;
  /**
   * Creates a new instance with the specified parameters profile.
   * @param profile the parameters of the load-balancing algorithm.
   */
  public CustomLoadBalancer(final LoadBalancingProfile profile) {
    super(profile);
    if (log.isDebugEnabled()) log.debug("creating CustomLoadBalancer #" + this.bundlerNumber);
  }
  /**
   * Make a copy of this bundler.
   * Which parts are actually copied depends on the implementation.
   * @return a new <code>Bundler</code> instance.
   */
  @Override
  public Bundler copy() {
    return new CustomLoadBalancer(null);
  }
  /**
   * Get the current number of tasks to send to the node.
   * @return the bundle size as an int value.
   */
  @Override
  public int getBundleSize() {
    return bundleSize;
  }
  /**
   * Get the corresponding node's system information.
   * @return a {@link JPPFSystemInformation} instance.
   */
  @Override
  public JPPFSystemInformation getNodeConfiguration() {
    return nodeConfiguration;
  }
  /**
   * Set the corresponding node's system information.
   * @param nodeConfiguration a {@link JPPFSystemInformation} instance.
   */
  @Override
  public void setNodeConfiguration(final JPPFSystemInformation nodeConfiguration) {
    this.nodeConfiguration = nodeConfiguration;
    if (log.isDebugEnabled()) log.debug("setting node configuration on bundler #" + bundlerNumber + ": " + nodeConfiguration);
  }
  /**
   * Get the max bundle size that can be used for this bundler.
   * @return the bundle size as an int.
   */
  @Override
  protected int maxSize() {
    return jppfContext == null ? 300 : jppfContext.getMaxBundleSize();
  }
  /**
   * Get the current job's inforamtion.
   * @return a {@link JPPFDistributedJob} instance.
   */
  @Override
  public JPPFDistributedJob getJob() {
    return jobInformation;
  }
  /**
   * Set the current job's information.
   * @param jobInformation a {@link JPPFDistributedJob} instance.
   */
  @Override
  public void setJob(final JPPFDistributedJob jobInformation) {
    this.jobInformation = jobInformation;
    // compute the number of tasks to send to the node,
    // based on the new job metadata
    computeBundleSize();
  }
  /**
   * Compute the number of tasks to send to the node. This is the actual algorithm implementation.
   */
  private void computeBundleSize() {
    if (log.isDebugEnabled()) log.debug("computing bundle size for bundler #" + this.bundlerNumber);
    // Get the job metadata in an easy to use format
    TypedProperties props = new TypedProperties(getJob().getMetadata().getAll());
    // the maximum memory footprint of each task in bytes
    long taskMemory = props.getLong("task.memory", 10 * 1024);
    // fetch the length of a task in milliseconds
    long taskTime = props.getLong("task.time", 10);
    // fetch the maximum allowed time for execution of a single set of tasks on a node
    long allowedTime = props.getLong("allowed.time", -1);
    // if allowed time is not defined we assume no time limit
    if (allowedTime <= 0) allowedTime = Long.MAX_VALUE;
    // get the number of processing threads in the node
    int nbThreads = getNodeConfiguration().getJppf().get(JPPFProperties.PROCESSING_THREADS);
    // max node heap size of the node in bytes
    long nodeMemory = getNodeConfiguration().getRuntime().getLong("maxMemory");
    // we assume 20 MB of the node's memory is taken by JPPF code and add-ons
    nodeMemory -= 20 * 1024 * 1024;
    if (nodeMemory < 0) nodeMemory = 0;
    // max number of tasks that can fit in the node's heap
    // we count 2*taskMemory because it will take approximately twice the memory footprint
    // when each task is serialized or deserialized in the node (serialized data + the object itself)
    int maxTasks = (int) (nodeMemory / (2 * taskMemory));
    // the maximum time needed to execute maxTasks tasks on nbThreads parallel threads
    long maxTime = taskTime * maxTasks / nbThreads;
    // if maxTime is not a multiple of nbThreads, make the adjustment
    if ((maxTasks % nbThreads) != 0) maxTime += taskTime;
    // if max time is longer than the allowed time, reduce the number of tasks by the appropriate amount
    if (maxTime > allowedTime) {
      maxTasks = (int) ((maxTasks * allowedTime) / maxTime);
    }
    // finally, store the computation result, ensuring that 1 <= size <= maxSize
    bundleSize = Math.max(1, Math.min(maxTasks, maxSize()));
    // for debugging and logging purposes
    String id = props.getString("id", "unknown id");
    // log the new bundle size
    if (log.isDebugEnabled()) log.debug("bundler #" + this.bundlerNumber + " computed new bundle size = " + bundleSize + " for job id = " + id);
  }
  /**
   * Release the resources used by this bundler.
   */
  @Override
  public void dispose() {
    if (log.isDebugEnabled()) log.debug("disposing bundler #" + this.bundlerNumber);
    this.jobInformation = null;
    this.nodeConfiguration = null;
  }
  @Override
  public JPPFContext getJPPFContext() {
    return jppfContext;
  }
  @Override
  public void setJPPFContext(final JPPFContext context) {
    this.jppfContext = context;
  }
}