Git Product home page Git Product logo

meteor-job's Introduction

meteor-job

NOTE: This Package remains experimental until v0.1.0 is released, and while the API methods described here are maturing, they may still change.

Intro

Meteor Job is a pure Javascript implementation of the Job and JobQueue classes that form the foundation of the jobCollection Atmosphere package for Meteor. This package is used internally by jobCollection on Meteor, but you should also use it for any job workers you would like to run outside of the Meteor environment as ordinary node.js programs.

Here's a very basic example that ignores authentication and connection error handling:

var DDP = require('ddp');
var Job = require('meteor-job')

// In this case a local Meteor instance, could be anywhere...
var ddp = new DDP({
  host: "127.0.0.1",
  port: 3000,
  use_ejson: true
});

// Job uses DDP Method calls to communicate with the Meteor jobCollection.
// Within Meteor, it can make those calls directly, but outside of Meteor
// you need to hook it up with a working DDP connection it can use.
Job.setDDP(ddp);

// Once we have a valid connection, we're in business
ddp.connect(function (err) {
  if (err) throw err;

  // Worker function for jobs of type 'somejob'
  var somejobWorker = function (job, cb) {
    job.log("Some message");
    // Work on job...
    job.progress(50, 100);  // Half done!
    // Work some more...
    var jobError = (Math.random() > 0.9); // Could fail!
    if (jobError) {
      job.fail("Some error happened...");
    } else {
      job.done();
    }
    cb(null); // Don't forget!
  };

  // Get jobs of type 'somejob' available in the
  // 'jobPile' jobCollection for somejobWorker
  var workers = Job.processJobs('jobPile', 'somejob', somejobWorker);
});

Installation

npm install meteor-job

Unit tests may be run from within the node_modules/meteor-job directory by:

npm test
# or
make test

Usage

Getting connected

First you need to establish a DDP connection with the Meteor server hosting the jobCollection you wish to work on. You will probably need to authenticate as well unless the Meteor server is wide open for unauthenticated DDP Method calls, which it really shouldn't be. I have written another npm package ddp-login which makes secure authentication with Meteor from node.js a snap.

var DDP = require('ddp');
var DDPlogin = require('ddp-login');
var Job = require('meteor-job')

// See DDP package docs for options here...
var ddp = new DDP({
  host: "127.0.0.1",
  port: 3000,
  use_ejson: true
});

Job.setDDP(ddp);

ddp.connect(function (err) {
  if (err) throw err;

  // The call below will look for an existing authentication token in
  // process.env.METEOR_TOKEN. If it find one and it is still valid,
  // authentication will be transparent. If not, the user will be prompted
  // for the e-mail and password to an account on the connected Meteor
  // server. This is the default case... ddp-login has other options
  // documented at https://www.npmjs.org/package/ddp-login
  DDPlogin(ddp, function (err, token) {
    if (err) throw err;

    // From here we can get to work, as long as the DDP connection is good.
    // See the DDP package for details on DDP auto_reconnect, and handling
    // socket events.

    // Do stuff!!!

  });
}

Job workers

Okay, so you've got an authenticated DDP connection, and you'd like to get to work, now what?

// 'jobQueue' is the name of the jobCollection on the server
// 'jobType' is the name of the kind of job you'd like to work on
Job.getWork('jobQueue', 'jobType', function (err, job) {
  if (job) {
     // You got a job!!!  Better work on it!
     // At this point the jobCollection has changed the job status to
     // 'running' so you are now responsible to eventually call either
     // job.done() or job.fail()
  }
});

However, Job.getWork() is kind of low-level. It only makes one request for a job. What you probably really want is to get some work whenever it becomes available and you aren't too busy:

var workers = Job.processJobs('jobQueue', 'jobType', { concurrency: 4 },
  function (job, cb) {
    // This will only be called if a job is obtained from Job.getWork()
    // Up to four of these worker functions can be outstanding at
    // a time based on the concurrency option...

    // Be sure to invoke the callback when this job has been
    // completed or failed.
    cb();

  }
);

Once you have a job, you can work on it, log messages, indicate progress and either succeed or fail.

// This code assumed to be running in a Job.processJobs() callback
var count = 0;

// In this example, assume that each job may contain
// multiple emails to be sent within the job's data.
var total = job.data.emailsToSend.length;
var retryLater = [];

// Most job methods have optional callbacks if you really want to be sure...
job.log("Attempting to send " + total + " emails",
  function(err, result) {
    // err would be a DDP or server error
    // If no error, the result will indicate what happened in jobCollection
  }
);

job.progress(count, total);

if (networkDown()) {
  // You can add a string message to a failing job
  job.fail("Network is down!!!");
  cb();
} else {
  job.data.emailsToSend.forEach(function (email) {
    sendEmail(email.address, email.subject, email.message,
      function(err) {
        count++;
        job.progress(count, total);
        if (err) {
          job.log("Sending to " + email.address + "failed",
                  {level: 'warning'});
          retryLater.push(email);
        }
        if (count === total) {
          // You can attach a result object to a successful job
          job.done({ retry: retryLater });
          cb();
        }
      }
    );
  });
}

The error handling mechanism in the above code seems pretty clunky... How do those failed messages get retried? This approach probably will probably be easier to manage:

var workers = Job.processJobs('jobQueue', 'jobType', { payload: 20 },
  function (jobs, cb) {
    // jobs is an array of jobs, between 1 and 20 long,
    // triggered by the option payload being > 1
    var count = 0;

    jobs.forEach(function (job) {
      var email = job.data.email // Only one email per job
      sendEmail(email.address, email.subject, email.message, function(err) {
        count++;
        if (err) {
          job.log("Sending failed with error" + err, {level: 'warning'});
          job.fail("" + err);
        } else {
          job.done();
        }
        if (count === jobs.length) {
          cb();  // Tells the processJobs we're done
        }
      });
    });
  }
);

With the above logic, each email can succeed or fail individually, and retrying later can be directly handled by the jobCollection itself.

The jobQueue object returned by Job.processJobs() has methods that can be used to determine its status and control its behavior. See the jobQueue API reference for more detail.

Job creators

If you'd like to create an entirely new job and submit it to a jobCollection, here's how:

var job = new Job('jobQueue', 'jobType', { work: "to", be: "done" });

// Set some options on the new job before submitting it. These option setting
// methods do not take callbacks because they only affect the local job object.
// See also: job.repeat(), job.after(), job.depends()

job.priority('normal')     // These methods return job and so are chainable.
   .retry({retries: 5,         // Retry up to five times
           wait: 15*60*1000})  //waiting 15 minutes per attempt
   .delay(15000);          // Don't run until 15 seconds have passed

// Save the job to be added to the Meteor jobCollection via DDP
job.save(function (err, result) {
  if (!err && result) {
    console.log("New job saved with Id: " + result);
  }
});

Note: It's likely that you'll want to think carefully about whether node.js programs should be allowed to create and manage jobs. Meteor jobCollection provides an extremely flexible mechanism to allow or deny specific actions that are attempted outside of trusted server code. As such, the code above (specifically the job.save()) may be rejected by the Meteor server depending on how it is configured. The same caveat applies to all of the job management methods described below.

Job managers

Management of the jobCollection itself is accomplished using a mixture of Job class methods and methods on individual job objects:

// Get a job object by Id
Job.getJob('jobQueue', id, function (err, job) {
  // Note, this is NOT the same a Job.getWork()
  // This call returns a job object, but does not change
  // the status to 'running'.
  // So you can't work on this job.
});

// If your job object's information gets stale, you can refresh it
job.refresh(function (err, result) {
  // job is refreshed
});

// Make a job object from a job document (which you
// can obtain by subscribing to a jobCollection)
job = Job.makeJob('jobQueue', jobDoc);  // No callback!

// Note that jobCollections are reactive, just like any
// other Meteor collection. So if you are subscribed,
// the job documents in the collection will auto-update.
// Then you can use Job.makeJob to turn a job doc into a
// job object whenever necessary without another DDP round trip

// Once you have a job object you can change many of its
// settings (but only while it's paused)
job.pause(function (err, result) {   // Prohibit the job from running
  job.priority('low');   // Change its priority
  job.save();            // Update its priority in the jobCollection
                         // This also automatically triggers a job.resume()
                         // which is how you'd otherwise get it running again.
});

// You can also cancel jobs that are running or are waiting to run.
job.cancel();

// You can restart a cancelled or failed job
job.restart();

// Or re-run a job that has already completed successfully
job.rerun();

// And you can remove a job, so long as it's cancelled,
// completed or failed. If it's running or in any other state,
// you'll need to cancel it before you can remove it.
job.remove();

// For bulk operations on acting on more than one job at a time,
// there are also Class methods that take arrays of job Ids.
// For example, cancelling a whole batch of jobs at once:
Job.cancelJobs('jobQueue', Ids, function(err, result) {
  // Operation complete. result is true if any jobs were
  // cancelled (assuming no error)
});

API

class Job

Job has a bunch of Class methods and properties to help with creating and managing Jobs and getting work for them.

Job.setDDP(ddp)

This class method binds Job to a specific instance of DDPClient. See node-ddp-client for more details. Currently it's only possible to use a single DDP connection at a time.

var ddp = new DDP({
  host: "127.0.0.1",
  port: 3000,
  use_ejson: true
});

Job.setDDP(ddp);

Job.getWork(root, type, [options], [callback])

Get one or more jobs from the job Collection, setting status to 'running'.

options:

  • maxJobs -- Maximum number of jobs to get. Default 1 If maxJobs > 1 the result will be an array of job objects, otherwise it is a single job object, or undefined if no jobs were available

callback(error, result) -- Optional only on Meteor Server with Fibers. Result will be an array or single value depending on options.maxJobs.

if (Meteor.isServer) {
  job = Job.getWork(  // Job will be undefined or contain a Job object
    'jobQueue',  // name of job Collection
    'jobType',   // type of job to request
    {
      maxJobs: 1 // Default, only get one job, returned as a single object
    }
  );
} else {
  Job.getWork(
    'jobQueue',                 // root name of job Collection
    [ 'jobType1', 'jobType2' ]  // can request multiple types in array
    {
      maxJobs: 5 // If maxJobs > 1, result is an array of jobs
    },
    function (err, jobs) {
      // jobs contains between 0 and maxJobs jobs, depending on availability
      // job type is available as
      if (job[0].type === 'jobType1') {
        // Work on jobType1...
      } else if (job[0].type === 'jobType2') {
        // Work on jobType2...
      } else {
        // Sadness
      }
    }
  );
}

Job.processJobs(root, type, [options], worker)

See documentation below for JobQueue

Job.makeJob(root, jobDoc)

Make a Job object from a job Collection document.

// doc is obtained from a job Collection subscription
job = Job.makeJob('jobQueue', doc);

Job.getJob(root, id, [options], [callback])

Creates a job object by id from the server job Collection, returns undefined if no such job exists.

options:

  • getLog -- If true, get the current log of the job. Default is false to save bandwidth since logs can be large.

callback(error, result) -- Optional only on Meteor Server with Fibers. result is a job object or undefined

if (Meteor.isServer) {
  job = Job.getJob(  // Job will be undefined or contain a Job object
    'jobQueue',  // name of job Collection
    id,          // job id of type EJSON.ObjectID()
    {
      getLog: false  // Default, don't include the log information
    }
  );
  // Job may be null
} else {
  Job.getJob(
    'jobQueue',    // root name of job Collection
    id,            // job id of type EJSON.ObjectID()
    {
      getLog: true  // include the log information
    },
    function (err, job) {
      if (job) {
        // Here's your job
      }
    }
  );
}

Job.getJobs(root, ids, [options], [callback])

Like Job.getJob except it takes an array of ids and is much more efficient than calling Job.getJob() in a loop because it gets Jobs from the server in batches.

Job.pauseJobs(root, ids, [options], [callback])

Like job.pause() except it pauses a list of jobs by id.

Job.resumeJobs(root, ids, [options], [callback])

Like job.resume() except it resumes a list of jobs by id.

Job.cancelJobs(root, ids, [options], [callback])

Like job.cancel() except it cancels a list of jobs by id.

Job.restartJobs(root, ids, [options], [callback])

Like job.restart() except it restarts a list of jobs by id.

Job.removeJobs(root, ids, [options], [callback])

Like job.remove() except it removes a list of jobs by id.

Job.startJobs(root, [options], [callback])

This feature is still immature. Starts the server job Collection.

options: No options currently

callback(error, result) -- Result is true if successful.

Job.startJobs('jobQueue');  // Callback is optional

Job.stopJobs(root, [options], [callback])

This feature is still immature. Stops the server job Collection.

options:

  • timeout: In ms, how long until the server forcibly fails all still running jobs. Default: 60*1000 (1 minute)

callback(error, result) -- Result is true if successful.

Job.stopJobs(
  'jobQueue',
  {
    timeout: 60000
  }
);  // Callback is optional

Job.forever

Constant value used to indicate that something should repeat forever.

job = new Job('jobQueue', 'jobType', { work: "to", be: "done" })
   .retry({ retries: Job.forever })    // Default for .retry()
   .repeat({ repeats: Job.forever });  // Default for .repeat()

Job.jobPriorities

Valid non-numeric job priorities.

Job.jobPriorities = { low: 10, normal: 0, medium: -5,
                      high: -10, critical: -15 };

Job.jobRetryBackoffMethods

Valid retry backoff methods.

jobRetryBackoffMethods = [ 'constant', 'exponential' ];

Job.jobStatuses

Possible states for the status of a job in the job collection.

Job.jobStatuses = [ 'waiting', 'paused', 'ready', 'running',
                    'failed', 'cancelled', 'completed' ];

Job.jobLogLevels

Valid log levels. If these look familiar, it's because they correspond to some the Bootstrap context and alert classes.

Job.jobLogLevels = [ 'info', 'success', 'warning', 'danger' ];

Job.jobStatusCancellable

Job status states that can be cancelled.

Job.jobStatusCancellable = [ 'running', 'ready', 'waiting', 'paused' ];

Job.jobStatusPausable

Job status states that can be paused.

Job.jobStatusPausable = [ 'ready', 'waiting' ];

Job.jobStatusRemovable

Job status states that can be removed.

Job.jobStatusRemovable = [ 'cancelled', 'completed', 'failed' ];

Job.jobStatusRestartable

Job status states that can be restarted.

Job.jobStatusRestartable = [ 'cancelled', 'failed' ];

Job.ddpMethods

Array of the names of all DDP methods used by Job

Job.ddpMethods = [
    'startJobs', 'stopJobs', 'jobRemove', 'jobPause', 'jobResume',
    'jobCancel', 'jobRestart', 'jobSave', 'jobRerun', 'getWork',
    'getJob', 'jobLog', 'jobProgress', 'jobDone', 'jobFail' ];

Job.ddpPermissionLevels

Array of the predefined DDP method permission levels

Job.ddpPermissionLevels = [ 'admin', 'manager', 'creator', 'worker' ];

Job.ddpMethodPermissions

Object mapping permission levels to DDP method names.

Job.ddpMethodPermissions = {
    'startJobs': ['startJobs', 'admin'],
    'stopJobs': ['stopJobs', 'admin'],
    'jobRemove': ['jobRemove', 'admin', 'manager'],
    'jobPause': ['jobPause', 'admin', 'manager'],
    'jobResume': ['jobResume', 'admin', 'manager'],
    'jobCancel': ['jobCancel', 'admin', 'manager'],
    'jobRestart': ['jobRestart', 'admin', 'manager'],
    'jobSave': ['jobSave', 'admin', 'creator'],
    'jobRerun': ['jobRerun', 'admin', 'creator'],
    'getWork': ['getWork', 'admin', 'worker'],
    'getJob': ['getJob', 'admin', 'worker'],
    'jobLog': [ 'jobLog', 'admin', 'worker'],
    'jobProgress': ['jobProgress', 'admin', 'worker'],
    'jobDone': ['jobDone', 'admin', 'worker'],
    'jobFail': ['jobFail', 'admin', 'worker']
};

Instances of Job

j = Job(root, type, data)

Create a new Job object. Data should be reasonably small, if worker requires a lot of data (e.g. video, image or sound files), they should be included by reference (e.g. with a URL pointing to the data, and another to where the result should be saved).

job = new Job(  // new is optional
  'jobQueue',   // job collection name
  'jobType',    // type of the job
  { /* ... */ } // Data for the worker, any valid EJSON object
);

j.depends([dependencies])

Adds jobs that this job depends upon (antecedents). This job will not run until these jobs have successfully completed. Defaults to an empty array (no dependencies). Returns job, so it is chainable. Added jobs must have already had .save() run on them, so they will have the _id attribute that is used to form the dependency. Calling j.depends() with a falsy value will clear any existing dependencies for this job.

 // job1 and job2 are Job objects, and they both
 // must successfully complete before job will run
job.depends([job1, job2]);
job.depends();  // Clear any dependencies previously added on this job

j.priority([priority])

Sets the priority of this job. Can be integer numeric or one of Job.jobPriorities. Defaults to 'normal' priority, which is priority 0. Returns job, so it is chainable.

job.priority('high');  // Maps to -10
job.priority(-10);     // Same as above

j.retry([options])

Set how failing jobs are rescheduled and retried by the job Collection. Returns job, so it is chainable.

options:

  • retries -- Number of times to retry a failing job. Default: Job.forever
  • wait -- Initial value for how long to wait between attempts, in ms. Default: 300000 (5 minutes)
  • backoff -- Method to use in determining how to calculate wait value for each retry:
    • 'constant': Always delay retrying by wait ms. Default value.
    • 'exponential': Delay by twice as long for each subsequent retry, e.g. wait, 2*wait, 4*wait ...

[options] may also be a non-negative integer, which is interpreted as { retries: [options] }

Note that the above stated defaults are those when .retry() is explicitly called. When a new job is created, the default number of retries is 0.

job.retry({
  retries: 5,   // Retry 5 times,
  wait: 20000,  // waiting 20 seconds between attempts
  backoff: 'constant'  // wait constant amount of time between each retry
});

j.repeat([options])

Set how many times this job will be automatically re-run by the job Collection. Each time it is re-run, a new job is created in the job collection. This is equivalent to running job.rerun(). Only 'completed' jobs are repeated. Failing jobs that exhaust their retries will not repeat. By default, if an infinitely repeating job is added to the job Collection, any existing repeating jobs of the same type that are cancellable, will be cancelled. See option.cancelRepeats for job.save() for more info. Returns job, so it is chainable.

options:

  • repeats -- Number of times to rerun the job. Default: Job.forever
  • wait -- How long to wait between re-runs, in ms. Default: 300000 (5 minutes)

[options] may also be a non-negative integer, which is interpreted as { repeats: [options] }

Note that the above stated defaults are those when .repeat() is explicitly called. When a new job is created, the default number of repeats is 0.

job.repeat({
  repeats: 5,   // Rerun this job 5 times,
  wait: 20000   // wait 20 seconds between each re-run.
});

j.delay([milliseconds])

How long to wait until this job can be run, counting from when it is initially saved to the job Collection. Returns job, so it is chainable.

job.delay(0);   // Do not wait. This is the default.

j.after([time])

time is a date object. This sets the time after which a job may be run. It is not guaranteed to run "at" this time because there may be no workers available when it is reached. Returns job, so it is chainable.

// Run the job anytime after right now. This is the default.
job.after(new Date());

j.log(message, [options], [callback])

Add an entry to this job's log. May be called before a new job is saved. message must be a string.

options:

  • level: One of Jobs.jobLogLevels: 'info', 'success', 'warning', or 'danger'. Default is 'info'.
  • echo: Echo this log entry to the console. 'danger' and 'warning' level messages are echoed using console.error() and console.warn() respectively. Others are echoed using console.log(). If echo is true all messages will be echoed. If echo is one of the Job.jobLogLevels levels, only messages of that level or higher will be echoed.

callback(error, result) -- Result is true if logging was successful. When running as Meteor.isServer with fibers, for a saved object the callback may be omitted and the return value is the result. If called on an unsaved object, the result is job and can be chained.

job.log(
  "This is a message",
  {
    level: 'warning'
    echo: true   // Default is false
  },
  function (err, result) {
    if (result) {
      // The log method worked!
    }
  }
);

var verbosityLevel = 'warning';
job.log("Don't echo this", { level: 'info', echo: verbosityLevel } );

j.progress(completed, total, [options], [cb])

Update the progress of a running job. May be called before a new job is saved. completed must be a number >= 0 and total must be a number > 0 with total >= completed.

options:

  • echo: Echo this progress update to the console using console.log().

callback(error, result) -- Result is true if progress update was successful. When running as Meteor.isServer with fibers, for a saved object the callback may be omitted and the return value is the result. If called on an unsaved object, the result is job and can be chained.

job.progress(
  50,
  100,    // Half done!
  {
    echo: true   // Default is false
  },
  function (err, result) {
    if (result) {
      // The progress method worked!
    }
  }
);

j.save([options], [callback])

Submits this job to the job Collection. Only valid if this is a new job, or if the job is currently paused in the job Collection. If the job is already saved and paused, then most properties of the job may change (but not all, e.g. the jobType may not be changed.)

options:

  • cancelRepeats: If true and this job is an infinitely repeating job, will cancel any existing jobs of the same job type. This is useful for background maintenance jobs that may get added on each server restart (potentially with new parameters). Default is false.

callback(error, result) -- Result is true if save was successful. When running as Meteor.isServer with fibers, the callback may be omitted and the return value is the result.

job.save(
  {
    // Cancel any jobs of the same type,
    // if this job repeats forever.
    // Default: false.
    cancelRepeats: true
  }
);

j.refresh([options], [callback])

Refreshes the current job object state with the state on the remote job Collection. Note that if you subscribe to the job Collection, the job documents will stay in sync with the server automatically via Meteor reactivity.

options:

  • getLog -- If true, also refresh the jobs log data (which may be large). Default: false

callback(error, result) -- Result is true if refresh was successful. When running as Meteor.isServer with fibers, the callback may be omitted and the return value is the result.

job.refresh(function (err, result) {
  if (result) {
    // Refreshed
  }
});

j.done(result, [options], [callback])

Change the state of a running job to 'completed'. result is any EJSON object. If this job is configured to repeat, a new job will automatically be cloned to rerun in the future. Result will be saved as an object. If passed result is not an object, it will be wrapped in one.

options: -- None currently.

callback(error, result) -- Result is true if completion was successful. When running as Meteor.isServer with fibers, the callback may be omitted and the return value is the result.

job.done(function (err, result) {
  if (result) {
    // Status updated
  }
});

// Pass a non-object result
job.done("Done!");
// This will be saved as:
// { "value": "Done!" }

j.fail(message, [options], [callback])

Cause this job to fail. It's next state depends on how the job's job.retry() settings are configured. It will either become 'failed' or go to 'waiting' for the next retry. message is a string.

options:

  • fatal -- If true, no additional retries will be attempted and this job will go to a 'failed' state. Default: false

callback(error, result) -- Result is true if failure was successful (heh). When running as Meteor.isServer with fibers, the callback may be omitted and the return value is the result.

job.fail(
  'This job has failed again!',
  {
    fatal: false  // Default case
  },
  function (err, result) {
    if (result) {
      // Status updated
    }
  }
});

j.pause([options], [callback])

Change the state of a job to 'paused'. Only 'ready' and 'waiting' jobs may be paused. This specifically does nothing to affect running jobs. To stop a running job, you must use job.cancel().

options: -- None currently.

callback(error, result) -- Result is true if pausing was successful. When running as Meteor.isServer with fibers, the callback may be omitted and the return value is the result.

job.pause(function (err, result) {
  if (result) {
    // Status updated
  }
});

j.resume([options], [callback])

Change the state of a job from 'paused' to 'waiting'.

options: -- None currently.

callback(error, result) -- Result is true if resuming was successful. When running as Meteor.isServer with fibers, the callback may be omitted and the return value is the result.

job.resume(function (err, result) {
  if (result) {
    // Status updated
  }
});

j.cancel([options], [callback])

Change the state of a job to 'cancelled'. Any job that isn't 'completed', 'failed' or already 'cancelled' may be cancelled. Cancelled jobs retain any remaining retries and/or repeats if they are later restarted.

options:

  • antecedents -- Also cancel all cancellable jobs that this job depends on. Default: false
  • dependents -- Also cancel all cancellable jobs that depend on this job. Default: true

callback(error, result) -- Result is true if cancellation was successful. When running as Meteor.isServer with fibers, the callback may be omitted and the return value is the result.

job.cancel(
  {
    antecedents: false,
    dependents: true    // Also cancel all jobs that will never run without this one.
  },
  function (err, result) {
    if (result) {
      // Status updated
    }
  }
);

j.restart([options], [callback])

Change the state of a 'failed' or 'cancelled' job to 'waiting' to be retried. A restarted job will retain any repeat count state it had when it failed or was cancelled.

options:

  • retries -- Number of additional retries to attempt before failing with job.retry(). Default: 0. These retries add to any remaining retries already on the job (such as if it was cancelled).
  • antecedents -- Also restart all 'cancelled' or 'failed' jobs that this job depends on. Default: true
  • dependents -- Also restart all 'cancelled' or 'failed' jobs that depend on this job. Default: false

callback(error, result) -- Result is true if restart was successful. When running as Meteor.isServer with fibers, the callback may be omitted and the return value is the result.

job.restart(
  {
    antecedents: true,  // Also restart all jobs that must
                        // complete before this job can run.
    dependents: false,
    retries: 0          // Only try one more time. This is the default.
  },
  function (err, result) {
    if (result) {
      // Status updated
    }
  }
);

j.rerun([options], [callback])

Clone a completed job and run it again.

options:

  • repeats -- Number of times to repeat the job, as with job.repeat().
  • wait -- Time to wait between reruns. Default is the existing `job.repeat({ wait: ms }) setting for the job.

callback(error, result) -- Result is true if rerun was successful. When running as Meteor.isServer with fibers, the callback may be omitted and the return value is the result.

job.rerun(
  {
    repeats: 0,         // Only repeat this once. This is the default.
    wait: 60000         // Wait a minute between repeats.
                        // Default is value from job being rerun.
  },
  function (err, result) {
    if (result) {
      // Status updated
    }
  }
);

j.remove([options], [callback])

Permanently remove this job from the job collection. The job must be 'completed', 'failed', or 'cancelled' to be removed.

options: -- None currently.

callback(error, result) -- Result is true if removal was successful. When running as Meteor.isServer with fibers, the callback may be omitted and the return value is the result.

job.remove(function (err, result) {
  if (result) {
    // Job removed from server.
  }
});

j.type

Contains the type of a job. Useful for when getWork or processJobs are configured to accept multiple job types. This may not be changed after a job is created.

j.data

Always an object, contains the job data needed by the worker to complete a job of a given type. This may not be changed after a job is created.

class JobQueue

JobQueue is similar in spirit to the async.js queue and cargo except that it gets its work from the Meteor jobCollection via calls to Job.getWork()

q = Job.processJobs(root, type, [options], worker)

Create a JobQueue to automatically get work from the job Collection, and asynchronously call the worker function.

options:

  • concurrency -- Maximum number of async calls to worker that can be outstanding at a time. Default: 1
  • cargo -- Maximum number of job objects to provide to each worker, Default: 1 If cargo > 1 the first paramter to worker will be an array of job objects rather than a single job object.
  • pollInterval -- How often to ask the remote job Collection for more work, in ms. Default: 5000 (5 seconds)
  • prefetch -- How many extra jobs to request beyond the capacity of all workers (concurrency * cargo) to compensate for latency getting more work.

worker(result, callback)

  • result -- either a single job object or an array of job objects depending on options.cargo.
  • callback -- must be eventually called exactly once when job.done() or job.fail() has been called on all jobs in result.
queue = Job.processJobs(
  'jobQueue',   // name of job Collection
  'jobType',    // type of job to request, can also be an array of job types
  {
    concurrency: 4,
    cargo: 1,
    pollInterval: 5000,
    prefetch: 1
  },
  function (job, callback) {
    // Only called when there is a valid job
    job.done();
    callback();
  }
);

// The job queue has methods... See JobQueue documentation for details.
queue.pause();
queue.resume();
queue.shutdown();

q.pause()

Pause the JobQueue. This means that no more work will be requested from the job collection, and no new workers will be called with jobs that already exist in this local queue. Jobs that are already running locally will run to completion. Note that a JobQueue may be created in the paused state by running q.pause() immediately on the returned new jobQueue.

q.pause()

q.resume()

Undoes a q.pause(), returning the queue to the normal running state.

q.resume()

q.shutdown([options], [callback])

options:

  • level -- May be 'hard' or 'soft'. Any other value will lead to a "normal" shutdown.
  • quiet -- true or false. False by default, which leads to a "Shutting down..." message on stderr.

callback() -- Invoked once the requested shutdown conditions have been achieved.

Shutdown levels:

  • 'soft' -- Allow all local jobs in the queue to start and run to a finish, but do not request any more work. Normal program exit should be possible.
  • 'normal' -- Allow all running jobs to finish, but do not request any more work and fail any jobs that are in the local queue but haven't started to run. Normal program exit should be possible.
  • 'hard' -- Fail all local jobs, running or not. Return as soon as the server has been updated. Note: after a hard shutdown, there may still be outstanding work in the event loop. To exit immediately may require process.exit() depending on how often asynchronous workers invoke 'job.progress()' and whether they die when it fails.
q.shutdown({ quiet: true, level: 'soft' }, function () {
  // shutdown complete
});

q.length()

Number of tasks ready to run.

q.full()

true if all of the concurrent workers are currently running.

q.running()

Number of concurrent workers currently running.

q.idle()

true if no work is currently running.

meteor-job's People

Contributors

vsivsi avatar

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.