Git Product home page Git Product logo

reactphp-child-process-pool's Introduction

Pool wyrihaximus/react-child-process-messenger processes

Linux Build Status Latest Stable Version Total Downloads Code Coverage License PHP 7 ready

Installation

To install via Composer, use the command below, it will automatically detect the latest version and bind it with ~.

composer require wyrihaximus/react-child-process-pool

Pools

  • Dummy - Meant for testing, doesn't do anything but complies to it's contract
  • Fixed - Spawns a given fixed amount of workers
  • Flexible - Spawns workers as a needed basis, given a minimum and maximum it will spawn within those values

Usage

This package pools wyrihaximus/react-child-process-messenger, for basic messaging please see that package for details how to use it.

Creating a pool

This package ships with a set factories, which create different pools. (All the options in the following examples are the default options.)

Dummy

Creates a Dummy pool:

$loop = EventLoopFactory::create();

Dummy::createFromClass(ReturnChild::class, $loop)->then(function (PoolInterface $pool) {
  // Now you have a Dummy pool, which does absolutely nothing
});

Fixed

Creates a Fixed pool:

$loop = EventLoopFactory::create();
$options = [
    Options::SIZE => 5,
];
Fixed::createFromClass(ReturnChild::class, $loop, $options)->then(function (PoolInterface $pool) {
    // You now have a pull with 5 always running child processes 
});

Flexible

Creates a Flexible pool:

$loop = EventLoopFactory::create();
$options = [
    Options::MIN_SIZE => 0,
    Options::MAX_SIZE => 5,
    Options::TTL      => 0,
];
Flexible::createFromClass(ReturnChild::class, $loop, $options)->then(function (PoolInterface $pool) {
    // You now have a pool that spawns no child processes on start.
    // But when you call rpc a new child process will be started for 
    // as long as the pool has work in the queue. With a maximum of five.
});

CpuCoreCountFixed

Creates a Fixed pool with size set to the number of CPU cores:

$loop = EventLoopFactory::create();

CpuCoreCountFlexible::createFromClass(ReturnChild::class, $loop)->then(function (PoolInterface $pool) {
    // You now have a Fixed pool with a child process assigned to each CPU core.
});

CpuCoreCountFlexible

The following example will creates a flexible pool with max size set to the number of CPU cores. Where the create method requires you to give it a React\ChildProcess\Process. The createFromClass lets you pass a classname of a class implementing WyriHaximus\React\ChildProcess\Messenger\ChildInterface that will be used as the worker in the client. Take a look at WyriHaximus\React\ChildProcess\Messenger\ReturnChild to see how that works.

$loop = EventLoopFactory::create();

CpuCoreCountFlexible::createFromClass(ReturnChild::class, $loop)->then(function (PoolInterface $pool) {
    // You now have a Fixed pool with a child process assigned to each CPU core,
    // which, just like the Flexible pool, will only run when there is something
    // in the queue.
});

License

Copyright 2017 Cees-Jan Kiewiet

Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

Contributors beyond the commit log

  • Gabi Davila - Helping test if my github token will be secure for pull requests on AppVeyor

reactphp-child-process-pool's People

Contributors

bartvanhoutte avatar billionaire avatar dependabot-preview[bot] avatar dependabot-support avatar dependabot[bot] avatar djuki avatar gabidavila avatar lucasnetau avatar tianjianjiang avatar wyrihaximus avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar

reactphp-child-process-pool's Issues

Upgrade AppVeyor config

Simple copy & paste issue for hacktoberfest.

The easiest way to do this is to edit the appveyor.yml file in this repo and create a pull request. Then go back to your fork and then use the Create new file button in the right just above the file listing.

Initializing workers with data from parent process

Is there a way to send data once to a process after it has been started, but before it gets work from the queue? This way i could reduce the overhead i'll have to send to the worker every time i've got stuff to do.

My use case is to calculate distances from ~20 million coordinates against a set of ~1000. My current solution is the send batches (e.g. 5000) combined with the other 1k in a rpc-call. But this, of course, results in me managing a buffer myself, until i have those 5k coordinates ready, and 20% communcation/parsing overhead due to the repeating set of 1k coordinates.

Another solution would be for the workers to fetch the 1k coordinates on startup themselves from an independend resource, but that doesnt sounds correct either.

Exception Given class doesn't implement ChildInterface

Hello, when i use Flexible::createFromClass i have got error. - Given class doesn't implement ChildInterface. Here my code:

Flexible::createFromClass(NewTestChild::class, $loop, $options) ->then(function (PoolInterface $pool) { try { $pool->rpc(MessagesFactory::rpc('return', [ 'i' => 1, 'time' => time() ]))->then(function (Payload $payload) use ($pool) { echo $payload['i'], PHP_EOL; echo $payload['time'], PHP_EOL; }); } catch (\Exception $e) { var_dump($e->getMessage()); } }); $loop->run();

My NewTestChild looks like:

`<?php

namespace childs\child;

use React\EventLoop\LoopInterface;
use WyriHaximus\React\ChildProcess\Messenger\ChildInterface;
use WyriHaximus\React\ChildProcess\Messenger\Messages\Payload;
use WyriHaximus\React\ChildProcess\Messenger\Messenger;

class NewTestChild implements ChildInterface
{

/**
 * @param Messenger     $messenger
 * @param LoopInterface $loop
 */
protected function __construct(Messenger $messenger, LoopInterface $loop)
{
    $messenger->registerRpc('return', function (Payload $payload) {
        return \React\Promise\resolve($payload);
    });
    $this->ran = true;
}

/**
 * @param Messenger     $messenger
 * @param LoopInterface $loop
 */
public static function create(Messenger $messenger, LoopInterface $loop)
{
    new static($messenger, $loop);
}

}`

I debuged this code and exception throws here:
on line 52
if (!\is_subclass_of($class, 'WyriHaximus\React\ChildProcess\Messenger\ChildInterface')) { throw new \Exception('Given class doesn\'t implement ChildInterface'); }

But my class NewTestChild implements ChildInterface
Maybe someone knows what the proble may be?

FlexiblePool ttl closes under MIN_SIZE workers

Hi, I have found out, that Pool\Flexible with TTL closes processes under the count of Options:MIN_SIZE.
Example:

$options = [
	Options::MIN_SIZE	=> 1,
	Options::MAX_SIZE	=> 5,
	Options::TTL		=> 10,		// seconds
];
Flexible::create(new Process('exec php ...'), $loop, $options);

Pool starts with 1 worker, but after 10 seconds (Options::TTL) of inactivity, worker is closed and there is 0 workers at that time. Problem can be in function ttl, but I do not understand the logic of this function to make PR.

Release 1.7.0 breaks React\Filesystem\ChildProcess

Release 1.7.0 breaks React\Filesystem\ChildProcess with

PHP Fatal error: Declaration of React\Filesystem\ChildProcess\Process::create(WyriHaximus\React\ChildProcess\Messenger\Messenger $messenger, React\EventLoop\LoopInterface $loop) must be compatible with WyriHaximus\React\ChildProcess\Messenger\ChildInterface::create(WyriHaximus\React\ChildProcess\Messenger\Messenger $messenger, React\EventLoop\LoopInterface $loop): void in /vendor/react/filesystem/src/ChildProcess/Process.php on line 18

library's composer.json recommends "wyrihaximus/react-child-process-messenger": "^3 || ^2.10.1" but version 3 is incompatible with the definition in React Filesystem (no return typehint void).

Workaround is to fix the version of wyrihaximus/react-child-process-messenger to ^2.10.1 in projects composer.json

Perhaps this 1.7.0 needs to be re-released as a version 2.0.0 so that it isn't automatically resolved as a dependancy from React Filesystem's composer.json "wyrihaximus/react-child-process-pool": "^1.3"

Hello PHP 8 !

Hello,

Can you have a plan to php 8 support ?

I see you use wyrihaximus/react-child-process-messenger but it use cakephp without support of php 8. :(

Tank's for considering this issue, because one of my project use lot of async task and i plan to use php 8.

How to handle errors from child process?

I get following error: Communication with process stopped unexpectedly when listening $pool->on('error), but i do not know where in my child process error occures - how can i handle such errors?

Message not sent to parent before closing child

On my case, messages are not sent from child to parent. As if they were ignored, they are not sent, even at the end, when the connection close.

I understand that if my code is blocking, the message can't be send immediately, and must wait for $loop to do an iteration. But it should send the message-s, at least at the end, before shutting down. (I think it is the case in your code. So I must miss something stupid :-/ )

For example, this does not trigger a message interception on my parent:

final class ProcessJob implements ChildInterface {
	public static function create(Messenger $messenger, LoopInterface $loop) {
		$messenger->registerRpc('process', function (Payload $payload) use ($messenger, $loop) {
			$messenger->message(MessagesFactory::message(["Test"]));

			$stopAt = time() + mt_rand(1, 2);
			do {
				// Do nothing
			} while ($stopAt >= time());

			return React\Promise\resolve(['status' => 'ok']);
		});
	}
}

On my parent, I intercept messages like this:

$this->pool->on('message', function (Payload $payload) {
	echo "PARENT: message\n";
	var_dump($payload);
});

Am I missing something?

Here is the full example I use for my tests.

go.php (launcher)

<?php
error_reporting(E_ALL);
require_once __DIR__.'/../../../../vendor/autoload.php';
require_once 'ping.php';

$loop = \React\EventLoop\Factory::create();
$ping = new Ping($loop);
$ping->go();
$loop->run();
echo "Bye, bye\n";

ping.php (the parent)

<?php
declare(strict_types=1);
require_once 'pong.php';

use WyriHaximus\React\ChildProcess\Messenger\Messages\Factory as MessagesFactory;
use WyriHaximus\React\ChildProcess\Pool\PoolInterface;
use WyriHaximus\React\ChildProcess\Pool\Factory\Flexible;
use WyriHaximus\React\ChildProcess\Pool\Options;
use WyriHaximus\React\ChildProcess\Messenger\Messages\Payload;
use React\EventLoop\LoopInterface;
use React\EventLoop\TimerInterface;

class Ping {
	/** @var LoopInterface		React Loop */
	protected $loop;

	/** @var  PoolInterface		Pool of process */
	protected $pool;

	/** @var TimerInterface		Periodic timer (proccess things each tick) */
	protected $timer;

	/** @var int				Just to limit number of operations in this example */
	protected $item;

	/**
	 * @param LoopInterface $loop		React loop
	 */
	public function __construct(LoopInterface $loop) {
		$this->loop = $loop;
		$this->pool = null;
		$this->timer = null;
		$this->item = 0;
	}

	/**
	 * Doing something in sub-processes
	 */
	public function go(): void {
		if ($this->pool !== null)
			throw new \Exception('Only one instance, please');

		// Create a pool
		$options = [
		    Options::MIN_SIZE => 0,
    		Options::MAX_SIZE => 2,
    		Options::TTL      => 0,
		];
		Flexible::createFromClass(ProcessJob::class, $this->loop, $options)
			->then(
				function (PoolInterface $pool) {
					$this->pool = $pool;
					echo "PARENT: ready\n";

					// Handle error reported by the child
					$this->pool->on('error', function (\Error $e) {
						echo "PARENT: error\n";
						var_dump($e);
					});

					// Handle message reported by the child
					$this->pool->on('message', function (Payload $payload) {
						echo "PARENT: message\n";
						var_dump($payload);
					});

					//Do things periodically
					$this->timer = $this->loop->addPeriodicTimer(0.5, function() {
						// Limit things to do (for this example)
						$item = ++$this->item;
						if ($item > 3) {
							$this->loop->cancelTimer($this->timer);
							$this->pool->terminate();
							return;
						}

						// Process something long in sub-process
						echo "PARENT: rpc(process:$item)\n";
						$this->pool->rpc(
							MessagesFactory::rpc('process', [
								'job'	=> ['foo' => 'bar'], 	#serialize($job),
								'item'	=> $item
							])
						)->then(
							function (Payload $payload) use ($item) {
								echo "PARENT: then(process:$item). Status = ".$payload['status']."\n";
							}
						);
					});
				}
			);
	}
}

pong.php (the child)

<?php
declare(strict_types=1);
error_reporting(E_ALL);

use React\EventLoop\LoopInterface;
use WyriHaximus\React\ChildProcess\Messenger\ChildInterface;
use WyriHaximus\React\ChildProcess\Messenger\Messenger;
use WyriHaximus\React\ChildProcess\Messenger\Messages\Payload;
use WyriHaximus\React\ChildProcess\Messenger\Messages\Factory as MessagesFactory;


final class ProcessJob implements ChildInterface {

	public static function create(Messenger $messenger, LoopInterface $loop) {

		$messenger->registerRpc('process', function (Payload $payload) use ($messenger, $loop) {
			$item = $payload['item'];
			echo "CHILD: rpc:process:".$payload['item']."\n";
			$messenger->message(MessagesFactory::message(["Test (".$payload['item'].")"]));

    		$stopAt = time() + mt_rand(1, 2);
			do {
				// Do nothing
			} while ($stopAt >= time());

			return React\Promise\resolve(['status' => 'ok']);
		});
	}
}

Result:

PARENT: ready
PARENT: rpc(process:1)
PARENT: rpc(process:2)
PARENT: rpc(process:3)
CHILD: rpc:process:1
CHILD: rpc:process:2
PARENT: then(process:1). Status = ok
PARENT: then(process:2). Status = ok
CHILD: rpc:process:3
PARENT: then(process:3). Status = ok
Bye, bye

For this example, I hacked your child-process.php (booo) to load pong.php:

require_once '../../../../servers/LPDSpooler/_test_/child-process/pong.php');

MessageFactory doesn't seem to resolve

Hi Cees-Jan,

I'm trying to get the pool running, but something isn't adding up for me. This is the class I'm calling:

<?php

namespace Database;

use React\EventLoop\LoopInterface;
use WyriHaximus\React\ChildProcess\Messenger\ChildInterface;
use WyriHaximus\React\ChildProcess\Messenger\Messenger;
use WyriHaximus\React\ChildProcess\Messenger\Messages\Payload;
use function React\Promise\resolve;

class PhalconSQL implements ChildInterface
{
    private $phql;

    public static function create(Messenger $messenger, LoopInterface $loop)
    {
        $messenger->registerRpc('executeQuery', function (Payload $payload) {
            return resolve([
                'response' => 'works'
            ]);
        });
    }
}

Here's how I'm calling the rpc:

                    $loop = EventLoopFactory::create();
                    CpuCoreCountFlexible::createFromClass(PhalconSQL::class, $loop)->then(function (PoolInterface $pool) {
                        echo 'tack' . PHP_EOL;

                        $pool->rpc(
                            MessageFactory::rpc('executeQuery')
                        )->then(function (Payload $result){
                            echo $result['response'] . PHP_EOL;
                            $pool->terminate();
                        });

                        echo 'tock' . PHP_EOL;

                    });
                    $loop->run();

What should I change to make it all resolve?

Thanks so much!
Maarten

Exit() of child process not counted

I have found out, that if child process exited by function exit() with return code 0, it is not handled as process termination in flexible pool.
The reason is in code reactphp-child-process-messenger/src/Factory.php#L211, where for exitCode zero is not thrown error

if ($exitCode === 0) {
   return;
}

If child process calls exit(), Flexible pool will still count this child, although process is terminated.

There is question, how to resolve this. I have a few ideas:

  1. Is there important to ignore exitCode === 0 in Messenger?
  2. Try to catch exit event on React\ChildProcess\Process
  3. Add to documentation, it is important to Terminate child process with non-zero exit codes to keep Flexible pool working.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.