thephpleague / pipeline Goto Github PK
View Code? Open in Web Editor NEWLeague\Pipeline
Home Page: https://pipeline.thephpleague.com
License: MIT License
League\Pipeline
Home Page: https://pipeline.thephpleague.com
License: MIT License
Hi,
I'm currently stuck on PHP 5.4 and wanted to use this package. The changes to make the package PHP 5.4 compatible were quite trivial, if you want to support 5.4 I can wrap up a PR if you like. If not, I understand as PHP 5.4 is near EOL.
In the PipelineBuilder.php, the build method says it returns an PipelineBuilderInterface, when it actually returns a Pipeline.
Hi!
I suggest to add variadic variable to __invoke() method.
Example usages (DTO - Data Transfer Object):
$pipeline = (new Pipeline)
->pipe(function ($payload, DTO $dto) {
$dto['ten'] = 10;
return $payload * 2;
})
->pipe(function ($payload, DTO $dto) {
return $payload + $dto['key'];
});
$pipeline->process(5, new DTO);
//returns 20
process method has ...$params
//https://github.com/thephpleague/pipeline/blob/master/src/Pipeline.php
/**
* Process the payload.
* @param $payload
* @return mixed
*/
public function process($payload, ...$params)
{
foreach ($this->stages as $stage) {
$payload = $stage($payload, ...$params);
}
return $payload;
}
My example (old code, before update): https://github.com/Roquie/pipeline/commits/master
Note: PHP 5.6 required.
What do you think about it?
I feel that every part of this library deserves to be Interfaced so that individual applications can choose to implement unique functionality. Hence, I propose interfacing the PipelineBuilder, I'm willing to open a PR myself, but I want to get a sense of what the maintainer's feeling on acceptance would be.
What are you using to generate compile your docs
?
Sometimes based on some condition, a pipeline needs to be forked, meaning there is a need to follow one of several disparate paths which have nothing in common. Optionally, later the disjoint paths may join again.
$pipeline = (new Pipeline)
->pipe(new TimeTwoStage)
->pipe(new AddOneStage)
->fork(function($payload) {
if($payload == 0) return "zero";
if($payload < 0) return "-";
if($payload > 0) return "+";
return false; // for short-circuit
})
->disjoin("zero", $zeroProcessingPipeline)
->disjoin("-", $negativeNumberProcessingPipeline)
->disjoin("+", $positiveNumberProcessingPipeline)
->join()
->pipe(new DivideByFiveStage);
I think will be more beautiful and usable
$pipeline = (new Pipeline)->pipe(function () {
throw new LogicException();
});
$pipeline->process($payload,[$this,'exceptionResolver']);
or
$pipeline = (new Pipeline($errorCallback))->pipe(function () {
throw new LogicException();
})->process($payload);
Instead of external try..catch wrapper
try {
$pipeline->process($payload);
} catch(\Exception|\Throwable $e) {
$this->errorHandler($e);
}
I noticed when you pipe, you must do:
$pipeline = $pipeline->pipe(WHATEVER);
as the pipe clones the old pipeline and returns the cloned pipeline.
Now if we have 100 pipes (just an example)
isn't it better to simply return the same pipeline and not wasting memory
on never used old pipelines that were cloned....
Is it possible to support generators? This would result in much lower memory usage.
Example: When you extract rows of a CSV file in one stage function and later process the data in another stage with current implementation you would have to have all data in memory:
function ($payload) {
$lines = [];
if (($handle = fopen("test.csv", "r")) !== FALSE) {
while (($data = fgetcsv($handle, 1000, ",")) !== FALSE) {
$lines[] = $data;
}
}
return $data;
}
It would be much better to use generators here:
function ($payload) {
if (($handle = fopen("test.csv", "r")) !== FALSE) {
while (($data = fgetcsv($handle, 1000, ",")) !== FALSE) {
yield $data;
}
}
}
I got inspired by https://github.com/dantleech/p-meter/blob/master/lib/Pipeline.php.
Hello,
I am curious of how pipeline suggests we handle errors in our processors? I think the documentation should cover this case, even if it's just "wrap your calls to the pipeline in your own try/catches" :)
Hi first let me thank you for this code, seems like this is exactly what I was looking for!
I just wanted to implement some kind of dependency for the individual stages of a pipeline with some extensive try catch error handling but then stumbled across the InterruptibleProcessor.php
and I was wondering if there might be a better way to do that, but I am not sure how to use the $check callable - do I understand the code right that it allows for one $check callable for each pipeline and not for one $check for each $stage?
I just want to let pipelines stop or skip or trigger a different pipeline based on the results of one stage.
Also I am wondering: what would be the best way to build fallback chains? Something like "if stage_x fails, try again with $stage_y" or "if $stage_x produces result x skip to stage_z"?
I am just experimenting to learn if it might make sense to transform an existing large if-then-else monstrosity into a few pipelines and do not immediately see how to implement such kind of fallbacks. Also of course I would like to avoid writing lots of try catch error handling code - the result should be less code than the if-then-else horror I would like to replace... :)
If you could find the time for providing a little example how you would implement such a thing, it would be a great help! Thanks for your attention!
I would like to mention a criticism of recent decisions on this awesome package, and request input from others.
The recent addition of the additional processors I feel was kind of messy compared to the resistance to bloat this project has taken in the recent past. I think the addition of the Processor interface was a fantastic idea; however, I feel the implementation was overreaching. The InterruptibleProcessor should have been an implementation detail of someone's project utilizing pipelines. Taking this approach would allow the library to resist the bloat of trying to predict/satisfy every users desired implementation. I hate BC breaks, but this library is less than 1.0, is this something the maintainers would consider allowing modification of?
To go along with this, since the FingersCrossedProcessor is the default Processor and the one we have all loved so much that we have been using this package Pre-1.0, doesn't it seem more fitting that its named with the respect it deserves? Such as Processor or DefaultProcessor.
Of course I mean no disrespect to the author of the changes, because the code was thoughtful and well written.
I will happily be providing more of my input in the PRs on this project in the future because I am passionate about the potential of this library, but I am certainly interested in other people's thoughts on this issue.
When an exception is thrown inside a stage process this causes a warning from array_reduce.
"Warning: array_reduce(): An error occurred while invoking the reduction callback in ..."
For instance, the following example (taken from the Readme):
$pipeline = (new Pipeline)
->pipe(CallableStage::forCallable(function () {
throw new LogicException();
}));
try {
$pipeline->process($payload);
} catch(LogicException $e) {
// Handle the exception.
}
^ Results in a Warning being thrown:
Warning: array_reduce(): An error occurred while invoking the reduction callback in /var/www/pipeline/src/Pipeline.php on line 52
I found a PHP bug ticket that seems to be related to this: https://bugs.php.net/bug.php?id=55416 that appear to be fixed in PHP 7, I tested it with 3v4l.org: https://3v4l.org/amBuY
Would it be possible to change from array_reduce and use a for loop or something else in the pipeline process to avoid getting these warnings?
When using a pipeline builder, the return value of the builder is a pipeline interface.
While the actual class that is returned (Pipeline
class) includes the process
method referenced in the docs, the actual interface does not include this method.
Seeing as this is probably a critical part of the pipeline, and should be included in the interface, I propose we add the process
method to the interface, to make other implementations more standard if there were to be any, and also to help with IDE auto completion.
Pipelines are great for data processing. However, there may be cases where the data fed into the pipeline is invalid, causing any stage to fail. That means there can be quite a few pipeline stages that we loaded, configured, et cetera, that are not going to be called. We can optimize performance in these cases by lazily initializing pipeline stages.
Instead of coming up with some bespoke interface to do so, we can instead delegate this to an existing PSR-11 container implementation. PSR-11 can be considered quite mature at this point, and seems like a good match.
So, instead of doing:
$pipeline = (new PipelineBuilder())
->add($container->get(MyFirstStage::class))
->add($container->get(MySecondStage::class))
->add(function ($payload) {
return $payload * 10;
})
->add($container->get(MyFourthStage::class))
->build();
// Every stage has now gone through initialization
We might have something like:
$pipeline = (new ContainerAwarePipelineBuilder($container))
->add(MyFirstStage::class)
->add(MySecondStage::class)
->add(function ($payload) {
// Adding callable stages directly still works
return $payload * 10;
})
->add(MyFourthStage::class)
->build();
// Stages from the container will not be initialized at this point,
// they will be initialized when the stage is invoked
As mentioned; lazy loading can do a lot for performance in larger applications. This idea came up because in my application I have a data processing pipeline with various stages that can fail. There are also (class based) stages that interact with a remote database, use configuration files, etc, which are expensive to initialize.
The cleanest way to write these stages would usually be a simple class where dependencies are passed to the constructor and initialization like preparing SQL statements, parsing a configuration file, etc are done in the constructor as well. Then the __invoke()
method is ready to just do its work.
However, that setup is expensive: not only the initialization that happens within the stage itself, but also the dependencies the stage depends upon need already be resolved. For example, if a stage depends on a PDO object to do it's database work, we need to already set up a connection to the database.
That means that if the pipeline is processing some payload that fails during the very first stage (i.e. a validation step fails), we already have done the expensive initialization for all the stages that follow it but that are never going to be invoked.
(A currently possible workaround is passing a container instance into the stages and have them lazily load their dependencies and do setup lazily whenever the stage is first invoked. This adds a lot of code complexity to the stages, and passing a container around like that is a bit of an anti-pattern. Solving this within the Pipeline abstraction would generally make for much nicer code.)
callable|string
: if it's a callable, it is used directly as a stage. If it's a non-callable string, then it's used as a key to retrieve the stage from the container.ContainerAwarePipeline(Builder)
as in my example above). We would still need to widen the callable
type used in the interfaces.I'd be happy to do the initial work and make a pull request, if the Pipeline maintainers are interested to have this kind of functionality added.
Hi,
I noticed I start pretty much any stageinterface I create with the same bits of code:
public function process($payload)
{
if ($this->isValidPayload($payload)) {
// the actual code
}
throw new InvalidPayloadException('Payload must be a ...');
}
/**
* Checks if the payload is valid
*/
private function isValidPayload($payload)
{
// logic to check if payload is an array, instanceof something or whatever
}
If this is common I'd create a PR to have an abstract stageinterface providing this base functionality. Let me know if this is something you think is valuable for the project.
Regards,
Ron
When porting Pipeline to League\Uri package, I came across a bug with the following code.
public function process($payload)
{
foreach ($this->stages as $stage) {
$payload = $stage($payload);
}
return $payload;
}
As it turned there's a PHP bug entry for that and Evert Pot wrote a blog post about it too ๐
What's sad is that it won't be correctly fix in PHP until this RFC passes ๐ .
So the current workaround is to use call_user_func
or call_user_func_array
instead like this:
public function process($payload)
{
foreach ($this->stages as $stage) {
$payload = call_user_func($stage, $payload);
}
return $payload;
}
Why do you want to clone the current object when you execute the pipe method?
I have been puzzled about this question for a long time, thanks for the explanation
Is there any proof links to the publications of a famous OOP evangelists?
Hi,
I think it would be nice to have a short circuiting mechanism that would allow to skip the following stages.
This would allow to decide at runtime when to stop with the pipeline processing. It would be a more general approach than #13
I am thinking to something along these lines:
Pipeline
constructorpublic function __construct(array $stages = [], callable $shortCircuiting = null)
process
methodforeach ($this->stages as $stage) {
$payload = $reducer($payload, $stage);
if ($this->shortCircuiting($payload)) {
continue;
}
}
What do you think? Could it be something useful?
windows is ok?
The concept of a pipe sounds very good but I was wondering if we can attach listeners for each stage
For example, we can have like PielineListenerInteface
interface PipelineListenerInterface
{
function before($context, $stageName);
function onError($context, $stageName, $exception);
}
Then we can easily log all the stages instead of catching all the pipelines.
It will be very cool if we can register the listener on project level (not on a pipeline).
Hi guys, can anyone please help me a little bit?
I can't get my head around the PipelineBuilder class. I don't understand when I should use it?
The Pipeline object itself can be extended and reused at anytime, so I don't quite understand when I should use the Pipeline or the PipelineBuilder.
Thanks for your help!
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.