Git Product home page Git Product logo

piscina's Introduction

Piscina Logo

piscina - the node.js worker pool

CI

  • ✔ Fast communication between threads
  • ✔ Covers both fixed-task and variable-task scenarios
  • ✔ Supports flexible pool sizes
  • ✔ Proper async tracking integration
  • ✔ Tracking statistics for run and wait times
  • ✔ Cancellation Support
  • ✔ Supports enforcing memory resource limits
  • ✔ Supports CommonJS, ESM, and TypeScript
  • ✔ Custom task queues
  • ✔ Optional CPU scheduling priorities on Linux

Written in TypeScript.

For Node.js 16.x and higher.

MIT Licensed.

Piscina API

Example

In main.js:

const path = require('path');
const Piscina = require('piscina');

const piscina = new Piscina({
  filename: path.resolve(__dirname, 'worker.js')
});

(async function() {
  const result = await piscina.run({ a: 4, b: 6 });
  console.log(result);  // Prints 10
})();

In worker.js:

module.exports = ({ a, b }) => {
  return a + b;
};

The worker may also be an async function or may return a Promise:

const { setTimeout } = require('timers/promises');

module.exports = async ({ a, b }) => {
  // Fake some async activity
  await setTimeout(100);
  return a + b;
};

ESM is also supported for both Piscina and workers:

import { Piscina } from 'piscina';

const piscina = new Piscina({
  // The URL must be a file:// URL
  filename: new URL('./worker.mjs', import.meta.url).href
});

const result = await piscina.run({ a: 4, b: 6 });
console.log(result); // Prints 10

In worker.mjs:

export default ({ a, b }) => {
  return a + b;
};

Exporting multiple worker functions

A single worker file may export multiple named handler functions.

'use strict';

function add({ a, b }) { return a + b; }

function multiply({ a, b }) { return a * b; }

add.add = add;
add.multiply = multiply;

module.exports = add;

The export to target can then be specified when the task is submitted:

'use strict';

const Piscina = require('piscina');
const { resolve } = require('path');

const piscina = new Piscina({
  filename: resolve(__dirname, 'worker.js')
});

(async function() {
  const res = await Promise.all([
    piscina.run({ a: 4, b: 6 }, { name: 'add' }),
    piscina.run({ a: 4, b: 6 }, { name: 'multiply' })
  ]);
})();

Cancelable Tasks

Submitted tasks may be canceled using either an AbortController or an EventEmitter:

'use strict';

const Piscina = require('piscina');
const { resolve } = require('path');

const piscina = new Piscina({
  filename: resolve(__dirname, 'worker.js')
});

(async function() {
  const abortController = new AbortController();
  try {
    const { signal } = abortController;
    const task = piscina.run({ a: 4, b: 6 }, { signal });
    abortController.abort();
    await task;
  } catch (err) {
    console.log('The task was canceled');
  }
})();

Alternatively, any EventEmitter that emits an 'abort' event may be used as an abort controller:

'use strict';

const Piscina = require('piscina');
const EventEmitter = require('events');
const { resolve } = require('path');

const piscina = new Piscina({
  filename: resolve(__dirname, 'worker.js')
});

(async function() {
  const ee = new EventEmitter();
  try {
    const task = piscina.run({ a: 4, b: 6 }, { signal: ee });
    ee.emit('abort');
    await task;
  } catch (err) {
    console.log('The task was canceled');
  }
})();

Delaying Availability of Workers

A worker thread will not be made available to process tasks until Piscina determines that it is "ready". By default, a worker is ready as soon as Piscina loads it and acquires a reference to the exported handler function.

There may be times when the availability of a worker may need to be delayed longer while the worker initializes any resources it may need to operate. To support this case, the worker module may export a Promise that resolves the handler function as opposed to exporting the function directly:

async function initialize() {
  await someAsyncInitializationActivity();
  return ({ a, b }) => a + b;
}

module.exports = initialize();

Piscina will await the resolution of the exported Promise before marking the worker thread available.

Backpressure

When the maxQueue option is set, once the Piscina queue is full, no additional tasks may be submitted until the queue size falls below the limit. The 'drain' event may be used to receive notification when the queue is empty and all tasks have been submitted to workers for processing.

Example: Using a Node.js stream to feed a Piscina worker pool:

'use strict';

const { resolve } = require('path');
const Pool = require('../..');

const pool = new Pool({
  filename: resolve(__dirname, 'worker.js'),
  maxQueue: 'auto'
});

const stream = getStreamSomehow();
stream.setEncoding('utf8');

pool.on('drain', () => {
  if (stream.isPaused()) {
    console.log('resuming...', counter, pool.queueSize);
    stream.resume();
  }
});

stream
  .on('data', (data) => {
    pool.run(data);
    if (pool.queueSize === pool.options.maxQueue) {
      console.log('pausing...', counter, pool.queueSize);
      stream.pause();
    }
  })
  .on('error', console.error)
  .on('end', () => {
    console.log('done');
  });

Out of scope asynchronous code

A worker thread is only active until the moment it returns a result, it can be a result of a synchronous call or a Promise that will be fulfilled/rejected in the future. Once this is done, Piscina will wait for stdout and stderr to be flushed, and then pause the worker's event-loop until the next call. If async code is scheduled without being awaited before returning since Piscina has no way of detecting this, that code execution will be resumed on the next call. Thus, it is highly recommended to properly handle all async tasks before returning a result as it could make your code unpredictable.

For example:

const { setTimeout } = require('timers/promises');

module.exports = ({ a, b }) => {
  // This promise should be awaited
  setTimeout(1000).then(() => {
    console.log('Working'); // This will **not** run during the same worker call
  });
  
  return a + b;
};

Additional Examples

Additional examples can be found in the GitHub repo at https://github.com/piscinajs/piscina/tree/master/examples

Class: Piscina

Piscina works by creating a pool of Node.js Worker Threads to which one or more tasks may be dispatched. Each worker thread executes a single exported function defined in a separate file. Whenever a task is dispatched to a worker, the worker invokes the exported function and reports the return value back to Piscina when the function completes.

This class extends EventEmitter from Node.js.

Constructor: new Piscina([options])

  • The following optional configuration is supported:
    • filename: (string | null) Provides the default source for the code that runs the tasks on Worker threads. This should be an absolute path or an absolute file:// URL to a file that exports a JavaScript function or async function as its default export or module.exports. ES modules are supported.
    • name: (string | null) Provides the name of the default exported worker function. The default is 'default', indicating the default export of the worker module.
    • minThreads: (number) Sets the minimum number of threads that are always running for this thread pool. The default is the number provided by os.availableParallelism.
    • maxThreads: (number) Sets the maximum number of threads that are running for this thread pool. The default is the number provided by os.availableParallelism * 1.5.
    • idleTimeout: (number) A timeout in milliseconds that specifies how long a Worker is allowed to be idle, i.e. not handling any tasks, before it is shut down. By default, this is immediate. Tip: The default idleTimeout can lead to some performance loss in the application because of the overhead involved with stopping and starting new worker threads. To improve performance, try setting the idleTimeout explicitly.
    • maxQueue: (number | string) The maximum number of tasks that may be scheduled to run, but not yet running due to lack of available threads, at a given time. By default, there is no limit. The special value 'auto' may be used to have Piscina calculate the maximum as the square of maxThreads. When 'auto' is used, the calculated maxQueue value may be found by checking the options.maxQueue property.
    • concurrentTasksPerWorker: (number) Specifies how many tasks can share a single Worker thread simultaneously. The default is 1. This generally only makes sense to specify if there is some kind of asynchronous component to the task. Keep in mind that Worker threads are generally not built for handling I/O in parallel.
    • useAtomics: (boolean) Use the Atomics API for faster communication between threads. This is on by default. You can disable Atomics globally by setting the environment variable PISCINA_DISABLE_ATOMICS to 1. If useAtomics is true, it will cause to pause threads (stoping all execution) between tasks. Ideally, threads should wait for all operations to finish before returning control to the main thread (avoid having open handles within a thread).
    • resourceLimits: (object) See Node.js new Worker options
      • maxOldGenerationSizeMb: (number) The maximum size of each worker threads main heap in MB.
      • maxYoungGenerationSizeMb: (number) The maximum size of a heap space for recently created objects.
      • codeRangeSizeMb: (number) The size of a pre-allocated memory range used for generated code.
      • stackSizeMb : (number) The default maximum stack size for the thread. Small values may lead to unusable Worker instances. Default: 4
    • env: (object) If set, specifies the initial value of process.env inside the worker threads. See Node.js new Worker options for details.
    • argv: (any[]) List of arguments that will be stringified and appended to process.argv in the worker. See Node.js new Worker options for details.
    • execArgv: (string[]) List of Node.js CLI options passed to the worker. See Node.js new Worker options for details.
    • workerData: (any) Any JavaScript value that can be cloned and made available as require('piscina').workerData. See Node.js new Worker options for details. Unlike regular Node.js Worker Threads, workerData must not specify any value requiring a transferList. This is because the workerData will be cloned for each pooled worker.
    • taskQueue: (TaskQueue) By default, Piscina uses a first-in-first-out queue for submitted tasks. The taskQueue option can be used to provide an alternative implementation. See Custom Task Queues for additional detail.
    • niceIncrement: (number) An optional value that decreases priority for the individual threads, i.e. the higher the value, the lower the priority of the Worker threads. This value is only used on Linux and requires the optional nice-napi module to be installed. See nice(2) for more details.
    • trackUnmanagedFds: (boolean) An optional setting that, when true, will cause Workers to track file descriptors managed using fs.open() and fs.close(), and will close them automatically when the Worker exits. Defaults to true. (This option is only supported on Node.js 12.19+ and all Node.js versions higher than 14.6.0).
    • closeTimeout: (number) An optional time (in milliseconds) to wait for the pool to complete all in-flight tasks when close() is called. The default is 30000
    • recordTiming: (boolean) By default, run and wait time will be recorded for the pool. To disable, set to false.

Use caution when setting resource limits. Setting limits that are too low may result in the Piscina worker threads being unusable.

Method: run(task[, options])

Schedules a task to be run on a Worker thread.

  • task: Any value. This will be passed to the function that is exported from filename.
  • options:
    • transferList: An optional lists of objects that is passed to [postMessage()] when posting task to the Worker, which are transferred rather than cloned.
    • filename: Optionally overrides the filename option passed to the constructor for this task. If no filename was specified to the constructor, this is mandatory.
    • name: Optionally overrides the exported worker function used for the task.
    • signal: An [AbortSignal][] instance. If passed, this can be used to cancel a task. If the task is already running, the corresponding Worker thread will be stopped. (More generally, any EventEmitter or EventTarget that emits 'abort' events can be passed here.) Abortable tasks cannot share threads regardless of the concurrentTasksPerWorker options.

This returns a Promise for the return value of the (async) function call made to the function exported from filename. If the (async) function throws an error, the returned Promise will be rejected with that error. If the task is aborted, the returned Promise is rejected with an error as well.

Method: runTask(task[, transferList][, filename][, abortSignal])

Deprecated -- Use run(task, options) instead.

Schedules a task to be run on a Worker thread.

  • task: Any value. This will be passed to the function that is exported from filename.
  • transferList: An optional lists of objects that is passed to [postMessage()] when posting task to the Worker, which are transferred rather than cloned.
  • filename: Optionally overrides the filename option passed to the constructor for this task. If no filename was specified to the constructor, this is mandatory.
  • abortSignal: An [AbortSignal][] instance. If passed, this can be used to cancel a task. If the task is already running, the corresponding Worker thread will be stopped. (More generally, any EventEmitter or EventTarget that emits 'abort' events can be passed here.) Abortable tasks cannot share threads regardless of the concurrentTasksPerWorker options.

This returns a Promise for the return value of the (async) function call made to the function exported from filename. If the (async) function throws an error, the returned Promise will be rejected with that error. If the task is aborted, the returned Promise is rejected with an error as well.

Method: destroy()

Stops all Workers and rejects all Promises for pending tasks.

This returns a Promise that is fulfilled once all threads have stopped.

Method: close([options])

  • options:
    • force: A boolean value that indicates whether to abort all tasks that are enqueued but not started yet. The default is false.

Stops all Workers gracefully.

This returns a Promise that is fulfilled once all tasks that were started have completed and all threads have stopped.

This method is similar to destroy(), but with the difference that close() will wait for the worker tasks to finish, while destroy() will abort them immediately.

Event: 'error'

An 'error' event is emitted by instances of this class when:

  • Uncaught exceptions occur inside Worker threads that do not currently handle tasks.
  • Unexpected messages are sent from from Worker threads.

All other errors are reported by rejecting the Promise returned from run() or runTask(), including rejections reported by the handler function itself.

Event: 'drain'

A 'drain' event is emitted whenever the queueSize reaches 0.

Event: 'needsDrain'

Similar to Piscina#needsDrain; this event is triggered once the total capacity of the pool is exceeded by number of tasks enqueued that are pending of execution.

Event: 'message'

A 'message' event is emitted whenever a message is received from a worker thread.

Property: completed (readonly)

The current number of completed tasks.

Property: duration (readonly)

The length of time (in milliseconds) since this Piscina instance was created.

Property: options (readonly)

A copy of the options that are currently being used by this instance. This object has the same properties as the options object passed to the constructor.

Property: runTime (readonly)

A histogram summary object summarizing the collected run times of completed tasks. All values are expressed in milliseconds.

  • runTime.average {number} The average run time of all tasks
  • runTime.mean {number} The mean run time of all tasks
  • runTime.stddev {number} The standard deviation of collected run times
  • runTime.min {number} The fastest recorded run time
  • runTime.max {number} The slowest recorded run time

All properties following the pattern p{N} where N is a number (e.g. p1, p99) represent the percentile distributions of run time observations. For example, p99 is the 99th percentile indicating that 99% of the observed run times were faster or equal to the given value.

{
  average: 1880.25,
  mean: 1880.25,
  stddev: 1.93,
  min: 1877,
  max: 1882.0190887451172,
  p0_001: 1877,
  p0_01: 1877,
  p0_1: 1877,
  p1: 1877,
  p2_5: 1877,
  p10: 1877,
  p25: 1877,
  p50: 1881,
  p75: 1881,
  p90: 1882,
  p97_5: 1882,
  p99: 1882,
  p99_9: 1882,
  p99_99: 1882,
  p99_999: 1882
}

Property: threads (readonly)

An Array of the Worker instances used by this pool.

Property: queueSize (readonly)

The current number of tasks waiting to be assigned to a Worker thread.

Property: needsDrain (readonly)

Boolean value that specifies whether the capacity of the pool has been exceeded by the number of tasks submitted.

This property is helpful to make decisions towards creating backpressure over the number of tasks submitted to the pool.

Property: utilization (readonly)

A point-in-time ratio comparing the approximate total mean run time of completed tasks to the total runtime capacity of the pool.

A pools runtime capacity is determined by multiplying the duration by the options.maxThread count. This provides an absolute theoretical maximum aggregate compute time that the pool would be capable of.

The approximate total mean run time is determined by multiplying the mean run time of all completed tasks by the total number of completed tasks. This number represents the approximate amount of time the pool as been actively processing tasks.

The utilization is then calculated by dividing the approximate total mean run time by the capacity, yielding a fraction between 0 and 1.

Property: waitTime (readonly)

A histogram summary object summarizing the collected times tasks spent waiting in the queue. All values are expressed in milliseconds.

  • waitTime.average {number} The average wait time of all tasks
  • waitTime.mean {number} The mean wait time of all tasks
  • waitTime.stddev {number} The standard deviation of collected wait times
  • waitTime.min {number} The fastest recorded wait time
  • waitTime.max {number} The longest recorded wait time

All properties following the pattern p{N} where N is a number (e.g. p1, p99) represent the percentile distributions of wait time observations. For example, p99 is the 99th percentile indicating that 99% of the observed wait times were faster or equal to the given value.

{
  average: 1880.25,
  mean: 1880.25,
  stddev: 1.93,
  min: 1877,
  max: 1882.0190887451172,
  p0_001: 1877,
  p0_01: 1877,
  p0_1: 1877,
  p1: 1877,
  p2_5: 1877,
  p10: 1877,
  p25: 1877,
  p50: 1881,
  p75: 1881,
  p90: 1882,
  p97_5: 1882,
  p99: 1882,
  p99_9: 1882,
  p99_99: 1882,
  p99_999: 1882
}

Static property: isWorkerThread (readonly)

Is true if this code runs inside a Piscina threadpool as a Worker.

Static property: version (readonly)

Provides the current version of this library as a semver string.

Static method: move(value)

By default, any value returned by a worker function will be cloned when returned back to the Piscina pool, even if that object is capable of being transfered. The Piscina.move() method can be used to wrap and mark transferable values such that they will by transfered rather than cloned.

The value may be any object supported by Node.js to be transferable (e.g. ArrayBuffer, any TypedArray, or MessagePort), or any object implementing the Transferable interface.

const { move } = require('piscina');

module.exports = () => {
  return move(new ArrayBuffer(10));
}

The move() method will throw if the value is not transferable.

The object returned by the move() method should not be set as a nested value in an object. If it is used, the move() object itself will be cloned as opposed to transfering the object it wraps.

Interface: Transferable

Objects may implement the Transferable interface to create their own custom transferable objects. This is useful when an object being passed into or from a worker contains a deeply nested transferable object such as an ArrayBuffer or MessagePort.

Transferable objects expose two properties inspected by Piscina to determine how to transfer the object. These properties are named using the special static Piscina.transferableSymbol and Piscina.valueSymbol properties:

  • The Piscina.transferableSymbol property provides the object (or objects) that are to be included in the transferList.

  • The Piscina.valueSymbol property provides a surrogate value to transmit in place of the Transferable itself.

Both properties are required.

For example,

const {
  move,
  transferableSymbol,
  valueSymbol
} = require('piscina');

module.exports = () => {
  const obj = {
    a: { b: new Uint8Array(5); },
    c: { new Uint8Array(10); },

    get [transferableSymbol]() {
      // Transfer the two underlying ArrayBuffers
      return [this.a.b.buffer, this.c.buffer];
    }

    get [valueSymbol]() {
      return { a: { b: this.a.b }, c: this.c };
    }
  };
  return move(obj);
};

Custom Task Queues

By default, Piscina uses a simple array-based first-in-first-out (fifo) task queue. When a new task is submitted and there are no available workers, tasks are pushed on to the queue until a worker becomes available.

If the default fifo queue is not sufficient, user code may replace the task queue implementation with a custom implementation using the taskQueue option on the Piscina constructor.

Custom task queue objects must implement the TaskQueue interface, described below using TypeScript syntax:

interface Task {
  readonly [Piscina.queueOptionsSymbol] : object | null;
}

interface TaskQueue {
  readonly size : number;
  shift () : Task | null;
  remove (task : Task) : void;
  push (task : Task) : void;
}

An example of a custom task queue that uses a shuffled priority queue is available in examples/task-queue;

The special symbol Piscina.queueOptionsSymbol may be set as a property on tasks submitted to run() or runTask() as a way of passing additional options on to the custom TaskQueue implementation. (Note that because the queue options are set as a property on the task, tasks with queue options cannot be submitted as JavaScript primitives).

Built-In Queues

Piscina also provides the FixedQueue, a more performant task queue implementation based on FixedQueue from Node.js project.

Here are some benchmarks to compare new FixedQueue with ArrayTaskQueue (current default). The benchmarks demonstrate substantial improvements in push and shift operations, especially with larger queue sizes.

Queue size = 1000
┌─────────┬─────────────────────────────────────────┬───────────┬────────────────────┬──────────┬─────────┐
│ (index) │ Task Name                               │ ops/sec   │ Average Time (ns)  │ Margin   │ Samples │
├─────────┼─────────────────────────────────────────┼───────────┼────────────────────┼──────────┼─────────┤
│ 0       │ 'ArrayTaskQueue full push + full shift' │ '9 692'   │ 103175.15463917515 │ '±0.80%' │ 970     │
│ 1       │ 'FixedQueue  full push + full shift'    │ '131 879' │ 7582.696390658352  │ '±1.81%' │ 13188   │
└─────────┴─────────────────────────────────────────┴───────────┴────────────────────┴──────────┴─────────┘

Queue size = 100_000
┌─────────┬─────────────────────────────────────────┬─────────┬────────────────────┬──────────┬─────────┐
│ (index) │ Task Name                               │ ops/sec │ Average Time (ns)  │ Margin   │ Samples │
├─────────┼─────────────────────────────────────────┼─────────┼────────────────────┼──────────┼─────────┤
│ 0       │ 'ArrayTaskQueue full push + full shift' │ '0'     │ 1162376920.0000002 │ '±1.77%' │ 10      │
│ 1       │ 'FixedQueue full push + full shift'     │ '1 026' │ 974328.1553396407  │ '±2.51%' │ 103     │
└─────────┴─────────────────────────────────────────┴─────────┴────────────────────┴──────────┴─────────┘

In terms of Piscina performance itself, using FixedQueue with a queue size of 100,000 queued tasks can result in up to 6 times faster execution times.

Users can import FixedQueue from the Piscina package and pass it as the taskQueue option to leverage its benefits.

Using FixedQueue Example

Here's an example of how to use the FixedQueue:

const { Piscina, FixedQueue } = require('piscina');
const { resolve } = require('path');

// Create a Piscina pool with FixedQueue
const piscina = new Piscina({
  filename: resolve(__dirname, 'worker.js'),
  taskQueue: new FixedQueue()
});

// Submit tasks to the pool
for (let i = 0; i < 10; i++) {
  piscina.runTask({ data: i }).then((result) => {
    console.log(result);
  }).catch((error) => {
    console.error(error);
  });
}

Note The FixedQueue will become the default task queue implementation in a next major version.

Current Limitations (Things we're working on / would love help with)

  • Improved Documentation
  • Benchmarks

Performance Notes

Workers are generally optimized for offloading synchronous, compute-intensive operations off the main Node.js event loop thread. While it is possible to perform asynchronous operations and I/O within a Worker, the performance advantages of doing so will be minimal.

Specifically, it is worth noting that asynchronous operations within Node.js, including I/O such as file system operations or CPU-bound tasks such as crypto operations or compression algorithms, are already performed in parallel by Node.js and libuv on a per-process level. This means that there will be little performance impact on moving such async operations into a Piscina worker (see examples/scrypt for example).

Queue Size

Piscina provides the ability to configure the minimum and maximum number of worker threads active in the pool, as well as set limits on the number of tasks that may be queued up waiting for a free worker. It is important to note that setting the maxQueue size too high relative to the number of worker threads can have a detrimental impact on performance and memory usage. Setting the maxQueue size too small can also be problematic as doing so could cause your worker threads to become idle and be shutdown. Our testing has shown that a maxQueue size of approximately the square of the maximum number of threads is generally sufficient and performs well for many cases, but this will vary significantly depending on your workload. It will be important to test and benchmark your worker pools to ensure you've effectively balanced queue wait times, memory usage, and worker pool utilization.

Queue Pressure and Idle Threads

The thread pool maintained by Piscina has both a minimum and maximum limit to the number of threads that may be created. When a Piscina instance is created, it will spawn the minimum number of threads immediately, then create additional threads as needed up to the limit set by maxThreads. Whenever a worker completes a task, a check is made to determine if there is additional work for it to perform. If there is no additional work, the thread is marked idle. By default, idle threads are shutdown immediately, with Piscina ensuring that the pool always maintains at least the minimum.

When a Piscina pool is processing a stream of tasks (for instance, processing http server requests as in the React server-side rendering example in examples/react-ssr), if the rate in which new tasks are received and queued is not sufficient to keep workers from going idle and terminating, the pool can experience a thrashing effect -- excessively creating and terminating workers that will cause a net performance loss. There are a couple of strategies to avoid this churn:

Strategy 1: Ensure that the queue rate of new tasks is sufficient to keep workers from going idle. We refer to this as "queue pressure". If the queue pressure is too low, workers will go idle and terminate. If the queue pressure is too high, tasks will stack up, experience increased wait latency, and consume additional memory.

Strategy 2: Increase the idleTimeout configuration option. By default, idle threads terminate immediately. The idleTimeout option can be used to specify a longer period of time to wait for additional tasks to be submitted before terminating the worker. If the queue pressure is not maintained, this could result in workers sitting idle but those will have less of a performance impact than the thrashing that occurs when threads are repeatedly terminated and recreated.

Strategy 3: Increase the minThreads configuration option. This has the same basic effect as increasing the idleTimeout. If the queue pressure is not high enough, workers may sit idle indefinitely but there will be less of a performance hit.

In applications using Piscina, it will be most effective to use a combination of these three approaches and tune the various configuration parameters to find the optimum combination both for the application workload and the capabilities of the deployment environment. There are no one set of options that are going to work best.

Thread priority on Linux systems

On Linux systems that support nice(2), Piscina is capable of setting the priority of every worker in the pool. To use this mechanism, an additional optional native addon dependency (nice-napi, npm i nice-napi) is required. Once nice-napi is installed, creating a Piscina instance with the niceIncrement configuration option will set the priority for the pool:

const Piscina = require('piscina');
const pool = new Piscina({
  worker: '/absolute/path/to/worker.js',
  niceIncrement: 20
});

The higher the niceIncrement, the lower the CPU scheduling priority will be for the pooled workers which will generally extend the execution time of CPU-bound tasks but will help prevent those threads from stealing CPU time from the main Node.js event loop thread. Whether this is a good thing or not depends entirely on your application and will require careful profiling to get correct.

The key metrics to pay attention to when tuning the niceIncrement are the sampled run times of the tasks in the worker pool (using the runTime property) and the delay of the Node.js main thread event loop.

Multiple Thread Pools and Embedding Piscina as a Dependency

Every Piscina instance creates a separate pool of threads and operates without any awareness of the other. When multiple pools are created in a single application the various threads may contend with one another, and with the Node.js main event loop thread, and may cause an overall reduction in system performance.

Modules that embed Piscina as a dependency should make it clear via documentation that threads are being used. It would be ideal if those would make it possible for users to provide an existing Piscina instance as a configuration option in lieu of always creating their own.

Release Notes

4.1.0

Features

  • add needsDrain property (#368) (2d49b63)
  • correctly handle process.exit calls outside of a task (#361) (8e6d16e)

Bug Fixes

4.0.0

  • Drop Node.js 14.x support
  • Add Node.js 20.x to CI

3.2.0

  • Adds a new PISCINA_DISABLE_ATOMICS environment variable as an alternative way of disabling Piscina's internal use of the Atomics API. (#163)
  • Fixes a bug with transferable objects. (#155)
  • Fixes CI issues with TypeScript. (#161)

3.1.0

3.0.0

  • Drops Node.js 10.x support
  • Updates minimum TypeScript target to ES2019

2.1.0

  • Adds name property to indicate AbortError when tasks are canceled using an AbortController (or similar)
  • More examples

2.0.0

  • Added unmanaged file descriptor tracking
  • Updated dependencies

1.6.1

  • Bug fix: Reject if AbortSignal is already aborted
  • Bug Fix: Use once listener for abort event

1.6.0

  • Add the niceIncrement configuration parameter.

1.5.1

  • Bug fixes around abortable task selection.

1.5.0

  • Added Piscina.move()
  • Added Custom Task Queues
  • Added utilization metric
  • Wait for workers to be ready before considering them as candidates
  • Additional examples

1.4.0

  • Added maxQueue = 'auto' to autocalculate the maximum queue size.
  • Added more examples, including an example of implementing a worker as a Node.js native addon.

1.3.0

  • Added the 'drain' event

1.2.0

  • Added support for ESM and file:// URLs
  • Added env, argv, execArgv, and workerData options
  • More examples

1.1.0

  • Added support for Worker Thread resourceLimits

1.0.0

  • Initial release!

The Team

Acknowledgements

Piscina development is sponsored by NearForm Research.

piscina's People

Contributors

0xflotus avatar addaleax avatar alan-agius4 avatar alexframe avatar andersonjoseph avatar bellatrick avatar blackglory avatar chocobozzz avatar clayjones-at avatar clydin avatar deivu avatar dependabot[bot] avatar elyahou avatar groozin avatar hhprogram avatar jaoodxd avatar jasnell avatar jasonchoimtt avatar marsup avatar metcoder95 avatar nicholas-l avatar prinzhorn avatar rafaelgss avatar samverschueren avatar simenb avatar skrylnikov avatar theanarkh avatar trivikr avatar xqin avatar zaubernerd avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

piscina's Issues

Feature: Wait for drain API

Currently Piscina will emit a drain event when the queue is empty, which we can await by wrapping it in a events.on() (e.g. await on(piscina, 'drain') ...

It would be nicer to have an async function that avoids having to use the additional events.on() wrapper...

e.g.

await piscina.drain();

Feature: task metrics

It would be helpful if metrics were built in.

  • Number of pending jobs
  • Number of active jobs
  • Number of completed jobs
  • Job duration histogram
  • Idle time histogram

(I will be investigating these)

Feature: Thread/Task scheduling

@addaleax said:

In a similar vein, it would be nice to be able to set scheduling parameters for the spawned threads, but that would require upstream support in libuv first, then in Node.js, then here.

I've been considering this a bit and a tunable scheduling strategy would be interesting but we'd need to play with it a bit to make sure it's actually worthwhile. The strategies I'd like to explore are:

  • OS-level scheduling params (but as @addaleax points out, we have to wait on libuv/node support)
  • Piscina-level queue strategies: Done!
    • fifo - The strategy we currently use for draining the queue
    • lifo - my hypothesis is that this won't give any desirable perf but I'd like to profile various scenarios to see what impact flipping the drain order would have.
    • priority - use a priority queue to determine which tasks are selected next

Improvement: filename can be a module

/cc @addaleax

Because Piscina is using require/import to load the workers, it is actually possible for a worker to be a separate module. For example, given the following directory structure:

* node_modules
  * worker
    * index.js
    * package.json
* index.js

Where /module_modules/worker/index.js is the worker function,

From /index.js we can do the following and it just works...

'use strict';

const Piscina = require('../..');
const { resolve } = require('path');

const piscina = new Piscina({ filename: resolve('./node_modules', 'worker') });

(async function () {
  const result = await piscina.runTask({ a: 4, b: 6 });
  console.log(result); // Prints 10
})();

While it's great that this works but the resolve('./node_modules', ... is a bit unfortunate. Just specifying { filename: 'worker' } fails saying that the module worker cannot be found.

james@ubuntu:~/nearform/piscina/examples/module$ node
Welcome to Node.js v14.1.0.
Type ".help" for more information.
> require('worker')
[Function (anonymous)]
> const Piscina = require('../..')
undefined
> const p = new Piscina({ filename: 'worker' })
undefined
> p.runTask().then(console.log)
Promise { <pending> }
> (node:23776) UnhandledPromiseRejectionWarning: Error: Cannot find module 'worker'
Require stack:
- /home/james/nearform/piscina/dist/src/worker.js
    at Function.Module._resolveFilename (internal/modules/cjs/loader.js:1020:15)
    at Function.Module._load (internal/modules/cjs/loader.js:890:27)
    at Module.require (internal/modules/cjs/loader.js:1080:19)
    at require (internal/modules/cjs/helpers.js:72:18)
    at /home/james/nearform/piscina/dist/src/worker.js:45:67
    at async getHandler (/home/james/nearform/piscina/dist/src/worker.js:45:19)
    at async /home/james/nearform/piscina/dist/src/worker.js:111:29
(Use `node --trace-warnings ...` to show where the warning was created)
(node:23776) UnhandledPromiseRejectionWarning: Unhandled promise rejection. This error originated either by throwing inside of an async function without a catch block, or by rejecting a promise which was not handled with .catch(). To terminate the node process on unhandled promise rejection, use the CLI flag `--unhandled-rejections=strict` (see https://nodejs.org/api/cli.html#cli_unhandled_rejections_mode). (rejection id: 1)
(node:23776) [DEP0018] DeprecationWarning: Unhandled promise rejections are deprecated. In the future, promise rejections that are not handled will terminate the Node.js process with a non-zero exit code.

Ideally new Piscina({ filename: 'worker' }) would Just Work here.

That said, filename likely isn't the best name for the property here either :-)

worker graceful shutdown

how can i handle graceful shutdown (connections and something like this ... ) of worker before terminate.
in case of destroy or no more threads ?

Feature: Canceling tasks

Not all tasks will be cancellable, but it would be nice to have cancellation as an option, either via API or by example to show how it is possible.

Checklist to publishing first version

@mcollina @addaleax ... please update this with whatever additional items you think are needed.

Things to do before publishing version 1:

  • Completed Tests
    • 100% Code Coverage - #6 - CI is currently failing due to < 100 coverage
    • Green CI on all platforms - #6
  • Docs
    • Full API doc
    • Design explanation
    • Getting started
  • Examples
  • Benchmarks
  • Contributing guide
  • Code of conduct file - #4
  • Email to NearForm explaining project and intent
  • Blog post (for Clinic.js or NearForm blogs)
  • Introductory video (introduce project, how to use it, what it's for)
  • npm publish - I've published an empty placeholder to npm currently at version 1.0.0-pre. Once we're ready to flip the switch, any of the three of us should be able to publish the actual module.

Couple of questions:

  1. Where should piscina live? Is having it in my personal github sufficient for now or should we move it somewhere more general?

  2. @mcollina, is there a fastify integration angle here? e.g. any way that a fastify-piscina type plugin makes sense? Is there a graphql angle?

  3. Should we also export an ESM version?

Api allows workers to do initialization but it is not clear how to do cleanup

Hi there,

Docs says that it is possible to do initialization inside worker. I am using that feature to create a database connection like this:

async function initialize() {
  const conn = await connectToDb();
  return (query) => conn.exec(query);
}

module.exports = initialize();

During its lifetime, pool may decide to close inactive threads. In that case I need to close database connection.
But currently I didn't found any api which allows to do that.

Currently I set minThreads and maxThreads to the same value and have the following as a workaround:

const Piscina = require("piscina")

const workerCount = 10;

const pool = new Piscina({
    filename: __dirname + "/searchMessagesWorker.cjs",
    minThreads: workerCount,
    maxThreads: workerCount,
    idleTimeout: Integer.MAX_SAFE_INTEGER
})

// this will be set to `false` each time user calls `pool.runTask`
let poolIsEmpty = true
pool.on('drain', () => {
    poolIsEmpty = true
})
    
function shutdown() {
    // drainPromise will be resolved as soon pool will be drained
    const drainPromise = new Promise((resolve) => {
        if (poolIsEmpty) {
           resolve()
        }
        else {
            pool.once('drain', resolve)
        }
    })

    return drainPromise.then(
        () => {
            // no tasks are scheduled on the pool at this moment.
            // Schedule "workerCount" amount "close" tasks in a single loop,
            // assuming Piscina will assign a single task to each thread.
            const promises = []
            for (let i = 0; i < workerCount; ++i) {
                // this tells worker to close DB connection
                promises.push(pool.runTask({
                    type: 'close'
                }))
            }
            return Promise.all(promises)
        }
    ).then(
        () => pool.destroy()
    )
}

Not only it is complicated, but also fragile (depending on how Piscina schedules tasks under the hood, setting idleTimeout to very high number). Also, this way it is not possible to have dynamic number of threads in the pool.

Interesting, but cleanup functionality is something which is missing in all worker pool implementations I can find on npm. I am wondering why?
Maybe there is a completely different way to do cleanup inside worker thread?

I'll be grateful for any explanation/reading resource on this topic.
If this is something which is in scope of Piscina package I am ready to work on PR as well.

Discuss: Streams in and Streams out

@mcollina @addaleax ... in looking through the possibility of using the worker pool for server side rendering, one of the difficulties is going to be the fact that existing SSR mechanisms (e.g. nextjs when using fastify-nextjs) require the ability to work directly with the request and response objects as streams (e.g. app.render(req.raw, reply.res, '/hello', req.query, {})). Within the worker thread, we do not have direct access to these streams, nor do we have access to the global nextjs application. The best we'd be able to do is create each worker such that it has it's own distinct image of the nextjs application without any streaming of the actual rendered data. That is obviously not ideal...

We should try to find a way of enabling streaming data into and out of a task. We can likely use the worker threads already wrapped process.stdin and process.stdout so long as the concurrency of each worker is exactly 1 task at a time.

Feature request: Worker initiated messages send to parent

Hi, Thanks for the great software!

I'm wondering if Piscina could add support to allow workers actively sending messages to parent main thread, with or without waiting for parent's acknowledgement. This feature may greatly improve the use case when we need to:

  1. Send customized status updates from worker to main thread
  2. Look up resources which only exist in main thread
  3. Delegate I/O centric logic, or connection managements, e.g. DB connection pools to main thread to avoid excessive network connections in many workers
  4. Allow call another worker from the context of current worker, and etc.

For example we could have below code runs in the context of worker threads:

const { callParent, sendMessageToParent } = require('piscina');

(async function () {
const result = await callParent({ a: 4, b: 6 }); // child send message to parent
console.log(result); // waiting for parent acknowledges worker with the result
//or without having to wait for parent's ack:
sendMessageToParent({ a: 4, b: 6 })
})();

And in the context of main thread:
const Piscina = require('piscina');

const piscina = new Piscina({
filename: path.resolve(__dirname, 'worker.js')
});

piscina.on('childmessage', (data) => {
...
});

Renamed master -> current

Created a new current branch and made it the default. Will keep master around for a little while but the plan is to delete that branch

Feature: Interface Transferable / Piscina.move()

Currently, values returned by a worker are not transferable. That is, they will always be cloned because we currently have no way of telling Piscina that the value should be transferred instead.

module.exports = () => {
  const buf = Buffer.alloc(10);
  return buf; // Copies the buffer, rather than transferring it
}

My proposal is to add a new Piscina.move() function that takes an input object capable of being transferred and wraps it in a new Transferable interface (in typescript terms) that effectively marks it as being able to be transferred.

const { move } = require('piscina');

module.exports = () => {
  const buf = Buffer.alloc(10);
  return move(buf);
}

The return value of move() is a Transferable, a wrapper object that (a) marks the object as being transferable and (b) provides a reference to the actual internal transferable object (e.g. the underlying ArrayBuffer in the example above.

This would only work with top level objects... for instance, the following would not cause the buffer to be transferred:

module.exports = () => {
  return { m: move(Buffer.alloc(10)) }
}

The move() function would work out of the box with objects known to be transferable by Node.js (e.g. MessagePorts and TypedArrays). Users would be able to create their own transferable objects by implementing the Transferable interface on their classes.

Crash on startup, no error given

I'm using typescript, __dirname is not defined, so I'm resolving my path manually.

this.workerPool = new Piscina({ filename: path.resolve("test/worker", 'worker.js'), maxThreads: 2 });

Doesn't go past that.

Feature: Task sets

This one I'm not yet convinced about but it came up in a conversation and I promised to consider it... the idea is to allow submitting multiple tasks at once and awaiting on all of them. The way we would do this now is:

await Promise.all([
  piscina.runTask(1),
  piscina.runTask(2),
  piscina.runTask(3),
]);

The suggestion would be for something like:

await piscina.runAll([1, 2, 3]);

Given that we can already do this with the current API, I'm not convinced there's anything we should do here.

Node silently exits with code 0, no error, when using a specific string for String.includes inside worker

Hey jasnell,

Node silently exits with code 0, no error, when using a specific string for String.includes inside worker. This happens during my call to await Promise.all(worker_promises) where worker_promises is an array of Piscina runTask() promises.

If you change it to a different string and run, it will exit normally producing full output.

I can give you my script to test with, but the problem is that it's designed to parse log file data from DDoS attacks (to count occurrences of requests matching certain patterns) and the dataset causing the issue is about 20GiB (1,186 gzipped log files). I can send it to you if you're okay with this.

Here are the two relevant sections of code and the output when it works:
Screen Shot 2021-02-10 at 6 08 44 PM
Screen Shot 2021-02-10 at 6 10 53 PM

Here are the two relevant sections of code and the output when it silently fails:
Screen Shot 2021-02-10 at 6 01 27 PM
Screen Shot 2021-02-10 at 6 02 02 PM

The dataset remains the same between runs and there are no other code changes.

I'm on node v14.15.5, macOS 11.2 x86

piscina with jest

I have an issue in one of my repo where user uses jest and this issue are pop up from the log

I wonder if it's sth to do with piscina?

Jest has detected the following 3 open handles potentially keeping Jest from exiting:

  ●  Piscina

      at new EventEmitterReferencingAsyncResource (../node_modules/eventemitter-asyncresource/src/index.ts:23:5)
      at new EventEmitterAsyncResource (../node_modules/eventemitter-asyncresource/src/index.ts:46:7)
      at new Piscina (../node_modules/piscina/src/index.ts:827:5)
      at Object.<anonymous> (../node_modules/camaro/index.js:4:14)


  ●  WORKER

      at ThreadPool._addNewWorker (../node_modules/piscina/src/index.ts:520:20)
      at ThreadPool._ensureMinimumWorkers (../node_modules/piscina/src/index.ts:514:12)
      at new ThreadPool (../node_modules/piscina/src/index.ts:508:10)
      at new Piscina (../node_modules/piscina/src/index.ts:876:18)
      at Object.<anonymous> (../node_modules/camaro/index.js:4:14)


  ●  MESSAGEPORT

      at ThreadPool._addNewWorker (../node_modules/piscina/src/index.ts:528:30)
      at ThreadPool._ensureMinimumWorkers (../node_modules/piscina/src/index.ts:514:12)
      at new ThreadPool (../node_modules/piscina/src/index.ts:508:10)
      at new Piscina (../node_modules/piscina/src/index.ts:876:18)
      at Object.<anonymous> (../node_modules/camaro/index.js:4:14)

ref: tuananh/camaro#113 (comment)

Feature: Multi-step tasks

This is one I'm not at all convinced about but came up in a conversation and I said I would investigate.

Essentially, it's about submitting a single task specification that goes through multiple steps before finally resolving. The output of one step feeds into the input of the next. Exactly how to define that flow is unclear.

The way we would do this now is simply:

piscina.runTask(await piscina.runTask({}))

Which begs the question about whether an API for this case would even be necessary.

Worker State

Hi,
What if a worker has a state, like a db connection that should be open prior to processing,
Should I pass a JS file opens that connection upon being required, and delays processing of any message until that connection is open?

Feature Request: Seamless stream and/or async iterable support

Would be nice to be able to provide a stream and/or async iterable (of buffers) as argument and have is seamlessly (using transferable) accessible in the worker.

e.g.

const handle = await fs.open(dst)
try {
  for await (const buf of worker.runTask(fs.createReadStream(src))) {
     await handle.write(buf)
  }
} finally {
  await handle.close()
}
module.exports = async function * (source) {
  for await (const buf of source) {
    yield veryExpensiveProcessing(buf)
  }
}

How to update state of each worker instance

Lets say each worker has global configuration state, that is same across all workers.
Is it possible to send new configuration object to each Worker to re-initialise its state?

Typescript examples of use

I can't manange to make it work with typescript.
The big problem is the "worker.ts" file, when compiling to "worker.js" file there is no
module.exports = ({ a, b }) => { return a + b; };
In the JS file compiled.
Using export default function() in worker.ts file is not working at all.

Warnings about experimental ESM module loader while CommonJS is used

Hi,

I was trying to use Piscina in production and I've used CommonJS for all my code, including worker.

Basically, my worker looks like this:

async function initialize() {
    return (action) => {
         // do something cpu-intensive here, based on action received
    }
}
module.exports = initialize();

Each time the worker starts, I am getting a message in the console:

(node:94710) ExperimentalWarning: The ESM module loader is experimental.

That unfortunate, because it is polluting logs and is not in a json format which is the format of my app logger (I use Fastify with Pino).

Is it possible to check is file is written in CommonJS format and use require instead of import to load worker code?
Or maybe use require when file extension is cjs (taking into account package type if any?) ?

Feature: maxQueue = 'auto'

In doing a variety of performance tests, I've found that keeping maxQueue limited to a multiple of maxThreads generally has the best performance in terms of minimizing queue waits and keeping memory bounded. Right now the default maxQueue is Infinity which I think we should keep for now but I'd like to have a mode where maxQueue is set automatically based on the max thread count. maxQueue = 'auto' would do exactly that.

Progress

Hi , can you please add a simple example of a worker that send progress to main file before it's close

Differentiate different errors, export AbortError

I might be missing something, but there doesn't seem to be a way to differentiate between an error that happened with the task or an error that happened because it was aborted. Could AbortError be exported? So we can do:

try {
  await piscina.runTask(data, abortController);
} catch (err) {
  if(err instanceof AbortError) {
    console.log('The task was canceled');
  } else {
    console.log('The task failed');
  }
}

Because I don't need AbortError in my logs, they are an expected part of my usage.

Feature: Allow worker export to be a Promise<Function>

Currently, a worker must export a Function immediately on loading. It would be nice (so we can allow for any async initialization tasks to occur before the ready message is sent) to also allow a worker to export a Promise ... e.g.

module.exports = (async () => {
  // Do any async init tasks here...
  return () => console.log('hello from the handler function')
})();

Refs: #57 (comment)

@addaleax ... whatcha think?

Support explicit registration of workers

Description

Add a mechanism for worker code to register itself with piscina instead of relying on the existing ./src/worker.ts logic that attempts to resolve and load (and cache) worker code.

Use-case

When bundling server-side code, ./dist/worker.js will not be picked up in the same bundle as ./dist/index.js because there are no statically-analyzable dependencies on the worker file. As a result, at runtime, the new location of the now-bundled ./dist/index.js is no longer adjacent to ./dist/worker.js on disk.

To get around this, it is possible to add ./dist/worker.js as a separate entrypoint to make sure it gets tracked by the bundler. An alternative solution, seen in workerpool is that it exposes a worker(functions: Record<string, (...args: any[]) => any>): void API that allows a worker to import the library's coordination code. The way I see this is that it inverts the relationship so that the worker has an explicit opt-in option.

A side-benefit that I'd love to see considered in piscina is that this provides a clean facility for registering functions that are not necessarily default exports and even for registering different named functions so that a dispatching layer isn't needed.

Feature: ESM support

I know you said you're working on this @addaleax but wanted to get the issue documented.

It would be great to have built-in support for ESM such that (a) Piscina can be used as ESM and (b) Workers can be defined as ESM.

Currently, creating a Piscina instance and pointing it at an ESM module fails:

const piscina = new Piscina({
  filename: path.resolve(__dirname, 'worker.mjs')
});
(node:443) UnhandledPromiseRejectionWarning: Error [ERR_REQUIRE_ESM]: Must use import to load ES Module: /home/james/nearform/piscina/examples/simple_esm/worker.mjs
    at Module.load (internal/modules/cjs/loader.js:1038:11)
    at Function.Module._load (internal/modules/cjs/loader.js:929:14)
    at Module.require (internal/modules/cjs/loader.js:1080:19)
    at require (internal/modules/cjs/helpers.js:72:18)
    at /home/james/nearform/piscina/dist/src/worker.js:28:63
    at async getHandler (/home/james/nearform/piscina/dist/src/worker.js:28:15)
    at async /home/james/nearform/piscina/dist/src/worker.js:93:29
(Use `node --trace-warnings ...` to show where the warning was created)
(node:443) UnhandledPromiseRejectionWarning: Unhandled promise rejection. This error originated either by throwing inside of an async function without a catch block, or by rejecting a promise which was not handled with .catch(). To terminate the node process on unhandled promise rejection, use the CLI flag `--unhandled-rejections=strict` (see https://nodejs.org/api/cli.html#cli_unhandled_rejections_mode). (rejection id: 1)
(node:443) [DEP0018] DeprecationWarning: Unhandled promise rejections are deprecated. In the future, promise rejections that are not handled will terminate the Node.js process with a non-zero exit code.

Feature: Dynamic scaling of minimum worker count

Currently the minimum number of workers is fixed at piscina start, with the minimum maintained at any given time. In discussion the idea of a dynamically scaled minimum based on the queue pressure was suggested. Essentially, rather than scale up one new worker at a time as needed when a new task is completed, automatically start launching N new workers (up to max worker count) while tasks are being added to the queue, where the minimum is dynamically calculated as a factor of the pending queue size.

Whether this would have a measurable performance improvement or not remains to be determined.

Feature request: task result streams or AsyncIterators

Hi folks -- thanks for putting together an awesome piece of software! I'd like to bring up a feature request to get an assessment for how hard it'd be to build into piscina: ReadableStream responses moved from worker threads to the calling thread. Instead of producing one Transferrable result at the end of a task, it'd be great to produce a stream or an AsyncIterator of results that the calling thread could access before the whole task is completed.

The concrete use case is using piscina to implement a server side renderer for client side React applications. Piscina is perfect for this: React SSR is a synchronous, CPU intensive task that usually blocks the main node thread for surprisingly long, and it'd be great to offload to a worker thread to keep the other work on the main event loop unblocked.

For maximum performance and the ideal user experience, React has built a streaming server side renderer that renders out chunks of HTML that can be streamed to the browser as they are produced, instead of having to wait until the entire render is complete before sending the first byte. This means the browser can start rendering, start prefetching other resources, etc etc, and is no extra work for the people building the React apps to support. See https://reactjs.org/docs/react-dom-server.html#rendertonodestream for more information.

I think architecturally this'd be a real sweet spot for piscina. To me, this library isn't the same as something like bullmq or pg-boss where tasks are persisted in some other system and worked by some other process. Piscina seems best at ephemeral, short lived tasks where you need the result quickly and don't want to pay as little of a network or serialization cost as possible. It's closer to a function call than those other things, and it'd be awesome if it supported the same return values that other function calls do, like Streams or AsyncIterators.

Discuss: runTasks (plural)

It would be interesting to investigate a variation on the runTask API that allows multiple jobs to be submitted that culminates with a single Promise... using All, Race, and AllSettled semantics...

e.g.

await piscina.all([ ... tasks])

await piscina.race([... tasks])

await piscina.allSettled([ ... tasks])

It would also be interesting to explore the ability to pass in an async generator that can feed tasks to piscina asynchronously ...

piscina.runTasks(my_async_generator)

Or ... a stream option

const my_task_transform = ...

stream.pipeline(source, my_task_transform, piscina.stream(), destination, () => {})

Really, just thinking about alternative ways of feeding the task queue...

Missing tags

It’s currently very hard to identify which commit in this repo corresponds to which npm release. I’ll leave this as a reminder for me to go figure out which match to which and push tags for those.

Piscina vs Cluster

Why and when should I use Piscina instead of Node's Cluster API?
I see there are some nice abstractions but as long as there are issues - when using third party modules - like bundle size, watch for CVEs and vulnerable dependencies etc... I wonder if there is a good reason for using it.

I briefly read the source code and could not figure out when it is a good idea.

Is it safe and advisable to use with expressjs or hapi?

how to know if all the thread is initialized?

we can do some initial work before actually do some task:

async function initialize() {
  await someAsyncInitializationActivity();
  return ({ a, b }) => a + b;
}

module.exports = initialize();

if the minThreads is set, piscina will create threads automatically when new Piscina() .

but how to make sure all the thread is initialized before calling runTask?

use case:
lots of node process, need make sure one process's all threads is initialized before handle user request, otherwise the user request may get overtime error.

is there a 'ready' event?

Order guarantees?

What kind of ordering guarantees does piscina provide? i.e. is it FIFO or whoever finishes first?

Reusable worker pool

Hello, I am staring using this library. I am trying to create a pool of workers then assign them jobs dynamically so as to reuse the same pool of workers. How can i achieve this?

Feature: Take os.loadavg() into account

It would be nice if piscina took os.loadavg() into account when deciding how many workers to use in a given situation, to dynamically respond to the system load. This can be helpful considering that the application that uses piscina may not be the main application running on a given host, and that the up to 1.5 × number of CPUs maxThreads default may be too much in a lot of circumstances.

The idea here is that we would not post new tasks until either less than minThreads tasks are currently running or the system drops below x CPU load (where x could be e.g. 100 % by default, but configurable).

In a similar vein, it would be nice to be able to set scheduling paramters for the spawned threads, but that would require upstream support in libuv first, then in Node.js, then here.

ParentPort with multiple workers

Is there a way to have a single MessageChannel in the main process with multiple tasks like the parentPort

I tried with

(async function () {
  const channel = new MessageChannel()
  channel.port1.on('message', (message) => {
    console.log(message)
  })
  let a = 0
  while (a < 10) {
    piscina.runTask({ hello: `world${a}`, port: channel.port2 }, [channel.port2])
    a++
  }
})()

but it gets DataCloneError: MessagePort in transfer list is already detached

Feature: pool utilization metric

@addaleax @mcollina ... In addition to the run time and wait time histograms, I would like to provide a calculation about the utilization of the pool but want to verify the approach before going too far.

Specifically, the pool utilization is a ratio of the total mean task run time to the total capacity run time of the pool. It is always a point-in-time measurement based on four calculations:

  • The pool duration -- the length of time the pool has been active (determined from the moment the Piscina object was created.

  • The pool capacity -- calculated as duration multiplied by maxThreads. This is the absolute upper bound of potential compute time for any task submitted by the queue. It is a limit that practically can never be reached for many practical reasons but is based on the assumption of 100% of the workers being 100% active for the duration of the Piscina object.

  • The pool approximate total mean run time -- calculated as the mean run time of all completed tasks multiplied by the total number of completed tasks. This is an approximation of the total amount of compute time actually realized by the pool.

  • The pool utilization -- calculated by dividing the approximate total run time time by the capacity to yield.

So, for instance, if the pool has been active for 100 milliseconds, has maxThreads = 10, and has processed 5 tasks with a mean runtime of 5 milliseconds each, we get:

  • duration: 100 milliseconds
  • capacity: 1000 milliseconds
  • approximate total mean run time: 25 milliseconds
  • utilization: 0.025

Our example queue, then, has an approximate upper limit of around 200 tasks that could have been completed within that 100 millisecond duration barring all other performance considerations.

Does this calculation seem reasonable?

In terms of API, there would be two new properties off Piscina:

  • duration -- provides the duration value
  • utilization -- provides the calculated point-in-time utilization value

We could potentially sample the utilization like we do with run times as the completion of each task but that seems generally unnecessary. If a use case for that crops up we can always add it later.

RangeError: Maximum call stack size exceeded

I have the below example

const path = require('path')
const csv = require('csvtojson')
const Pool = require('piscina')

const workerPool = new Pool({
    filename: path.resolve(__dirname, 'worker.js'),
})

csv()
    .fromFile('4mb.csv')
    .on('data', (data) => {
        const line = data.toString('utf8')
        workerPool.runTask(line)
    })
    .on('error', console.error)
    .on('end', () => {
        console.log('done!')
    })

worker.js: just parse json and print it. nothing fancy.

module.exports = (data) => {    
    console.log(JSON.parse(data))
}

I'm seeing a lot of errors like this

internal/worker/io.js:251
  const message = receiveMessageOnPort_(port);
                  ^

RangeError: Maximum call stack size exceeded
    at receiveMessageOnPort (<anonymous>)
    at receiveMessageOnPort (internal/worker/io.js:251:19)
    at WorkerInfo.processPendingMessages (/Users/tuananh/Code/scratchpad/node_modules/piscina/dist/src/index.js:174:29)
    at WorkerInfo.onMessage (/Users/tuananh/Code/scratchpad/node_modules/piscina/dist/src/index.js:249:32)
    at WorkerInfo._handleResponse (/Users/tuananh/Code/scratchpad/node_modules/piscina/dist/src/index.js:132:14)
    at WorkerInfo.processPendingMessages (/Users/tuananh/Code/scratchpad/node_modules/piscina/dist/src/index.js:175:22)
    at WorkerInfo.onMessage (/Users/tuananh/Code/scratchpad/node_modules/piscina/dist/src/index.js:249:32)
    at WorkerInfo._handleResponse (/Users/tuananh/Code/scratchpad/node_modules/piscina/dist/src/index.js:132:14)
    at WorkerInfo.processPendingMessages (/Users/tuananh/Code/scratchpad/node_modules/piscina/dist/src/index.js:175:22)
    at WorkerInfo.onMessage (/Users/tuananh/Code/scratchpad/node_modules/piscina/dist/src/index.js:249:32)
    at WorkerInfo._handleResponse (/Users/tuananh/Code/scratchpad/node_modules/piscina/dist/src/index.js:132:14)
    at WorkerInfo.processPendingMessages (/Users/tuananh/Code/scratchpad/node_modules/piscina/dist/src/index.js:175:22)
    at WorkerInfo.onMessage (/Users/tuananh/Code/scratchpad/node_modules/piscina/dist/src/index.js:249:32)
    at WorkerInfo._handleResponse (/Users/tuananh/Code/scratchpad/node_modules/piscina/dist/src/index.js:132:14)
    at WorkerInfo.processPendingMessages (/Users/tuananh/Code/scratchpad/node_modules/piscina/dist/src/index.js:175:22)
    at WorkerInfo.onMessage (/Users/tuananh/Code/scratchpad/node_modules/piscina/dist/src/index.js:249:32)

Feature: `'drain'` event

We should add a piscina.on('drain', () => {}) event that is fired once all currently pending tasks have been either completed or dispatched to workers. The use case is to provide a signal for when it may be ok to start submitting tasks to the queue again. Specifically, if I am using a stream to push tasks into the queue, but I approach the queue limit, I want to pause the inbound stream to apply backpressure... then, once the queue has reached zero, I want a signal to indicate that I was unpause the feeding stream and resume submitting tasks.

We should consider carefully how this works because the timing of the event can be a performance bottleneck. For instance, if we wait until the all tasks are completed, then our queue will go idle while we wait for the next tasks to be submitted, which could cause our workers to be terminated unnecessarily. However, if we fire the drain event too early we could add churn.

One idea would be to set a load threshold as a percentage of queue limit as a configuration option for the drain event. That is, when queue limit is not unlimited, setting a load threshold of 0.25 would mean that when the queue size is 25% of the queue limit, the 'drain' event is emitted allowing us to start filling the queue back up.

Wanted to make sure we discussed the strategy on this before working on the implementation.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.