JPPF, java, parallel computing, distributed computing, grid computing, parallel, distributed, cluster, grid, cloud, open source, android, .net
JPPF, java, parallel computing, distributed computing, grid computing, parallel, distributed, cluster, grid, cloud, open source, android, .net
JPPF

The open source
grid computing
solution

 Home   About   Features   Download   Documentation   On Github   Forums 

Submitting multiple jobs concurrently

From JPPF 6.1 Documentation

Jump to: navigation, search

Contents

Main Page > Development guide > Concurrent jobs

In this section, we will present a number of ways to design an application, such that it can execute multiple jobs concurrently, using a single JPPFClient instance. These can be seen as common reusable patterns, in an attempt at covering the most frequent use cases where submission and processing of multiple jobs in parallel is needed.

The patterns presented here all make the assumption that job submissions are performed through a single instance of JPPFClient. It is indeed the recommended way to work with JPPF, since it benefits the most from the built-in features of the JPPF client:

  • thread safety
  • ability to connect to multiple remote drivers, to the same driver multiple times, or any combination of these
  • load-balancing between available connections
  • ability to submit a job over multiple connections for increased performance
  • fine-grained filtering of eligible connections for each job, via the job's client-side execution policy
  • connection failover strategies defined via the connection pools priorities
Important note: starting from JPPF 6.1, it is no longer required to have multiple connections to the JPPF server to enable concurrent job processing. Each connection can now handle an unlimited number of jobs concurrently.

1 Job submissions from multiple threads

This pattern explores how concurrent jobs can be submitted by the same JPPFClient instance by multiple threads. In this pattern, we are using blocking jobs, since each job is submitted in its own thread, thus we can afford blocking that thread until the job completes:

public void multipleThreadsBlockingJobs() {
  // a pool of threads that will submit the jobs and retrieve their results
  ExecutorService executor = Executors.newFixedThreadPool(4);
  try (JPPFClient client = new JPPFClient()) {
    // handles for later retrieval of the job submissions results
    List<Future<List<Task<?>>>> futures = new ArrayList<>();
    for (int i=0; i<4; i++) {
      JPPFJob job = new JPPFJob();
      // ... set attributes and add tasks ...
      // submit the job as a Callable in a separate thread
      futures.add(executor.submit(client::submit));
    }
    futures.forEach(future -> {
      try {
        // wait until each job has completed and retrieve its results
        List<Task<?>> results = future.get();
        // ... process the job results ...
        processResults(results);
      } catch (Exception e) {
        e.printStackTrace();
      }
    });
  }
  executor.shutdown();
}

2 Multiple non-blocking jobs from a single thread

Here, we take advantage of the asynchronous nature of non-blocking jobs to write a much less cumbersome version of the previous pattern:

public void singleThreadNonBlockingJobs() {
  try (JPPFClient client = new JPPFClient()) {
    // holds the submitted jobs for later retrieval of their results
    List<JPPFJob> jobs = new ArrayList<>();
    // submit the jobs without blocking the current thread
    for (int i=0; i<4; i++) {
      JPPFJob job = new JPPFJob();
      // ... set other attributes and add tasks ...
      jobs.add(job);
      client.submitAsync(job); // non-blocking operation
    }
    // get and process the jobs results
    jobs.forEach(job -> {
      // synchronize on each job's completion: this is a blocking operation
      List<Task<?>> results = job.awaitResults();
      processResults(results); // process the job results
    });
  } catch(Exception e) {
    e.printStackTrace();
  }
}

3 Fully asynchronous processing

Here, we use a JobListener to retrieve and process the results of the jobs via jobs life cycle notifications. The only synchronization occurs in the main method, to await on the global completion of all jobs:

public void asynchronousNonBlockingJobs() {
  try (JPPFClient client = new JPPFClient()) {
    int nbJobs = 4;
    // synchronization helper that tells us when all jobs have completed
    final CountDownLatch countDown = new CountDownLatch(nbJobs);
    for (int i=0; i<nbJobs; i++) {
      JPPFJob job = new JPPFJob();
      // results will be processed asynchronously in the job listener's jobEnded() notifications
      job.addJobListener(new JobListenerAdapter() {
        @Override
        public void jobEnded(JobEvent event) {
          List<Task<?>> results = event.getJob().getAllResults();
          processResults(results); // process the job results
          // decrease the jobs count down; when it reaches 0, countDown.await() will exit 
          countDown.countDown();
        }
      });
      // ... set other attributes, add tasks, submit the job ...
      client.submitAsync(job);
    }
    // wait until all jobs are complete, i.e. until the count down reaches 0
    countDown.await();
  } catch(Exception e) {
    e.printStackTrace();
  }
}

4 Job streaming

Job streaming occurs when an application is continuously creating and executing jobs, based on a potentially infinite source of data. The main problem to overcome in this use case is when jobs are created much faster than they are executed, thus potentially filling the memory until an OutOfMemoryError occurs. A possible solution to this is to build a job provider with a limiting factor, which determines the maximum number of jobs that can be running at any given time.

Additionally, an Iterator is a Java data structure that fits particulary well the streaming pattern, thus our job provider will implement the Iterable interface:

public class JobProvider extends JobListenerAdapter
   implements Iterable<JPPFJob>, Iterator<JPPFJob> {
  private int concurrencyLimit;  // limit to the maximum number of concurrent jobs
  private int currentNbJobs = 0; // current count of concurrent jobs

  public JobProvider(int concurrencyLimit) {
    this.concurrencyLimit = concurrencyLimit;
  }

  // implementation of Iterator<JPPFJob>
  @Override public synchronized boolean hasNext() {
    boolean hasMoreJobs = false;
    // ... compute hasMoreJobs, e.g. check if there is any more data to read
    return hasMoreJobs;
  }

  @Override public synchronized JPPFJob next() {
    // wait until the number of running jobs is less than the concurrency limit
    while (currentNbJobs >= concurrencyLimit) {
      try {
        wait();
      } catch (Exception e) {
        e.printStackTrace();
      }
    }
    return buildJob();
  }

  @Override public void remove() {
    throw new UnsupportedOperationException("remove() is not supported");
  }

  private synchronized JPPFJob buildJob() {
    JPPFJob job = new JPPFJob();
    // ... build the tasks by reading data from a file, a database, etc...
    // ... add the tasks to the job ...
    // add a listener to update the concurrent jobs count when the job ends
    job.addJobListener(this);
    // increase the count of concurrently running jobs
    currentNbJobs++;
    return job;
  }

  // implementation of JobListener
  @Override synchronized  public void jobEnded(JobEvent event) {
    processResults(event.getJob().getAllResults()); // process the job results
    // decrease the count of concurrently running jobs
    currentNbJobs--;
    // wake up the threads waiting in next()
    notifyAll();
  }

  // implementation of Iterable<JPPFJob>
  @Override public Iterator<JPPFJob> iterator() { return this; }

  private void processResults(List<Task<?>> results) { // ... }
}

Note the use of a JobListener to ensure the current count of jobs is properly updated, so that the provider can create new jobs from its data source. It is also used to process the job results asynchronously.

Now that we have a job provider, we can use it to submit the jobs it creates to a JPPF grid:

public void jobStreaming() {
  try (JPPFClient client = new JPPFClient()) {
    // create the job provider with a limiting concurrency factor
    JobProvider jobProvider = new JobProvider(4);
    // build and submit the provided jobs until no more is available
    jobProvider.forEach(client::submitAsync));
  } catch(Exception e) {
    e.printStackTrace();
  }
}

4.1 The AbstractJPPFJobStream helper class

Given the potential complexity of the job streaming pattern, we found it useful to provide a helper class which alleviates the work of a developer by implementing all the wiring and internal state transitions, such that the developers can solely focus on the specifics of the jobs they want to submit. The abstract class AbstractJPPFJobStream serves this purpose. It is defined as follows:

public abstract class AbstractJPPFJobStream extends JobListenerAdapter
  implements Iterable<JPPFJob>, Iterator<JPPFJob>, AutoCloseable {

  // Initialize this job provider with a concurrency limit
  public AbstractJPPFJobStream(final int concurrencyLimit)

  // Determine whether there is at least one more job in the stream
  // This method must be overriden in subclasses
  public abstract boolean hasNext()
  // Get the next job in the stream
  public synchronized JPPFJob next() throws NoSuchElementException
  // Create the next job in the stream, along with its tasks
  // This method must be overriden in subclasses and is called from next()
  protected abstract JPPFJob createNextJob()
  // This operation is not supported
  public void remove() throws UnsupportedOperationException

  // Update the state of this job stream and process the results of a job asynchronously
  public void jobEnded(final JobEvent event)
  // Callback invoked from jobEnded() when a job is complete
  // This method must be overriden in subclasses
  protected abstract void processResults(JPPFJob job)

  // implementation of Iterable<JPPFJob>
  public Iterator<JPPFJob> iterator()

  // Close this stream and release the underlying resources it uses
  // This method must be overriden in subclasses
 public abstract void close() throws Exception

  // Determine whether any job is still being executed
  public synchronized boolean hasPendingJob()
  // Get the number of executed jobs
  public synchronized int getJobCount()
  // Get the number of executed tasks
  public synchronized int getTaskCount()
}

This class is designed to be subclassed and to this effect, we have outlined the four abstract methods that must be overridden in any subclass. We can see how this will simplify the work of any implementation. Let's re-implement the previous example by subclassing AbstractJPPFJobStream:

public class JobProvider extends AbstractJPPFJobStream {
  public JobProvider(int concurrencyLimit) {
    super(concurrencyLimit);
  }

  @Override
  public synchronized boolean hasNext() {
    boolean hasMoreJobs = false;
    // ... compute hasMoreJobs, e.g. check if there is any more data to read
    return hasMoreJobs;
  }

  @Override
  protected JPPFJob createNextJob() {
    JPPFJob job = new JPPFJob();
    // ... build the tasks by reading data from a file, a database, etc...
    // ... add the tasks to the job and return it ...
    return job;
  }

  @Override
  protected void processResults(List<Task<?>> results) { // ... }

  @Override
  public void close() throws Exception {
    // close a file, database connection, etc...
  }
}

5 Dedicated sample

For a fully working and documented example of the patterns seen in the previous sections, you are invited to explore the dedicated Concurrent Jobs demo.

Main Page > Development guide > Concurrent jobs

JPPF Copyright © 2005-2020 JPPF.org Powered by MediaWiki