Submitting multiple jobs concurrently
From JPPF 5.2 Documentation
|
Main Page > Development guide > Concurrent jobs |
In this section, we will present a number of ways to design an application, such that it can execute multiple jobs concurrently, using a single JPPFClient instance. These can be seen as common reusable patterns, in an attempt at covering the most frequent use cases where submission and processing of multiple jobs in parallel is needed.
1 Base requirement: multiple connections
For a JPPF client to be able to process multiple jobs in parallel, it is mandatory that the client holds multiple connections, whether to a single server, multiple servers or any combination of these. The number of connections determines how many jobs can be sent concurently to a server. If only one connection is available, then only one job at a time will actually be processed by the JPPF grid. Other jobs submitted by the same client will remain in the client queue until the first job has completed.
Multiple connections can be obtained either statically from the JPPF client configuration, or dynamically using the connection pool APIs. In the rest of this section, we will simply assume this is done appropriately.
2 Job submissions from multiple threads
This pattern explores how concurrent jobs can be submiited by the same JPPFClient instance by multiple threads. In this pattern, we are using blocking jobs, since each job is submitted in its own thread, thus we can afford blocking that thread until the job completes:
public void multipleThreadsBlockingJobs() { // a pool of threads that will submit the jobs and execute their results ExecutorService executor = Executors.newFixedThreadPool(4); try (JPPFClient client = new JPPFClient()) { // handles for later retrieval of the job submissions results List<Future<List<Task<?>>>> futures = new ArrayList<>(); for (int i=0; i<4; i++) { JPPFJob job = new JPPFJob(); // ... set attributes and add tasks ... // submit the job in a separate thread futures.add(executor.submit(new JobSubmitter(client, job))); } for (Future<List<Task<?>>> future: futures) { try { // wait until each job has completed and retrieve its results List<Task<?>> results = future.get(); // ... process the job results ... processResults(results); } catch (Exception e) { e.printStackTrace(); } } } executor.shutdown(); }
The class JobSubmitter is defined as follows:
public class JobSubmitter implements Callable<List<Task<?>>> { private final JPPFClient client; private final JPPFJob job; public JobSubmitter(JPPFClient client, JPPFJob job) { this.client = client; this.job = job; } @Override public List<Task<?>> call() throws Exception { // just submit the job return client.submitJob(job); } }
3 Multiple non-blocking jobs from a single thread
Here, we take advantage of the asynchronous nature of non-blocking jobs to write a much less cumbersome version of the previous pattern:
public void singleThreadNonBlockingJobs() { try (final JPPFClient client = new JPPFClient()) { // holds the submitted jobs for later retrieval of their results List<JPPFJob> jobs = new ArrayList<>(); // submit the jobs without blocking the current thread for (int i=0; i<4; i++) { JPPFJob job = new JPPFJob(); job.setBlocking(false); // ... set other attributes and add tasks ... jobs.add(job); client.submitJob(job); // non-blocking operation } // get and process the jobs results for (JPPFJob job: jobs) { // synchronize on each job's completion: this is a blocking operation List<Task<?>> results = job.awaitResults(); processResults(results); // process the job results } } catch(Exception e) { e.printStackTrace(); } }
4 Fully asynchronous processing
Here, we use a JobListener to retrieve and process the results of the jobs. The only synchronization occurs in the main method, to await on the global completion of all jobs:
public void asynchronousNonBlockingJobs() { try (final JPPFClient client = new JPPFClient()) { int nbJobs = 4; // synchronization helper that tells us when all jobs have completed final CountDownLatch countDown = new CountDownLatch(nbJobs); for (int i=0; i<nbJobs; i++) { JPPFJob job = new JPPFJob(); job.setBlocking(false); // results will be processed asynchronously within // the job listener's jobEnded() notifications job.addJobListener(new JobListenerAdapter() { @Override public void jobEnded(JobEvent event) { List<Task<?>> results = event.getJob().getAllResults(); processResults(results); // process the job results // decrease the jobs count down // when the count reaches 0, countDown.await() will exit immediately countDown.countDown(); } }); // ... set other attributes, add tasks, submit the job ... client.submitJob(job); } // wait until all jobs are complete, i.e. until the count down reaches 0 countDown.await(); } catch(Exception e) { e.printStackTrace(); } }
5 Job streaming
Job streaming occurs when an application is continuously creating and executing jobs, based on a potentially infinite source of data. The main problem to overcome in this use case is when jobs are created much faster than they are executed, thus potentially filling the memory until an OutOfMemoryError occurs. A possible solution to this is to build a job provider with a limiting factor, which determines the maximum number of jobs that can be running at any given time.
Additionally, an Iterator is a Java data structure that fits particulary well the streaming pattern, thus our job provider will implement the Iterable interface:
public class JobProvider extends JobListenerAdapter implements Iterable<JPPFJob>, Iterator<JPPFJob> { private int concurrencyLimit; // limit to the maximum number of concurrent jobs private int currentNbJobs = 0; // current count of concurrent jobs public JobProvider(int concurrencyLimit) { this.concurrencyLimit = concurrencyLimit; } // implementation of Iterator<JPPFJob> @Override public synchronized boolean hasNext() { boolean hasMoreJobs = false; // ... compute hasMoreJobs, e.g. check if there is any more data to read return hasMoreJobs; } @Override public synchronized JPPFJob next() { // wait until the number of running jobs is less than the concurrency limit while (currentNbJobs >= concurrencyLimit) { try { wait(); } catch (Exception e) { e.printStackTrace(); } } return buildJob(); } @Override public void remove() { throw new UnsupportedOperationException("remove() is not supported"); } private synchronized JPPFJob buildJob() { JPPFJob job = new JPPFJob(); // ... build the tasks by reading data from a file, a database, etc... // ... add the tasks to the job ... job.setBlocking(false); // add a listener to update the concurrent jobs count when the job ends job.addJobListener(this); // increase the count of concurrently running jobs currentNbJobs++; return job; } // implementation of JobListener @Override synchronized public void jobEnded(JobEvent event) { processResults(event.getJob().getAllResults()); // process the job results // decrease the count of concurrently running jobs currentNbJobs--; // wake up the threads waiting in next() notifyAll(); } // implementation of Iterable<JPPFJob> @Override public Iterator<JPPFJob> iterator() { return this; } private void processResults(List<Task<?>> results) { // ... } }
Note the use of a JobListener to ensure the current count of jobs is properly updated, so that the provider can create new jobs from its data source. It is also used to process the job results asynchronously.
Now that we have a job provider, we can use it to submit the jobs it creates to a JPPF grid:
public void jobStreaming() { try (JPPFClient client = new JPPFClient()) { // create the job provider with a limiting concurrency factor JobProvider jobProvider = new JobProvider(4); // build and submit the provided jobs until no more is available for (JPPFJob job: jobProvider) { client.submitJob(job); } } catch(Exception e) { e.printStackTrace(); } }
5.1 The AbstractJPPFJobStream helper class
Given the potential complexity of the job streaming pattern, we found it useful to provide a helper class which alleviates the work of a developer by implementing all the wiring and internal state transitions, such that the developers can solely focus on the specifics of the jobs they want to submit. The abstract class AbstractJPPFJobStream serves this purpose. It is defined as follows:
public abstract class AbstractJPPFJobStream extends JobListenerAdapter implements Iterable<JPPFJob>, Iterator<JPPFJob>, AutoCloseable { // Initialize this job provider with a concurrency limit public AbstractJPPFJobStream(final int concurrencyLimit) // Determine whether there is at least one more job in the stream // This method must be overriden in subclasses public abstract boolean hasNext() // Get the next job in the stream public synchronized JPPFJob next() throws NoSuchElementException // Create the next job in the stream, along with its tasks // This method must be overriden in subclasses and is called from next() protected abstract JPPFJob createNextJob() // This operation is not supported public void remove() throws UnsupportedOperationException // Update the state of this job stream and process the results of a job asynchronously public void jobEnded(final JobEvent event) // Callback invoked from jobEnded() when a job is complete // This method must be overriden in subclasses protected abstract void processResults(JPPFJob job) // implementation of Iterable<JPPFJob> public Iterator<JPPFJob> iterator() // Close this stream and release the underlying resources it uses // This method must be overriden in subclasses public abstract void close() throws Exception // Determine whether any job is still being executed public synchronized boolean hasPendingJob() // Get the number of executed jobs public synchronized int getJobCount() // Get the number of executed tasks public synchronized int getTaskCount() }
This class is designed to be subclassed and to this effect, we have outlined the four abstract methods that must be overriden in any subclass. We can see how this will simplify the work of any implementation. Let's re-implement the previous example by subclassing AbstractJPPFJobStream:
public class JobProvider extends AbstractJPPFJobStream { public JobProvider(int concurrencyLimit) { super(concurrencyLimit); } @Override public synchronized boolean hasNext() { boolean hasMoreJobs = false; // ... compute hasMoreJobs, e.g. check if there is any more data to read return hasMoreJobs; } @Override protected JPPFJob createNextJob() { JPPFJob job = new JPPFJob(); // ... build the tasks by reading data from a file, a database, etc... // ... add the tasks to the job and return it ... return job; } @Override protected void processResults(List<Task<?>> results) { // ... } @Override public void close() throws Exception { // close a file, database connection, etc... } }
6 Dedicated sample
For a fully working and documented example of the patterns seen in the previous sections, you are invited to explore the dedicated Concurrent Jobs demo.
Main Page > Development guide > Concurrent jobs |