jbielick / faktory_worker_node Goto Github PK
View Code? Open in Web Editor NEWa node.js client and worker framework for Faktory job server
Home Page: https://github.com/contribsys/faktory
License: MIT License
a node.js client and worker framework for Faktory job server
Home Page: https://github.com/contribsys/faktory
License: MIT License
Getting this during the heartbeat for long(ish) running workers:
1/9/2018 4:42:21 AM{ Error: This socket has been ended by the other party
1/9/2018 4:42:21 AM at Socket.writeAfterFIN [as write] (net.js:351:12)
1/9/2018 4:42:21 AM at Promise (/apps/api/node_modules/faktory-client/lib/client.js:164:19)
1/9/2018 4:42:21 AM at new Promise (<anonymous>)
1/9/2018 4:42:21 AM at Client.send (/apps/api/node_modules/faktory-client/lib/client.js:160:12)
1/9/2018 4:42:21 AM at Client.beat (/apps/api/node_modules/faktory-client/lib/client.js:223:17)
1/9/2018 4:42:21 AM at withConnection.c (/apps/api/node_modules/faktory-worker/lib/manager.js:112:34)
1/9/2018 4:42:21 AM at Manager.withConnection (/apps/api/node_modules/faktory-worker/lib/manager.js:46:14)
1/9/2018 4:42:21 AM at <anonymous> code: 'EPIPE' }
node.js version: v11.0.0
npm/yarn version: 6.4.1
faktory-server version: 0.9.1
facktory-worker
package version:
Error: read ETIMEDOUT
at TCP.onStreamRead (internal/stream_base_commons.js:111:27) errno: 'ETIMEDOUT', code: 'ETIMEDOUT', syscall: 'read'
Unhandled Rejection at: Promise Promise {
<rejected> Error: Connection closed
at Client.onClose (/app/node_modules/faktory-worker/lib/client.js:327:26)
at Socket.emit (events.js:182:13)
at Socket.EventEmitter.emit (domain.js:442:20)
at TCP._handle.close (net.js:606:12)
This means I should check for errors? If this error happens there is no reconnection to faktory (never starts processing jobs)
facktory-worker
package version: 3.3.7+
import faktory from 'faktory-worker'
yields the following typescript error:
'faktory' is declared but its value is never read. ts(6133)
I am in fact using it.
This is because of the change from export class faktory
to export interface faktory
in index.d.ts
An interface can only be used as a type.
node.js version:
16.15.1
npm/yarn version:
1.22.10
(yarn)
faktory-server version:
1.5.1
faktory-worker
package version:
4.1.4
-> 4.3.0
Hi there. Thanks for the excellent project!
I ran into an issue after upgrading from 4.1.4 -> 4.3.0 where workers appear to hang. I am unable to replicate in a small example and the issue does not appear consistently in my environment, but after a good amount of debugging, I am confident it comes from the upgrade in some way.
The workers do a good amount of IO and are on the default concurrency, and the work
function code has not changed between version upgrades. The hanging only shows up in 4.3.0
, although I think the hanging is probably a symptom of something else rather than the node workers actually hanging.
Sorry to be so vague but I am confused myself and still collecting information! I figured i'd at least create an issue in case someone else has issues after upgrading.
If I can provided any specific information let me know. I'll need to look at the version diffs as well and see if anything sticks out.
Hi @jbielick -- I have been coming across this issue a lot recently on a Windows worker. I just updated from 2.2.0 to 3.02, and am still seeing this issue.
Jobs process normally for a while, and then I hit the error described above which takes down the entire process.
On the worker, I'm running Windows 10, Node 10.16, faktory-worker-node 3.02. I do not see this same issue on my ubuntu workers. Do you have any further insight into this issue on this platform specifically? Is there something I can do in my code to be more defensive? I didn't want to open a new issue yet since all of the context is here.
Here is an example output log from my app: https://pastebin.com/3r9Sq3KT
It shows the process working for a while sending a variety of ACKs and FAILs, and then crashing.
Here is how I'm initiating the worker with a few small redactions. I'm not very great with Node, so I'm including this in case it shows something obviously incorrect.
const faktory = require('faktory-worker');
const worker = require('./worker-__');
const colors = require('colors');
const concurrency = parseInt(process.argv[2]) || 4;
// Setup the worker server & register the faktory worker(s).
(async () => {
faktory.register('Harvest::__::Scraper', async (zipCode, section, aspect, rim, url=null) => {
if (url) {
await worker.extractSingleProduct(zipCode, section, aspect, rim, url);
} else {
await worker.run(zipCode, section, aspect, rim);
}
});
faktory.register('Harvest::__::Scraper::InCartPricing', async (zipCode, section, aspect, rim, listing) => {
await worker.inCartPricing(zipCode, section, aspect, rim, listing);
});
// MIDDLEWARE
faktory.use(async (ctx, next) => {
const start = new Date(Date.now()).toISOString();
try {
console.log(`${start} ${ctx.job.jid} ${ctx.job.jobtype} Start ${ctx.job.args}`);
await next();
const end = new Date(Date.now()).toISOString();
console.log(`${end} ${ctx.job.jid} ${ctx.job.jobtype} ACK ${ctx.job.args}`.green);
} catch (e) {
const errTime = new Date(Date.now()).toISOString();
console.log(`${errTime} ${ctx.job.jid} ${ctx.job.jobtype} FAIL ${ctx.job.args}`.red);
throw e;
}
});
await faktory.work({
queues: ['www.__.com'],
concurrency : concurrency
});
})()
Please let me know if you'd like me to create a separate issue or provide more information, or if there is something I should change on my side.
Originally posted by @ttilberg in #23 (comment)
node.js version:
14
npm/yarn version:
1.22.5
faktory-server version:
Faktory 1.5.1
facktory-worker
package version:
4.1.4
import { db } from '../api/src/lib/db'
import { logger } from '../api/src/lib/logger'
const faktory = require('faktory-worker')
faktory.register('ResizeImage', async ({ id, size }) => {
console.log('run ResizeImage')
})
export default async ({ args }) => {
const worker = await faktory.work()
worker.on('fail', ({ job, error }) => {
logger.debug({ job, error }, `Faktory worker failure event`)
})
}
No error should be produced
Error causes the worker to crash with worker.on is not a function
Use the script provided above.
I would love to see this project ported to deno.
There's actually an existing project but having both project maintained at the same place seems to me a better choice for the long term.
Here's a ressource that might help https://medium.com/samsung-internet-dev/using-node-modules-in-deno-2885600ed7a9
node.js version: 9.5.0
npm/yarn version: 5.6.0
facktory-worker
package version: 0.8.0
let run = async () => {
const client = await Faktory.connect()
console.log("connected.")
Faktory.register("AggroJob", () => {
console.log("process job")
})
console.log("starting worker.")
Faktory.work()
}
run().then(() => {
console.log("running")
}).catch((err) => {
console.log("err")
console.error(err)
})
When I hit ctrl-c
, for the program to gracefully shutdown. It looks like it does wait to shut everything down, but then it hangs.
> node jobs/aggro_job.js
connected.
starting worker.
running
process job
^C2018-03-26T21:17:30.897Z faktory-manager wid=85195ce3 Stopping
2018-03-26T21:17:30.898Z faktory-manager wid=85195ce3 Shutting down. In progress: 0
2018-03-26T21:17:30.905Z wid=85195ce3 pid=74732 Error: Connection closed
2018-03-26T21:17:30.905Z wid=85195ce3 pid=74732 Error: Connection closed
2018-03-26T21:17:30.905Z wid=85195ce3 pid=74732 Error: Connection closed
2018-03-26T21:17:30.905Z wid=85195ce3 pid=74732 Error: Connection closed
2018-03-26T21:17:30.906Z wid=85195ce3 pid=74732 Error: Connection closed
2018-03-26T21:17:30.908Z wid=85195ce3 pid=74732 Error: Connection closed
2018-03-26T21:17:30.908Z wid=85195ce3 pid=74732 Error: Connection closed
2018-03-26T21:17:30.908Z wid=85195ce3 pid=74732 Error: Connection closed
2018-03-26T21:17:30.908Z wid=85195ce3 pid=74732 Error: Connection closed
2018-03-26T21:17:30.908Z wid=85195ce3 pid=74732 Error: Connection closed
2018-03-26T21:17:30.908Z wid=85195ce3 pid=74732 Error: Connection closed
2018-03-26T21:17:30.908Z wid=85195ce3 pid=74732 Error: Connection closed
2018-03-26T21:17:30.908Z wid=85195ce3 pid=74732 Error: Connection closed
2018-03-26T21:17:30.908Z wid=85195ce3 pid=74732 Error: Connection closed
2018-03-26T21:17:30.908Z wid=85195ce3 pid=74732 Error: Connection closed
2018-03-26T21:17:30.909Z wid=85195ce3 pid=74732 Error: Connection closed
2018-03-26T21:17:30.909Z wid=85195ce3 pid=74732 Error: Connection closed
2018-03-26T21:17:30.909Z wid=85195ce3 pid=74732 Error: Connection closed
2018-03-26T21:17:30.909Z wid=85195ce3 pid=74732 Error: Connection closed
2018-03-26T21:17:30.909Z wid=85195ce3 pid=74732 Error: Connection closed
^C2018-03-26T21:17:45.647Z faktory-manager wid=85195ce3 Stopping
2018-03-26T21:17:45.647Z faktory-manager wid=85195ce3 Shutting down. In progress: 0
Use the code specified for the worker, and run it. Then try to use ctrl-C to exit.
node.js version: 12
npm/yarn version: 6
faktory-server version: 1.0.1
facktory-worker
package version: 3.2.1
I started faktory-server and the worker in docker containers, worker connect to server and after some time i stopped the server and started to receive this error from worker and the CPU is 100%. I replied to the other issue after you closed it but i don't know if you saw it.
Error: getaddrinfo ENOTFOUND queue
at GetAddrInfoReqWrap.onlookup [as oncomplete] (dns.js:60:26) {
errno: 'ENOTFOUND',
code: 'ENOTFOUND',
syscall: 'getaddrinfo',
hostname: 'queue'
}
The Client function push
is asynchronous and should return a Promise
that resolves to string
.
Since as of version 3.0.0 per https://github.com/jbielick/faktory_worker_node/blob/master/CHANGELOG.md#300--2019-02-11, a connection pool is added to each client, is it advantageous/recommended to instantiate a single client connection to be used for the lifetime of an app?
Also, can you clarify what is meant by (taken from the changelog):
Client.connect() not necessary before using connection Client.connect() now connects and disconnects to test the connection details. A connection is created in the pool when it is needed.
I'm currently researching Faktory and usage in Node.js (after looking at other job queues), and I'm wondering how I'd go about implementing a means for a worker or set of workers to be temporarily paused. My use case is that I have to interact with a social media services' API, and that API can sometimes go down or exceed rate limits, in these cases, I think failing the job that encountered the limit would be okay, but then I'd like to instruct all workers of that particular job to actually stop working, until the coast is clear and the API is back up โ there's no point in all jobs continuing to be started and continuing to fail when a circuit breaker has flipped.
Would this be implemented by having a side channel that subscribes to, say, a redis pub-sub notification that defines whether or not it's okay to call Faktory.work
or call worker.quiet()
?
I can't find any information in the Faktory wiki about handling circuit-breakers for jobs either, but this can be vital to ensure distributed systems work reliably.
Are we using npm or yarn as the package manager for this project? I'm also curious why the lockfiles are in the .gitignore? These are files that you should absolutely want to check in. Happy to open a PR for that.
I see from the code that the cli has options for booting up workers on queues other than default, and even specifying weights.
Would be good to have more code examples in the readme for anyone not using the bin scripts
unknown: No handler for GenerateForm
I can't seem to fgure out why im getting this error. The job is set up exactly the same as all other jobs in our system
const client = await faktory.connect({ queue: 'form_generator' });
const job = await client.job('GenerateForm', jobData);
job.queue = 'form_generator';
job.priority = 5;
job.retry = -1;
job.reserve_for = 18000;
logger.info({ job });
await job.push();
await client.close();
return job;
faktory.register('GenerateForm', async (jobData) => {
logger.info('await processor');
await processor(jobData);
});
faktory.work({
queues: ['form_generator'],
});
Thanks for writing this.
I'm running into an issue when I return a rejected promise inside a worker. The job gets stuck in busy until timeout. It works fine if I return a resolved promise.
Any ideas?
Here is my worker
const faktory = require('faktory-worker');
const doWork = async (id, size) => {
return new Promise((resolve, reject) => {
reject('Test')
})
}
faktory.register('MyDoWorkJob', doWork);
faktory.work();
Pushing into the queue
const faktory = require('faktory-worker');
async function run() {
const client = await faktory.connect();
client.push({
queue: 'default',
jobtype: 'MyDoWorkJob',
args: []
});
}
run().then(process.exit)
Here is the error message
> node worker.js
(node:90083) UnhandledPromiseRejectionWarning: Unhandled promise rejection (rejection id: 2): TypeError: Cannot read property 'split' of undefined
(node:90083) [DEP0018] DeprecationWarning: Unhandled promise rejections are deprecated. In the future, promise rejections that are not handled will terminate the Node.js process with a non-zero exit code.
ctx
is available for middleware, but isn't accessible to the job handler itself. ctx should be passed to the job handler thunk instead of just the job payload.
const crypto = require('crypto');
const faktory = require('faktory-worker');
faktory.use(async (ctx, next) => {
ctx.db = 'database connection'; // this is not accessible in the job
await ctx.db.connect();
try {
await next();
} catch (e) {
await ctx.db.close();
throw e;
}
});
faktory.register('Lookup', async ({ id }) => async ctx => {
// ^ this is not ctx, it's the job payload
console.log(ctx)
const hash = crypto.createHash('sha256');
const doc = await ctx.db.get(id);
hash.update(doc.word);
doc.result = hash.digest('hex');
await ctx.db.put(doc);
});
faktory.work();
Will you be integrating the Mutate API?
https://github.com/contribsys/faktory/wiki/Mutate-API
https://github.com/contribsys/faktory/blob/master/client/mutate.go
node.js version: 9.8.0
npm/yarn version: 5.6.0
facktory-worker
package version: 0.9.1
$ npm install --save faktory-worker
+ [email protected]
added <n> packages in 1.665s
npm ERR! path <repo_path>/node_modules/faktory-worker/bin/faktory-worker
npm ERR! code ENOENT
npm ERR! errno -2
npm ERR! syscall chmod
npm ERR! enoent ENOENT: no such file or directory, chmod '<repo_path>/node_modules/faktory-worker/bin/faktory-worker'
npm ERR! enoent This is related to npm not being able to find a file.
npm ERR! enoent
$ npm install --save faktory-worker
node.js version: node:8.12-alpine
npm/yarn version: yarn 1.10.1
faktory-server version: 0.9.0
facktory-worker package version: ^2.2.0
faktory.work({
concurrency: 1,
queues: ['node-scraper', 'node-crawler'],
timeout: 25 * 1000
})
If the node-scraper
queue is not empty a job should be processed from there, then once that is empty a job from node-crawler
should be processed.
A new job appears to be grabbed from the top of the stack. I just tested this by adding a new node-crawler
job while there were 130 node-scraper
jobs in queue. The next jobs to run were node-crawler
.
Create a worker that listens to two queues. Queue A will be a short running process that generates many long running Queue B processes. The same worker listens to both queues. When long running processes in Queue B have accumulated, try adding another from Queue A, which should be executed only after all of Queue B have completed.
I've been doing some application profiling of our system and noticed that generating a new Faktory connection is surprisingly slow (~400ms), most of which is spent doing the default 4000 (!) hash/salt iterations on the configured password to ensure we have the same password as the scheduler.
Would it be possible to memoize the hash
function, so it only needs to be run once per process for the same password + salt + iterations parameters? If so, I can look into opening a PR for that.
Exception
(node:43609) UnhandledPromiseRejectionWarning: ReplyError: ERR Unknown worker 22dded38
at parseError (/Users/laksh/Desktop/repos/kaiju/node_modules/redis-parser/lib/parser.js:179:12)
at parseType (/Users/laksh/Desktop/repos/kaiju/node_modules/redis-parser/lib/parser.js:302:14)
node_modules/source-map-support/source-map-support.js:495
(node:43609) UnhandledPromiseRejectionWarning: Unhandled promise rejection. This error originated either by throwing inside of an async function without a catch block, or by rejecting a promise which was not handled with .catch(). To terminate the node process on unhandled promise rejection, use the CLI flag `--unhandled-rejections=strict` (see https://nodejs.org/api/cli.html#cli_unhandled_rejections_mode). (rejection id: 2)
node.js version: v12.16.2
npm/yarn version: 6.14.11
faktory-server version: 1.4.2
facktory-worker
package version: 4.0.1
try {
this.worker = await this.faktory.work({
...config
})
} catch (error) {
logger.error({ error }, 'faktory-worker error')
}
Gracefully return a promise rejection in faktory.work()
(node:43609) UnhandledPromiseRejectionWarning: ReplyError: ERR Unknown worker 22dded38
at parseError (/Users/laksh/Desktop/repos/kaiju/node_modules/redis-parser/lib/parser.js:179:12)
at parseType (/Users/laksh/Desktop/repos/kaiju/node_modules/redis-parser/lib/parser.js:302:14)
node_modules/source-map-support/source-map-support.js:495
(node:43609) UnhandledPromiseRejectionWarning: Unhandled promise rejection. This error originated either by throwing inside of an async function without a catch block, or by rejecting a promise which was not handled with .catch(). To terminate the node process on unhandled promise rejection, use the CLI flag `--unhandled-rejections=strict` (see https://nodejs.org/api/cli.html#cli_unhandled_rejections_mode). (rejection id: 2)
Looking into Faktory-worker-node issues, I found this in #70
Just covering the bases. There's a try/catch around worker.js:76, so I'm trying to make sense of the stack trace for the unhandled rejection.
faktory_worker_node/lib/worker.js
Lines 75 to 81 in 99bab78
I know there are some similar rules around not listening to emitted "error" events, and one is emitted on 79 in the case that
fetch
orhandle
fails, so I haven't ruled that out yet, either.
I am suspecting that the error happens before the worker can start the fetch
and handle
job loop?
faktory_worker_node/src/worker.ts
Line 205 in 8150cf0
faktory_worker_node/src/client.ts
Line 239 in 8150cf0
^^ the above is not wrapped in a try/catch block? Could this be the issue?
Error is returned from Faktory server here => https://github.com/contribsys/faktory/blob/b5549cc1cc6cdb19592db69f2da8a8881f69a13d/server/commands.go#L223
node.js version: n/a
npm/yarn version: n/a
faktory-server version: 0.7.0 (docker image)
facktory-worker
package version: 0.9.2
โบ DEBUG=faktory* bin/faktory-work --label one --label two --concurrency 1
faktory-worker:manager creating connection pool with max 3 +0ms
faktory-worker:manager creating pool resource +2ms
faktory-worker:client Connecting to server +0ms
faktory-worker:manager starting 1 worker(s) +3ms
faktory-worker:client:socket connect +0ms
faktory-worker:client Established connection +4ms
faktory-worker:client Shaking hands +0ms
faktory-worker:client:server HI {"v":2} +0ms
faktory-worker:client HELLO {"hostname":"HAL9000.local","v":2,"labels":["one","two"],"pid":48629,"wid":"45309e28"} +3ms
faktory-worker:client:server OK +2ms
faktory-worker:client client='HELLO {"hostname":"HAL9000.local","v":2,"labels":["one","two"],"pid":48629,"wid":"45309e28"}', server={ text: 'OK' } +1ms
faktory-worker:client Connected +2ms
faktory-worker:client:heart BEAT +0ms
faktory-worker:client BEAT {"wid":"45309e28"} +1ms
faktory-worker:client:server OK +3ms
faktory-worker:client client='BEAT {"wid":"45309e28"}', server={ text: 'OK' } +0ms
faktory-worker:client FETCH +2ms
panic: runtime error: invalid memory address or nil pointer dereference
[signal SIGSEGV: segmentation violation code=0x1 addr=0x20 pc=0x794424]
goroutine 35 [running]:
github.com/contribsys/faktory/manager.(*manager).Fetch(0xc4200119e0, 0xf7aa80, 0xc420160120, 0xc420174198, 0x8, 0xc420172060, 0x0, 0x0, 0x500000c4201741c7, 0x7, ...)
/root/go/src/github.com/contribsys/faktory/manager/manager.go:196 +0x2d4
github.com/contribsys/faktory/server.fetch(0xc42017e0c0, 0xc42010e000, 0xc4201741c7, 0x5)
/root/go/src/github.com/contribsys/faktory/server/commands.go:81 +0x14c
github.com/contribsys/faktory/server.(*Server).processLines(0xc42010e000, 0xc42017e0c0)
/root/go/src/github.com/contribsys/faktory/server/server.go:299 +0x232
github.com/contribsys/faktory/server.(*Server).Run.func1(0xc42010e000, 0xf7c640, 0xc420166050)
/root/go/src/github.com/contribsys/faktory/server/server.go:147 +0x5c
created by github.com/contribsys/faktory/server.(*Server).Run
/root/go/src/github.com/contribsys/faktory/server/server.go:142 +0x247
faktory-worker:client:socket close +141ms
faktory-worker:client Connection closed unexpectedly +132ms
Error: Connection closed
at Client.onClose (/Users/josh/code/faktory-worker/lib/client.js:330:26)
at Socket.emit (events.js:160:13)
at TCP._handle.close [as _onclose] (net.js:559:12)
[1] + exit 2 bin/server
faktory-worker:manager destroying pool resource +2s
faktory-worker:manager creating pool resource +0ms
faktory-worker:client Connecting to server +2s
faktory-worker:client:socket error: { Error: connect ECONNREFUSED 127.0.0.1:7419 at Object._errnoException (util.js:1003:13) at _exceptionWithHostPort (util.js:1024:20) at TCPConnectWrap.afterConnect [as oncomplete] (net.js:1194:14) errno: 'ECONNREFUSED', code: 'ECONNREFUSED', syscall: 'connect', address: '127.0.0.1', port: 7419 } +2s
{ Error: connect ECONNREFUSED 127.0.0.1:7419
at Object._errnoException (util.js:1003:13)
at _exceptionWithHostPort (util.js:1024:20)
at TCPConnectWrap.afterConnect [as oncomplete] (net.js:1194:14)
errno: 'ECONNREFUSED',
code: 'ECONNREFUSED',
syscall: 'connect',
address: '127.0.0.1',
port: 7419 }
Labels are added to the worker process and displayed in the UI
The server panics during fetch
Start a server: bin/server &
Start a worker with labels: bin/faktory-work --label one --label two --concurrency 1
Regarding
faktory_worker_node/lib/worker.js
Line 59 in 1ae4482
Is there a reason the poolSize
for the client is set to concurrency + 2
? We have concurrency set to 100 and have many queues open and this is causing Faktory to essentially grind to a halt to the point where using the Web UI does not work. We also use Elixir and the concurrency there is not a problem.
Unless each Redis connection is blocking while the job is running, this seems like overkill.
node.js version: node:8.12-alpine
npm/yarn version: yarn 1.10.1
faktory-server version: 0.9.0
facktory-worker
package version: ^2.2.0
I think that the code isn't entirely applicable here, and would just be noise. It's a crawler that uses puppeteer to continuously crawl pages and take screenshots.
It may be worth noting that this is all running in a dockerized environment, and that these containers are currently experiencing issues with hitting their memory limits because some pages are just heavy.
To continue processing jobs.
(node:10) UnhandledPromiseRejectionWarning: ReplyError: ERR Unknown worker 97d0ef88
at parseError (/app/node_modules/faktory-worker/node_modules/redis-parser/lib/parser.js:179:12)
at parseType (/app/node_modules/faktory-worker/node_modules/redis-parser/lib/parser.js:302:14)
(node:10) UnhandledPromiseRejectionWarning: Unhandled promise rejection. This error originated either by throwing inside of an async function without a catch block, or by rejecting a promise which was not handled with .catch(). (rejection id: 2400)
It throws this error repeatedly.
This error only occurs after hours or days of running. It may well be that my solution is to catch this error and kill the process so that it restarts, as it's so intermittent.
We enabled all faktory-worker logs and now everytime a heart beat is sent, this shows up as an ERROR level log.
Exact message is: "ERROR: faktory-worker:client:heart" occuring every time a heart beat is sent (i.e., not indicative of an error, just that a heart beat was sent).
We would like a way to downgrade this heart beat sent log message to DEBUG or INFO.
Hi @jbielick
I am using faktory and this lib in a project and it worked fine at first. But then I came across some issues with error handling.
With architecture in mind, I want my project to keep running if faktory server is down (the other parts of the project which are not faktory dependant).
However, if faktory worker can't be found at options.host:options.port
, the lib crashes the application. This is due to unhandled 'error' event.
This is likely due to this line:
faktory_worker_node/lib/connection.js
Line 190 in 9e94469
Which make the client.connect()
uncatchable.
The error:
events.js:174
throw er; // Unhandled 'error' event
^
Error: connect EHOSTUNREACH 172.12.12.10:7419
at TCPConnectWrap.afterConnect [as oncomplete] (net.js:1097:14)
Emitted 'error' event at:
at Connection.onError (/usr/src/app/node_modules/faktory-worker/lib/connection.js:191:10)
at Socket.emit (events.js:194:15)
at emitErrorNT (internal/streams/destroy.js:82:8)
at emitErrorAndCloseNT (internal/streams/destroy.js:50:3)
at process._tickCallback (internal/process/next_tick.js:63:19)
[nodemon] app crashed - waiting for file changes before starting...
I'm using node version: 10.15 with facktory-worker
package version: 3.0.2
Here is the client helper code that makes the whole application crash:
const {Client} = require('faktory-worker')
module.exports = (config, logger) => {
const client = new Client(config)
return client
.connect()
.then(() => {
logger.info('Connection to faktory has been established successfully.')
return client
})
.catch(err => {
logger.error(`Unable to connect to faktory: ${err}`)
return err
})
}
I played around with my faktory-worker
package in my node_modules
directory and commenting this particular line make the connect
function catchable once again
Hello!
Thank you for writing a robust Faktory client for NodeJS!
I wonder if you've considered running workers in a multi-process configuration, where the master process responds to health checks, and manages a pool of workers to do the actual job?
If a particular worker blocks the main event loop in the NodeJS process (I know, this is very bad and it shouldn't, alas this happens sometimes), the health check may timeout and the entire worker is considered dead by the Faktory server. In addition, long-running jobs are difficult if not impossible to timeout when concurrency is > 1. The only reliable way of doing so is to exit the process.
pm2
I found this pretty impressive process/clustering solution for node โ pm2 and it seems that the combination of the two would be a dream come true.
Have you thought about what the integration between pm2 and Faktory-worker be like? Is it a worthwhile effort? Are there any roadblocks that you see in implementing a clustered set of workers using pm2?
As you know, Faktory is the rewrite of Sidekiq written in Ruby. Sidekiq Enterprise offers multi-process clustering, but you can get it for free using sidekiq-pool ruby gem. Adding multi-process support to this module, therefore, makes just as much sense.
node.js version: 12.14.1
faktory-server version: pro-1.4.0
faktory-worker
package version: 3.3.6
Worker disconnects should be handled and retried.
Unhandled exception leading to the worker not receiving any jobs.
The stack trace:
at TCP.<anonymous> (net.js:664:12)
at Socket.EventEmitter.emit (domain.js:475:20)
at Socket.emit (events.js:223:5)
at Connection.onClose (/app/node_modules/faktory-worker/lib/connection.js:120:41)
Error: Connection closed
This error originated either by throwing inside of an async function without a catch block, or by rejecting a promise which was not handled with .catch(). The promise rejected with the reason:
at async Worker.tick (/app/node_modules/faktory-worker/lib/worker.js:76:19)
at async Client.fetch (/app/node_modules/faktory-worker/lib/client.js:176:22)
at /app/node_modules/generic-pool/lib/Pool.js:475:16
at Pool.destroy (/app/node_modules/generic-pool/lib/Pool.js:546:10)
at Pool._destroy (/app/node_modules/generic-pool/lib/Pool.js:137:42)
at ConnectionFactory.destroy (/app/node_modules/faktory-worker/lib/connection-factory.js:54:23)
at Connection.close (/app/node_modules/faktory-worker/lib/connection.js:205:12)
at new Promise (<anonymous>)
at /app/node_modules/faktory-worker/lib/connection.js:206:50
at Socket.end (net.js:580:31)
at Socket.Writable.end (_stream_writable.js:617:10)
at Socket.writeAfterFIN [as write] (net.js:447:14)
Error: This socket has been ended by the other party
2020.08.03 17:54:59 LOG5[11]: Connection closed: 8736 byte(s) sent to TLS, 23781 byte(s) sent to socket
node.js version: 12
npm/yarn version: 6
faktory-server version: 1.0.1
facktory-worker
package version: 3.1.0
faktory.register(jobTypes.export, async (data) => {
await job(data);
});
console.log(`๐ Waiting for jobs on ${htmlQueue}...`);
faktory.work({ queues: htmlQueue, concurrency });
If you change FAKTORY_URL to a domain that does not exists or faktory server is down generic-pool will keep trying to connect to that server and keep CPU to 100% or more.
The problem is that the create function ConnectionFactory will throw an error and i read here coopernurse/node-pool#197 that this function should not fail or fail with some delay maybe.
I tried something like this
create() {
debug('+connection');
const conn = new Connection(this.port, this.host);
conn.on('error', this.onError);
return new Promise(async (resolve, reject) => {
try {
const greeting = await conn.open();
await this.handshake(conn, greeting);
resolve(conn);
} catch (e) {
setTimeout(() => {
reject(e);
}, 1000);
}
});
}
node.js version: 14.17.1
npm/yarn version: 7.6.1
faktory-server version: 1.6.1
facktory-worker
package version:4.5.0"
I was trying to connect via a FAKTORY_URL
to a staging environment and forgot to put the basic auth in the url. However, the error I got back was:
[2022-10-03 20:59:27.458] [ERROR] ResourceRequest timed out | {"name":"TimeoutError","stack":"TimeoutError: ResourceRequest timed out\n at ResourceRequest._fireTimeout (<project root>/node_modules/faktory-worker/node_modules/generic-pool/lib/ResourceRequest.js:62:17)\n at Timeout.bound <project root>/node_modules/faktory-worker/node_modules/generic-pool/lib/ResourceRequest.js:8:15)\n at listOnTimeout (internal/timers.js:557:17)\n at processTimers (internal/timers.js:500:7)"}
ie. a timeout.
It took me a while to figure out the issue. I couldn't see anything in the connection code that would expose this error other than a timeout. Does Faktory not return any kind of auth error to the workers trying to do a BEAT
?
Running the latest faktory version in brew (0.9.0-1) with the latest version of this library (2.2.0) seems to encounter a timeout when calling faktory.work()
a multitude of times. This seems to be easy to trigger by calling await faktory.work()
(with or without parameters) ~10 or more times on my machine (2016 quad core mbp). I've increased the timeout to 20s and that seems to resolve the problem.
A few questions as I'm not entirely sure what's happening here:
10000 * size
also seems to work)?faktory.work()
to input an acquireTimeout if they know this may be an issue?I think some form of 2 would be the most seamless to the consumer.
node.js version: 14.17.1
npm/yarn version: 7.6.1
faktory-server version: 1.6.1
facktory-worker
package version:4.4.0
Hi,
I see the CLI option for starting using node_modules/.bin/faktory-work
, however, whenever I start the process that way any jobs get an error saying undefined: No jobtype registered: <jobname>
.
This makes sense as the command line tool doesn't seem to allow for any jobs to be registered. Am I missing something about how jobs can be registered to the cli spawned worker?
Related, is the a best practice for how users load a potentially large number of registrations attached to a worker? For example do people create a jobs
or tasks
folder where each file contains a worker.register()
call and require
the whole folder, or have all the register calls in one worker.js
file that then call out to other modules?
node.js version: 8..0
npm/yarn version: 1.7.0
faktory-server version: 0.0.7
facktory-worker
package version: github master
I'm killing my worker every hour via SIGINT as a work around to another problem. This causes an un-handled promise rejection error pointing at lib/manager.js:104:38
. When the kill is done within a shorter timeframe, say 10 seconds, there is no problem.
Is this as simple as wrapping the beat
async call in a try/catch block?
try {
const response = await this.pool.use(client => client.beat());
switch (response) {
case 'quiet':
this.quiet();
break;
case 'terminate':
this.stop();
break;
default:
// noop
break;
}
} catch(err) {
this.stop();
}
2018-08-11T21:36:07.359Z pdf-bot-pro:worker Shutting down per shutdownTimer
2018-08-11T21:36:07.360Z faktory-manager wid=2f4eaadf Stopping
(node:1) UnhandledPromiseRejectionWarning: Error: pool is draining and cannot accept work
at Pool.acquire (/usr/src/app/node_modules/generic-pool/lib/Pool.js:434:9)
at Pool.use (/usr/src/app/node_modules/generic-pool/lib/Pool.js:464:17)
at Manager.beat (/usr/src/app/node_modules/faktory-worker/lib/manager.js:104:38)
at ontimeout (timers.js:498:11)
at tryOnTimeout (timers.js:323:5)
at Timer.listOnTimeout (timers.js:290:5)
(node:1) UnhandledPromiseRejectionWarning: Unhandled promise rejection. This error originated either by throwing inside of an async function without a catch block, or by rejecting a promise which was not handled with .catch(). (rejection id: 3)
(node:1) [DEP0018] DeprecationWarning: Unhandled promise rejections are deprecated. In the future, promise rejections that are not handled will terminate the Node.js process with a non-zero exit code.
Graceful shutdown
pdf-bot-pro:worker Shutting down per shutdownTimer +10s
2018-08-11T21:49:45.303Z faktory-manager wid=7d1a53a8 Stopping
setTimeout(() => {
process.kill(process.id, 'SIGINT')
}, 3600000)
faktory.work(...)
node.js version: 16.14.2
npm/yarn version: 8.5.0
faktory-server version: 1.7.0
facktory-worker
package version: 4.5.1
const faktoryTime = "2024-12-20T15:30:17.111222333Z";
const date = new Date(faktoryTime);
const jobData = {
keyId: keyId,
at: date.toISOString(),// Store the scheduled time as a string
retry: 3,
};
const data = await client.job("Sql", jobData).push();
node version: 10.19 and 14.15.5
npm/yarn version: 7.12.1
faktory-server version: 1.5.1
facktory-worker
package version: 2.2.3 and 4.1.4
client = await faktory.connect()
client.job("foo", {hi: "mom"}).at(new Date())
it works
TypeError: ).at is not a function
See sample code
Which Faktory package and version?
We are currently using faktory-worker: ^4.2.1
Please include any relevant worker configuration
We have our redis mounted on a t2.small EC2 instance and our workers on Fargate with 5 tasks.
Please include any relevant error messages or stacktraces
We are receiving this logs constantly:
"level": 50,
"time": 1678978804856,
"pid": 20,
"source": "faktory",
"method": "enqueueJob",
"message": "Connection closed",
"stack": "Error: Connection closed\n at Connection.onClose (/app/node_modules/faktory-worker/lib/connection.js:102:45)\n at Socket.emit (events.js:400:28)\n at Socket.emit (domain.js:475:12)\n at TCP.<anonymous> (net.js:686:12)"
}
{
"level": 50,
"time": 1678978804856,
"pid": 20,
"source": "faktory",
"method": "enqueueJob",
"message": "read ECONNRESET",
"stack": "Error: read ECONNRESET\n at TCP.onStreamRead (internal/stream_base_commons.js:209:20)"
}
node.js version: 16
npm/yarn version: 8.3
faktory-server version: 1.6.0
facktory-worker
package version: 1.6.0
Currently, the client.js files looks as following.
constructor(options = {}) {
const url = new url_1.URL(options.url || FAKTORY_URL);
this.password = options.password || querystring_1.unescape(url.password);
this.labels = options.labels || [];
this.wid = options.wid;
this.connectionFactory = new connection_factory_1.ConnectionFactory({
host: options.host || url.hostname,
port: options.port || url.port,
handshake: this.handshake.bind(this),
});
this.pool = generic_pool_1.createPool(this.connectionFactory, {
testOnBorrow: true,
acquireTimeoutMillis: 5000,
idleTimeoutMillis: 10000,
evictionRunIntervalMillis: 11000,
min: 1,
max: options.poolSize || 20,
autostart: false,
});
}
Is is possible to set the acquireTimeoutMillis like below:
acquireTimeoutMillis: options.acquireTimeoutMillis || 5000
This can help decide failure timeout strategies. for e.g., we might want to fail fast and set it to lower values.
Also, do you think there can be negative impacts caused by this?
If this seems reasonable, we can also raise this as a PR.
Currently it takes quite some time before it timeout, resulting in bad API response times on our systems.
using faktory 0.9.1 docker
Error: Socket closed unexpectedly
at Client.onClose (/reporter/node_modules/faktory-worker/lib/client.js:346:20)
at Socket.emit (events.js:182:13)
at Socket.EventEmitter.emit (domain.js:442:20)
at TCP._handle.close (net.js:599:12)
(node:48) UnhandledPromiseRejectionWarning: TypeError: this.onConnectReject is not a function
at Client.onConnect (/reporter/node_modules/faktory-worker/lib/client.js:318:12)
at process._tickCallback (internal/process/next_tick.js:68:7)
(node:48) UnhandledPromiseRejectionWarning: Unhandled promise rejection. This error originate
node.js version: 9.5.0
npm/yarn version: 5.6.0
facktory-worker
package version: 0.8.1
const Faktory = require('faktory-worker')
Faktory.register("AggroJob", () => {
console.log("processing!")
throw(new Error("some weird error"))
// return Promise.reject(new Error("some weird error"))
})
console.log("starting worker pool.")
Faktory.work()
Used this to push a job
const Faktory = require('faktory-worker')
let run = async () => {
const client = await Faktory.connect()
console.log("connected.")
await client.push({
queue: 'default',
jobtype: 'AggroJob',
args: []
});
console.log("pushed job")
await client.close()
}
run().then(() => {
console.log("pushed dummy job")
}).catch((err) => {
console.error("There was an error")
console.error(err)
})
I expected the job to retry after a couple seconds. I expect the Faktory UI dashboard to show there are jobs to be retried at some time in the future.
It throw the error, and the job is done. The Faktory UI dashboard also doesn't show that there are any jobs to be retried.
I tried throwing the error, returning the error, returning a rejected promise of an error, and all to no avail. In the code, it does look like handling the job is in a try/catch block, but once the error is caught, it's not put back on the retry queue. It just gets logged and dropped on the ground.
Run the sample code above.
Hi Josh, I'm trying out your library and writing a blog post on it. Since I'm a JS neophyte, I'm struggling with your sample code, especially starting a new worker process:
const faktory = require('faktory-worker');
faktory.register('adder', async (a, b) => {
console.log(a, " + ", b, " = ", a+b)
});
await faktory.work();
// send INT signal to shutdown gracefully
Output:
10:00:32 worker.1 | /Users/mikeperham/src/blog/fworker.js:7
10:00:32 worker.1 | await faktory.work();
10:00:32 worker.1 | ^^^^^
10:00:32 worker.1 |
10:00:32 worker.1 | SyntaxError: await is only valid in async function
10:00:32 worker.1 | at new Script (vm.js:84:7)
10:00:32 worker.1 | at createScript (vm.js:264:10)
10:00:32 worker.1 | at Object.runInThisContext (vm.js:312:10)
10:00:32 worker.1 | at Module._compile (internal/modules/cjs/loader.js:684:28)
10:00:32 worker.1 | at Object.Module._extensions..js (internal/modules/cjs/loader.js:732:10)
10:00:32 worker.1 | at Module.load (internal/modules/cjs/loader.js:620:32)
How do I wrap the await at the top-level?
node.js version:
10.1.0
npm/yarn version:
6.1.0
faktory-server version:
0.8
facktory-worker
package version:
0.9.2
FAKTORY_URL=tcp://:abcdefghijklmnop=@someip:7419
FAKTORY_PASSWORD=abcdefghijklmnop=
Faktory worker should be able to connect
Bad password!
Set up faktory with a password that has a trailing =
(maybe even in the middle, did not test this), and you'll get bad password errors. Changing the password to drop the =
solves this, so I know it's not something to do with the setup.
node.js version: 12.16.2
yarn version: v1.22.10
faktory-server version: 1.4.2
facktory-worker
package version: 4.0.1
const faktory = require("faktory-worker");
faktory.register("TestJob", async (test) => {
console.log("DO NOT BE ALARMED!!! THIS IS A TEST!!!");
});
function exitOnError (err) {
console.log("Faktory worker crashed", err);
}
faktory.work(
{
host: "faktory-headless",
password: "password",
queues: ["node"]
}
)
.then((worker) => { worker.on("error", exitOnError); })
.catch(exitOnError);
Jobs are processed
No jobs are processed
worker.js
script2020-11-06T22:41:45.944Z faktory-worker use -
2020-11-06T22:41:45.946Z faktory-worker registered TestJob
2020-11-06T22:41:45.947Z faktory-worker registered Test
2020-11-06T22:41:45.949Z faktory-worker:worker work concurrency=20
2020-11-06T22:41:45.950Z faktory-worker:client:heart BEAT
2020-11-06T22:41:45.952Z faktory-worker:connection-pool +1
2020-11-06T22:41:45.953Z faktory-worker:connection connecting
Error: connect ECONNREFUSED 172.18.0.6:7419
at TCPConnectWrap.afterConnect [as oncomplete] (net.js:1141:16) {
errno: 'ECONNREFUSED',
code: 'ECONNREFUSED',
syscall: 'connect',
address: '172.18.0.6',
port: 7419
}
...
... more of these connection errors until the Istio sidecar is ready
...
2020-11-06T22:41:47.979Z faktory-worker:connection-pool attempts=5
2020-11-06T22:41:47.979Z faktory-worker:connection close
2020-11-06T22:41:48.981Z faktory-worker:connection-pool +1
2020-11-06T22:41:48.981Z faktory-worker:connection connecting
Faktory worker crashed TimeoutError: ResourceRequest timed out
at ResourceRequest._fireTimeout (/usr/src/app/node_modules/generic-pool/lib/ResourceRequest.js:62:17)
at Timeout.bound (/usr/src/app/node_modules/generic-pool/lib/ResourceRequest.js:8:15)
at listOnTimeout (internal/timers.js:549:17)
at processTimers (internal/timers.js:492:7) {
name: 'TimeoutError'
}
2020-11-06T22:41:54.987Z faktory-worker:connection HI {"v":2,"i":7514,"s":"6fcd18472fd92d52"}
2020-11-06T22:41:54.988Z faktory-worker:client handshake
2020-11-06T22:41:55.021Z faktory-worker:connection SEND: HELLO {"hostname":"worker-7c6fcc8c6-zx8jd","v":2,"labels":[],"pid":1,"wid":"c313f705","pwdhash":"1a754d8c77a02a00f4fa01fc7e6e3e89e87444823188f78ffccb18a565844551"}
2020-11-06T22:41:55.025Z faktory-worker:connection OK
2020-11-06T22:41:55.025Z faktory-worker:connection client='HELLO {"hostname":"worker-7c6fcc8c6-zx8jd","v":2,"labels":[],"pid":1,"wid":"c313f705","pwdhash":"1a754d8c77a02a00f4fa01fc7e6e3e89e87444823188f78ffccb18a565844551"}', server='OK'
2020-11-06T22:42:05.030Z faktory-worker:connection timeout
2020-11-06T22:42:07.958Z faktory-worker:connection-pool -1
2020-11-06T22:42:07.959Z faktory-worker:connection-pool +1
2020-11-06T22:42:07.959Z faktory-worker:connection connecting
2020-11-06T22:42:07.961Z faktory-worker:connection close
2020-11-06T22:42:13.930Z faktory-worker:connection HI {"v":2,"i":5430,"s":"52aea839aa553172"}
2020-11-06T22:42:13.930Z faktory-worker:client handshake
2020-11-06T22:42:13.947Z faktory-worker:connection SEND: HELLO {"hostname":"worker-7c6fcc8c6-zx8jd","v":2,"labels":[],"pid":1,"wid":"c313f705","pwdhash":"cd62b616d1090ae685b23bcea4c977a9dcfe5a25c8ff723f915f3deb988f30b2"}
2020-11-06T22:42:13.951Z faktory-worker:connection OK
2020-11-06T22:42:13.951Z faktory-worker:connection client='HELLO {"hostname":"worker-7c6fcc8c6-zx8jd","v":2,"labels":[],"pid":1,"wid":"c313f705","pwdhash":"cd62b616d1090ae685b23bcea4c977a9dcfe5a25c8ff723f915f3deb988f30b2"}', server='OK'
2020-11-06T22:42:23.956Z faktory-worker:connection timeout
2020-11-06T22:42:29.926Z faktory-worker:connection-pool -1
2020-11-06T22:42:29.926Z faktory-worker:connection-pool +1
2020-11-06T22:42:29.927Z faktory-worker:connection connecting
2020-11-06T22:42:29.928Z faktory-worker:connection close
2020-11-06T22:42:35.930Z faktory-worker:connection HI {"v":2,"i":6818,"s":"3166ee0545ecad70"}
2020-11-06T22:42:35.931Z faktory-worker:client handshake
2020-11-06T22:42:35.953Z faktory-worker:connection SEND: HELLO {"hostname":"worker-7c6fcc8c6-zx8jd","v":2,"labels":[],"pid":1,"wid":"c313f705","pwdhash":"55a76fb257dded5b03e03a951ffec691de1cefa673eaadc805c3343a858da1e4"}
2020-11-06T22:42:35.956Z faktory-worker:connection OK
2020-11-06T22:42:35.956Z faktory-worker:connection client='HELLO {"hostname":"worker-7c6fcc8c6-zx8jd","v":2,"labels":[],"pid":1,"wid":"c313f705","pwdhash":"55a76fb257dded5b03e03a951ffec691de1cefa673eaadc805c3343a858da1e4"}', server='OK'
2020-11-06T22:42:45.927Z faktory-worker:connection timeout
... Then these handshakes and timeouts just keep going and no jobs are processed
faktory_worker_node/lib/client.js
Line 242 in a86d1df
I haven't found the culprit error that has no stack trace, but I am getting the following error:
TypeError: Cannot read property 'split' of undefined
at Client.fail (/app/node_modules/faktory-worker/lib/client.js:242:28)
at Worker.handle (/app/node_modules/faktory-worker/lib/worker.js:247:25)
node.js version: 16
npm/yarn version:
faktory-server version: latest
facktory-worker
package version: latest
I need a small help to setup the client to submit job to faktory server.
I went through the documentation, but I am confused in two things.
import {Client} from "faktory-worker";
const client = new Client(options);
It throws error like
TypeError: _faktoryWorker.Client is not a constructor
I would appreciate help on this.
Hi @jbielick ,
I wanted to see if you had any plans to add support for Faktory Enterprise features or if you would accept contributions to add support for Enterprise features.
My organization may be interested in the Batch and Tracking enterprise features in the future.
Hi, is there a possibility to verify from a worker if the faktory is down or up ?
node.js version: 12.22.1
npm/yarn version: npm 6.14.12
faktory-server version: 1.4.0
facktory-worker
package version: 4.1.4
Recently had a job take much longer than its allotted reservation time due to load on the server. This caused Faktory to re-enqueue that job while the original was still being worked without having been terminated. Unfortunately this particular job is difficult to make idempotent and so once a second instance started, bad things happened.
I'm looking into ways to work around this but I was curious if there's something I'm supposed to be doing in my job that I'm not, such as handling a SIGTERM for example.
How does the library respond to this event, if at all (I'm not sure if the client is notified by Faktory when this happens). If it doesn't, but could, I guess this is a feature request. Otherwise, any recommendations?
node.js version: 12.14.1
faktory-server version: 1.4.0
faktory-worker
package version: latest
worker.work(opts).catch((err) => {
console.error("Worked crashed, exiting", err);
process.exit(1);
});
When compiling with tsc
I expected the above to be compiled ok.
It throws an error:
error TS2339: Property 'catch' does not exist on type 'Worker'
I think this is because the declaration file defines Worker
as the return type instead of Promise<Worker>
.
The faktory-work executable should be a good test case.
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.