JPPF, java, parallel computing, distributed computing, grid computing, parallel, distributed, cluster, grid, cloud, open source, android, .net
JPPF, java, parallel computing, distributed computing, grid computing, parallel, distributed, cluster, grid, cloud, open source, android, .net
JPPF

The open source
grid computing
solution

 Home   About   Features   Download   Documentation   Forums 

Load balancing API

From JPPF 5.2 Documentation

Jump to: navigation, search

Contents

Main Page > Load Balancing > API


1 How it works

The basic flow of a load-balancer is shown in the following figure:

LoadBalancingFlow.gif

As we can see, from a high-level perspective it is made of a continuous feedback loop:

  • the load-balancer determines which tasks of a job to send to a node, according to its last computed bundle size
  • upon receiving the tasks results from the node, it also receives information on the execution performance: number of tasks, total round-trip time, and optionally the total accumulated elapsed time for tasks execution, along with the network transport overhead
  • information on the job being split can optionally be injected, allowing the load-balancer to recompute the bundle size based of the job's state and properties.

2 Bundler

In JPPF, all load-balancing algorithms implement the Bundler interface, defined as follows:

public interface Bundler<T extends LoadBalancingProfile> {
  // Get the last computed bundle size
  int getBundleSize();
  // Provide feedback from a channel after execution of a set of tasks
  void feedback(final int nbTasks, final double totalTime);
  // Get the timestamp at which this bundler was created
  long getTimestamp();
  // Perform context-independent initializations
  void setup();
  // Release the resources used by this bundler
  void dispose();
  // Get the parameters profile used by this load-balancer
  T getProfile();
}

For the feedback(int, double) method, the feedback data consists in a number of tasks that were executed, along with their total execution time in milliseconds. The execution time includes the network round trip between node and server or between server and client, along with the serialization and deserialization time, and any other overhead time from JPPF-specific processing.

The getTimestamp() methods provides the bundler's creation timestamp. It is used for the purpose of dynamically updating the load-balancing settings, as can be done with JMX API calls or from the adminiistration console. The server or client will compare this timestamp with the last modification date of the settings, and instantiate a new bundler based on these settings whenever the settings are newer.

The setup() and dispose() methods are lifecycle methods and are called right after a bundler is created, and just before it is dismissed, respectively. Their purpose is to allow the bundler to confgure/load any resources it will use during its life span, and clean these resources up when it is terminated.

Note: it is important to remember that JPPF creates a Bundler instance for each channel it load-balances against. The server will associate a distinct bundler with each node connection. Similarly, the client will create a distinct bundler for each server connection, plus another for its local executor.

For all practical purposes, it is simpler and easier to extend the AbstractBundler class, rather than to implement Bundler directly. AbstractBundler is defined as follows:

public abstract class AbstractBundler<T extends LoadBalancingProfile>
  implements Bundler<T> {

  // Creates a new instance with the specified parameters profile
  public AbstractBundler(T profile)
  // Get the max bundle size that can be used for this bundler
  public int maxSize()
  // This implementation does nothing and should be overridden in subclasses
  // that compute the bundle size based on the feedback from the nodes
  @Override public void feedback(int bundleSize, double totalTime)
  // Get the timestamp at which this bundler was created
  @Override public long getTimestamp()
  // Perform context-independent initializations
  @Override public void setup()
  // Release the resources used by this bundler
  @Override public void dispose()
  // Get the parameters of the algorithm
  @Override public T getProfile()

}

In addition to providing default implementations for a number of methods in Bundler, it also adds a maxSize() method, which provides a hint as to the maximum bundle size the bundler should return. This addresses the problem that, if too many tasks are sent to a single channel, there will be no fair distribution of the tasks and the overall grid performance will suffer from it. As mentioned, this is just a hint and there is no obligation to use it.

3 BundlerEx

The BundlerEx interface is an extension of Bundler that provides an additional feedback() method with two more parameters:

public interface BundlerEx<T extends LoadBalancingProfile> extends Bundler<T> {

  // Feedback the bundler with the performance result of a task bundle
  void feedback(int nbTasks, double totalTime, double accumulatedElapsed,
                double overheadTime);

}

As for Bundler.feedback(int, double), the nbTasks and totalTime parameters represent the number of tasks in the bundle and the total round-trip time of the bundle, respectively. The purpose of the other two parameters is to provide greater accuracy when computing the performance of the task bundle's execution:

accumulatedElapsed is the sum of the execution elapsed time of all the tasks in the bundle. If differs from the total execution time in that it considers the execution as if it occurred on a single thread.

overheadTime measures the sum of the network transport time and the JPPF overhead time, which also includes serialization and deserialization. The total time spent actually executing the tasks can then be calculated as:

executionTime = totalTime - overheadTime

To illustrate these parameters, let's consider a node that has 2 threads. For clarity's sake, let's imagine the tasks all have the same duration d. We distinguish several scenarios:

1) We send 1 task to the node: both the total execution time and the accumulated elapsed time will be equal to d:

executionTime = d
accumulatedElapsed = d

2) We send 2 tasks to the node: since the node has 2 threads, the 2 tasks will execute concurrently, and we will have:

executionTime = d
accumulatedElapsed = 2*d

3) We send 3 tasks to the node: the first two tasks will execute concurrently, while the third task will wait until a thread becomes available, i.e. until one of the first two tasks completes. We will then have:

executionTime = 2*d
accumulatedElapsed = 3*d

If the algorithm is aware of the number of threads in the node, then it can easily, in its computations, get rid of the disturbance introduced by threads remaining idle by lack of tasks to execute.

Note: at runtime, when JPPF detects that a bundler implements BundlerEx, it will call the BundlerEx.feedback() method instead of the Bundler.feedback() method.

The abstract class AbstractAdaptiveBundler implements BundlerEx, and for all practical purposes it will easier to extend it than to implement BundlerEx directly:

public abstract class AbstractAdaptiveBundler<T extends LoadBalancingProfile>
  extends AbstractBundler<T> implements BundlerEx<T>, NodeAwareness, JobAwarenessEx {
  // the last computed bundle size
  protected int bundleSize;

  // Creates a new instance with the specified parameters profile
  public AbstractAdaptiveBundler(T profile)

  // receive feedback from a node
  @Override public void feedback(int size, double totalTime,
                                 double accumulatedElapsed, double overheadTime)

  // get the last computed bundle size
  @Override public int getBundleSize()

  @Override public JPPFSystemInformation getNodeConfiguration()

  @Override public void setNodeConfiguration(JPPFSystemInformation nodeConfiguration)

  @Override public JPPFDistributedJob getJob()

  @Override public void setJob(JPPFDistributedJob job)

  @Override public void dispose()

  @Override public int maxSize()
}

AbstractAdaptiveBundler has a default implementation of BundlerEx.feedback() which computes a synthetic totalTime where the idle threads overhead is removed, then delegates to Bundler.feedback() with this synthetic value.

4 Parameters profile

A load-balancing algorithm may use zero or more parameters, which can be specified in the configuration of a JPPF driver or client. These parameters are encapsulated in an implementation of the LoadBalancingProfile interface, defined as follows:

public interface LoadBalancingProfile extends Serializable {

  // Make a copy of this profile
  @Deprecated
  LoadBalancingProfile copy();

}

As we can see, the only method is this interface is deprecated and isn't used anymore, as of JPPF v5.2. This makes LoadBalancingProfile a marker interface for all purposes and intents. As a convenience, an abstract implementation is provided, whose copy() method simply returns null: the AbstracLoadBalancingProfile class.

The link between a bundler and its parameters profile is provided by the Bundler.getLoadBalancingProfile() method. It is also generally convenient to have the LoadBalancingProfile passed in the bundler's constructor, as is the case for AbstractBundler and AbstractAdaptiveBundler.

5 Bundler provider

JPPF relies on the Service Provider Interface (SPI) mechanism to discover the defined load-balancing algorithm. To this effect, it is required that, for each algorithm, an implementation of the JPPFBundlerProvider interface be given:

public interface JPPFBundlerProvider<T extends LoadBalancingProfile> {

  // Get the name of the algorithm defined by this provider
  String getAlgorithmName();

  // Create a bundler instance using the specified parameters profile
  Bundler<T> createBundler(T profile);

  // Create a bundler profile containing the parameters of the algorithm
  T createProfile(TypedProperties configuration);

}

Notes:

a) the method getAlgorithmName() must return a name that is unique accross all algorithms, otherwise only the last discovered algorithm will be kept.

b) the parameters profile provided in the createBundler() method is created by invoking the method createProfile(TypedProperties).

c) the TypedProperties object provided in the createProfile() method contains only the configuration properties for a given parameters profile, where the names of the properties are stripped of their JPPF-specific prefix. For instance, if the configuration contains:

jppf.load.balancing.profile = myProfile1
jppf.load.balancing.profile.myProfile1.param1 = value1
jppf.load.balancing.profile.myProfile2.param2 = value2

then the configuration provided to createProfile() will contain a single property definition without prefix:

param1 = value1

d) When an algorithm doesn't use any parameter, then you can implement an associated bundler provider using LoadBalancingProfile as the generic profile type, and returning null in the createProfile() method:

public class MyProvider implements JPPFBundlerProvider<LoadBalancingProfile> {
  @Override
  public String getAlgorithmName() { return "myAlgorithm"; }

  @Override
  public Bundler createBundler(LoadBalancingProfile profile) {
    return new MyAlgorithm(); // profile is not used
  }

  @Override
  public LoadBalancingProfile createProfile(TypedProperties configuration) {
    return null; // configuration is not used
  }
}

Finally, to enable the discovery of the algorithm, it is required to create, in the META-INF/services folder, a file named:

org.jppf.load.balancing.spi.JPPFBundlerProvider

In this file, enter the fully qualified name of the bundler provider implementation, for each existing algorithm, on a separate line, as in this example:

# my custom load-balancer algorithm
com.example.MyProvider
# the JPPF "manual" algorithm
org.jppf.load.balancer.spi.FixedSizeBundlerProvider

6 Channel awareness

A load-balancer that wishes to receive information about its associated channel, to base its computations on, should implement the ChannelAwareness interface:

public interface ChannelAwareness {
  // Get the corresponding node's system information
  JPPFSystemInformation getChannelConfiguration();

  // Set the corresponding node's system information
  void setChannelConfiguration(JPPFSystemInformation channelConfiguration);
}

As we can see, this interface allows JPPF to set an attribute of type JPPFSystemInformation onto the load-balancer. The properties contained in this attribute are described in full details in the Execution policy properties section.

The setChannelConfiguration() method is a bundler life cycle callback, invoked internally by the JPPF client or server, when the channel initially establishes a connection (handshake) or when one or more proerties in its configuration, including the number of threads, are changed dynamically.

Notes:

7 Job awareness

A bundler can also receive information on the job being distributed over the serve ror node channels by implementing the JobAwarenessEx interface:

public interface JobAwarenessEx {
  // Get the current job for which load-balancing is being performed
  JPPFDistributedJob getJob();

  // Set the current job for which load-balancing is being performed
  void setJob(JPPFDistributedJob job);
}

This interface allows the client or server to set an attribute of type JPPFDistributedJob onto the bundler.

The setJob() method is a bundler life cycle callback, invoked internally by the JPPF client or server, whenever a set of tasks from a job is dispatched to a channel.

As for ChannelAwareness, a bundler that implements JobAwarenessEx will be automatically recognized and its setJob() method invoked at the appropriate times.

AbstractAdaptiveBundler also implements JobAwarenessEx and its implementation of setJob() sets a job attribute of type JPPFDistributedJob.

8 Simple code example: the "manual" algorithm

To try and translate these concepts into something more concrete, we will now walk through the implementation of the simplest of the JPPF built-in algorithms: the "manual" algorithm. The "manual" algorithm is a static, global and deterministic algorithm which always returns the same fixed bundle size for all the channels. It uses a single parameter named "size" and is configured as follows in a JPPF configuration file:

# name of the load-balancing algorithm
jppf.load.balancing.algorithm = manual
# name of the set of parameter (profile) for the algorithm
jppf.load.balancing.profile = manual_profile
# "manual_profile" profile
jppf.load.balancing.profile.manual_profile.size = 20

Remember, during the discovery and creation of the load-balancers, this configuration will be stripped down to only the essential information needed:

size = 20

Based on this, we implement a the profile class FixedSizeProfile as follows:

public class FixedSizeProfile extends AbstractLoadBalancingProfile {
  private final int size;

  // Initialize this profile with values read from the specified configuration
  public FixedSizeProfile(final TypedProperties config) {
    int n = config.getInt("size", 1);
    this.size = (n < 1) ? 1 : n; 
  }

  // Get the bundle size
  public int getSize() {
    return size;
  }
}


As we can see, this profile simply extracts the "size" parameter from the configuration at construction time and exposes it to other classes with a getter.

Given its simplicity, the algortihm does not need job awareness nor channel awareness, therefore it can be implemented by extending AbstractBundler:

public class FixedSizeBundler extends AbstractBundler<FixedSizeProfile> {
  // Initialize this bundler
  public FixedSizeBundler(FixedSizeProfile profile) {
    super(profile);
  }

  // Returns the bundle size statically assigned in the configuration
  @Override
  public int getBundleSize() {
    return profile.getSize();
  }
}

Note here that we leave the implementation of the feedback(int, double) method to the superclass, which does nothing (empty implementation).

The associated JPPFBundlerProvider, the FixedSizeBundlerProvider class, is then implemented like this:

public class FixedSizeBundlerProvider implements JPPFBundlerProvider<FixedSizeProfile> {
  // Create a bundler instance using the specified parameters profile
  @Override
  public Bundler createBundler(FixedSizeProfile profile) {
    return new FixedSizeBundler(profile);
  }

  // Create a bundler profile containing the parameters of the algorithm
  @Override
  public FixedSizeProfile createProfile(TypedProperties configuration) {
    return new FixedSizeProfile(configuration);
  }

  // Get the name of the algorithm defined by this provider
  @Override
  public String getAlgorithmName() {
    return "manual";
  }
}

Finally, for JPPF to discover the algorithm, we add the bundler provider's fully qualified class name to the service file META-INF/services/org.jppf.load.balancing.spi.JPPFBundlerProvider:

# the "manual" algorithm
org.jppf.load.balancer.spi.FixedSizeBundlerProvider
Main Page > Load Balancing > API

JPPF Copyright © 2005-2017 JPPF.org Powered by MediaWiki