Comments (26)
So the way I always envisioned it is that you define the jobs that you want the worker to process (above) before starting it, and then it starts processing jobs that it has definitions for. This lets you do things like specify limited job queues. For example, consider the following project structure and files for a worker.
project:
worker.js
lib/
jobs/
- email.js
- video-processing.js
- image-processing.js
models/
- user-model.js
- blog-post.model.js
Sample job processor (eg. jobs/email.js
)
var email = require('some-email-lib'),
User = require('../models/user-model.js');
module.exports = function(agenda) {
agenda.define('registration email', function(job, done) {
User.get(job.data.userId, function(err, user) {
if(err) return done(err);
email(user.email(), 'Thanks for registering', 'Thanks for registering ' + user.name(), done);
});
});
agenda.define('reset password', function(job, done) {
// etc etc
})
// More email related jobs
}
worker.js
var Agenda = require('agenda');
var jobTypes = process.env.JOB_TYPES.split(',');
var agenda = new Agenda(connectionOpts);
jobTypes.forEach(function(type) {
require('./lib/jobs/' + type)(agenda);
})
agenda.start();
Now you can do the following to spin up a worker that processes jobs of a given type:
JOB_TYPES=email node worker.js
JOB_TYPES=video-processing,image-processing node worker.js
Hope this helps, let me know.
from agenda.
I am facing the same issue but in my case i am using dynamic job creation. here is my use case. i am billing businesses and each business has a different billing date.For each business, the billing should repeat after every x month(which is also different for each business). at the moment this is how am creating and stating jobs
module.exports = async (agenda, jobName) => {
agenda.define(jobName, (job, done) => {
console.log('runing job with name ' + jobName);
const { category } = job.attrs.data;
createBill(category, done);
});
}
jobName will be pass from the calling code.
and this is how i start the job
const every = `${category.billingCycle} ${category.billingCycleUnit}`;
const billingJob = agenda.create(category.name, { category: category });
const cycleStart = new Date(category.cycleStart);
const now = new Date();
const startBillingAt = (now.getTime() >= cycleStart.getTime()) ? now : cycleStart;
billingJob.schedule(startBillingAt);
billingJob.repeatAt(every);
billingJob.save();
However when the server restart the job does not start again. Hope someone can help. thanks
from agenda.
I guess every time i start my node server, i should look through the jobs collection and schedule them back?
from agenda.
Do you have definitions for jobName
?
from agenda.
@rschmukler yes i do. the job name comes from the data passed from socket.io message "createJob"
from agenda.
@MurWade can you try updating to 0.6.16
and see if the issue still persists. I suspect it will, but worth a shot.
from agenda.
@rschmukler awsome thanks.
from agenda.
Did it work?
from agenda.
@rschmukler sorry didn't mention that i will try it. Haven't tried it yet.
from agenda.
Fairly certain this is related to #70, as I was seeing this exact same issue happen to me.
That being said, I think I'm going to add a test to ensure .every() jobs don't get reran when the server restarts.
from agenda.
I am using 0.6.16 and am having this issue as well. Jobs pending in the database when the script is loaded to not ever get scheduled
EDIT: I tried resetting up the jobs manually, but this seems to duplicate the jobs as well. For now I guess the workarount would be to remove the jobs first, then reschedule directly after
from agenda.
Ya i tried it with the latest still the same issue.
from agenda.
I have this problem too. It looks like agenda.start()
will run processJobs()
, which relies on this._definitions
. Perhaps the problem is that this._definitions
is going to be an empty object when the server restarts.
My understanding is that job definitions are not being stored in the database. I originally thought that the fix simply required agenda.start()
to pre-populate this._definitions
by checking the database, but that wouldn't work because there are no definitions in the database. It looks like agenda.define()
needs to store job definitions in the database --not just store them in memory via this._definitions
. Then, agenda.start()
should retrieve any job definitions from the database?
On a related note: It's clear that Agenda doesn't write or read job definitions from the database, but what about actual jobs? Obviously agenda.saveJob()
is storing/writing jobs into the database, but what part of code is reading jobs from the database? (I still need more time to read the source code)
from agenda.
@RebootJeff , thats idea i had. didnt really look at the code, but i am guessing the job _definition is the function that executes every time the job runs. how can this be saved in the database anyway.
from agenda.
@MurWade - Yea pretty much. The processJobs()
function will iterate over the keys in the this._definitions
object. When the server restarts and runs agenda.start()
, the object will be empty so processJobs()
will do nothing.
To answer your question: saving definitions would look something like...
Agenda.prototype.define = function(name, options, processor) {
if(!processor) {
processor = options;
options = {};
}
this._definitions[name] = {
fn: processor,
concurrency: options.concurrency || this._defaultConcurrency,
priority: options.priority || 0,
lockLifetime: options.lockLifetime || this._defaultLockLifetime,
running: 0
};
// NEW CODE STARTS HERE
saveDefinition(name, this._definitions[name]);
};
function saveDefinition(definitionName, definition) {
var props = {
type: 'definition', // this is questionable, but we need some way to show that this database item is a definition rather than another job
name: definitionName,
data: definition
};
// Add new definition to database or update if it already exists in database
this._db.update({ name: name }, props, { upsert: true });
// My mongo code might be missing a callback?
}
Sorry if my code looks crappy. I'm used to mongoose + promises rather than mongo + callbacks.
from agenda.
Not sure this will work as definition contains the function and "scope" for each job. You could serialize and perist the function in mongo, but i'm not sure how you deserialize back the full state.
Looks like wiping and recreating jobs on server startup may be the only option.
from agenda.
Are you guys dynamically generating job definitions? If not, why do they need to be stored in the database? If so, what's the use case?
If possible, jobs should be stored using variables, and those variables used for dynamic jobs. This helps for a few reasons:
- Storing tons of dynamic jobs means a big memory footprint.
- You're hosed if the job queue crashes (as we see here).
So, for example, instead of:
agenda.define('send-registration-' + user.id(), function(job, done) {
emailService.send('Thanks for registering ' + user.name() , user.email(), done);
})
agenda.once('send-registration-' + user.id())
You should do:
agenda.define('send registration email', function(job, done) {
var data = job.data;
emailService.send('Thanks for registering ' + data.name, data.email, done);
})
// Elsewhere in code
agenda.once('send registration email', { name: user.name(), email: user.email() });
If I am missing the boat on this one, please let me know the use case so I can see why you're dynamically generating job definitions.
@wired8 @RebootJeff @MurWade @droppedonjapan
from agenda.
My jobs aren't dynamic, they follow your recommended example.
The issue I believe comes down to repopulating agenda._definitions[name].fn after server restart. This is the function which performs the job.
I did manage to serialize _definitions including job functions in Mongo, but this won't work because you also need to set the dependencies (requires) and state of all job functions.
The example above would require a reference to emailService.
For now I simply purge all jobs and reload them on server restart, this works well enough.
from agenda.
@wired8 if they aren't dynamic, why cant you have something that looks like the following:
var agenda = new Agenda(mongoStuff);
agenda.define('example job 1', function() { /* do stuff */ });
agenda.define('example job 2', function() { /* do stuff */ });
agenda.define('example job 3', function() { /* do stuff */ });
agenda.define('example job 4', function() { /* do stuff */ });
agenda.start();
from agenda.
That exactly what I have. If you restart node, your job queue will always be 0, because the _definitions object will be empty.
Should we be recreating all job definitions on app start by calling agenda.define for each job?
from agenda.
That looks like a better example Ryan. You should add that to the readme, and a donate button.. ;-)
from agenda.
Haha, thanks. I'll definitely add it to the documentation. I'll consider the donate button as well :P
from agenda.
@Osmandiyaka were you able to figure it out? I am stuck with the same issue and I am creating jobs dynamically.
from agenda.
@raunaqkapoor can you try this
- Read the job names from the MongoDB and then follow this ##74 (comment)
(i.e. define the job function before start())
from agenda.
@raunaqkapoor did you find any work around on this?
from agenda.
@raunaqkapoor did you find any work around on this?
@DevanshB10 can't recall for the life of me! Was 4 years back and I don't have access to that repo any longer. Sorry! Maybe try this
from agenda.
Related Issues (20)
- Jobs not running in production HOT 6
- lockLifetime not updatable when job is running HOT 1
- Some feature not work in version 6.X?
- TypeError: Cannot read properties of undefined (reading 'collection') HOT 4
- MongoDB connections are getting increased gradually
- Is npm agenda deprecated? Am I supposed to start using @hokify/agenda? HOT 3
- Agenda repeatEvery('10 seconds') fails processing remaining jobs
- Longer schedule time (more than one hour from current date or next hour) not triggering
- TimeoutOverflowWarning: 2428508862 does not fit into a 32-bit signed integer.
- Agenda sometimes dont restart the job execution
- Canceling jobs HOT 2
- It seems agenda doesn't connect with mongoose 7.x. Any plans to make it compatible with newer versions of mongoose? HOT 1
- Is there a way to update a job's original payload?
- Questions: REST API - safe to instantiate a new Agenda at each received request? Every can handle only a single job at a time? HOT 1
- With pm2 lots of connections are always open HOT 1
- "mongoose": "^8.0.4" and "agenda": "^5.0.0", getting error HOT 2
- RIP HOT 9
- Scheduled job runs multiple times HOT 2
- Unable to build typescript project with Agenda HOT 2
- Agenda 5.0.0 not compatible with Mongoose 8.2.1 HOT 2
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from agenda.