Git Product home page Git Product logo

workerpool's Introduction

workerpool

workerpool offers an easy way to create a pool of workers for both dynamically offloading computations as well as managing a pool of dedicated workers. workerpool basically implements a thread pool pattern. There is a pool of workers to execute tasks. New tasks are put in a queue. A worker executes one task at a time, and once finished, picks a new task from the queue. Workers can be accessed via a natural, promise based proxy, as if they are available straight in the main application.

workerpool runs on Node.js and in the browser.

Features

  • Easy to use
  • Runs in the browser and on node.js
  • Dynamically offload functions to a worker
  • Access workers via a proxy
  • Cancel running tasks
  • Set a timeout on tasks
  • Handles crashed workers
  • Small: 7 kB minified and gzipped
  • Supports transferable objects (only for web workers and worker_threads)

Why

JavaScript is based upon a single event loop which handles one event at a time. Jeremy Epstein explains this clearly:

In Node.js everything runs in parallel, except your code. What this means is that all I/O code that you write in Node.js is non-blocking, while (conversely) all non-I/O code that you write in Node.js is blocking.

This means that CPU heavy tasks will block other tasks from being executed. In case of a browser environment, the browser will not react to user events like a mouse click while executing a CPU intensive task (the browser "hangs"). In case of a node.js server, the server will not respond to any new request while executing a single, heavy request.

For front-end processes, this is not a desired situation. Therefore, CPU intensive tasks should be offloaded from the main event loop onto dedicated workers. In a browser environment, Web Workers can be used. In node.js, child processes and worker_threads are available. An application should be split in separate, decoupled parts, which can run independent of each other in a parallelized way. Effectively, this results in an architecture which achieves concurrency by means of isolated processes and message passing.

Install

Install via npm:

npm install workerpool

Load

To load workerpool in a node.js application (both main application as well as workers):

const workerpool = require('workerpool');

To load workerpool in the browser:

<script src="workerpool.js"></script>

To load workerpool in a web worker in the browser:

importScripts('workerpool.js');

Use

Offload functions dynamically

In the following example there is a function add, which is offloaded dynamically to a worker to be executed for a given set of arguments.

myApp.js

const workerpool = require('workerpool');
const pool = workerpool.pool();

function add(a, b) {
  return a + b;
}

pool
  .exec(add, [3, 4])
  .then(function (result) {
    console.log('result', result); // outputs 7
  })
  .catch(function (err) {
    console.error(err);
  })
  .then(function () {
    pool.terminate(); // terminate all workers when done
  });

Note that both function and arguments must be static and stringifiable, as they need to be sent to the worker in a serialized form. In case of large functions or function arguments, the overhead of sending the data to the worker can be significant.

Dedicated workers

A dedicated worker can be created in a separate script, and then used via a worker pool.

myWorker.js

const workerpool = require('workerpool');

// a deliberately inefficient implementation of the fibonacci sequence
function fibonacci(n) {
  if (n < 2) return n;
  return fibonacci(n - 2) + fibonacci(n - 1);
}

// create a worker and register public functions
workerpool.worker({
  fibonacci: fibonacci,
});

This worker can be used by a worker pool:

myApp.js

const workerpool = require('workerpool');

// create a worker pool using an external worker script
const pool = workerpool.pool(__dirname + '/myWorker.js');

// run registered functions on the worker via exec
pool
  .exec('fibonacci', [10])
  .then(function (result) {
    console.log('Result: ' + result); // outputs 55
  })
  .catch(function (err) {
    console.error(err);
  })
  .then(function () {
    pool.terminate(); // terminate all workers when done
  });

// or run registered functions on the worker via a proxy:
pool
  .proxy()
  .then(function (worker) {
    return worker.fibonacci(10);
  })
  .then(function (result) {
    console.log('Result: ' + result); // outputs 55
  })
  .catch(function (err) {
    console.error(err);
  })
  .then(function () {
    pool.terminate(); // terminate all workers when done
  });

Worker can also initialize asynchronously:

myAsyncWorker.js

define(['workerpool/dist/workerpool'], function (workerpool) {
  // a deliberately inefficient implementation of the fibonacci sequence
  function fibonacci(n) {
    if (n < 2) return n;
    return fibonacci(n - 2) + fibonacci(n - 1);
  }

  // create a worker and register public functions
  workerpool.worker({
    fibonacci: fibonacci,
  });
});

Examples

Examples are available in the examples directory:

https://github.com/josdejong/workerpool/tree/master/examples

API

The API of workerpool consists of two parts: a function workerpool.pool to create a worker pool, and a function workerpool.worker to create a worker.

pool

A workerpool can be created using the function workerpool.pool:

workerpool.pool([script: string] [, options: Object]) : Pool

When a script argument is provided, the provided script will be started as a dedicated worker. When no script argument is provided, a default worker is started which can be used to offload functions dynamically via Pool.exec. Note that on node.js, script must be an absolute file path like __dirname + '/myWorker.js'. In a browser environment, script can also be a data URL like 'data:application/javascript;base64,...'. This allows embedding the bundled code of a worker in your main application. See examples/embeddedWorker for a demo.

The following options are available:

  • minWorkers: number | 'max'. The minimum number of workers that must be initialized and kept available. Setting this to 'max' will create maxWorkers default workers (see below).
  • maxWorkers: number. The default number of maxWorkers is the number of CPU's minus one. When the number of CPU's could not be determined (for example in older browsers), maxWorkers is set to 3.
  • maxQueueSize: number. The maximum number of tasks allowed to be queued. Can be used to prevent running out of memory. If the maximum is exceeded, adding a new task will throw an error. The default value is Infinity.
  • workerType: 'auto' | 'web' | 'process' | 'thread'.
    • In case of 'auto' (default), workerpool will automatically pick a suitable type of worker: when in a browser environment, 'web' will be used. When in a node.js environment, worker_threads will be used if available (Node.js >= 11.7.0), else child_process will be used.
    • In case of 'web', a Web Worker will be used. Only available in a browser environment.
    • In case of 'process', child_process will be used. Only available in a node.js environment.
    • In case of 'thread', worker_threads will be used. If worker_threads are not available, an error is thrown. Only available in a node.js environment.
  • workerTerminateTimeout: number. The timeout in milliseconds to wait for a worker to cleanup it's resources on termination before stopping it forcefully. Default value is 1000.
  • forkArgs: String[]. For process worker type. An array passed as args to child_process.fork
  • forkOpts: Object. For process worker type. An object passed as options to child_process.fork. See nodejs documentation for available options.
  • workerOpts: Object. For web worker type. An object passed to the constructor of the web worker. See WorkerOptions specification for available options.
  • workerThreadOpts: Object. For worker worker type. An object passed to worker_threads.options. See nodejs documentation for available options.
  • onCreateWorker: Function. A callback that is called whenever a worker is being created. It can be used to allocate resources for each worker for example. The callback is passed as argument an object with the following properties:
    • forkArgs: String[]: the forkArgs option of this pool
    • forkOpts: Object: the forkOpts option of this pool
    • workerOpts: Object: the workerOpts option of this pool
    • script: string: the script option of this pool Optionally, this callback can return an object containing one or more of the above properties. The provided properties will be used to override the Pool properties for the worker being created.
  • onTerminateWorker: Function. A callback that is called whenever a worker is being terminated. It can be used to release resources that might have been allocated for this specific worker. The callback is passed as argument an object as described for onCreateWorker, with each property sets with the value for the worker being terminated.
  • emitStdStreams: boolean. For process or thread worker type. If true, the worker will emit stdout and stderr events instead of passing it through to the parent streams. Default value is false.

Important note on 'workerType': when sending and receiving primitive data types (plain JSON) from and to a worker, the different worker types ('web', 'process', 'thread') can be used interchangeably. However, when using more advanced data types like buffers, the API and returned results can vary. In these cases, it is best not to use the 'auto' setting but have a fixed 'workerType' and good unit testing in place.

A worker pool contains the following functions:

  • Pool.exec(method: Function | string, params: Array | null [, options: Object]) : Promise<any, Error>
    Execute a function on a worker with given arguments.

    • When method is a string, a method with this name must exist at the worker and must be registered to make it accessible via the pool. The function will be executed on the worker with given parameters.
    • When method is a function, the provided function fn will be stringified, send to the worker, and executed there with the provided parameters. The provided function must be static, it must not depend on variables in a surrounding scope.
    • The following options are available:
      • on: (payload: any) => void. An event listener, to handle events sent by the worker for this execution. See Events for more details.
      • transfer: Object[]. A list of transferable objects to send to the worker. Not supported by process worker type. See example for usage.
  • Pool.proxy() : Promise<Object, Error>
    Create a proxy for the worker pool. The proxy contains a proxy for all methods available on the worker. All methods return promises resolving the methods result.

  • Pool.stats() : Object
    Retrieve statistics on workers, and active and pending tasks.

    Returns an object containing the following properties:

    {
      totalWorkers: 0,
      busyWorkers: 0,
      idleWorkers: 0,
      pendingTasks: 0,
      activeTasks: 0
    }
    
  • Pool.terminate([force: boolean [, timeout: number]]) : Promise<void, Error>

    If parameter force is false (default), workers will finish the tasks they are working on before terminating themselves. Any pending tasks will be rejected with an error 'Pool terminated'. When force is true, all workers are terminated immediately without finishing running tasks. If timeout is provided, worker will be forced to terminate when the timeout expires and the worker has not finished.

The function Pool.exec and the proxy functions all return a Promise. The promise has the following functions available:

  • Promise.then(fn: Function<result: any>) : Promise<any, Error>
    Get the result of the promise once resolve.
  • Promise.catch(fn: Function<error: Error>) : Promise<any, Error>
    Get the error of the promise when rejected.
  • Promise.cancel() : Promise<any, Error>
    A running task can be cancelled. The worker executing the task is enforced to terminate immediately. The promise will be rejected with a Promise.CancellationError.
  • Promise.timeout(delay: number) : Promise<any, Error>
    Cancel a running task when it is not resolved or rejected within given delay in milliseconds. The timer will start when the task is actually started, not when the task is created and queued. The worker executing the task is enforced to terminate immediately. The promise will be rejected with a Promise.TimeoutError.

Example usage:

const workerpool = require('workerpool');

function add(a, b) {
  return a + b;
}

const pool1 = workerpool.pool();

// offload a function to a worker
pool1
  .exec(add, [2, 4])
  .then(function (result) {
    console.log(result); // will output 6
  })
  .catch(function (err) {
    console.error(err);
  });

// create a dedicated worker
const pool2 = workerpool.pool(__dirname + '/myWorker.js');

// supposed myWorker.js contains a function 'fibonacci'
pool2
  .exec('fibonacci', [10])
  .then(function (result) {
    console.log(result); // will output 55
  })
  .catch(function (err) {
    console.error(err);
  });

// send a transferable object to the worker
// supposed myWorker.js contains a function 'sum'
const toTransfer = new Uint8Array(2).map((_v, i) => i)
pool2
  .exec('sum', [toTransfer], { transfer: [toTransfer.buffer] })
  .then(function (result) {
    console.log(result); // will output 3
  })
  .catch(function (err) {
    console.error(err);
  });

// create a proxy to myWorker.js
pool2
  .proxy()
  .then(function (myWorker) {
    return myWorker.fibonacci(10);
  })
  .then(function (result) {
    console.log(result); // will output 55
  })
  .catch(function (err) {
    console.error(err);
  });

// create a pool with a specified maximum number of workers
const pool3 = workerpool.pool({ maxWorkers: 7 });

worker

A worker is constructed as:

workerpool.worker([methods: Object<String, Function>] [, options: Object]) : void

Argument methods is optional and can be an object with functions available in the worker. Registered functions will be available via the worker pool.

The following options are available:

  • onTerminate: ([code: number]) => Promise<void> | void. A callback that is called whenever a worker is being terminated. It can be used to release resources that might have been allocated for this specific worker. The difference with pool's onTerminateWorker is that this callback runs in the worker context, while onTerminateWorker is executed on the main thread.

Example usage:

// file myWorker.js
const workerpool = require('workerpool');

function add(a, b) {
  return a + b;
}

function multiply(a, b) {
  return a * b;
}

// create a worker and register functions
workerpool.worker({
  add: add,
  multiply: multiply,
});

Asynchronous results can be handled by returning a Promise from a function in the worker:

// file myWorker.js
const workerpool = require('workerpool');

function timeout(delay) {
  return new Promise(function (resolve, reject) {
    setTimeout(resolve, delay);
  });
}

// create a worker and register functions
workerpool.worker({
  timeout: timeout,
});

Transferable objects can be sent back to the pool using Transfer helper class:

// file myWorker.js
const workerpool = require('workerpool');

function array(size) {
  var array = new Uint8Array(size).map((_v, i) => i);
  return new workerpool.Transfer(array, [array.buffer]);
}

// create a worker and register functions
workerpool.worker({
  array: array,
});

Events

You can send data back from workers to the pool while the task is being executed using the workerEmit function:

workerEmit(payload: any) : unknown

This function only works inside a worker and during a task.

Example:

// file myWorker.js
const workerpool = require('workerpool');

function eventExample(delay) {
  workerpool.workerEmit({
    status: 'in_progress',
  });

  workerpool.workerEmit({
    status: 'complete',
  });

  return true;
}

// create a worker and register functions
workerpool.worker({
  eventExample: eventExample,
});

To receive those events, you can use the on option of the pool exec method:

pool.exec('eventExample', [], {
  on: function (payload) {
    if (payload.status === 'in_progress') {
      console.log('In progress...');
    } else if (payload.status === 'complete') {
      console.log('Done!');
    }
  },
});

Utilities

Following properties are available for convenience:

  • platform: The Javascript platform. Either node or browser
  • isMainThread: Whether the code is running in main thread or not (Workers)
  • cpus: The number of CPUs/cores available

Roadmap

  • Implement functions for parallel processing: map, reduce, forEach, filter, some, every, ...
  • Implement graceful degradation on old browsers not supporting webworkers: fallback to processing tasks in the main application.
  • Implement session support: be able to handle a series of related tasks by a single worker, which can keep a state for the session.

Related libraries

Build

First clone the project from github:

git clone git://github.com/josdejong/workerpool.git
cd workerpool

Install the project dependencies:

npm install

Then, the project can be build by executing the build script via npm:

npm run build

This will build the library workerpool.js and workerpool.min.js from the source files and put them in the folder dist.

Test

To execute tests for the library, install the project dependencies once:

npm install

Then, the tests can be executed:

npm test

To test code coverage of the tests:

npm run coverage

To see the coverage results, open the generated report in your browser:

./coverage/index.html

Publish

  • Describe changes in HISTORY.md
  • Update version in package.json, run npm install to update it in package-lock.json too.
  • Push to Github
  • Deploy to npm via npm publish
  • Add a git tag with the version number like:
    git tag v1.2.3
    git push --tags
    

License

Copyright (C) 2014-2024 Jos de Jong [email protected]

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

workerpool's People

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

workerpool's Issues

npm i workerpool failed

the following error:

memory.cpp
  win_delay_load_hook.cc
..\src\memory.cpp(6): fatal error C1083: Cannot open include file: 'sys/mman.h': No such file or directory [E:\electron_project\TMAPConver
tSDK\x64\node_modules\shmmap\build\shmmap.vcxproj]
gyp ERR! build error
gyp ERR! stack Error: `C:\Program Files (x86)\MSBuild\14.0\bin\msbuild.exe` failed with exit code: 1
gyp ERR! stack     at ChildProcess.onExit (E:\nvm\v8.11.2\node_modules\npm\node_modules\node-gyp\lib\build.js:258:23)
gyp ERR! stack     at emitTwo (events.js:126:13)
gyp ERR! stack     at ChildProcess.emit (events.js:214:7)
gyp ERR! stack     at Process.ChildProcess._handle.onexit (internal/child_process.js:198:12)
gyp ERR! System Windows_NT 10.0.17134
gyp ERR! command "E:\\nodejs\\node.exe" "E:\\nvm\\v8.11.2\\node_modules\\npm\\node_modules\\node-gyp\\bin\\node-gyp.js" "rebuild"
gyp ERR! cwd E:\electron_project\TMAPConvertSDK\x64\node_modules\shmmap
gyp ERR! node -v v8.11.2
gyp ERR! node-gyp -v v3.6.2
gyp ERR! not ok
npm WARN [email protected] No description
npm WARN [email protected] No repository field.
npm WARN optional SKIPPING OPTIONAL DEPENDENCY: [email protected] (node_modules\shmmap):
npm WARN optional SKIPPING OPTIONAL DEPENDENCY: [email protected] install: `node-gyp rebuild`
npm WARN optional SKIPPING OPTIONAL DEPENDENCY: Exit status 1

Better worker termination

It would be nice to have more advanced worker termination options, at least for node.

  • The timeout is a false one. It just causes the promise to reject. I would expect the timeout to actually force kill
  • Currently terminating with worker.kill(). If one wanted to implement some cleanup only way would be to trap SIGTERM. Would be nicer to support a method like {onTermination: myCleanup}, then have the timeout actually kill the process with worker.kill('SIGKILL')

Promise timeout starts too early

If there are more jobs than minWorkers, these are queued as expected. The problem is that the timeout's timer starts as soon as they are pushed into the queue. It would be great if the timer would start when the job is really started that is when is popped out the queue and forked.
One solution could be to pass the timeout value directly in the exec function of pool, then store the timeout value in the promise object and then in _next() start the timer. I've tested this solution and it works

Is streaming to proxy supported?

I've been using a standalone Worker in my Node.js app.

My worker created a zip stream and streamed the chunks via Worker's parentPort.postMessage like so:

zip.generateInternalStream({ type: 'uint8array', compression: 'DEFLATE' }).on('data', (streamData, metadata) => { parentPort.postMessage(streamData)

The main file had:
const worker = new Worker('../../libs/worker/zip.worker.js'); worker.on('message', message => { socket.emit(message)}

Using workerpool, how would one achieve this flow? there's is no documentation regarding the native Worker .on() function.

Thank you,
Haim

request new feature:

Hello,

Would it be possible to add 2 methods :

  • to tag the workers; that could be useful for the next one.

  • to restart / renew a specific workers or proxy to a specific worker

The use case I see is if code change, it could slowly restart each worker to implement the new method

Thank you.

Node child forks can't use the debugger

Hi Jos,

It looks like I can't use node debugger while spawning child processes, because each worker try to use the same port. Example:

Debugger listening on 127.0.0.1:51884
Error: listen EADDRINUSE 127.0.0.1:51884
    at Object.exports._errnoException (util.js:1023:11)
    at exports._exceptionWithHostPort (util.js:1046:20)
    at Agent.Server._listen2 (net.js:1261:14)
    at listen (net.js:1297:10)
    at doListening (net.js:1396:7)
    at _combinedTickCallback (internal/process/next_tick.js:77:11)
    at process._tickCallback (internal/process/next_tick.js:98:9)

It seems that child forks have to be created with the --debugger={port}. My first guess would be to change this line (WorkerHandler.js:79):

    this.worker = node.require('child_process').fork(this.script);

What do you think?

Thanks again for your work, cheers from France

executeAll API request

Is there a way to execute a method on all workers? We are trying to invalidate cached data the workers could have cached.. but we can't call an API that guarantees it will run on each worker. Any help would be greatly appreciated.

Thanks,
Kurt

Send progress/status information from worker to main process

As I understand, there isn't a way for listening/sending messages between worker and executor. I refer to message listeners and sender methods worker.send in worker.js and WorkerHandler.js files. May you give a suggestion how I can establish message communication? Maybe you've already had some thought about it.

P.S. Great package, thank you for the work.

Feauture: 1 pool that manages multiple workers

Currently 1 custom worker results in 1 pool, but in my usecase (a build system) I would prefer to have 1 pool where I could register multiple workers, similarly to how I can currently register multiple methods.

This would prevent an unessary number of sub processes/threads, that typically remain dormant, as we would begin reusing existing pools subproceses/threads.

If this idea sounds reasonable, I will provide a PR. If not, it can be implemented in user-land, but It seems more appropriate as part node-workerpool.

Let me know!

Uses browser mode in Jest

I'm having trouble testing my API outputs in Jest. I always get

WorkerPool: Web workers not supported by the browser

In version 3, I would mock the workerpool/lib/environment module and set it to "node" mode. Version 4 seems to be utilizing webpack and the environment module is no longer exposed. Is there any way to force workerpool to run in "node" mode?

As a workaround, I use global.Window = undefined in my jest setup, but I see it as a hack.

Thank you for this amazing library.

Pool statistics

Is there any way to get the number of used and available workers at a given time?

`worker.send = process.send.bind(process)... TypeError: Cannot read property 'bind' of undefined`

I'm on Windows 10 and this may be due to that (I remember the days when windows was the first thing to get oversight, but alas)...

I'm getting this error running on node 10.15.3:

...node_modules\workerpool\lib\worker.js:42
worker.send = process.send.bind(process);
                               ^
TypeError: Cannot read property 'bind' of undefined

I looked through the other issues and couldn't find anything related. I could just be doing something stupid. It works fine in my Quokka REPL...

Could it be that I'm using another const externalLibrary = require('externalLibrary') in the worker.js file (imported)

Not sure if it's related, but here's the docs on the subject:

https://nodejs.org/api/child_process.html#child_process_options_stdio

issue debugging when more then 1 worker pool exists

Example:

// index.js
'use strict';

const workerpool = require('workerpool');
const pool = workerpool.pool();
const pool2 = workerpool.pool();

const add = (a, b) => a + b;

Promise.all([
  pool.exec(add, [1, 1]),
  pool2.exec(add, [1, 1])
])
  .then(function (result) {
    console.log('result', result); // outputs 7
  })
  .catch(function (err) {
    console.error(err);
  }).then(() => {
    pool.terminate();
    pool2.terminate();
  });

Running this with:

node ./index.js

Worker correctly.

Unfortunately, running this with combined with #58 for better error message output

node --inspect ./index.js

results in:

node --inspect foo.js
Debugger listening on ws://127.0.0.1:9229/194b112a-6b22-4abf-8ad9-4aaa4920475b
For help see https://nodejs.org/en/docs/inspector
Debugger listening on ws://127.0.0.1:43210/a7b46e61-8702-4334-9281-9467bf55c9e5
For help see https://nodejs.org/en/docs/inspector
Starting inspector on 127.0.0.1:43210 failed: address already in use
Error: Workerpool Worker terminated Unexpectedly
    exitCode: `12`
    signalCode: `null`
    workerpool.script: `/Users/spenner/src/josdejong/workerpool/lib/worker.js`
    spawnArgs: `/Users/spenner/src/stefanpenner/dotfiles/.config/node/versions/node-v8.9.4-darwin-x64/bin/node,--inspect=43210,/Users/spenner/src/josdejong/workerpool/lib/worker.js`
    spawnfile: `/Users/spenner/src/stefanpenner/dotfiles/.config/node/versions/node-v8.9.4-darwin-x64/bin/node`
    stdout: `null`
    stderr: `null`

    at ChildProcess.<anonymous> (/Users/spenner/src/josdejong/workerpool/lib/WorkerHandler.js:259:13)
    at emitTwo (events.js:126:13)
    at ChildProcess.emit (events.js:214:7)
    at Process.ChildProcess._handle.onexit (internal/child_process.js:198:12)

As both subprocess's get attempt to bind to the same inspector port --inpect=43210

respawn delay

Is it possible to set a respawn delay for crashed workers?

Typescript class ready

Hi,

I wanted to Offload functions dynamically, but If I do this with a Typescript Object, it breaks with:

TypeError: Converting circular structure to JSON
--> starting at object with constructor 'UserDAO'
| property '_storage' -> object with constructor 'Object'
| property 'username' -> object with constructor 'UserEntity'
| ...
| index 0 -> object with constructor 'DeviceEntity'
--- property 'userDAO' closes the circle
at JSON.stringify ()
at ChildProcess.target._send (internal/child_process.js:778:25)
at ChildProcess.target.send (internal/child_process.js:676:19)

Well I expected that I can just insert my Device Object (class) and It will just pass it to the generated worker. My Code:

function adder (number1, number2, device) {
  console.log(number1, number2, device);

  return number1 + number2;
}

class WorkerService {

  private static instance;

  public pool = workerPool.pool();

  constructor() {
  }
  public executeAdder(device: IDevice): void {
    this.pool.exec(adder, [10, 3, device]).then(result => {
      console.log(result);
    }).catch(err => {
      console.error(err);
    });
  }

}

Hope you can make this working soon
Best Regards

minWorkers is not acting as documented

I have the case where I tried to spin up pool with following options:

const numCPUs = 5
workerpool.pool(path, {
  minWorkers: 'max',
  maxWorkers: numCPUs
}

I expected to see 5 workers spinned up, but I saw 3 instead (machine has 4 CPUs).

It seems that actual implementation is looking at CPU count instead of copying maxWorkers value as implied by the documentation

minWorkers: number | 'max'. 
The minimum number of workers that must be initialized and kept available. 
Setting this to 'max' will create maxWorkers default workers (see below).

(Question) - Offload functions dynamically

Hi, I am not sure if this is the right place to ask this question, but its regarding "Offload functions dynamically".

Does this mean that a worker will be created every time that is called (following Offload functions dynamically example)? So lets say I have 1 person initiate a task that gets called according to Offload functions dynamically example, and while that worker is processing, another person initiates the same task.

Will it spawn another worker while the other is still processing? So if two people say initiate the task, it would spawn two workers to process that data for those two requests? (I'm using this in an express application).

Thank you in advance!

Can't resolve 'child_process'

ERROR in ./node_modules/workerpool/lib/WorkerHandler.js
Module not found: Error: Can't resolve 'child_process' in '/Users/geo/Code/Calcite-proj/calcite-v2/node_modules/workerpool/lib'
 @ ./node_modules/workerpool/lib/WorkerHandler.js 176:83-107 179:81-105
 @ ./node_modules/workerpool/lib/Pool.js
 @ ./node_modules/workerpool/index.js

Trying to get this running client side and running into this error.

Workerpool Worker terminated Unexpectedly exitCode: `1`

Trying to get a dedicated worker to work in Node/Express with a simple example. Constantly getting a ...

Workerpool Worker terminated Unexpectedly exitCode: `1` signalCode: `null` workerpool.script: `path/to/worker/test.worker.js` spawnArgs: `/usr/local/bin/node,--inspect=43210,path/to/worker/test.worker.js` spawnfile: `/usr/local/bin/node` stdout: `null` stderr: `null`

Here is a sample repo of the issue:
https://github.com/stoplion/worker-test

Is there anything I am missing with the setup?

How to import external function to Worker?

Hello!
I am currently in a project that involves:
Electron v4.1.4
Vue v2.6.10
Node v10.11.0
Vuetify v1.5.9
Chromium v69.0.3497.128
V8 v6.9.427.31-electron.0
NPM Packages:

  • ws v6.1.0 (Websocket)
  • pg v7.4.3 (postgreSQL)

Case study:
I need to create a Worker in a separate file or in the same component. "Vue" or ".js" that receives an "asynchronous" function. This function creates a Websocket connection with a file named "server.js" running through "pm2" on the server.

Situation:
Test the "workerpool" package on electron within a .vue component and it works perfectly. However I can not pass an external function to the worker.

The following examples:

    Example 1 -> RUN OK!
   //File - Login.vue
   TEST_WORKER: async function () {
       var workerpool = require ('workerpool');
        var pool = workerpool.pool ();

        function add () {
          let element
          for (let index = 0; index <1000000000; index ++) {
            element = index
          }
          return element
        }
   
        try {
          let x = await pool.exec (add)
          console.log (x)
        catch (error) {
          console.log (error)
        }
      }
  Example 2 -> DON'T RUN!
  File - Login.vue

  TEST_WORKER: async function () {
       var workerpool = require ('workerpool');
        var pool = workerpool.pool ();   
        try {
          let x = await pool.exec (this.EXTERNAL_FUNCTION_WORKER())
          console.log (x)
        catch (error) {
          console.log (error)
        }
      }

EXTERNAL_FUNCTION_WORKER: function () {
          let element = 0
          for (let index = 0; index <1000000000; index ++) {
            element = index
          }
          return element
}

Situation 2:
I can not reference the Worker path.
Example:

 var pool = workerpool.pool ('/ workers / worker.js');

The file is in a top level folder and app does not find, error 404 is generated.
How do I proceed?

appreciate!

Support asynchronous initialization

If a worker uses a dynamic module loader it cannot be used with workerpool directly because the worker might not have registered it's onmessage handler at the time WorkerHandler sends the exec call.

I have implemented support for asynchronous initialization here. If you want this change I can prepare a proper pull request with updated documentation as well.

Thank You.

(Question) - how to import workerpool in es6 module

Our environment is typescript, es6 module and webpack. When I tried to compile my project, i received following webpack errors:

WARNING in ./node_modules/workerpool/lib/environment.js
2:21-28 Critical dependency: require function is used in a way in which dependencies cannot be statically extracted
@ ./node_modules/workerpool/lib/environment.js
@ ./node_modules/workerpool/index.js
@ ./ClientApp/util/Fetcher.ts
@ ./ClientApp/util/index.tsx
@ ./ClientApp/boot-client.tsx

WARNING in ./node_modules/workerpool/lib/WorkerHandler.js
8:21-28 Critical dependency: require function is used in a way in which dependencies cannot be statically extracted
 @ ./node_modules/workerpool/lib/WorkerHandler.js
 @ ./node_modules/workerpool/lib/Pool.js
 @ ./node_modules/workerpool/index.js
 @ ./ClientApp/util/Fetcher.ts
 @ ./ClientApp/util/index.tsx
 @ ./ClientApp/boot-client.tsx

I guess my question is how to import workerpool in es6 module. or is it possible.

Is pool.terminate async?

The documentation treats pool.terminate as synchronous, but in practice (at least under Node.js) it returns a Promise.

It makes sense to me that it would be asynchronous, but I wanted to confirm before updating my own code.

If it is async, I'd be happy to open a PR to make sure the doc matches.

Thanks for an awesome library!

Pool.terminate (true) doesn't clear the task queue

Calling pool.terminate(true) will kill all existing tasks, but the queue will continue to run with the tasks in queue.
It seems this is not the expected behaviour, as pool.terminate() should not left any running task after completion.

Example code

'use strict';

const workerpool = require('workerpool');

/**
 * Test worker function
 * @param {number} nb
 * @return {Promise<number>}
 */
async function workerFunction (nb) {
	await new Promise(resolve => setTimeout(resolve, 500 + (Math.random() * 200)));
	if (nb === 5) {
		console.log('Throwing error', nb);
		throw new Error('Error for 5');
	}
	console.log('Finish', nb);
	return nb;
}

/**
 * Main
 * @return {Promise<void>}
 */
async function main () {
	const todoList = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12];

	const pool = await workerpool.pool({
		maxWorkers: 4
	});

	try {
		await Promise.all(todoList.map(async nb => {
			try {
				await pool.exec(workerFunction, [nb]);
				console.log(`Done: ${nb}`);
			} catch (e) {
				console.error(`Error: ${nb}`);
				throw e;
			}
		}));

		// Terminate gracefully
		await pool.terminate();
	} catch (e) {
		console.error('Global error');
		// Force terminate the pool (no need to continue after an error)
		await pool.terminate(true);
		throw e;
	}
}

main().catch(e => {
	console.error(e);
	process.exitCode = 1;
});

Result

Finish 3
Done: 3
Finish 1
Done: 1
Finish 0
Done: 0
Finish 2
Done: 2
Finish 7
Done: 7
Throwing error 5
Error: 5
Global error
Error: 6
Error: 9
Error: 8
Error: 4
Error: Error for 5
    at AsyncFunction.workerFunction (eval at run (/home/congelli501/dev/blg-diffusion-v2/node_modules/workerpool/dist/worker.js:204:11), <anonymous>:5:9)
Finish 10
Done: 10
Finish 12
Done: 12
Finish 11
Done: 11

--- The node process doesn't terminate as some active handlers still exists ---

Expected output

Finish 2
Done: 2
Finish 3
Done: 3
Finish 1
Done: 1
Finish 0
Done: 0
Finish 7
Done: 7
Finish 6
Done: 6
Finish 4
Done: 4
Throwing error 5
Error: 5
Global error
Error: 8
Error: 9
Error: 10
Error: 11
Error: Error for 5
    at AsyncFunction.workerFunction (eval at run (/home/congelli501/dev/blg-diffusion-v2/node_modules/workerpool/dist/worker.js:204:11), <anonymous>:5:9)

Process finished with exit code 1

Fix

Add the following code to Pool.prototype.terminate

  if (force) {
    // We are about to force will all the workers, clear the task list too
    this.tasks.length = 0;
  }

If this is indeed not the expected behaviour, I can create a merge request.
Otherwise, it should be nice to add a clearQueue() and terminateAndClearQueue() methods for that purpose.
In that case, it could be nice to document that terminate() doesn't clear the queue and the process will continue picking tasks.

Workaround

From user code, always call:

pool.tasks.length = 0;
await pool.terminate(true);

When an immediate force terminate is needed.

Using Proxys to identify worker

Can I use a proxy to identify what a child process is working on?

For instance, Say I have a thread that's doing some machine learning on an array of objects with a particular id that identifies them as coming from a certain set.

There may be several such threads running all training on a different data sets.

if I need to interrupt a running thread due to data changes that occur on the main thread, could I identify it via a proxy method and then run another method on the worker that cancels it?

I've been using threads.js to attempt this, but haven't able to get it to work with that library yet.

inner method calls not working

const workerpool = require('workerpool');
const pool = workerpool.pool();

const job = (n: number): number => {
  return returnsInput(n);
}

function returnsInput (n: number) {
  return n
}

pool.exec(job, [10])
  .then((res: number) => {
    console.log('im done');
    pool.clear();
  });

promise does not get resolved (probably) so I dont get the output "im done"

if I replace return returnsInput(n); with any value instead of function call for example n method gets resolved.

Why so many threads?

I have a workerpool with minWorkers 2 and maxWorkers 2. After offloading some work to the pool I can see in htop that the 2 workers spawned multiple threads. As I understood from doc there should be only 2 workers which will process the incoming work. Is this normal?

PS: I added a screenshot with how htop looks for the workerpool at this link

Export maximum number of workers supported

Exporting the maximum number of workers supported would be a nice addition to workerpool.

Workerpool can queue exec calls if there aren't any workers available but sometimes you want to know that beforehand so that you don't have to pull data in memory (From some external resource) until there are resources to process it.

Also, since Workers are not in Ecmascript specification and there is no cross-platform way to get the maximum number of available workers workerpool might as well provide this feature.

Implemented here.

passing Parameters to workers

It would be cool if you could pass parameters to the workers.
For example beeing able to increase the stack-size would be nice.
Also always passing the parent parameters is a problem, if you are using an IDE like VS Code, because it uses a debug port, which is blocked by the main process, when the worker try to open it (so the workers crash).

Feature: pool events

I don't know if we can access it today, but I would like to get pool events such as workerCreated, workerDestroy.

Error when loading via AMD in the browser

The code doesn't seem to work in browser (Tested in Firefox):

HTML:

<html>
  <head>
    <meta http-equiv="Content-Type" content="text/plain; charset=utf-8"></meta>
    <script src="node_modules/requirejs/require.js" data-main="test"></script>
  </head>
  <body>
  </body>
</html>

Javascript:

define(['node_modules/workerpool/dist/workerpool'], function(workerpool) {

  'use strict';

  var pool = workerpool.pool();

  return pool.exec(function(value) {
    return value + 1;
  }, [1]).then(function(results) {

    console.log(results);
    pool.clear();

  });

});

Error: ReferenceError: require is not defined

pool.exec not compatible with ES6 class syntax

I am using ES6 class syntax. I have initialised my pool object inside the class constructor.
But when i am trying to dynamically offload the function with below syntax :-

this.pool.exec(getUserInside,[userId,apiKey,behanceUrl]); it is giving me error getUserInside not defined. when i replaced it with below synxtax:-

this.pool.exec(this.getUserInside,[userId,apiKey,behanceUrl]); it is again giving me different error.

But when i replaced everything in my code with old syntax without class structure everything worked fine.

Is workerpool not compatible with ES6 syntax

Child processes not killed when the parent process is killed

This is an issue that is relevant to workerpool, but it's not clear whether it should be the responsibility of the module or not.

When the parent process dies ungracefully, none of the spawned child processes are killed. For my particular use-case, I want to keep around a pool of long-running processes to perform work. If the parent process is killed with, say, SIGTERM, the child processes continue to run. When using a process management system like upstart, it's entirely possible that the parent process could be respawned, creating a whole new generation of worker processes in addition to the orphaned processes.

Here is a script that demonstrates the issue:

#! /usr/bin/env node

const workerpool = require('workerpool');
const count = 4;
const pool = workerpool.pool({
  minWorkers: count,
  maxWorkers: count,
});

// This is meant to represent a long-lived process
const work = (id) => {
  const workForever = (id) => {
    console.log(`Worker ${id} alive and kicking...`);

    return new Promise((resolve) => {
      setTimeout(resolve, 1000);
    }).then(() => workForever(id));
  };

  return workForever(id);
};

const ids = Array.from(Array(count).keys());
const promises = ids.map((id) => pool.exec(work, [id]))

Promise.all(promises)
  .then(() => console.log('Done with all workers'));

// After dispatching all the child processes, simulate the parent process getting
// killed by a user, out-of-memory error, etc.
process.kill(process.pid, 'SIGKILL');

While most signals could be caught with process.on to then explicitly terminate the pool, SIGKILL does not provide such an opportunity. After some googling, I found a solution that may be of help to others who encounter similar situations. Apparently the 'disconnect' event available in process is triggered when the parent process dies, thus making it possible to have the child processes self-terminate. The updated work function then becomes:

const work = (id) => {
  process.on('disconnect', () => {
    process.exit(0);
  });

  const workForever = (id) => {
    console.log(`Worker ${id} alive and kicking...`);

    return new Promise((resolve) => {
      setTimeout(resolve, 1000);
    }).then(() => workForever(id));
  };

  return workForever(id);
};

Node just got worker_threads

Hey, we have been using workerpool for some time to help parallelize ember-cli builds. Thanks for a great project!

Anyways, I wanted to share that node just added experimental support for worker_threads: nodejs/node#20876 And this most likely could be of benefit to this project.

I'll likely kick the tires a bit, if it turns out good would their be interest in a PR providing support for those who opt-in?

Support window.navigator.hardwareConcurrency

Currently, if maxWorkers is not defined, the maximum number of workers is the number of CPU's minus one expect for browsers for which the number defaults to 3.

I propose window.navigator.hardwareConcurrency is to be used in browsers instead. This property is not yet available in all browsers so the previous functionality should be used as a fallback:

var numCPUs = (environment == 'browser') ? window.navigator.hardwareConcurrency || 4 : node.require('os').cpus().length;
this.maxWorkers = Math.max(numCPUs - 1, 1);

Support for Typescript?

I wonder if there is a roadmap to create a wrapper or something to be able to support typescript?

Currently, I am doing the following:

## main.ts
private readonly workersPool: WorkerPool = pool(
    `${__dirname}/worker.js`
  );

workersPool.exec('hello', ['world'])

## worker.js 

const path = require('path');

require('ts-node').register();
require(path.resolve(__dirname, './worker.ts'));


## worker.ts

import { worker } from 'workerpool';

worker({
  hello: (message: string) => `hello ${message}`,
});

Worker child process title

Is there a way of specifying the name of the child processes spawned by the pool? I want them to have different titles from the parent process.

Possible to import libraries into Worker?

Is it possible to import libraries into a Worker?

Worker

import memoize from 'fast-memoize';
import parseLine from './parse_line';
const parseLineMemoized = memoize(parseLine);

export default function parseLines(linesStr, varTable) {
  const lines = linesStr.split(/\n/);

  return lines.map((lineText, lineNum) => {
    return {
      lineNum,
      lineText: parseLineMemoized(lineText, varTable),
    };
  });
}

Using the worker

    pool.exec(parseLines, [linesStr, varTable])
      .then((result) => {
      })
      .catch((err) => {
        console.log(err)
      });

Getting a parseLineMemoized is not defined at eval

Support binary data

Web workers can pass binary data, but this doesn't work in workerpool.

the stdio option for child processes was added a bit ago, and allows creation of the ipc channel, which can then be leveraged for sending binary data.

additionally there's the new cluster native module which automates a lot of this.

i'd love the workerpool API available on top of either one of these approaches so i can have a robust solution that works on both web and node.

(Question) Using workerpool with browserify

Would like to know if workerpool is expected to be used with browserify? I have created a pool of dedicated workers by pointing at a script in another directory (../workers/compressionWorker.js). I build by running browserify and the browser said cannot post to localhost:3000/src/compressionWorker.js

It seems that the directory parameter is passed down directly during browserifying and the browser is referencing the relative path directly.

Implement parallel Map & Reduce

Hi Team ,

We are using workerpool for executing certain functions for an array of items.

As of current implementation, the full array is passed onto all the workers for processing.
Ideally we would want a map & reduce functionality where the array is distributed among the workers in chunks and the workers upon completion return the worked upon array.

Any direction on helping achieve that would be great. We can open for any contributions to the code if reqd.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.