taskforcesh / bullmq Goto Github PK
View Code? Open in Web Editor NEWBullMQ - Message Queue and Batch processing for NodeJS and Python based on Redis
Home Page: https://bullmq.io
License: MIT License
BullMQ - Message Queue and Batch processing for NodeJS and Python based on Redis
Home Page: https://bullmq.io
License: MIT License
Sandbox processing is an important feature in bull 3 that allows processors to run on independent nodejs processes, this allows usage of multi-cpu deployments as well as protects the main process from crashing if one of the processors crash.
Sandboxed processors are implemented in bull 3 in 3 files:
These should be ported to typescript and the old tests should pass:
The port can be in an initial version identical in feature set, but for the final 4.0 version we would like to change something, namely the concurrency setting for sandboxed processors. Currently it can only be specified the concurrency per processors, meaning that only 1 nodejs process is used per processor, but it should also be possible to specify the concurrency per nodejs process, so that you could have for example 4 nodejs processes running 10 jobs in parallel.
This intermittent error is happening when moveToFinished Lua script is being called.
When capturing the failed event [i.e. worker.on('failed', (job, error) => {...})] the following data is produced:
job:
{
"id": "8",
"name": "eml",
"data": "{\"templateId\":\"server_shutdown\",\"subject\":\"[Meeting-Test] Server Report - Shutdown\",\"data\":{\"serverInfo\":{\"env\":\"MOCK_ENV\",\"version\":\"MOCK_VERSION\",\"hostName\":\"MOCK_HOST_NAME\"}},\"options\":{\"isDiagnosticEmail\":true,\"sendAsText\":true,\"skipToAccountBounceVerification\":true},\"toAccounts\":[{\"email\":\"[email protected]\"}]}",
"opts": "{\"attempts\":0,\"delay\":0,\"removeOnComplete\":100}",
"progress": 0,
"attemptsMade": 1,
"processedOn": 1574214599662,
"timestamp": 1574214599674,
"failedReason": "\"ERR Error running script (call to f_ae2c40fe4d28cbdf1e2fe57d1682bc203603a506): @user_script:107: @user_script: 107: Lua redis() command arguments must be strings or integers \"",
"stacktrace": "[\"ReplyError: ERR Error running script (call to f_ae2c40fe4d28cbdf1e2fe57d1682bc203603a506): @user_script:107: @user_script: 107: Lua redis() command arguments must be strings or integers \\n at parseError (~/server/node_modules/ioredis/node_modules/redis-parser/lib/parser.js:179:12)\\n at parseType (~/server/node_modules/ioredis/node_modules/redis-parser/lib/parser.js:302:14)\"]",
"returnvalue": "0"
}
error:
{
"command": {
"name": "evalsha",
"args": [
"ae2c40fe4d28cbdf1e2fe57d1682bc203603a506",
"7",
"m2:emlq:active",
"m2:emlq:completed",
"m2:emlq:8",
"m2:emlq:wait",
"m2:emlq:priority",
"m2:emlq:events",
"m2:emlq:meta",
"8",
"1574214599677",
"returnvalue",
"null",
"completed",
"101",
"{\"jobId\":\"8\"}",
"0",
"m2:emlq:",
"0572fc7e-3069-4155-97b7-aafd738128ac",
"30000"
]
}
}
Notice that all of the args are strings.
Even when I insert my own logging code in scripts.ts, the args are as follows:
moveToFinished ARGS: [
'm2:emlq:active',
'm2:emlq:completed',
'm2:emlq:8',
'm2:emlq:wait',
'm2:emlq:priority',
'm2:emlq:events',
'm2:emlq:meta',
'8',
1574214599677,
'returnvalue',
'null',
'completed',
'101',
'{"jobId":"8"}',
0,
'm2:emlq:',
'0572fc7e-3069-4155-97b7-aafd738128ac',
30000
]
You can see that they are all integers and strings.
The unit test I am performing does 100 identical jobs in a row but only some produce this error. Any thoughts on how I should track this down?
Currently all events are stored in the event stream, this will grow indefinitely until running out of memory, therefore we need an option to trim to a maximum number of events.
bullmq: v1.0.1
I'm not sure how to pass the redis connection. I tried different opts like from the doc (https://docs.bullmq.io/guide/connections) but it either crash with error or connects to 127.0.0.1
am I doing something wrong ?
const IORedis = require('ioredis');
const queue = new Queue('myqueue', new IORedis(process.env.REDIS_URL));
const queueEvents = new QueueEvents('myqueueevents', new IORedis(process.env.REDIS_URL));
const worker = new Worker('myworker', async job => { ... }, new IORedis(process.env.REDIS_URL));
We should run code-coverage and aim for at least 95% coverage.
You should add a github link in the documentation website.
I had to make a google search to find this repository
Apologies if this is already documented somewhere, but what is the plan to keep taskforcesh/bullmq in sync with fixes from optimalbits/bull? Are PRs manually cherry-picked?
In order to keep competitive and not create performance regressions we need a simple benchmark suite, ideally that can be run in the CI pipeline too.
Node: 12.12
Platform: Mac OS 10.14.6, Windows 10 (1903)
Problem: Installed bullmq
thourgh npm into a fresh project. Trying to run sample code and facing Error: Cannot find module 'tslib'
Should tslib
need to be installed manually in the project? Or is there any other suggested way?
This seems to be the line in question. As a result of this, JSON.parse(JSON.stringify(job)).data
is always a string, which was quite surprising. My particular use case is sending a Job over a socket serialized as json. It's easy enough to work around this, but since this is still an early release it seemed worth mentioning that this might surprise others as well.
(I've missed that emit call, it's already done)
In order to simplify migration between 3.x and 4.0 we should have a compatibility class.
In 4.0 we modularized by dividing responsibilities in different classes such as client, worker, events, scheduler, etc. In 3.0 on the other hand we have a unique class "Queue" that provides the same functionality.
We could create a class that mimics 3.x as close as possible (although probably we cannot require it to be 100% backwards compatible), by wrapping all the other classes. It will not be as efficient in terms of redis client usage but it will make it much easier for 3.x users to upgrade.
Hello @manast,
I am a great fan of the optimalbits/bull library and am very excited with this new repo. I'm wondering if you could share (it might exist, but have not found it) some details on the differences between bull v3 and v4. It looks like a full rewrite in TypeScript, but does it include more new features, or breaking changes? Futhermore can you share why you moved to another github organization instead of a v4 branch on the original repo?
Thanks a lot for the insight and thanks a lot for the awesome work.
Hey I'm trying out Bull 4, i love the simplicity of the new API, only thing i'm confused on is why my jobs don't seem to be processing. Looking at redis, i see the jobs added to the queue but they are all stuck on waiting.
In bull 3 i just had to call the process method once to initialize, this one i'm a bit at a lost, i'm sure it's something simple i'm overlooking. This is my test code below. Great work on Bull 3 by the way!
import * as express from 'express';
import * as bodyParser from 'body-parser';
import { resolve } from 'path';
import { UI, setQueues } from 'bull-board';
import { Queue, Worker } from 'bullmq'
import Redis from 'ioredis';
const appEnvironment = process.env.NODE_ENV || 'local';
require('dotenv').config({
path: resolve(process.cwd(), `.env.${appEnvironment}`),
debug: true
});
const { REDIS_HOST, REDIS_PORT } = process.env;
const REDIS_DB = parseInt(process.env.REDIS_DB || '0');
const connectionConfig = new Redis({
port: parseInt(REDIS_PORT),
host: REDIS_HOST,
db: REDIS_DB
});
const test = new Queue('test', { connection: connectionConfig });
setQueues(test)
const worker = new Worker('test', async (job) => {
await console.log('starting to process job', job)
})
const app = express();
app.use(bodyParser.json());
app.use('/', UI)
app.post('/add-to-queue', async ({ body }, res) => {
const { type } = body;
await test.add('test', body)
res.status(200).send({
message: 'Job was successfully added to queue'
});
});
app.listen({ port: process.env.PORT || 8000 }, () =>
console.log(`Server ready at http://${process.env.HOST}:${process.env.PORT}`)
);
Posted this on Twitter, but moving here for more visibility.
It seems like in BullMQ you can no longer setup workers per jobName
? That seems like a pretty large departure (and makes it tricky to migrate from bull
). Does this mean I should be instantiating a queue per job name from now on if I want to control concurrency of each worker? Additionally, queue.add still takes jobName as first arg, but at this point it's kind of pointless (might as well put it into payload if you need to). Unless you're planning to add the per job workers back in?
This feature was really nice for grouping various job pipelines.
E.g. you might have queue X with job types kick (concurrency 1), calculate (concurrency 5), send (concurrency 20).
In other words, I would suggest to either:
This put me in a bit of a pickle when I tried migrating, and I'm now migrating back to bull
.
This issue is to track unstable tests (collected from travis build logs https://travis-ci.org/taskforcesh/bullmq)
Pause
should be able to pause a running queue and emit relevant events
Compat
events
emits drained and global:drained event when all jobs have been processed
repeat
should emit a waiting event when adding a repeatable job to the waiting list
Delayed jobs
should process a delayed job only after delayed time
Just from a brief glance, it looks like only delay
is being used from Bluebird. It really is a no-starter for us to include libs with Bluebird since it seriously hinders the async guarantee in the server env.
Is this something I could help with?
PS: so excited to see a rewrite of Bull! Thanks for all the hard work
This is achieved with createClient
option which is defined but ignored now.
Also, we should accept RedisOptions for this to work.
PS Moreover, it's a bit confusing that we accept IORedis instance in two places but no usual RedisOptions at all
// very simplified version of RedisOptions
export interface RedisOpts {
skipVersionCheck?: boolean;
port: number;
host: string;
}
export type ConnectionOptions = RedisOpts | IORedis.Redis; // [1]
export interface QueueBaseOptions {
connection?: ConnectionOptions;
client?: IORedis.Redis; // [2]
...
}
When I add a job to a queue with a long delay (e.g. 60s), then subsequently call moveToCompleted before the delay has passed, the job goes active and is executed by the QueueScheduler.
My current workaround is to Job#remove but I would rather keep the history of all jobs.
Is this by design?
My concern here is that I suppose streams would infinitely grow if my setup only consists of a Queue and Worker, whereas trim() functionality is implemented on QueueScheduler/QueueEvents side. Correct me if I'm wrong.
Found in 3cf2617 (currently this is a feat/add-trim-option-#21)
Test code to reproduce
const queue = new Queue("test", { });
queue.add("test", { }, { stackTraceLimit: 100 });
const worker = new Worker("test", async (job) => {
throw new Error("Fail!");
});
This is covered by tests ("marks the job as failed" in "test_job.ts") but I still see empty stacktrace in Redis
As in Bull 3 we need a locking mechanism where a worker needs to pick a lock everytime it starts processing a job. Every worker and every concurrent processor withing a worker needs to have a uuid that will be stored in the lock, by doing this we can also know which worker that is processing a given job and also if a worker has lost the lock (which in the future should imply a cancellation).
Scenario
localhost:6379
and create a Worker
with default options and no-op processing callbackdrainDelay
(5 secs)kill -9
The ECONNREFUSED
will appear (I've also added some logging to trace each brpoplpush)
BRPOPLPUSH at 2019-10-12T09:22:26.934Z
BRPOPLPUSH at 2019-10-12T09:22:31.950Z
Error: connect ECONNREFUSED 127.0.0.1:6379
at TCPConnectWrap.afterConnect [as oncomplete] (net.js:1126:14) {
errno: 'ECONNREFUSED',
code: 'ECONNREFUSED',
syscall: 'connect',
address: '127.0.0.1',
port: 6379
}
<the same message appears again from time to time>
UPD. I've spent some more time on debugging this. What's not good is that brpoplpush()
call does not return control on such error, i.e. the promise returned neither resolves nor rejects.
hey guys,
can i use this package, to donwload files from one server to another via FTP,
i want to build like a queue with this package in which i feeded with the files that i want to download it "the path of the files in the server i want to download from"
is this possible, i am seeking the structure of this cycle,
was searching and i found your package.
thank you for the effort
Worker
will process the job immediately when it is instantiated. Is it possible to add a public method like start()
? Then add a boolean option such as autostart
to constructor arguments. Of course It's default value will be true. Thank you.
Need a method in a Queue class to wrap Queue.repeat.addNextRepeatableJob() call.
With NODE_DEBUG=bull
we can enable debug mode in Bull 3, and see some helpful output in console. Bullmq also have support of this debug mode, but there's only single place where debuglog is called, so in current implementation it's not very usable.
Also, for bullmq it's reasonable to rename section key to match new package name.
https://travis-ci.org/taskforcesh/bullmq/builds/576675637
1) workers
retry a job that fails:
Error: Timeout of 2000ms exceeded. For async tests and hooks, ensure "done()" is called; if returning a Promise, ensure it resolves. (/home/travis/build/taskforcesh/bullmq/src/test/test_worker.ts)
UPD Looks like this test is unstable, sometimes it does not fail.
I have so problem with my code, cannot find the reason:
UnhandledPromiseRejectionWarning: TypeError: client.addJob is not a function
at Function.addJob (C:\...\node_modules\bullmq\dist\classes\scripts.js:38:23)
at Job.addJob (C:\...\node_modules\bullmq\dist\classes\job.js:323:34)
at Function.create (C:\...\node_modules\bullmq\dist\classes\job.js:32:28)
at process._tickCallback (internal/process/next_tick.js:68:7)
Will be gratefull for any advice
My code:
import { Queue, QueueEvents, Worker } from 'bullmq';
import { posix } from 'path';
import IORedis from 'ioredis';
const connection = new IORedis({
port: 6363,
host: 'localhost',
password: 'some-password'
});
let queues: {
[key: string]: Queue
} = {};
let queueEvents: {
[key: string]: QueueEvents
} = {};
let queueWorkers: {
[key: string]: Worker
} = {};
export function addToQueue(queueName: string, data: any): void {
queues[queueName].add(queueName, data);
}
export function setQueue(queueName: string): void {
queues[queueName] = new Queue(
queueName,
{ connection }
);
queueWorkers[queueName] = new Worker(
queueName,
async job => {
console.log(job.data);
},
{ connection }
);
queueEvents[queueName] = new QueueEvents(
queueName,
{ connection }
);
queueEvents[queueName].on('completed', (job, result) => {
console.log('completed job => ', job);
console.log('completed result => ', result);
});
queueEvents[queueName].on('failed', function(job, err) {
console.log('completed job => ', job);
console.log('completed err => ', err);
});
queues[queueName].add(queueName, { text: 'foo' });
queues[queueName].add(queueName, { text: 'bar });
Catching any event since the beginning of time would be helpful feature in some cases, but I believe the most users would expect QueueEvents to work very similar to global events in Bull3.
Also, what I found not very obvious is that one should call init() on QueueEvents to start receiving any events.. is there any reason to not to place init() call inside class constrictor?
PS. Looks like instead of this.client = await this.connection.init();
line in init() method we can just call super.waitUntilReady()
as it's done in QueueScheduler.
Will this rewrite support RedisCluster natively? We really want to add redundancy to our Redis instance, but based on testing with Bull 3 it was just completely unreliable with a Redis cluster based on our testing. Was curious if this is going to be resolved in this version?
I've been toying with an admin dashboard for Bull, with the emphasis being performance metrics (specifically latency and wait times).
For the latter, I've been listening to the waiting event, then writing the current time to the job's data. Then on the transition to active i calculate the difference between the stored time and the current timestamp.
It seems that this and similar use cases would make a waitStartedOn job field a useful addition.
Hi,
I am new to coding and these scheduling things. Now I am creating a job which is need to be delayed for a certain time. But I did not succeed. Can any one give me a proper example to create a delayed job. Thanks in advance.
This is the code I written.
import { Queue } from 'bullmq';
import { Worker } from 'bullmq';
import { QueueScheduler } from 'bullmq';
import nodemailer from 'nodemailer';
import IORedis from 'ioredis';
const connection = new IORedis();
// date and time section
const secs_delay = 60000;
//end
const data = {
email: '[email protected]'
}
const options = {
delay: secs_delay,
attempts: 3
}
const queueScheduler = new QueueScheduler('sendEmail', { connection });
const queue = new Queue('sendEmail', { connection });
queue.add(data.email, options);
function sendRatingMailTo(sender_email: any){
// async..await is not allowed in global scope, must use a wrapper
async function main() {
// Generate test SMTP service account from ethereal.email
// Only needed if you don't have a real mail account for testing
let testAccount = await nodemailer.createTestAccount();
// create reusable transporter object using the default SMTP transport
let transporter = nodemailer.createTransport({
host: 'smtp.gmail.com',
port: 465,
secure: true, // true for 465, false for other ports
auth: {
user: "[email protected]", // generated ethereal user
pass: "mypassword" // generated ethereal password
}
});
// send mail with defined transport object
let info = await transporter.sendMail({
from: sender_email, // sender address
to: '[email protected]', // list of receivers
subject: 'Hello โ', // Subject line
text: 'Hello world?', // plain text body
html: '<b>Hello world?</b>' // html body
});
console.log('Message sent: %s', info.messageId);
// Message sent: <[email protected]>
// Preview only available when sending through an Ethereal account
console.log('Preview URL: %s', nodemailer.getTestMessageUrl(info));
// Preview URL: https://ethereal.email/message/WaQKMgKddxQDoou...
}
main().catch(console.error);
}
const worker = new Worker('sendEmail', async job => {
await sendRatingMailTo(job.data.email);
});
queueScheduler.on('completed', jobId => {
console.log('Sent emails Successfully');
});
queueScheduler.on('failed', (jobId, err) => {
console.error('error sending mails', err);
});
// queueScheduler.
Using only 1 connection in the workers is not enough in the case of concurrency > 1. In this case, while one worker is performing some work, and the other concurrent worker is waiting for some job to arrive (using a blocking call), when the first worker has completed his job there is not a connection free to move the job to the completed set.
When providing an already ready instance of IORedis
to the queue constructor, scripts are not loaded. This is a problem in the waitUntilReady
method of RedisConnection class that skip scripts loading for ready clients.
Maybe a class property should be used in RedisConnection in order to track if scripts have already been loaded.
bullmq/src/classes/redis-connection.ts
Lines 42 to 58 in 89bb554
const cronQueue = new Queue(obj.name, { connection: connectionConfig });
const worker = new Worker(obj.name, processor, {
connection: connectionConfig,
});
const result = await cronQueue.add(obj.name, {test: 'ol'},{ repeat: {
every: 10000,
limit: 5
}} );
when i run await cronQueue.getRepeatableJobs() i get
[
{
key: 'get-token::::10000',
name: 'get-token',
id: null,
endDate: null,
tz: null,
cron: '10000',
next: 1575414520000
}
]
however the processor never fires, I'm not sure if i'm misunderstanding how repeat job works but i figured that it would run once and then rerun at the allotted repeat param ? in this case run max 5 times every 10 seconds?
this works fine if i remove repeat thoughts ?
Hey I could not find it in the doc or code but is there Support for a UI built into BullMQ?
If not could you guide me a bit on how I could start to build a basic UI that lets me see all the jobs and its associated parameters?
Possible solutions:
"$"
instead of "0-0"
, only streaming new events. Downside of this approach: Events are ignored during QueueEvents listener unavailabilityI think there is a problem with queue defaultJobOptions
handling.
Options provided to Queue constructor are not overrided by options provided during job creation.
const queue = new Queue(
"my-queue",
{
connection,
defaultJobOptions: {
attempts: 2,
backoff: {
type: "fixed",
delay: 2000,
},
},
},
);
const job = await queue.add(
"my-job",
payload,
{
attempts: 3,
backoff: {
type: "fixed",
delay: 5000,
},
},
);
console.log(job.opts);
It print defaultJobOptions instead of provided options.
opts
parameter provided to the add
method should be merged into defaultJobOptions
.
Looks like with current implementation 'failed' events would be emitted by both worker (on processFn fail) and scheduler when job stalled too much.
I see there's no "stalled" event anymore by design, instead moveStalledJobsToWait emits either global "waiting" or "failed".
But I think it would be better to indicate that some job was stalled and that was the reason it is retried or failed.
The way concurrent works in BullMQ4.x makes it very trivial to implement:
While the worker is processing a job that is delayed (or node process is stalling the event loop > lockDuration) and attempts to call handleCompleted(), an "Error: Missing lock for job 778 finished" error is thrown on line 303, is caught, and handleFailed() is called on line 305.
Lines 298 to 308 in ad1fc3c
However, in handleFailed, it attempts to move the job to failed state on line 293.
Lines 284 to 295 in ad1fc3c
moveToFailed will also throw an "Error: Missing lock for job 778 finished" exception which will not be caught and will exit the node process.
I think the logic for these types of errors could be handled more gracefully. But, at the very least, a try catch block should be put around line 293 (await job.moveToFailed()) so that the node process does not exit and the emit('failed') will be executed.
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.