JPPF, java, parallel computing, distributed computing, grid computing, parallel, distributed, cluster, grid, cloud, open source, android, .net
JPPF

The open source
grid computing
solution

 Home   About   Features   Download   Documentation   On Github   Forums 
May 30, 2023, 07:15:05 AM *
Welcome,
Please login or register.

Login with username, password and session length
Advanced search  
News: New users, please read this message. Thank you!
  Home Help Search Login Register  
Pages: [1]   Go Down

Author Topic: Processing of large data with ~10 nodes  (Read 4639 times)

GeorgeL

  • JPPF Padawan
  • *
  • Posts: 8
Processing of large data with ~10 nodes
« on: January 15, 2013, 10:38:07 AM »

Hello and thanks for the great framework!

I would like to use JPPF for the processing of a large image set, I have more than 10M images and would like to extract metadata from them.
I developed the extraction executable which will run as an external app, wrapped into JPPF via the CommandLineTaskEx interface (done)

in my test configuration 1 have 6 nodes and 1 driver (different hardware each)
my application uses non blocking tasks (RunExecImageTask) and retrieves image URLs from a mySQL database
each job has 1000 image extractions tasks,  briefly the algorithm is as like this :
Code: [Select]
do {

  List<JPPFResultCollector> resultsList = new ArrayList<JPPFResultCollector>();
  JPPFJob job = new JPPFJob();
  // read 1000 image urls from  database
  java.sql.ResultSet rs = stmt.executeQuery();
  while (rs.next()) {
    job.addTask(new RunExecImageTask(url, objectID));
  }
  if (!job.getTasks().isEmpty())
    resultsList.add(submitNonBlockingJob(job));

  // once the job is complete, process the results
  for (JPPFResultCollector collector : resultsList) {
  // this is a blocking call
  List<JPPFTask> results = collector.waitForResults(pager * 10000);
  if (results != null) {
    for (JPPFTask task : results) {
if (task.getException() != null) {
  totalExceptions++;
} else if (task.getResult() != null) {
  String r = (String) task.getResult();
  totalReceivedResults++;
}
    }
  }

} while(!done)

// print statistics
....

I'm using the 'Autotuned' algorithm for load balaning (values taken for JPPF wiki):
maxDeviation = 0.2
maxGuessToStable = 50
sizeRatioDeviation = 1.5
minSamplesToAnalyse = 100
size = 5
decreaseRatio = 0.2
minSamplesToCheckConvergence = 50


I have a feeling that my solution is not the optimal one,
my application send batches if 1000 tasks, the nodes process them and stay idle till the next batch...

[1] is there a way to have less idle time ?

[2] I could write the result back to the database from the code executed at the node (with JDBC) but then what would the main application expet to mark the job as done and move on?

thanks in advance,
George
« Last Edit: January 15, 2013, 05:04:33 PM by GeorgeL »
Logged

lolo

  • Administrator
  • JPPF Council Member
  • *****
  • Posts: 2272
    • JPPF Web site
Re: Processing of large data with ~10 nodes
« Reply #1 on: January 16, 2013, 05:57:11 AM »

Hello George,

One possible bottleneck is at the client level. This wil happen if you submit new jobs from the same client, while one or more jobs are already executing on the nodes.
Each connection from the client to the server can only process one job at a time. Thus, if you submit multiple jobs concurrently, one job at a time will be sent to the server, and the other jobs will be waiting in a queue on the client side. To resolve this, you will need to setup a pool of connections in your client configuration, where the number of connections determines how many jobs can be sent to the server concurrently. You can do it as follows:

when server discovery is enabled (jppf.discovery.enabled = true):
Code: [Select]
jppf.pool.size = 1
when server discovery is disabled (jppf.discovery.enabled = false):
Code: [Select]
jppf.drivers = my_driver
my_driver.jppf.server.host = my_host
my_driver.jppf.server.port = my_port
my_driver.jppf.pool.size = 10
here we have set a pool size of 10

Also, please keep in mind that the values displayed in the UI console are not real time values, but rather snapshots of the nodes states taken at regular intervals (every second), so the nodes' execution status may not reflect accurately whether they are idle or executing tasks.

You may also want to try a different algorithm, as it can take many jobs for the "autotuned" algorithm to converge to near optimal efficiency. Given the size of your jobs, I believe the "proportional" algorithm might provide more balnced results.

Can you please try these suggestions and let us know if this works for you?

Thanks,
-Laurent
Logged

GeorgeL

  • JPPF Padawan
  • *
  • Posts: 8
Re: Processing of large data with ~10 nodes
« Reply #2 on: January 16, 2013, 03:41:30 PM »

thanks for the reply,
I set the pool size to 50 and changed the  algorithm to 'proportional', also each job has 500 tasks
Logged

GeorgeL

  • JPPF Padawan
  • *
  • Posts: 8
Re: Processing of large data with ~10 nodes
« Reply #3 on: January 16, 2013, 03:45:56 PM »

I also ran the application twice so that the driver gets 2 jobs simultaneously and it keep my machines busy
Logged

GeorgeL

  • JPPF Padawan
  • *
  • Posts: 8
Re: Processing of large data with ~10 nodes
« Reply #4 on: January 16, 2013, 03:48:47 PM »

I get weird error messages while posting messages in the forums (ERROR 406)
Logged

GeorgeL

  • JPPF Padawan
  • *
  • Posts: 8
Re: Processing of large data with ~10 nodes
« Reply #5 on: January 24, 2013, 11:22:31 AM »

any suggestions about the Java heap space memory errors I get after running for 5-6 hours ?

Code: [Select]
00:42:57,302 [ERROR] BaseJPPFClientConnection:232 - Java heap space
java.lang.OutOfMemoryError: Java heap space
at java.lang.Class.getDeclaredMethod(Class.java:1954)
at java.io.ObjectStreamClass.getInheritableMethod(ObjectStreamClass.java:1378)
at java.io.ObjectStreamClass.access$2200(ObjectStreamClass.java:69)
at java.io.ObjectStreamClass$2.run(ObjectStreamClass.java:491)
at java.io.ObjectStreamClass$2.run(ObjectStreamClass.java:455)
at java.security.AccessController.doPrivileged(Native Method)
at java.io.ObjectStreamClass.<init>(ObjectStreamClass.java:455)
at java.io.ObjectStreamClass.lookup(ObjectStreamClass.java:352)
at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:589)
at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1601)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1514)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1750)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1347)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1964)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1888)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1347)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:369)
at org.jppf.utils.ObjectSerializerImpl.deserialize(ObjectSerializerImpl.java:166)
at org.jppf.utils.ObjectSerializerImpl.deserialize(ObjectSerializerImpl.java:152)
at org.jppf.io.IOHelper.unwrappedData(IOHelper.java:173)
at org.jppf.io.IOHelper.unwrappedData(IOHelper.java:151)
at org.jppf.client.BaseJPPFClientConnection.receiveBundleAndResults(BaseJPPFClientConnection.java:203)
at org.jppf.client.BaseJPPFClientConnection.receiveResults(BaseJPPFClientConnection.java:245)
at org.jppf.client.BaseJPPFClientConnection.receiveResults(BaseJPPFClientConnection.java:262)
at org.jppf.client.balancer.ChannelWrapperRemote$RemoteRunnable.run(ChannelWrapperRemote.java:248)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:334)
at java.util.concurrent.FutureTask.run(FutureTask.java:166)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
at java.lang.Thread.run(Thread.java:722)
00:49:53,276 [WARN ] ChannelWrapperRemote:263 - java.lang.OutOfMemoryError: Java heap space
00:50:11,825 [WARN ] ChannelWrapperRemote:263 - java.lang.OutOfMemoryError: Java heap space
00:50:15,662 [WARN ] ChannelWrapperRemote:263 - java.lang.OutOfMemoryError: Java heap space
00:50:19,531 [WARN ] ChannelWrapperRemote:263 - java.lang.OutOfMemoryError: Java heap space
00:50:23,447 [WARN ] ChannelWrapperRemote:263 - java.lang.OutOfMemoryError: Java heap space
00:50:27,378 [WARN ] ChannelWrapperRemote:263 - java.lang.OutOfMemoryError: Java heap space

and

Code: [Select]
22:39:19,191 [ERROR] BaseJPPFClientConnection:232 - Java heap space
java.lang.OutOfMemoryError: Java heap space
        at java.util.Arrays.copyOfRange(Arrays.java:2694)
        at java.lang.String.<init>(String.java:203)
        at java.lang.StringBuilder.toString(StringBuilder.java:405)
        at java.io.ObjectInputStream$BlockDataInputStream.readUTFBody(ObjectInputStream.java:3041)
        at java.io.ObjectInputStream$BlockDataInputStream.readUTF(ObjectInputStream.java:2837)
        at java.io.ObjectInputStream.readString(ObjectInputStream.java:1617)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1338)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1964)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1888)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1347)
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:369)
        at org.jppf.utils.ObjectSerializerImpl.deserialize(ObjectSerializerImpl.java:166)
        at org.jppf.utils.ObjectSerializerImpl.deserialize(ObjectSerializerImpl.java:152)
        at org.jppf.io.IOHelper.unwrappedData(IOHelper.java:173)
        at org.jppf.io.IOHelper.unwrappedData(IOHelper.java:151)
        at org.jppf.client.BaseJPPFClientConnection.receiveBundleAndResults(BaseJPPFClientConnection.java:203)
        at org.jppf.client.BaseJPPFClientConnection.receiveResults(BaseJPPFClientConnection.java:245)
        at org.jppf.client.BaseJPPFClientConnection.receiveResults(BaseJPPFClientConnection.java:262)
        at org.jppf.client.balancer.ChannelWrapperRemote$RemoteRunnable.run(ChannelWrapperRemote.java:248)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
        at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:334)
        at java.util.concurrent.FutureTask.run(FutureTask.java:166)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
        at java.lang.Thread.run(Thread.java:722)
22:39:23,902 [WARN ] ChannelWrapperRemote:263 - java.lang.OutOfMemoryError: Java heap space

I run my client with -Xms768m -Xmx4096m (from command line, not IDE)

thanks!

p.s. forum posting errors fixed
« Last Edit: January 24, 2013, 11:35:25 AM by GeorgeL »
Logged

lolo

  • Administrator
  • JPPF Council Member
  • *****
  • Posts: 2272
    • JPPF Web site
Re: Processing of large data with ~10 nodes
« Reply #6 on: January 24, 2013, 08:32:35 PM »

Hello George,

It is very likely that you have a memory leak in your application.
To determine what is causing it, the easiest way would be to generate a heap dump. The simplest way is to start your JVM with the "-XX:+HeapDumpOnOutOfMemoryError" to generate a heap dump when the OOME occurs, or use a tool such as VisualVM to generate a heap dump manually.
Also, it would be good to start the JVM with the smallest possible heap size that still allows your application to run. This will make the heap dump much smaller as well.
Once you have a heap dump (a .hprof file) I would recommend a tool such as Eclipse MAT to analyze it. MAT comes as a standalone RCP application, you don't need an Eclipse IDE to install it.

If you can publish the heap dump somewhere (after zipping it please), and provide a link to download it, I will take a look at it.

Additionally, could you provide more details abot the job(s) you are executing on the grid, in particular how many jobs can you execute before the OOME occurs, how many tasks you have in each job, a rough estimate of the tasks footprint (after execution)?

Thanks,
-Laurent

Logged

GeorgeL

  • JPPF Padawan
  • *
  • Posts: 8
Re: Processing of large data with ~10 nodes
« Reply #7 on: January 25, 2013, 11:18:30 AM »

Thanks for the great tips Laurent,

I know about visualVM but did not know MAT, great tool.
According to MAT, it seems that the problem is with the JDBC MySQL driver implementation.
I followed your suggestion about the heap size, lowered to 128m,  and my application didn't crash,
I'm currently monitoring it with HeapDumpOnOutOfMemoryError and will post the results.

thanks a lot
George

EDIT: just added try/finally to ALWAYS close ResultSets and PreparedStatements as well said here:
http://stackoverflow.com/a/1730197/634094
« Last Edit: January 28, 2013, 08:11:26 AM by GeorgeL »
Logged

GeorgeL

  • JPPF Padawan
  • *
  • Posts: 8
Re: Processing of large data with ~10 nodes
« Reply #8 on: January 28, 2013, 08:12:47 AM »

running OK for 3 days, no memory exceptions
500 tasks per job
-Xms64m -Xmx128m
Logged
Pages: [1]   Go Up
 
JPPF Powered by SMF 2.0 RC5 | SMF © 2006–2011, Simple Machines LLC Get JPPF at SourceForge.net. Fast, secure and Free Open Source software downloads