Git Product home page Git Product logo

pipeline's Introduction

League\Pipeline

Author Maintainer Build Status Coverage Status Quality Score Software License Packagist Version Total Downloads

This package provides a pipeline pattern implementation.

Pipeline Pattern

The pipeline pattern allows you to easily compose sequential stages by chaining stages.

In this particular implementation the interface consists of two parts:

  • StageInterface
  • PipelineInterface

A pipeline consists of zero, one, or multiple stages. A pipeline can process a payload. During the processing the payload will be passed to the first stage. From that moment on the resulting value is passed on from stage to stage.

In the simplest form, the execution chain can be represented as a foreach:

$result = $payload;

foreach ($stages as $stage) {
    $result = $stage($result);
}

return $result;

Effectively this is the same as:

$result = $stage3($stage2($stage1($payload)));

Immutability

Pipelines are implemented as immutable stage chains. When you pipe a new stage, a new pipeline will be created with the added stage. This makes pipelines easy to reuse, and minimizes side-effects.

Usage

Operations in a pipeline, stages, can be anything that satisfies the callable type-hint. So closures and anything that's invokable is good.

$pipeline = (new Pipeline)->pipe(function ($payload) {
    return $payload * 10;
});

Class based stages.

Class based stages are also possible. The StageInterface can be implemented which ensures you have the correct method signature for the __invoke method.

use League\Pipeline\Pipeline;
use League\Pipeline\StageInterface;

class TimesTwoStage implements StageInterface
{
    public function __invoke($payload)
    {
        return $payload * 2;
    }
}

class AddOneStage implements StageInterface
{
    public function __invoke($payload)
    {
        return $payload + 1;
    }
}

$pipeline = (new Pipeline)
    ->pipe(new TimesTwoStage)
    ->pipe(new AddOneStage);

// Returns 21
$pipeline->process(10);

Re-usable Pipelines

Because the PipelineInterface is an extension of the StageInterface pipelines can be re-used as stages. This creates a highly composable model to create complex execution patterns while keeping the cognitive load low.

For example, if we'd want to compose a pipeline to process API calls, we'd create something along these lines:

$processApiRequest = (new Pipeline)
    ->pipe(new ExecuteHttpRequest) // 2
    ->pipe(new ParseJsonResponse); // 3
    
$pipeline = (new Pipeline)
    ->pipe(new ConvertToPsr7Request) // 1
    ->pipe($processApiRequest) // (2,3)
    ->pipe(new ConvertToResponseDto); // 4 
    
$pipeline->process(new DeleteBlogPost($postId));

Pipeline Builders

Because pipelines themselves are immutable, pipeline builders are introduced to facilitate distributed composition of a pipeline.

The pipeline builders collect stages and allow you to create a pipeline at any given time.

use League\Pipeline\PipelineBuilder;

// Prepare the builder
$pipelineBuilder = (new PipelineBuilder)
    ->add(new LogicalStage)
    ->add(new AnotherStage)
    ->add(new LastStage);

// Build the pipeline
$pipeline = $pipelineBuilder->build();

Exception handling

This package is completely transparent when dealing with exceptions. In no case will this package catch an exception or silence an error. Exceptions should be dealt with on a per-case basis. Either inside a stage or at the time the pipeline processes a payload.

$pipeline = (new Pipeline)->pipe(function () {
    throw new LogicException();
});
    
try {
    $pipeline->process($payload);
} catch(LogicException $e) {
    // Handle the exception.
}

pipeline's People

Contributors

alexandrubau avatar dannyvankooten avatar duncan3dc avatar frankdejonge avatar hannesvdvreken avatar jamesdb avatar jblotus avatar leoruhland avatar mstnorris avatar nyamsprod avatar pascalheidmann-check24 avatar raphaelstolt avatar rishipuri avatar shadowhand avatar spiliot avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

pipeline's Issues

quick question on the pipe methos

I noticed when you pipe, you must do:

$pipeline = $pipeline->pipe(WHATEVER);

as the pipe clones the old pipeline and returns the cloned pipeline.

Now if we have 100 pipes (just an example)

isn't it better to simply return the same pipeline and not wasting memory
on never used old pipelines that were cloned....

Pipeline should use call_user_func

When porting Pipeline to League\Uri package, I came across a bug with the following code.

    public function process($payload)
    {
        foreach ($this->stages as $stage) {
            $payload = $stage($payload);
        }
        return $payload;
    }

As it turned there's a PHP bug entry for that and Evert Pot wrote a blog post about it too ๐Ÿ‘

What's sad is that it won't be correctly fix in PHP until this RFC passes ๐Ÿ‘Ž .

So the current workaround is to use call_user_func or call_user_func_array instead like this:

    public function process($payload)
    {
        foreach ($this->stages as $stage) {
            $payload = call_user_func($stage, $payload);
        }
        return $payload;
    }

Add PipelineBuilderInterface

I feel that every part of this library deserves to be Interfaced so that individual applications can choose to implement unique functionality. Hence, I propose interfacing the PipelineBuilder, I'm willing to open a PR myself, but I want to get a sense of what the maintainer's feeling on acceptance would be.

Adding short circuiting mechanism

Hi,

I think it would be nice to have a short circuiting mechanism that would allow to skip the following stages.

This would allow to decide at runtime when to stop with the pipeline processing. It would be a more general approach than #13

I am thinking to something along these lines:

  • add an optional parameter to the Pipeline constructor
public function __construct(array $stages = [], callable $shortCircuiting = null)
  • add a check in the process method
foreach ($this->stages as $stage) {
    $payload = $reducer($payload, $stage);
    if ($this->shortCircuiting($payload)) {
        continue;
    }
}

What do you think? Could it be something useful?

PipelineInterface does not have a `process` method

When using a pipeline builder, the return value of the builder is a pipeline interface.

While the actual class that is returned (Pipeline class) includes the process method referenced in the docs, the actual interface does not include this method.

Seeing as this is probably a critical part of the pipeline, and should be included in the interface, I propose we add the process method to the interface, to make other implementations more standard if there were to be any, and also to help with IDE auto completion.

[Feature Request] Attach a listener to a pipeline

The concept of a pipe sounds very good but I was wondering if we can attach listeners for each stage

For example, we can have like PielineListenerInteface

interface PipelineListenerInterface
{
    function before($context, $stageName);

    function onError($context, $stageName, $exception);
}

Then we can easily log all the stages instead of catching all the pipelines.

It will be very cool if we can register the listener on project level (not on a pipeline).

Adding validation of payload

Hi,

I noticed I start pretty much any stageinterface I create with the same bits of code:

public function process($payload)
{
    if ($this->isValidPayload($payload)) {
        // the actual code
    }

    throw new InvalidPayloadException('Payload must be a ...');
}

 /**
  * Checks if the payload is valid
  */
private function isValidPayload($payload)
{
    // logic to check if payload is an array, instanceof something or whatever
}

If this is common I'd create a PR to have an abstract stageinterface providing this base functionality. Let me know if this is something you think is valuable for the project.

Regards,
Ron

PHP Warning occur when an exception is thrown inside a stage process

When an exception is thrown inside a stage process this causes a warning from array_reduce.
"Warning: array_reduce(): An error occurred while invoking the reduction callback in ..."

For instance, the following example (taken from the Readme):

$pipeline = (new Pipeline)
    ->pipe(CallableStage::forCallable(function () {
        throw new LogicException();
    }));

try {
    $pipeline->process($payload);
} catch(LogicException $e) {
    // Handle the exception.
}

^ Results in a Warning being thrown:
Warning: array_reduce(): An error occurred while invoking the reduction callback in /var/www/pipeline/src/Pipeline.php on line 52

I found a PHP bug ticket that seems to be related to this: https://bugs.php.net/bug.php?id=55416 that appear to be fixed in PHP 7, I tested it with 3v4l.org: https://3v4l.org/amBuY

Would it be possible to change from array_reduce and use a for loop or something else in the pipeline process to avoid getting these warnings?

PHP 5.4 compatibility

Hi,

I'm currently stuck on PHP 5.4 and wanted to use this package. The changes to make the package PHP 5.4 compatible were quite trivial, if you want to support 5.4 I can wrap up a PR if you like. If not, I understand as PHP 5.4 is near EOL.

[Feature Request] Fork-Join

Sometimes based on some condition, a pipeline needs to be forked, meaning there is a need to follow one of several disparate paths which have nothing in common. Optionally, later the disjoint paths may join again.

$pipeline = (new Pipeline)
    ->pipe(new TimeTwoStage)
    ->pipe(new AddOneStage)
    ->fork(function($payload) {
            if($payload == 0) return "zero";
            if($payload < 0) return "-";
            if($payload > 0) return "+";
            return false; // for short-circuit
        })
        ->disjoin("zero", $zeroProcessingPipeline)
        ->disjoin("-", $negativeNumberProcessingPipeline)
        ->disjoin("+", $positiveNumberProcessingPipeline)
        ->join()
    ->pipe(new DivideByFiveStage);

Generator Processor

Is it possible to support generators? This would result in much lower memory usage.
Example: When you extract rows of a CSV file in one stage function and later process the data in another stage with current implementation you would have to have all data in memory:

function ($payload) {
  $lines = [];
  if (($handle = fopen("test.csv", "r")) !== FALSE) {
     while (($data = fgetcsv($handle, 1000, ",")) !== FALSE) {
        $lines[] = $data;
     }
  }
  return $data;
}

It would be much better to use generators here:

function ($payload) {
  if (($handle = fopen("test.csv", "r")) !== FALSE) {
     while (($data = fgetcsv($handle, 1000, ",")) !== FALSE) {
        yield $data;
     }
  }
}

I got inspired by https://github.com/dantleech/p-meter/blob/master/lib/Pipeline.php.

[feature request] Inject errorHandler into processor (optional)

I think will be more beautiful and usable

 $pipeline = (new Pipeline)->pipe(function () {
    throw new LogicException();
 });
 $pipeline->process($payload,[$this,'exceptionResolver']);

or

   $pipeline = (new Pipeline($errorCallback))->pipe(function () {
        throw new LogicException();
   })->process($payload);

Instead of external try..catch wrapper

try {
    $pipeline->process($payload);
} catch(\Exception|\Throwable $e) {
   $this->errorHandler($e);
}

How to use $check in InterruptibleProcessor.php?

Hi first let me thank you for this code, seems like this is exactly what I was looking for!
I just wanted to implement some kind of dependency for the individual stages of a pipeline with some extensive try catch error handling but then stumbled across the InterruptibleProcessor.php and I was wondering if there might be a better way to do that, but I am not sure how to use the $check callable - do I understand the code right that it allows for one $check callable for each pipeline and not for one $check for each $stage?

I just want to let pipelines stop or skip or trigger a different pipeline based on the results of one stage.

Also I am wondering: what would be the best way to build fallback chains? Something like "if stage_x fails, try again with $stage_y" or "if $stage_x produces result x skip to stage_z"?

I am just experimenting to learn if it might make sense to transform an existing large if-then-else monstrosity into a few pipelines and do not immediately see how to implement such kind of fallbacks. Also of course I would like to avoid writing lots of try catch error handling code - the result should be less code than the if-then-else horror I would like to replace... :)

If you could find the time for providing a little example how you would implement such a thing, it would be a great help! Thanks for your attention!

Error handling

Hello,

I am curious of how pipeline suggests we handle errors in our processors? I think the documentation should cover this case, even if it's just "wrap your calls to the pipeline in your own try/catches" :)

Remove InterruptibleProcessor and Rename FingersCrossedProcessor

I would like to mention a criticism of recent decisions on this awesome package, and request input from others.

The recent addition of the additional processors I feel was kind of messy compared to the resistance to bloat this project has taken in the recent past. I think the addition of the Processor interface was a fantastic idea; however, I feel the implementation was overreaching. The InterruptibleProcessor should have been an implementation detail of someone's project utilizing pipelines. Taking this approach would allow the library to resist the bloat of trying to predict/satisfy every users desired implementation. I hate BC breaks, but this library is less than 1.0, is this something the maintainers would consider allowing modification of?

To go along with this, since the FingersCrossedProcessor is the default Processor and the one we have all loved so much that we have been using this package Pre-1.0, doesn't it seem more fitting that its named with the respect it deserves? Such as Processor or DefaultProcessor.

Of course I mean no disrespect to the author of the changes, because the code was thoughtful and well written.

I will happily be providing more of my input in the PRs on this project in the future because I am passionate about the potential of this library, but I am certainly interested in other people's thoughts on this issue.

[Feature Request] Lazily retrieve pipeline stages from PSR-11 container

Description

Pipelines are great for data processing. However, there may be cases where the data fed into the pipeline is invalid, causing any stage to fail. That means there can be quite a few pipeline stages that we loaded, configured, et cetera, that are not going to be called. We can optimize performance in these cases by lazily initializing pipeline stages.

Instead of coming up with some bespoke interface to do so, we can instead delegate this to an existing PSR-11 container implementation. PSR-11 can be considered quite mature at this point, and seems like a good match.

So, instead of doing:

$pipeline = (new PipelineBuilder())
    ->add($container->get(MyFirstStage::class))
    ->add($container->get(MySecondStage::class))
    ->add(function ($payload) {
        return $payload * 10;
    })
    ->add($container->get(MyFourthStage::class))
    ->build();
// Every stage has now gone through initialization

We might have something like:

$pipeline = (new ContainerAwarePipelineBuilder($container))
    ->add(MyFirstStage::class)
    ->add(MySecondStage::class)
    ->add(function ($payload) {
        // Adding callable stages directly still works
        return $payload * 10;
    })
    ->add(MyFourthStage::class)
    ->build();
// Stages from the container will not be initialized at this point,
// they will be initialized when the stage is invoked

What problem does this solve

As mentioned; lazy loading can do a lot for performance in larger applications. This idea came up because in my application I have a data processing pipeline with various stages that can fail. There are also (class based) stages that interact with a remote database, use configuration files, etc, which are expensive to initialize.

The cleanest way to write these stages would usually be a simple class where dependencies are passed to the constructor and initialization like preparing SQL statements, parsing a configuration file, etc are done in the constructor as well. Then the __invoke() method is ready to just do its work.

However, that setup is expensive: not only the initialization that happens within the stage itself, but also the dependencies the stage depends upon need already be resolved. For example, if a stage depends on a PDO object to do it's database work, we need to already set up a connection to the database.

That means that if the pipeline is processing some payload that fails during the very first stage (i.e. a validation step fails), we already have done the expensive initialization for all the stages that follow it but that are never going to be invoked.

(A currently possible workaround is passing a container instance into the stages and have them lazily load their dependencies and do setup lazily whenever the stage is first invoked. This adds a lot of code complexity to the stages, and passing a container around like that is a bit of an anti-pattern. Solving this within the Pipeline abstraction would generally make for much nicer code.)

Brainstorm: If we implement this, how?

  • We can probably support both already ready to go stages and stages that need to be fetched from the container with a single interface: We would widen the types allowable for a stage to callable|string: if it's a callable, it is used directly as a stage. If it's a non-callable string, then it's used as a key to retrieve the stage from the container.
  • Alternatively, we add this functionality to a separate class (for example ContainerAwarePipeline(Builder) as in my example above). We would still need to widen the callable type used in the interfaces.
  • For those interacting with the classes/interfaces widening the types wouldn't be a problem, but for those implementing custom classes based on them that would be a BC break. So yes, this would need a new major version.
  • In documentation and examples, we can use the excellent League Container, which itself already provides documentation on how to do lazy-loading.
  • Another design choice: Do we retrieve the stages once and cache them? Or do we delegate that to the container as well, and just retrieve the stage every time we need it? Probably the latter, but might be worth discussing.

I'd be happy to do the initial work and make a pull request, if the Pipeline maintainers are interested to have this kind of functionality added.

Add variadic variable to __invoke

Hi!
I suggest to add variadic variable to __invoke() method.

Example usages (DTO - Data Transfer Object):

$pipeline = (new Pipeline)
->pipe(function ($payload, DTO $dto) {
    $dto['ten'] = 10;
    return $payload * 2;
})
->pipe(function ($payload, DTO $dto) {
    return $payload + $dto['key'];
});

$pipeline->process(5, new DTO);
//returns 20

process method has ...$params

//https://github.com/thephpleague/pipeline/blob/master/src/Pipeline.php
/**
     * Process the payload.
     * @param $payload
     * @return mixed
     */
    public function process($payload, ...$params)
    {
        foreach ($this->stages as $stage) {
            $payload = $stage($payload, ...$params);
        }
        return $payload;
    }

My example (old code, before update): https://github.com/Roquie/pipeline/commits/master

Note: PHP 5.6 required.
What do you think about it?

When to use a PipelineBuilder?

Hi guys, can anyone please help me a little bit?

I can't get my head around the PipelineBuilder class. I don't understand when I should use it?
The Pipeline object itself can be extended and reused at anytime, so I don't quite understand when I should use the Pipeline or the PipelineBuilder.

Thanks for your help!

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.