Getting Started

  1. Please download and install Java Runtime Environment (JRE) or JDK version 1.6.0 or newer for your specific platform and operating system.

  2. Then download Mpaxs and follow the installation instructions.

Installation Instructions

Download the zip-distribution of Mpaxs and include the jar files in the lib/ folder on the classpath of your project.

Demonstration

The zip-distribution of Mpaxs includes a self-contained demo that may be run from the command line:

  1. > java -jar mpaxs.jar

This will print the available options:

  1. usage: net.sf.mpaxs.spi.computeHost.StartUp [-m <mjobs>] [-n <nhosts>] [-r
  2. <runmode>]
  3. -m <mjobs> Number of jobs to run in parallel
  4. -n <nhosts> Number of hosts for parallel processing
  5. -r <runmode> The mode in which to operate: one of
  6. <ALL,LOCAL,DISTRIBUTED>

In order to run mpaxs demo locally within the active virtual machine with 100 jobs and 1 thread, type in the following:

  1. > java -jar mpaxs.jar -m 100 -n 1 -r LOCAL

Note:
Mpaxs will try to launch new compute hosts using the drmaa api by default. If no drmaa-compatible scheduling system, such as Globus, SGE/OGE, Torque, or PBS is installed on your system, mpaxs will automatically fall back to launch the compute hosts on your local system within seperate processes.

If you have a cluster scheduling system installed, you can try to run mpaxs demo with a larger maximum number of hosts and jobs in distributed mode:

  1. > java -jar mpaxs.jar -m 100000 -n 3 -r DISTRIBUTED

Note:
Compute hosts are launched on demand by mpaxs. Thus, if the active compute hosts can easily cater for the submitted jobs, mpaxs will not launch any new hosts. If a compute host is idle for a pre-defined time, it will initiate an orderly shutdown of itself and deregister with the master server.

Launching a local execution with non-blocking wait

  1. CompletionServiceFactory<Long> csf = new CompletionServiceFactory<Long>();
  2. csf.setTimeOut(1);
  3. csf.setTimeUnit(TimeUnit.SECONDS);
  4. csf.setMaxThreads(maxThreads);
  5. csf.setBlockingWait(false);
  6.  
  7. final ICompletionService<Long> cs = csf.newLocalCompletionService();
  8. for(int i = 0; i< maxJobs; i++) {
  9. //!!! this callable is NOT serializable !!!
  10. cs.submit(new Callable<Long>() {
  11. @Override
  12. public Long call() throws Exception {
  13. long sum = 0;
  14. for (int i = Integer.MIN_VALUE; i < Integer.MAX_VALUE; i++) {
  15. sum += i;
  16. }
  17. if (Math.random() > 0.9) {
  18. throw new IOException("Failed on io due to simulated random error!");
  19. }
  20. return Long.valueOf(sum);
  21. }
  22. });
  23. }
  24. List<Long> results = mcs2.call();
  25. System.out.println("Local Results (Local Host execution): " + results);

If you want automatic resubmission of failed jobs, wrap the completion service before submitting jobs to it:

  1. //try to rerun failed tasks up to three times to catch
  2. //randomly occurring runtime exceptions
  3. final ICompletionService<Long> rcs = csf.asResubmissionService(cs,3);

Launching a remote execution

You need to have a local DRMAA-comatible grid engine locally installed. There are a number of different implementations available. For Ubuntu Linux, you may want to install the Open Grid Scheduler packages following this tutorial. Do not forget to install the drmaa java api bindings.

Users that want to use the grid engine should alter their own .profile or the admin could add a script file under /etc/profile.d/ to add export SGE_ROOT=/var/lib/gridengine (under Ubuntu) and export SGE_CELL=CELL, where you should replace CELL with the cell name you entered during package installation.

Note:
If you have problems starting up either the master or the exec instances, have a look at your /etc/hosts file. It seems, that the implementation only works if your host's IP (may be 127.0.0.1 for local only operation) with hostname (NOT localhost) is the first entry. If that is missing, try to insert a new entry at the top of the file with your current IP and hostname.

We have successfully used Mpaxs with Oracle Grid Engine on Solaris and the Open Grid Scheduler on Linux with +50 hosts and > 200000 jobs.

Caveat:
Please be aware that you may need to keep an eye on the slot capacity of your grid engine installation. If you request more compute hosts than slots available in your system, some compute hosts may start up only after the main application has finished. After these compute hosts have been scheduled to run by the grid engine, they will take some time to notice that the master server is unreachable before they shut down. Thus, you may want to shut the remaining jobs down by running qdel mpaxs-chost* or simply wait until the compute hosts' connection attempts time out.

  1. //Define the location of the compute host jar, this could be your own extended version
  2. File computeHostJarLocation = new File(System.getProperty("user.dir"), "mpaxs.jar");
  3. if (!computeHostJarLocation.exists() || !computeHostJarLocation.isFile()) {
  4. throw new IOException("Could not locate mpaxs.jar in " + System.getProperty("user.dir"));
  5. }
  6. final PropertiesConfiguration cfg = new PropertiesConfiguration();
  7. //set execution type
  8. cfg.setProperty(ConfigurationKeys.KEY_EXECUTION_MODE, ExecutionType.DRMAA);
  9. //set location of compute host jar
  10. cfg.setProperty(ConfigurationKeys.KEY_PATH_TO_COMPUTEHOST_JAR, computeHostJarLocation);
  11. //exit to console when master server shuts down
  12. cfg.setProperty(ConfigurationKeys.KEY_MASTER_SERVER_EXIT_ON_SHUTDOWN, true);
  13. //limit the number of used compute hosts
  14. cfg.setProperty(ConfigurationKeys.KEY_MAX_NUMBER_OF_CHOSTS, 3);
  15. //native specs for the drmaa api
  16. cfg.setProperty(ConfigurationKeys.KEY_NATIVE_SPEC, "");
  17.  
  18. Impaxs impxs = ComputeServerFactory.getComputeServer();
  19. impxs.startMasterServer(cfg);
  20. CompletionServiceFactory<Double> csf = new CompletionServiceFactory<Double>();
  21. final ICompletionService<Double> mcs = csf.newDistributedCompletionService();
  22. int maxJobs = 200;
  23. for (int i = 0; i < maxJobs; i++) {
  24. //TestCallable is within the mpaxs-test module, net.sf.mpaxs.test
  25. mcs.submit(new TestCallable());
  26. }
  27. List<Double> results = mcs.call();
  28. System.out.println("Distributed execution: " + results);
  29. impxs.stopMasterServer();

Low level access for distributed execution

If you want to have better control of individual jobs, you should have a look at the API of net.sf.mpaxs.api.Impxs. You can create your own job instances and submit them to the compute hosts. Additionally, the API supports the registration of event listeners for a job to be called when a jobs status changes during its lifecycle.

Note:
This example assumes, that you have created and configured an Impaxs instance, as in the previous example.

  1. //create a job from TestScheduledRunnable returning a Boolean.TRUE on success
  2. Job<Boolean> job = new Job<Boolean>(new TestScheduledRunnable(), Boolean.TRUE);
  3. //increase the priority so that the job can bypass other, waiting jobs
  4. job.setPriority(job.getPriority() + 1);
  5. //api submission, this job will be wrapped as a ScheduledJob
  6. impxs.submitScheduledJob(job, 1, 5, TimeUnit.SECONDS);
  7. //alternative, direct creation of a ScheduledJob
  8. //impxs.submitJob(new ScheduledJob(job, 1, 5, TimeUnit.SECONDS));

The parameters for the scheduled job are the same as for Javas ScheduledExecutorService The example above will run a scheduled job with a priority slightly higher than the default priority of 0. Scheduled jobs will run off the same priority blocking queue as all one-shot jobs. Note that that the requested initial time and the period are only hints to the execution system as to when to enqueue a job. There is no guarantee, that the job will be executed within the requested interval if the scheduler / system is under heavy load. Jobs requiring that they be executed close the requested starting time / interval should use a higher priority to receive precedence over normal jobs. However, if the maximum number of compute hosts has been reached and no free host is immediately available, the job will have to wait for the next free host.

If you are interested in receiving lifecycle events of a job, register an net.sf.mpaxs.api.event.IJobEventListener

  1. //add a listener for all jobs
  2. impxs.addJobEventListener(myListener);
  3. //add a listener for a specific job using the job UUID
  4. impxs.addJobEventListener(myListener,jobId);

The lifecycle of a job in mpaxs takes on one of the following values:

  • UNKNOWN: initial state after job is created
  • WAITING: state after job has been enqueued for execution
  • RUNNING: state after job has been transmitted to execution host
  • DONE: state after job has finished execution successfully
  • ERROR: state after job has finished with an exception
  • CANCELED: state after job has been cancelled