Git Product home page Git Product logo

p-queue's Introduction

p-queue

Promise queue with concurrency control

Useful for rate-limiting async (or sync) operations. For example, when interacting with a REST API or when doing CPU/memory intensive tasks.

For servers, you probably want a Redis-backed job queue instead.

Note that the project is feature complete. We are happy to review pull requests, but we don't plan any further development. We are also not answering email support questions.

Install

npm install p-queue

Warning: This package is native ESM and no longer provides a CommonJS export. If your project uses CommonJS, you'll have to convert to ESM. Please don't open issues for questions regarding CommonJS / ESM.

Usage

Here we run only one promise at the time. For example, set concurrency to 4 to run four promises at the same time.

import PQueue from 'p-queue';
import got from 'got';

const queue = new PQueue({concurrency: 1});

(async () => {
	await queue.add(() => got('https://sindresorhus.com'));
	console.log('Done: sindresorhus.com');
})();

(async () => {
	await queue.add(() => got('https://avajs.dev'));
	console.log('Done: avajs.dev');
})();

(async () => {
	const task = await getUnicornTask();
	await queue.add(task);
	console.log('Done: Unicorn task');
})();

API

PQueue(options?)

Returns a new queue instance, which is an EventEmitter3 subclass.

options

Type: object

concurrency

Type: number
Default: Infinity
Minimum: 1

Concurrency limit.

timeout

Type: number

Per-operation timeout in milliseconds. Operations fulfill once timeout elapses if they haven't already.

throwOnTimeout

Type: boolean
Default: false

Whether or not a timeout is considered an exception.

autoStart

Type: boolean
Default: true

Whether queue tasks within concurrency limit, are auto-executed as soon as they're added.

queueClass

Type: Function

Class with a enqueue and dequeue method, and a size getter. See the Custom QueueClass section.

intervalCap

Type: number
Default: Infinity
Minimum: 1

The max number of runs in the given interval of time.

interval

Type: number
Default: 0
Minimum: 0

The length of time in milliseconds before the interval count resets. Must be finite.

carryoverConcurrencyCount

Type: boolean
Default: false

If true, specifies that any pending Promises, should be carried over into the next interval and counted against the intervalCap. If false, any of those pending Promises will not count towards the next intervalCap.

queue

PQueue instance.

.add(fn, options?)

Adds a sync or async task to the queue. Always returns a promise.

Note: If your items can potentially throw an exception, you must handle those errors from the returned Promise or they may be reported as an unhandled Promise rejection and potentially cause your process to exit immediately.

fn

Type: Function

Promise-returning/async function. When executed, it will receive {signal} as the first argument.

options

Type: object

priority

Type: number
Default: 0

Priority of operation. Operations with greater priority will be scheduled first.

signal

AbortSignal for cancellation of the operation. When aborted, it will be removed from the queue and the queue.add() call will reject with an error. If the operation is already running, the signal will need to be handled by the operation itself.

import PQueue from 'p-queue';
import got, {CancelError} from 'got';

const queue = new PQueue();

const controller = new AbortController();

try {
	await queue.add(({signal}) => {
		const request = got('https://sindresorhus.com');

		signal.addEventListener('abort', () => {
			request.cancel();
		});

		try {
			return await request;
		} catch (error) {
			if (!(error instanceof CancelError)) {
				throw error;
			}
		}
	}, {signal: controller.signal});
} catch (error) {
	if (!(error instanceof DOMException)) {
		throw error;
	}
}

.addAll(fns, options?)

Same as .add(), but accepts an array of sync or async functions and returns a promise that resolves when all functions are resolved.

.pause()

Put queue execution on hold.

.start()

Start (or resume) executing enqueued tasks within concurrency limit. No need to call this if queue is not paused (via options.autoStart = false or by .pause() method.)

Returns this (the instance).

.onEmpty()

Returns a promise that settles when the queue becomes empty.

Can be called multiple times. Useful if you for example add additional items at a later time.

.onIdle()

Returns a promise that settles when the queue becomes empty, and all promises have completed; queue.size === 0 && queue.pending === 0.

The difference with .onEmpty is that .onIdle guarantees that all work from the queue has finished. .onEmpty merely signals that the queue is empty, but it could mean that some promises haven't completed yet.

.onSizeLessThan(limit)

Returns a promise that settles when the queue size is less than the given limit: queue.size < limit.

If you want to avoid having the queue grow beyond a certain size you can await queue.onSizeLessThan() before adding a new item.

Note that this only limits the number of items waiting to start. There could still be up to concurrency jobs already running that this call does not include in its calculation.

.clear()

Clear the queue.

.size

Size of the queue, the number of queued items waiting to run.

.sizeBy(options)

Size of the queue, filtered by the given options.

For example, this can be used to find the number of items remaining in the queue with a specific priority level.

import PQueue from 'p-queue';

const queue = new PQueue();

queue.add(async () => '🦄', {priority: 1});
queue.add(async () => '🦄', {priority: 0});
queue.add(async () => '🦄', {priority: 1});

console.log(queue.sizeBy({priority: 1}));
//=> 2

console.log(queue.sizeBy({priority: 0}));
//=> 1

.pending

Number of running items (no longer in the queue).

.isPaused

Whether the queue is currently paused.

Events

active

Emitted as each item is processed in the queue for the purpose of tracking progress.

import delay from 'delay';
import PQueue from 'p-queue';

const queue = new PQueue({concurrency: 2});

let count = 0;
queue.on('active', () => {
	console.log(`Working on item #${++count}.  Size: ${queue.size}  Pending: ${queue.pending}`);
});

queue.add(() => Promise.resolve());
queue.add(() => delay(2000));
queue.add(() => Promise.resolve());
queue.add(() => Promise.resolve());
queue.add(() => delay(500));

completed

Emitted when an item completes without error.

import delay from 'delay';
import PQueue from 'p-queue';

const queue = new PQueue({concurrency: 2});

queue.on('completed', result => {
	console.log(result);
});

queue.add(() => Promise.resolve('hello, world!'));

error

Emitted if an item throws an error.

import delay from 'delay';
import PQueue from 'p-queue';

const queue = new PQueue({concurrency: 2});

queue.on('error', error => {
	console.error(error);
});

queue.add(() => Promise.reject(new Error('error')));

empty

Emitted every time the queue becomes empty.

Useful if you for example add additional items at a later time.

idle

Emitted every time the queue becomes empty and all promises have completed; queue.size === 0 && queue.pending === 0.

The difference with empty is that idle guarantees that all work from the queue has finished. empty merely signals that the queue is empty, but it could mean that some promises haven't completed yet.

import delay from 'delay';
import PQueue from 'p-queue';

const queue = new PQueue();

queue.on('idle', () => {
	console.log(`Queue is idle.  Size: ${queue.size}  Pending: ${queue.pending}`);
});

const job1 = queue.add(() => delay(2000));
const job2 = queue.add(() => delay(500));

await job1;
await job2;
// => 'Queue is idle.  Size: 0  Pending: 0'

await queue.add(() => delay(600));
// => 'Queue is idle.  Size: 0  Pending: 0'

The idle event is emitted every time the queue reaches an idle state. On the other hand, the promise the onIdle() function returns resolves once the queue becomes idle instead of every time the queue is idle.

add

Emitted every time the add method is called and the number of pending or queued tasks is increased.

next

Emitted every time a task is completed and the number of pending or queued tasks is decreased. This is emitted regardless of whether the task completed normally or with an error.

import delay from 'delay';
import PQueue from 'p-queue';

const queue = new PQueue();

queue.on('add', () => {
	console.log(`Task is added.  Size: ${queue.size}  Pending: ${queue.pending}`);
});

queue.on('next', () => {
	console.log(`Task is completed.  Size: ${queue.size}  Pending: ${queue.pending}`);
});

const job1 = queue.add(() => delay(2000));
const job2 = queue.add(() => delay(500));

await job1;
await job2;
//=> 'Task is added.  Size: 0  Pending: 1'
//=> 'Task is added.  Size: 0  Pending: 2'

await queue.add(() => delay(600));
//=> 'Task is completed.  Size: 0  Pending: 1'
//=> 'Task is completed.  Size: 0  Pending: 0'

Advanced example

A more advanced example to help you understand the flow.

import delay from 'delay';
import PQueue from 'p-queue';

const queue = new PQueue({concurrency: 1});

(async () => {
	await delay(200);

	console.log(`8. Pending promises: ${queue.pending}`);
	//=> '8. Pending promises: 0'

	(async () => {
		await queue.add(async () => '🐙');
		console.log('11. Resolved')
	})();

	console.log('9. Added 🐙');

	console.log(`10. Pending promises: ${queue.pending}`);
	//=> '10. Pending promises: 1'

	await queue.onIdle();
	console.log('12. All work is done');
})();

(async () => {
	await queue.add(async () => '🦄');
	console.log('5. Resolved')
})();
console.log('1. Added 🦄');

(async () => {
	await queue.add(async () => '🐴');
	console.log('6. Resolved')
})();
console.log('2. Added 🐴');

(async () => {
	await queue.onEmpty();
	console.log('7. Queue is empty');
})();

console.log(`3. Queue size: ${queue.size}`);
//=> '3. Queue size: 1`

console.log(`4. Pending promises: ${queue.pending}`);
//=> '4. Pending promises: 1'
$ node example.js
1. Added 🦄
2. Added 🐴
3. Queue size: 1
4. Pending promises: 1
5. Resolved 🦄
6. Resolved 🐴
7. Queue is empty
8. Pending promises: 0
9. Added 🐙
10. Pending promises: 1
11. Resolved 🐙
12. All work is done

Custom QueueClass

For implementing more complex scheduling policies, you can provide a QueueClass in the options:

import PQueue from 'p-queue';

class QueueClass {
	constructor() {
		this._queue = [];
	}

	enqueue(run, options) {
		this._queue.push(run);
	}

	dequeue() {
		return this._queue.shift();
	}

	get size() {
		return this._queue.length;
	}

	filter(options) {
		return this._queue;
	}
}

const queue = new PQueue({queueClass: QueueClass});

p-queue will call corresponding methods to put and get operations from this queue.

FAQ

How do the concurrency and intervalCap options affect each other?

They are just different constraints. The concurrency option limits how many things run at the same time. The intervalCap option limits how many things run in total during the interval (over time).

Maintainers

Related

  • p-limit - Run multiple promise-returning & async functions with limited concurrency
  • p-throttle - Throttle promise-returning & async functions
  • p-debounce - Debounce promise-returning & async functions
  • p-all - Run promise-returning & async functions concurrently with optional limited concurrency
  • More…

p-queue's People

Contributors

aikawayataro avatar ameech avatar bendingbender avatar bobjflong avatar callmehiphop avatar cap32 avatar codex- avatar dobesv avatar edorivai avatar ericmcornelius avatar floatdrop avatar ilaiwi avatar justinbeckwith avatar kheyse-oqton avatar liximomo avatar ltetzlaff avatar markherhold avatar matamatanot avatar mboperator avatar onury avatar przemyslawzalewski avatar rafael09ed avatar rexxars avatar rgoldman-qb avatar richienb avatar sindresorhus avatar szmarczak avatar timgit avatar vkbansal avatar zkochan avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

p-queue's Issues

Non-transpiled NPM module

Hi, we encountered a new problem a little while ago, when our npm builds started failing with the following error:
ERROR in vendor.bundle.js from UglifyJs Unexpected token: name (first) [vendor.bundle.js:26373,5]

Which we tracked to p-queue using es6 syntax in the distributed npm module. The current UglifyJS dosn't support ES6, and crashes, unless we specifically include p-queue to be babel-transpiled.

A fix would be to distribute a transpiled "ES5 modules" version on NPM, which uses ES5 syntax, but has import statements intact.

Backpressure

These libraries are awesome and make building consistent asynchronous code much easier.

But, is there a mechanism in this library or one of the other p-libraries to have a queue-like data structure like this that implements some type of back-pressure so that if a queue gets to a certain size, the function calling the add() method will pause until the queue size has reduced?

Sync function error breaks everything

What if fn() fails? Example:

(async function() {
	let queue = new PQueue({ concurrency: 1 });
	
	queue.add(() => { throw 'x' }).catch(console.log);
	queue.add(() => { throw 'y' }).catch(console.log);
})();

Add an option to add a timeout for queue members

Issuehunt badges

...so it will be removed from the queue if not resolved within the time frame.

Hello, I'm worried about a case that if any of my Promises crash for some reason, and if it doesn't resolve, it will hang in the queue for ever!
It would be nice to have timeout to clean out hanging Promises in the queue.


IssueHunt Summary

ltetzlaff ltetzlaff has been rewarded.

Backers (Total: $40.00)

Submitted pull Requests


Tips


IssueHunt has been backed by the following sponsors. Become a sponsor

Clear queue

I found a need to clear/interrupt a long promise queue. If this is wanted, I can submit a PR for this.

Thanks

Question on usage

Hey,

We've just recently implemented this as a way to throttle and queue our api calls and a question was raised around potential memory leak. The concern is that by having this be what triggers our api calls there may become a time where the queue end up very large.

Do you have tests specifically around ensuring there is no chance of a memory leak?

Helper method to emulate add(() => {})

As far as I can tell, there is no way to get a promise that resolves for all items that are currently in the queue to finish (even if new ones are added later). A hacky workaround is const p = q.add(() => {}) which does get you this, but at the expense of adding an item to the queue.

event:active is not emitted after each item processed while setting concurrency greater than 1

active event

const queue = new PQueue({ concurrency: 5 });
queue.addAll(myTaskQueue);
queue.on('active', () => {
            console.log(`${queue.size} requests remain to be scraped, pending workers: ${queue.pending}`);
});

console log

    5 requests remain to be scraped, pending workers: 4

    4 requests remain to be scraped, pending workers: 4

    3 requests remain to be scraped, pending workers: 4

    2 requests remain to be scraped, pending workers: 4

    1 requests remain to be scraped, pending workers: 4

Have queue.onEmpty() instantly resolve when queue.size === 0

I'm currently using the following pattern to simulate said behavior:

if (queue.size > 0) await queue.onEmpty();
doSomethingAfterQueueIsFinished();

Which could be simplified to

await queue.onEmpty();
doSomethingAfterQueueIsFinished();

if queue.onEmpty() would return a resolved promise when queue.size === 0. Currently, the promise from onEmpty will never resolve, unless another item is added to the queue, and finishes.

Would love to know other peeps' thoughts.

Wait until the size <= concurrency

When i set concurrency to 10, i would expect await queue.add() will let me put 10 jobs to queue and will keep Promise pending just if the queue is full (10+)
Current implementation is always waiting for job to complete so if i am adding in iteration, it does not let iteration to fill the queue.
This expected behaviour is present for instance in async.queue but without promises support.

Timeout documentation and questions

Hi there:

Thanks for this project, really appreciate it, and I'm getting some excellent use out of it.

Is it intended for throwOnTimeout to default to true as declared in the README?

I notice the default is actually false in the implementation and it must be explicitly enabled.

It would likely be a rather rough breaking change for some people to modify that to an opt-out instead of opt-in, so I'm guessing the documentation should be updated instead?

Also, I'm not clear how the "timeout per operation functionality is supposed to function - is it intended to be settable at queue time, e.g. schedule.add(() => <promise>, {timeout: <timeout>})

I believe there's likely something broken in the option overrides if that's the case, as currently it doesn't seem to have any impact whatsoever. It looks like a fairly easy change to introduce though.

Return a promise from start()?

When queue is created with autoStart: false, I found it common to call:

queue.start();
await queue.onIdle();

I wonder if it would make sense to add a call to onIdle at the end of start() method so that just one could be used to start and wait for all tasks?

How to add to queue asynchronously?

I have a concurrency of 10 and the tasks check queue.size to see whether the queue has fallen underneath a certain number. As soon as a task sees that, it should asynchronously add 20 more tasks to the queue. But how can I do this without the other 9 tasks also trying to add those 20 new tasks to the queue as well?

I'm looking for a way to keep the size of the queue above a certain minimum number (20 in this case) by adding new items to the queue asynchronously (such as calling an API or database to determine what those next 20 queue items should be).

How do I remove a specific queue'd item?

So say I queue up 5-10 async functions, then want to cancel maybe the fifth one. How would I go about doing that?

Looked through the code and couldn't find a remove or delete. Looks like there is a dequeue but that seems to work similar to the Array pop method.

Exceptions swallowed

Any exceptions raised in the promise appear to be swallowed, invoking the notorious 'Unhandled promise rejection' message under Node.js.

Is there any way to catch/handle errors that occur while processing a queue?

Perhaps a q.onError(callback) kind of thing?

Of course one could just add a global handler for unhandled rejections via process.on('unhandledRejection'..., although you then lose the ability to trap/handle errors within a particular context.

onEmpty/onIdle misusing Promises

These methods are event oriented and should not have been using promises.

A promise is intended to only be resolved once, however this library is expecting to resolve them multiple times.
https://stackoverflow.com/questions/20328073/is-it-safe-to-resolve-a-promise-multiple-times

these API's need to be altered to queue.onIdle(() => { }); and register an array of callbacks,
or ideally, use an EventEmitter style API and do queue.on('idle', () => { });

these existing API's can be altered to forward to the new preferred API by returning something like
return {
then(cb) { this.on('idle', cb); return this; }
catch (e) { } //unused, but part of promise expectation
}

Typescript 3.5.2 compatibility issue

Upgrading from TS version 3.4.5 to 3.5.2 results in the following compile time error:

$ tsc 
node_modules/p-queue/index.d.ts:79:47 - error TS2344: Type 'DefaultAddOptions' does not satisfy the constraint 'QueueAddOptions'.
  Index signature is missing in type 'DefaultAddOptions'.

79  EnqueueOptionsType extends QueueAddOptions = DefaultAddOptions
                                                 ~~~~~~~~~~~~~~~~~


Found 1 error.

It looks like QueueAddOptions has a index definition but DefaultAddOptions does not, nor it extends QueueAddOptions.

onEmpty() is not the same as queue.pending === 0

In a use-case where

  • I have 16 async things to do
  • I don't want more than 8 "in-flight" concurrently
  • I want to know when all 16 are done

I expected to be able to

const assert = require('assert');
const Queue = require('p-queue');
const queue = new Queue({ concurrency: 8 });
const tasks = Array(16).fill(0);

const work = () => new Promise((resolve) => {
  setTimeout(resolve, Math.random() * 2000);
});

tasks.forEach(() => {
  queue.add(() => work());
});

queue.onEmpty()
  .then(() => assert.equal(queue.pending, 0));

However this is not the case -- onEmpty() resolves as soon as there's space in the queue for the new tasks to launch, not when all the tasks that have been enqueued are resolved. Does this sound right, and if so, would you be interested in another method that provides a promise that resolves when queue.pending reaches 0?

exports default

Hi, In new release 5.0.0 there is no export default support, so there is a problem when using imports in typescript

onIdle method

From discussion:

Promise from onEmpty method resolves when queue is empty, but there are pending promises in fly.

We should add onIdle method to signal, that all work in queue is completed.

Fails to minify the code when used with create-react-app

The code fails to minify because of this package. The yarn build fails with the following output:

...
Creating an optimized production build...
Failed to compile.

Failed to minify the code from this file: 

 	./node_modules/p-queue/index.js:6 

Read more here: http://bit.ly/2tRViJ9

error Command failed with exit code 1.

set timeout options but cann't catch the timeoutError

const PQueue = require('p-queue');

const queue = new PQueue({ concurrency: 1, timeout: 5000, throwOnTimeout: true });

const delay = async (ms) => {
await new Promise((resolve, reject) => {
setTimeout(() => {
console.log('timeout');
resolve();
}, ms)
});
};

try {
queue.add(() => delay(10000))
} catch(err) {
console.log(err);
}

This is my little demo.
I want to catch the timeoutError

Compile to ES5?

Can we compile index.js to ES5? create-react-app is complaining when I try to build.

image

Not depend on events?

In 3.1.0 the dependency of EE => events was introduced. That means this library does no longer work on the web.

6.x import signature

I just tried updating to 6, and it didn’t go as expected (from 5.0.0) :}

I’m using esm on node. I have a .mjs file importing this lib and the loader is not unwrapping the default export. I think this is expected because the lib is compiled TypeScript and outputs cjs with a bundler-friendly esm flag. Does TypeScript have a compile target of esm? If so, you could add a “module”: ‘dist/esm.js” or something and that would make this work transparently with esm users (who opt to bite the mjs bullet)

This can be closed if this idea is not desirable. I’m just playing with esm as it currently stands in node to get a feel.

interval & intervalCap question

Hey everyone!

Here's what I'm trying to accomplish. Please let me know if that's possible with this cool lib:

I need a queue to run 5 concurrent tasks. Let's say I add 10. I'd like to see 5 tasks running, a delay, another 5.

Is it possible to do it with interval and/or intervalCap? I couldn't figure it out just by reading the readme.

Thanks!

Possible memory leak?

Hi, i'm finding when running the queue for extended periods there appear to be handles held / memory not reducing over time.

i.e.:

import PQueue from 'p-queue';

class Cow {
  beef: number;

  constructor(i: number) {
    this.beef = i;
  }
}

async function test(queue: any) {
  for (let j = 0; j < 30000; j++) {
    const promises = [];
    console.log(`Start... ${j}`);

    for (let i = 0; i < 1000; i++) {
      promises.push(
        queue.add(async () => {
          const arr = [];
          for (let i = 0; i < 100; i++) {
            arr.push(new Cow(i));
          }
        }),
      );
    }

    await Promise.all(promises);
  }
}

(async () => {
  const queue = new PQueue({ concurrency: 5 });
  await test(queue);
  console.log('Completed');
  while (true) {}
})();

When executing this locally with node 10 the memory used before populating the queue is ~8mb but after executing this sits at around ~78mb with the cause of this simple object.

I would have expected the function completion to mark the items in the functions closure as safe for GC.

In my bigger use case, there are objects being delete from a queue elsewhere but they're stuck in memory when dumping the memory.

Am I using the queue incorrectly? Should I be calling a clear of some kind bound to a completion event?

Rejecting the whole queue when a task rejects

I'm trying p-queue after using promise-queue for some time. Given this background, I was expecting to see the whole queue failing if any of its tasks rejects. However, this does not seem to happen.

Here's an example:

// testPQueue.js

import * as PQueue from "p-queue";

async function processQueue() {
  const queue = new PQueue();
  queue.add(
    () =>
      new Promise((resolve, reject) =>
        setTimeout(() => {
          reject();
        }, 500),
      ),
  );

  await queue.onIdle();
  console.log("should not be here");
  return 42;
}

(async () => {
  try {
    const result = await processQueue();
    console.log(result);
  } catch (e) {
    console.log("could not process queue");
  }
})();
node ./testPQueue.js

Instead of seeing could not process queue in my stdout, I get this:

should not be here
42
(node:13679) UnhandledPromiseRejectionWarning: undefined
(node:13679) UnhandledPromiseRejectionWarning: Unhandled promise rejection. This error originated either by throwing inside of an async function without a catch block, or by rejecting a promise which was not handled with .catch(). (rejection id: 1)
(node:13679) [DEP0018] DeprecationWarning: Unhandled promise rejections are deprecated. In the future, promise rejections that are not handled will terminate the Node.js process with a non-zero exit code.

How can I stop and reject the whole queue once any its tasks has failed?

[TypeScript] Index signature is missing in type 'DefaultAddOptions'

Since upgrading [email protected] I get this compile error:

node_modules/p-queue/index.d.ts:81:54 - error TS2344: Type 'DefaultAddOptions' does not satisfy the constraint 'QueueAddOptions'.
  Index signature is missing in type 'DefaultAddOptions'.

81  EnqueueOptionsType extends PQueue.QueueAddOptions = PQueue.DefaultAddOptions
                                                        ~~~~~~~~~~~~~~~~~~~~~~~~

My tsconfig.json looks like:

{
	"compilerOptions": {
		"lib": ["esnext"],
		"module": "commonjs",
		"target": "es5"
	}
}

Promise when all actions in the queue are finished entirely

Currently it doesn't look like it's possible to wait until all promises are resolved, or am I missing something?

I tried to use onEmpty(). The thing is that onEmpty() can get called before all promises are resolved. It just triggers when next() is called on the queue and there are no more items to be processed.

The promise returned by .add() is not what I need either because when you use a concurrency of 5, the last .add() might resolve the as the first one.

Now that I'm typing this, I might not be using the correct library. Maybe I should have a look at p-throttle, although I like how PQueue works :).

Add option to update concurrency

I have a use case where I need to update concurrency based on queue execution results, what I'd like to do is on a specific output, decrease concurrency (I understand it would not be instant but would have to wait until items are dequeued), maybe the call to decrease concurrency could return a Promise that resolves once the new concurrency is reached ?

CHANGELOG is missing

This is the same as sindresorhus/execa#154, I was surprised to find that both packages are by you @sindresorhus! 🙂

I was upgrading my project dependencies and noticed that p-queue bumped from 2.4.2 to 3.0.0. This looked like a certain breaking change, so I went to search for a CHANGELOG to then update my code if needed.

I could not find any info on the releases page and a commonly used CHANGELOG.md file was also missing. It'd be great if at least one of these things was added to p-queue so that people were wasting less time trying to figure out whether they can safely bump or not. This can save hundreds of hours for folks around the world.

Unusable in browser environment

There is no transpiled ES5 distribution which could safely be used in the browser.

I want to use it in a web application. Do you plan to create a ES5 distributable?

I will try to transpile it with my Babel config, but that fails with larger libraries, that's why the convention is to distribute a transpiled version.

Do you see any reason why this library shouldn't be usable on the web?

Thank you for your time!

Combination with p-retry

Consider the following function, it will throw once run.

function errorFn (){
  throw new Error('Should Throw');
};

We can add this to the queue by the following code.

queue.add(errorFn)

How do we impliment a function to re-queue it in case it throws an error? Can we use p-retry and p-queue together to achieve that? Can this be added as a function to p-queue?

Here is a sample implementation,
https://github.com/entrptaher/p-requeue/blob/master/test.js

Add execution control

Currently the queue is auto-executed in p-queue.
Is it possible to add ability to start executing the queue at a later time?

Example Case

  • Some app events are fired (globally) and handled within each controller (of views).
  • When received, there are API calls to be made and some UI lists to be filled in.
  • But operations need to be queued and executed when the UI/view is ready.

Suggestion: an autorun option and .run() method...

const queue = new PQueue({ concurrency: 1, autorun: false });

app.on('locationChanged', event => {
    queue.add(() => got(endpoint)).then(result => {
        // do UI stuff
    });
});

view.on('ready', () => queue.run());

What do you think?

onSuccess?

There's the onEmpty() method but if some promise reject or throw we can still end up within the onEmpty. It would be great to have the same method if everything succeed.

Adding items to current queue?

I am currently using p-queue in combination with Puppeteer. What I want to achieve is to add the current function to the queue if an exception gets thrown. I am running with concurrency set to 3.

I have the following snippets:

getAccounts is simply a method for parsing a json file.

    async init() {
        let accounts = await this.getAccounts();
        accounts.map(async () => {
            await queue.add(() => this.test());
        });
        await queue.onIdle();
        console.log("Done");
    }
async test() {
        const browser = await puppeteer.launch({headless: false});
        try {
            const page = await browser.newPage();
            await page.goto(this._url);

            if (Math.floor(Math.random() * 10) > 4) {
                throw new Error("Simulate error");
            }

            await browser.close();
        } catch (error) {
            await browser.close();
            await queue.add(() => this.test());
            console.log(error);
        }
    }

When it goes into the catch, the queue just stops. I am not sure if my implementation is right. But it also does not execute console.log("Done");.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.