sindresorhus / p-queue Goto Github PK
View Code? Open in Web Editor NEWPromise queue with concurrency control
License: MIT License
Promise queue with concurrency control
License: MIT License
Hey everyone!
Here's what I'm trying to accomplish. Please let me know if that's possible with this cool lib:
I need a queue to run 5 concurrent tasks. Let's say I add 10. I'd like to see 5 tasks running, a delay, another 5.
Is it possible to do it with interval
and/or intervalCap
? I couldn't figure it out just by reading the readme.
Thanks!
active event
const queue = new PQueue({ concurrency: 5 });
queue.addAll(myTaskQueue);
queue.on('active', () => {
console.log(`${queue.size} requests remain to be scraped, pending workers: ${queue.pending}`);
});
console log
5 requests remain to be scraped, pending workers: 4
4 requests remain to be scraped, pending workers: 4
3 requests remain to be scraped, pending workers: 4
2 requests remain to be scraped, pending workers: 4
1 requests remain to be scraped, pending workers: 4
When queue is created with autoStart: false
, I found it common to call:
queue.start();
await queue.onIdle();
I wonder if it would make sense to add a call to onIdle
at the end of start()
method so that just one could be used to start and wait for all tasks?
queue.clear()
doesn't reset _pendingCount
to 0, preventing newly-added items from running.
These libraries are awesome and make building consistent asynchronous code much easier.
But, is there a mechanism in this library or one of the other p-libraries to have a queue-like data structure like this that implements some type of back-pressure so that if a queue gets to a certain size, the function calling the add() method will pause until the queue size has reduced?
What if fn()
fails? Example:
(async function() {
let queue = new PQueue({ concurrency: 1 });
queue.add(() => { throw 'x' }).catch(console.log);
queue.add(() => { throw 'y' }).catch(console.log);
})();
This is the same as sindresorhus/execa#154, I was surprised to find that both packages are by you @sindresorhus! 🙂
I was upgrading my project dependencies and noticed that p-queue bumped from 2.4.2 to 3.0.0. This looked like a certain breaking change, so I went to search for a CHANGELOG to then update my code if needed.
I could not find any info on the releases page and a commonly used CHANGELOG.md
file was also missing. It'd be great if at least one of these things was added to p-queue so that people were wasting less time trying to figure out whether they can safely bump or not. This can save hundreds of hours for folks around the world.
Hey,
We've just recently implemented this as a way to throttle and queue our api calls and a question was raised around potential memory leak. The concern is that by having this be what triggers our api calls there may become a time where the queue end up very large.
Do you have tests specifically around ensuring there is no chance of a memory leak?
These methods are event oriented and should not have been using promises.
A promise is intended to only be resolved once, however this library is expecting to resolve them multiple times.
https://stackoverflow.com/questions/20328073/is-it-safe-to-resolve-a-promise-multiple-times
these API's need to be altered to queue.onIdle(() => { }); and register an array of callbacks,
or ideally, use an EventEmitter style API and do queue.on('idle', () => { });
these existing API's can be altered to forward to the new preferred API by returning something like
return {
then(cb) { this.on('idle', cb); return this; }
catch (e) { } //unused, but part of promise expectation
}
I am currently using p-queue in combination with Puppeteer. What I want to achieve is to add the current function to the queue if an exception gets thrown. I am running with concurrency set to 3.
I have the following snippets:
getAccounts
is simply a method for parsing a json file.
async init() {
let accounts = await this.getAccounts();
accounts.map(async () => {
await queue.add(() => this.test());
});
await queue.onIdle();
console.log("Done");
}
async test() {
const browser = await puppeteer.launch({headless: false});
try {
const page = await browser.newPage();
await page.goto(this._url);
if (Math.floor(Math.random() * 10) > 4) {
throw new Error("Simulate error");
}
await browser.close();
} catch (error) {
await browser.close();
await queue.add(() => this.test());
console.log(error);
}
}
When it goes into the catch, the queue just stops. I am not sure if my implementation is right. But it also does not execute console.log("Done");
.
From discussion:
Promise from onEmpty
method resolves when queue is empty, but there are pending promises in fly.
We should add onIdle
method to signal, that all work in queue is completed.
So say I queue up 5-10 async functions, then want to cancel maybe the fifth one. How would I go about doing that?
Looked through the code and couldn't find a remove or delete. Looks like there is a dequeue
but that seems to work similar to the Array pop
method.
Currently it doesn't look like it's possible to wait until all promises are resolved, or am I missing something?
I tried to use onEmpty()
. The thing is that onEmpty()
can get called before all promises are resolved. It just triggers when next()
is called on the queue and there are no more items to be processed.
The promise returned by .add()
is not what I need either because when you use a concurrency of 5, the last .add()
might resolve the as the first one.
Now that I'm typing this, I might not be using the correct library. Maybe I should have a look at p-throttle
, although I like how PQueue
works :).
Hi, In new release 5.0.0 there is no export default support, so there is a problem when using imports in typescript
const PQueue = require('p-queue');
const queue = new PQueue({ concurrency: 1, timeout: 5000, throwOnTimeout: true });
const delay = async (ms) => {
await new Promise((resolve, reject) => {
setTimeout(() => {
console.log('timeout');
resolve();
}, ms)
});
};
try {
queue.add(() => delay(10000))
} catch(err) {
console.log(err);
}
This is my little demo.
I want to catch the timeoutError
I found a need to clear/interrupt a long promise queue. If this is wanted, I can submit a PR for this.
Thanks
Add argument on .add
method to provide priority to task. We can use simple priority queue or something more complicated (see this amazing talk about Apollo Guidance Computer and how it can save lives).
As far as I can tell, there is no way to get a promise that resolves for all items that are currently in the queue to finish (even if new ones are added later). A hacky workaround is const p = q.add(() => {})
which does get you this, but at the expense of adding an item to the queue.
Hi, i'm finding when running the queue for extended periods there appear to be handles held / memory not reducing over time.
i.e.:
import PQueue from 'p-queue';
class Cow {
beef: number;
constructor(i: number) {
this.beef = i;
}
}
async function test(queue: any) {
for (let j = 0; j < 30000; j++) {
const promises = [];
console.log(`Start... ${j}`);
for (let i = 0; i < 1000; i++) {
promises.push(
queue.add(async () => {
const arr = [];
for (let i = 0; i < 100; i++) {
arr.push(new Cow(i));
}
}),
);
}
await Promise.all(promises);
}
}
(async () => {
const queue = new PQueue({ concurrency: 5 });
await test(queue);
console.log('Completed');
while (true) {}
})();
When executing this locally with node 10 the memory used before populating the queue is ~8mb but after executing this sits at around ~78mb with the cause of this simple object.
I would have expected the function completion to mark the items in the functions closure as safe for GC.
In my bigger use case, there are objects being delete from a queue elsewhere but they're stuck in memory when dumping the memory.
Am I using the queue incorrectly? Should I be calling a clear of some kind bound to a completion event?
What are the breaking changes?
What is the upgrade path?
Hi, we encountered a new problem a little while ago, when our npm builds started failing with the following error:
ERROR in vendor.bundle.js from UglifyJs Unexpected token: name (first) [vendor.bundle.js:26373,5]
Which we tracked to p-queue using es6 syntax in the distributed npm module. The current UglifyJS dosn't support ES6, and crashes, unless we specifically include p-queue to be babel-transpiled.
A fix would be to distribute a transpiled "ES5 modules" version on NPM, which uses ES5 syntax, but has import statements intact.
In a use-case where
I expected to be able to
const assert = require('assert');
const Queue = require('p-queue');
const queue = new Queue({ concurrency: 8 });
const tasks = Array(16).fill(0);
const work = () => new Promise((resolve) => {
setTimeout(resolve, Math.random() * 2000);
});
tasks.forEach(() => {
queue.add(() => work());
});
queue.onEmpty()
.then(() => assert.equal(queue.pending, 0));
However this is not the case -- onEmpty()
resolves as soon as there's space in the queue for the new tasks to launch, not when all the tasks that have been enqueued are resolved. Does this sound right, and if so, would you be interested in another method that provides a promise that resolves when queue.pending
reaches 0
?
I just tried updating to 6, and it didn’t go as expected (from 5.0.0) :}
I’m using esm on node. I have a .mjs
file importing this lib and the loader is not unwrapping the default export. I think this is expected because the lib is compiled TypeScript and outputs cjs with a bundler-friendly esm flag. Does TypeScript have a compile target of esm? If so, you could add a “module”: ‘dist/esm.js”
or something and that would make this work transparently with esm users (who opt to bite the mjs
bullet)
This can be closed if this idea is not desirable. I’m just playing with esm as it currently stands in node to get a feel.
...so it will be removed from the queue if not resolved within the time frame.
Hello, I'm worried about a case that if any of my Promises crash for some reason, and if it doesn't resolve, it will hang in the queue for ever!
It would be nice to have timeout to clean out hanging Promises in the queue.
IssueHunt has been backed by the following sponsors. Become a sponsor
I have a use case where I need to update concurrency based on queue execution results, what I'd like to do is on a specific output, decrease concurrency (I understand it would not be instant but would have to wait until items are dequeued), maybe the call to decrease concurrency could return a Promise that resolves once the new concurrency is reached ?
There's the onEmpty()
method but if some promise reject or throw we can still end up within the onEmpty
. It would be great to have the same method if everything succeed.
If I only add one promise to the queue, onIdle settles before the added / pending promise is settled.
With multiple promise, this issue does not occur.
There is no transpiled ES5 distribution which could safely be used in the browser.
I want to use it in a web application. Do you plan to create a ES5 distributable?
I will try to transpile it with my Babel config, but that fails with larger libraries, that's why the convention is to distribute a transpiled version.
Do you see any reason why this library shouldn't be usable on the web?
Thank you for your time!
Any exceptions raised in the promise appear to be swallowed, invoking the notorious 'Unhandled promise rejection' message under Node.js.
Is there any way to catch/handle errors that occur while processing a queue?
Perhaps a q.onError(callback)
kind of thing?
Of course one could just add a global handler for unhandled rejections via process.on('unhandledRejection'...
, although you then lose the ability to trap/handle errors within a particular context.
Upgrading from TS version 3.4.5
to 3.5.2
results in the following compile time error:
$ tsc
node_modules/p-queue/index.d.ts:79:47 - error TS2344: Type 'DefaultAddOptions' does not satisfy the constraint 'QueueAddOptions'.
Index signature is missing in type 'DefaultAddOptions'.
79 EnqueueOptionsType extends QueueAddOptions = DefaultAddOptions
~~~~~~~~~~~~~~~~~
Found 1 error.
It looks like QueueAddOptions
has a index definition but DefaultAddOptions
does not, nor it extends QueueAddOptions
.
See #23
Since upgrading [email protected]
I get this compile error:
node_modules/p-queue/index.d.ts:81:54 - error TS2344: Type 'DefaultAddOptions' does not satisfy the constraint 'QueueAddOptions'.
Index signature is missing in type 'DefaultAddOptions'.
81 EnqueueOptionsType extends PQueue.QueueAddOptions = PQueue.DefaultAddOptions
~~~~~~~~~~~~~~~~~~~~~~~~
My tsconfig.json
looks like:
{
"compilerOptions": {
"lib": ["esnext"],
"module": "commonjs",
"target": "es5"
}
}
From the readme, concurrency can be not less than 0, which is good. But currently it allows undefined
and it behaves and doesn't do anything with the tasks. It is a silent error. I suggest throwing an error on undefined here:
Line 61 in 5216b17
I'm trying p-queue
after using promise-queue
for some time. Given this background, I was expecting to see the whole queue failing if any of its tasks rejects. However, this does not seem to happen.
Here's an example:
// testPQueue.js
import * as PQueue from "p-queue";
async function processQueue() {
const queue = new PQueue();
queue.add(
() =>
new Promise((resolve, reject) =>
setTimeout(() => {
reject();
}, 500),
),
);
await queue.onIdle();
console.log("should not be here");
return 42;
}
(async () => {
try {
const result = await processQueue();
console.log(result);
} catch (e) {
console.log("could not process queue");
}
})();
node ./testPQueue.js
Instead of seeing could not process queue
in my stdout
, I get this:
should not be here
42
(node:13679) UnhandledPromiseRejectionWarning: undefined
(node:13679) UnhandledPromiseRejectionWarning: Unhandled promise rejection. This error originated either by throwing inside of an async function without a catch block, or by rejecting a promise which was not handled with .catch(). (rejection id: 1)
(node:13679) [DEP0018] DeprecationWarning: Unhandled promise rejections are deprecated. In the future, promise rejections that are not handled will terminate the Node.js process with a non-zero exit code.
How can I stop and reject the whole queue once any its tasks has failed?
When i set concurrency to 10, i would expect await queue.add()
will let me put 10 jobs to queue and will keep Promise pending just if the queue is full (10+)
Current implementation is always waiting for job to complete so if i am adding in iteration, it does not let iteration to fill the queue.
This expected behaviour is present for instance in async.queue
but without promises support.
The code fails to minify because of this package. The yarn build
fails with the following output:
...
Creating an optimized production build...
Failed to compile.
Failed to minify the code from this file:
./node_modules/p-queue/index.js:6
Read more here: http://bit.ly/2tRViJ9
error Command failed with exit code 1.
When using it in our angular app,
Angular 8 build for production optimization will fail.
"An unhandled exception occurred: Unexpected token: keyword (default)"
I think it has something to do with terser which was used by angular cli.
I'm currently using the following pattern to simulate said behavior:
if (queue.size > 0) await queue.onEmpty();
doSomethingAfterQueueIsFinished();
Which could be simplified to
await queue.onEmpty();
doSomethingAfterQueueIsFinished();
if queue.onEmpty()
would return a resolved promise when queue.size === 0
. Currently, the promise from onEmpty
will never resolve, unless another item is added to the queue, and finishes.
Would love to know other peeps' thoughts.
No main
entry is defined in package.json
. Hence bundling is failing with webpack.
I'm not sure if this is by design, but when a promise is added to the queue it changes the size before the added promise is resolved.
runkit-p-queue-single-queue-bug
I would imagine the queue size should be updated after the added promise was resolved. I really appreciate all the work you've done btw, I'm a fanboy 😍
In 3.1.0 the dependency of EE => events was introduced. That means this library does no longer work on the web.
It doesn't seem to have any dependencies from Node.
Currently the queue is auto-executed in p-queue
.
Is it possible to add ability to start executing the queue at a later time?
autorun
option and .run()
method...const queue = new PQueue({ concurrency: 1, autorun: false });
app.on('locationChanged', event => {
queue.add(() => got(endpoint)).then(result => {
// do UI stuff
});
});
view.on('ready', () => queue.run());
What do you think?
Consider the following function, it will throw once run.
function errorFn (){
throw new Error('Should Throw');
};
We can add this to the queue by the following code.
queue.add(errorFn)
How do we impliment a function to re-queue it in case it throws an error? Can we use p-retry and p-queue together to achieve that? Can this be added as a function to p-queue?
Here is a sample implementation,
https://github.com/entrptaher/p-requeue/blob/master/test.js
Hi there:
Thanks for this project, really appreciate it, and I'm getting some excellent use out of it.
Is it intended for throwOnTimeout to default to true as declared in the README?
I notice the default is actually false in the implementation and it must be explicitly enabled.
It would likely be a rather rough breaking change for some people to modify that to an opt-out instead of opt-in, so I'm guessing the documentation should be updated instead?
Also, I'm not clear how the "timeout per operation functionality is supposed to function - is it intended to be settable at queue time, e.g. schedule.add(() => <promise>, {timeout: <timeout>})
I believe there's likely something broken in the option overrides if that's the case, as currently it doesn't seem to have any impact whatsoever. It looks like a fairly easy change to introduce though.
Including p-throttle, p-debounce, p-limit, etc..
Hi there.
We need broader compatibility with older browsers. The current output target produces spread/rest operators and they can't be interpreted by older engines.
Just a matter of changing https://github.com/sindresorhus/p-queue/blob/master/tsconfig.json#L5 and republishing.
"target": "es5",
I have a concurrency of 10 and the tasks check queue.size
to see whether the queue has fallen underneath a certain number. As soon as a task sees that, it should asynchronously add 20 more tasks to the queue. But how can I do this without the other 9 tasks also trying to add those 20 new tasks to the queue as well?
I'm looking for a way to keep the size of the queue above a certain minimum number (20 in this case) by adding new items to the queue asynchronously (such as calling an API or database to determine what those next 20 queue items should be).
We're currently at 90%. Would be great to get to 100% to ensure we don't introduce regressions in untested code paths.
https://codecov.io/gh/sindresorhus/p-queue/src/master/index.js
ameech earned $20.00 by resolving this issue!
- Checkout the Issuehunt explorer to discover more funded issues.
- Need some help from other developers? Add your repositories on Issuehunt to raise funds.
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.