JPPF, java, parallel computing, distributed computing, grid computing, parallel, distributed, cluster, grid, cloud, open source, android, .net
JPPF

The open source
grid computing
solution

 Home   About   Features   Download   Documentation   On Github   Forums 
November 22, 2019, 05:02:46 PM *
Welcome,
Please login or register.

Login with username, password and session length
Advanced search  
News: New users, please read this message. Thank you!
Home Help Search Login Register
Pages: [1]   Go Down

Author Topic: Executing jobs in parallel  (Read 2245 times)

michaelm

  • Guest
Executing jobs in parallel
« on: April 25, 2014, 04:50:46 PM »

Hi,

Parallelization of jobs is explained in the doc and in several threads of this forum, but after many different tries to set jppf.properties file appropriately, I still cannot execute jobs in parallel.

Here is my configuration (derived from the application template).

I created a class extending CommandTaskManager (the task just waits a few seconds with "ping 1.1.1.1 -n 1 -w 5000")
I put one task in each of my job.
Then I add 10 jobs and start them as nonBlockingJobs :

Code: [Select]
public static void main(final String...args) {
    try (JPPFClient jppfClient = new JPPFClient()) {
      TemplateApplicationRunner runner = new TemplateApplicationRunner();
      for (int i = 0 ; i < 10 ; i++) {
          JPPFJob job = runner.createJob(i);
          runner.executeNonBlockingJob(jppfClient, job);
      }
    } catch(Exception e) {
      e.printStackTrace();
    }
  }

Individual jobs are excecuted correctly.
I started 1 server and several nodes on the same machine.
I changed the configuration file conf/jppf.properties with
Code: [Select]
# discovery enabled
jppf.discovery.enabled = true
# pool of 5 connections
jppf.pool.size = 5
or with
Code: [Select]
# discovery disabled
jppf.discovery.enabled = false
# list of driver names
jppf.drivers = myDriver
# connection to the driver
myDriver.jppf.server.host = some_host
myDriver.jppf.server.port = some_port
# pool of 5 connections
myDriver.jppf.pool.size = 5

but whatever "jppf.pool.size" is, all my tasks are always executed sequentially.
And from the gui (topology), I never see more than 1 node "executing" at a time.

Any idea what I missed.

Thanks
MichaŽl

Logged

lolo

  • Administrator
  • JPPF Council Member
  • *****
  • Posts: 2257
    • JPPF Web site
Re: Executing jobs in parallel
« Reply #1 on: April 25, 2014, 07:07:24 PM »

Hello MichaŽl,

Your configuration file is correct. I believe it is the code of the tutorial that is, in fact, misleading: it shows that a job can be executed asynchronously, but does not show how multiple jobs can be executed concurrently. Because of this, your code is effectively executing the jobs sequentially.

A proper execution pattern for concurrent jobs would be like this:

Code: [Select]
public static void main(String...args) {
  try (JPPFClient jppfClient = new JPPFClient()) {
    TemplateApplicationRunner runner = new TemplateApplicationRunner();
    List<JPPFJob> jobs = new ArrayList<>();
    // submit concurrent jobs
    for (int i = 0; i < 10; i++) {
      JPPFJob job = runner.createJob(i);
      job.setBlocking(false);
      jppfClient.submitJob(job);
      jobs.add(job);
    }
    for (JPPFJob job: jobs) {
      JPPFResultCollector collector = (JPPFResultCollector) job.getResultListener();
      List<Task<?>> results = collector.awaitResults();
      runner.processExecutionResults(results);
    }
  } catch (Exception e) {
    e.printStackTrace();
  }
}

With this, you should observe that the jobs are indeed executed concurrently: you should see multiple "executing" nodes in the topology view, and also multiple jobs in the "Job data" view.

I registered a JPPF bug for this: JPPF-256 The tutorial does not properly demonstrate the use of concurrent jobs
I will update the tutorial accordingly, and I apologize for the confusion this is causing.

Sincerely,
-Laurent
Logged

michaelm

  • Guest
Re: Executing jobs in parallel
« Reply #2 on: April 25, 2014, 09:17:59 PM »

Hi,

Works like a charm, thanks a lot for the quick response
an keep the good work,

MichaŽl
Logged

dbwiddis

  • JPPF Council Member
  • *****
  • Posts: 106
Re: Executing jobs in parallel
« Reply #3 on: May 08, 2014, 08:39:09 AM »

From a tutorial perspective you may wish to elaborate on this model slightly, and instead of just "sequentially" using awaitResults(), you could implement a polling loop checking the status of the asynchronous jobs.  For example, in my implementation I have a SLA implemented to execute certain jobs only on certain servers, and want to take actions on those servers after the job completes, even while other jobs are executing.  Here's a snippet:
Code: [Select]
    for (final JPPFJob job : jobList) {
        // A = Name
        // B = Boolean
        // C = Collector
        collectorList.add(new Triplet<String, Boolean, JPPFResultCollector>(job
            .getName(), false, submitNonBlockingJob(job)));
    }

    boolean waitingForJobs;

    do {
      // Wait for jobs to execute, and process them appropriately
      // Stall for a reasonable polling interval
      Thread.sleep(20000);
      waitingForJobs = false;
      for (int i = 0; i < collectorList.size(); i++) {
        // If job is done, take action otherwise keep polling
        if (!collectorList.get(i).getB()) {
          if (collectorList.get(i).getC().getStatus() == SubmissionStatus.COMPLETE) {
            // We are now ready to get the results of the job execution.
            processExecutionResults(collectorList.get(i).getC().getAllResults());
            // Flag that we've reported results
            collectorList.get(i).setB(true);

            // Do stuff with the corresponding job/servers

          } else {
            waitingForJobs = true;
          }
        }
      }
    } while (waitingForJobs);
Logged

lolo

  • Administrator
  • JPPF Council Member
  • *****
  • Posts: 2257
    • JPPF Web site
Re: Executing jobs in parallel
« Reply #4 on: May 08, 2014, 11:58:41 AM »

Thank you very much for this inspiring idea. In fact, I think we could extend it to demonstrate a fully asynchonous processing model, based on non-blocking jobs and job listeners. Something like this:

Code: [Select]
public void doAsync(final JPPFClient jppfClient, final List<JPPFJob> jobList) throws Exception {
  final CountDownLatch countDown = new CountDownLatch(jobList.size());
  for (JPPFJob job: jobList) {
    job.setBlocking(false); // just to be sure
    job.addJobListener(new JobListenerAdapter() {
      @Override
      public void jobEnded(final JobEvent event) {
        JPPFResultCollector collector = (JPPFResultCollector) event.getJob().getResultListener();
        processExecutionResults(collector.getAllResults());
        countDown.countDown();
      }
    });
    jppfClient.submitJob(job);
  }
  countDown.await(); // wait until all jobs are finished
}

Of course, this is basically the simplest form of asynchronous processing I could come up with. It doesn't really dig into more complex use cases such as job streaming, where the number of jobs is not known ahead and where we must be careful to limit the number of jobs in memory to avoid out of memory conditions.

What do you guys think?

-Laurent
Logged

dbwiddis

  • JPPF Council Member
  • *****
  • Posts: 106
Re: Executing jobs in parallel
« Reply #5 on: May 09, 2014, 08:47:35 PM »

Much more elegant than my hack!  I like it, and it's doubly good as a demonstration of listeners.

Probably need to add a few comments before and after the await() to demonstrate where to put "do other stuff while waiting for jobs"  and "do stuff after jobs are all complete"
Logged

dbwiddis

  • JPPF Council Member
  • *****
  • Posts: 106
Re: Executing jobs in parallel
« Reply #6 on: May 10, 2014, 01:35:56 AM »

So I just implemented this into my code.  Very nice approach.

I have to do quite a bit of stuff before the job starts (use jclouds to start up and configure cloud servers, etc.) so I combined your approach with a Runnable instance, and left the job as blocking so I could have a benchmark time when it finished.

Using runnable and a thread pool will help limit the numer of jobs being processed and your memory concerns, as well.
Logged

dbwiddis

  • JPPF Council Member
  • *****
  • Posts: 106
Re: Executing jobs in parallel
« Reply #7 on: May 10, 2014, 06:44:43 AM »

Thank you very much for this inspiring idea. In fact, I think we could extend it to demonstrate a fully asynchonous processing model, based on non-blocking jobs and job listeners. Something like this:

Code: [Select]
public void doAsync(final JPPFClient jppfClient, final List<JPPFJob> jobList) throws Exception {
  final CountDownLatch countDown = new CountDownLatch(jobList.size());
  for (JPPFJob job: jobList) {
    job.setBlocking(false); // just to be sure
    job.addJobListener(new JobListenerAdapter() {
      @Override
      public void jobEnded(final JobEvent event) {
        JPPFResultCollector collector = (JPPFResultCollector) event.getJob().getResultListener();
        processExecutionResults(collector.getAllResults());
        countDown.countDown();
      }
    });
    jppfClient.submitJob(job);
  }
  countDown.await(); // wait until all jobs are finished
}

Of course, this is basically the simplest form of asynchronous processing I could come up with. It doesn't really dig into more complex use cases such as job streaming, where the number of jobs is not known ahead and where we must be careful to limit the number of jobs in memory to avoid out of memory conditions.

What do you guys think?

-Laurent
Laurent,

I've spent the better part of an afternoon and evening trying to figure out what was going on.  Null Pointer Exceptions in a subclass are tricky because things just die on you! ;)

Anyway, in your code snippet above, collector.getAllResults() returns null.  So when this is passed along to the processResults method, the for loop fails.  Silently, of course, leading to much frustration and wine consumption. :)

I've been able to get all my task results piecemeal with the jobReturned event's getJobTasks() method, but this method doesn't work for jobEnded... obviously they're supposed to be passed to the getResultListener() but I don't think that's happening before the event is fired.

Reading the javadoc for getAllResults, it says, "a list of results as tasks, or null if not all tasks have been executed."   I suspect that the jobEnded event is being triggered prior to the results being set.

I'm digging through the code trying to confirm, but barely know where to start looking. I figured I'd toss this info out there for you in case you know better where to look...
Logged

dbwiddis

  • JPPF Council Member
  • *****
  • Posts: 106
Re: Executing jobs in parallel
« Reply #8 on: May 10, 2014, 07:29:18 AM »

Debug info:  at the time collector.getAllResults() is called, collector's jobResults (Treemap) has 3 elements, but the variable "results" is still null.  JPPFResultCollector.java has a buildResults() method that's supposed to move one to the other, which has apparently not yet been called by the time the code reaches this point.
Logged

lolo

  • Administrator
  • JPPF Council Member
  • *****
  • Posts: 2257
    • JPPF Web site
Re: Executing jobs in parallel
« Reply #9 on: May 10, 2014, 10:44:37 AM »

Hello,

JPPFResultCollector.resultsReceived() is always called before jobEnded() but, as you found out, the results field is not assgined at that time. The buildResults() method is in fact invoked when the job status changees to COMPLETE, which happens later on. This is definitely a JPPF bug, and I will register it shortly. In the meantime, instead of using JPPFResultCollector.getAllResults() you cand safely use the following in the jobEnded() notification :

Code: [Select]
public void jobEnded(final JobEvent event) {
  List<Task<?>> results = new ArrayList<>(event.getJob().getResults().getAllResults());
  processExecutionResults(results);
  countDown.countDown();
}

I actually prefer this one, as it doesn't involve the result collector. I think it was a mistake to expose the job's result listener, because now the internal client code relies a lot on JPPFResultCollector. This was done before the job listeners were introduced, so now there is little point in having both. This is one of the changes I'm planning for version 5.0.

-Laurent
Logged

dbwiddis

  • JPPF Council Member
  • *****
  • Posts: 106
Re: Executing jobs in parallel
« Reply #10 on: May 10, 2014, 06:42:06 PM »

Hello,

JPPFResultCollector.resultsReceived() is always called before jobEnded() but, as you found out, the results field is not assgined at that time. The buildResults() method is in fact invoked when the job status changees to COMPLETE, which happens later on. This is definitely a JPPF bug, and I will register it shortly. In the meantime, instead of using JPPFResultCollector.getAllResults() you cand safely use the following in the jobEnded() notification :

Code: [Select]
public void jobEnded(final JobEvent event) {
  List<Task<?>> results = new ArrayList<>(event.getJob().getResults().getAllResults());
  processExecutionResults(results);
  countDown.countDown();
}

I actually prefer this one, as it doesn't involve the result collector. I think it was a mistake to expose the job's result listener, because now the internal client code relies a lot on JPPFResultCollector. This was done before the job listeners were introduced, so now there is little point in having both. This is one of the changes I'm planning for version 5.0.

-Laurent
Thanks... works great and I prefer it too, as I already had a JPPFJob available to me so that's one less thing to pass around.

Question:  I'm trying to keep track of the number of uncompleted tasks remaining in a job when the jobReturned event fires (or, alternately, the cumulative number of returned tasks so far).  I've got a workaround using a CountDownLatch but shouldn't that information be available on the event or job somewhere?
Logged

lolo

  • Administrator
  • JPPF Council Member
  • *****
  • Posts: 2257
    • JPPF Web site
Re: Executing jobs in parallel
« Reply #11 on: May 10, 2014, 07:06:22 PM »

You can easily compute the number of uncompleted tasks using the JobResult class:

Code: [Select]
@Override
public void jobReturned(final JobEvent event) {
  JPPFJob job = event.getJob();
  JobResults results = job.getResults();
  int nbUncompletedTasks = job.getJobTasks().size() - results.size();
  ...
}

-Laurent
Logged
Pages: [1]   Go Up
JPPF Powered by SMF 2.0 RC5 | SMF © 2006–2011, Simple Machines LLC Get JPPF at SourceForge.net. Fast, secure and Free Open Source software downloads