Git Product home page Git Product logo

pipes-1's Introduction

Build status Build Status codecov CodeFactor NuGet

Pipes

Realmar.Pipes is a library to compose modular and chainable pipes using small reusable units called processors. Processors are statically typed, therefore the need to cast objects to their actual type disappears.

var pipe = new Pipe<double>();
pipe.FirstConnector
    .Connect(x => x * x)
    .Connect(x => 8.Equals(x))
    .Finish(VerifyCallback);

Getting Started

Prerequisites

This library currently targets netstandard2.0 (.NET Core 2.0 or .NET Framework 4.6.1), net461, net46, net452, net45 and net40.

Mono is not targeted yet.

Installing

Realmar.Pipes is available on nuget.org. It can be installed using following command from within Visual Studio:

PM> Install-Package Realmar.Pipes

Remarks

This library is CLS compilant which means that it is language independent. (eg. you may use it in C#, VB.NET, IronPython, etc.)

SemVer is used for versioning. Currently Realmar.Pipes has only prerelease versions. (0.y.z)

Usage

Overview

This section describes the architecture of Realmar.Pipes.

The idea is that a pipe is constructed using small reusable components called processors. Data is given to the pipe which then processes it using the composed processors.

           ┌─Pipe─────────────────────────────────┐
Input Data ⇒ [Processor]→[Processor]→[Processor] ⇒ Transformed Data
           └──────────────────────────────────────┘

A processor is the smallest composable unit and is designed to do exactly one transformation on the data. This results in processors being very modular.

Pipes are the next bigger unit and can also be connected to each other. Either by trivially giving the transformed data to the next pipe or by using a more complex construct called a pipe connector. The 'ConditionalPipeConnector' is such a connector, which, based on a predicate, decides to which pipe it should give the processed data.

     ┌─Pipe─────────────────────┐ ┌─Connector─┐  True:  [ Pipe_True ]
Data ⇒ [Processor]→[Processor]  ⇒ Predicate  ⇒
     └──────────────────────────┘ └───────────┘  False: [ Pipe_False ]

Simple Pipe

Pipes can be easily composed using delegates:

var pipe = new Pipe<double>();
pipe.FirstConnector
    .Connect(x => x * 2)
    .Connect(x => $"{x} is your number!")
    .Connect(x => x.ToUpperInvariant())
    .Finish(Console.WriteLine);

pipe.Process(2);

// prints:
// 4 IS YOUR NUMBER!

Use delegates if the computations are simple. For actions which are more complex and may require state implement IPipeProcessor to create your own processor. This process will be described in the following sections.

Processors

A processor is responsible for performing a single transformation on the input data and return it.

Let's create a new processor which converts a string to a primitive type. This is done by implementing the IPipeProcessor<TIn, TOut> interface:

public class ParseStringProcessor<TOut> : IPipeProcessor<string, TOut>
{
    public TOut Process(string data)
    {
        var tc = TypeDescriptor.GetConverter(typeof(TOut));
        var obj = tc.ConvertFromInvariantString(null, data);

        return (TOut)obj;
    }
}

Pipes

A pipe is responsible for combining multiple processors into one processing pipe line. Additionally, a pipe uses a processing strategy which defines how the data is processed. (later more)

The processor defined above is used for this example:

// Instantiate a pipe which takes data of type string as input
var pipe = new Pipe<string>();

pipe.FirstConnector
    .Connect(new ParseStringProcessor<int>())
    .Finish(Console.WriteLine);     // print to console


pipe.Process(new List<string> { "1", "2" });

// it is also possible to just give one item to the pipe
pipe.Process("1");

Using pseudo processors it is possible to illustrate the usage of a pipe better:

// create a pipe which takes a list of objects of type TIn as input
var pipe new Pipe<TIn>();
    .Connect(new Processor<TIn, T1>())
    .Connect(new Processor<T1, T2>())
    .Finish(Action<IList<T2>>);

IList<TIn> data ...;
pipe.Process(data);

The delegate which is given to 'Finish' is the callback invoked after the data has been processed by all processors in the specified pipe.

More examples can be found in the tests.

Variance

Processors are variant:

// TIn is contravariant
// TOut is covariant
IPipeProcessor<in TIn, out TOut>

This allows a processor to take input which is "less derived" than the output data of the previous processor: (Note: TOut does not need to be covariant to achieve this behavior. The reason why TOut is covariant is convenience.)

class Base { }
class Derived : Base { }

// ---

// pipe takes in a list of objects of type `Derived`
var pipe = new Pipe<Derived>();

pipe.FirstConnector
    // because TIn of IPipeProcessor is contravariant
    // it is possible for this processors to take objects
    // of type `Base` as input:
    //
    // Derived --> Base
    .Connect(new Processor<Base, Base>())
    // Base --> object
    .Connect(new CastProcessor<object, Derived>())      // cast object to Derived
    // Derived --> Base
    .Connect(new Processor<Base, object>())
    .Finish(results => { });

pipe.Process(new Derived());

Note that this is not possible with value types (eg. int --> object) because value types are not polymorphic to object. (They are boxed instead)

Process Strategies

Process strategies define how the data is processed:

// SerialProcessStrategy is the default when using the parameterless constructor
// of Pipe<TIn>. This strategy processes each data in the list after each other.
var pipe = new Pipe<object>(new SerialProcessStrategy());
        Serial
[data] ┌Strategy─┐
[data] ⇒ data   ⇒ [data][data][data]
[data] └─────────┘
// Process data in parallel.
// Each element of the list will be processed using the
// Parallel facility of .NET. All threads are synchronized
// at the end, and further processing is done in the thread
// where the processing was started.
var pipe = new Pipe<object>(new ParallelProcessStrategy());
         Parallel
       ┌─Strategy───┐
       |    data    |
[data] |   ╱    ╲   | [data]
[data] ⇒ ━ data  ━ ⇒ [data]
[data] |   ╲    ╱   | [data]
       |    data    |
       └────────────┘
// Use a threadpool to process the data.
// It is advised to use a NonBlockingPipe when using the ThreadPoolProcessStrategy
// as this processing strategy will not block when processing the data. (in contrast
// to the ParallelProcessStrategy which blocks until all data is processed)
// A NonBlockingPipe is optimized to work with such a strategy.
var pipe = new NonBlockingPipe<object>(new ThreadPoolProcessStrategy());
         ThreadPool
       ┌─Strategy───┐
       |    data ━━ ⇒ [data]
[data] ||
[data] ⇒ ━ data ━━ ⇒ [data]
[data] ||
       |    data ━━ ⇒ [data]
       └────────────┘

Non-Blocking Pipe

By combining processors into a pipe one might see the resemblance to Unix pipes '|'. In such a scenario the chained programs produce output before all input is processed. Meaning that they take a stream of data as input and give the transformed stream as output.

Using 'NonBlockingPipe's, 'Realmar.Pipes' is able provides this functionality too!

It is possible to give a 'NonBlockingPipe' data continuously so that the caller is not blocked.

var pipe = new NonBlockingPipe<string>();

pipe.FirstConnector
    .Connect(new ParseStringProcessor<int>())
    .Finish(Console.WriteLine);

// will not block
pipe.Process(new List<string>{ "1", "2" });

// will not block
pipe.Process(new List<string>{ "3", "4"});

The default processing strategy of the 'NonBlockingPipe' is the 'ThreadPoolProcessStrategy'.

It is important to note that the 'NonBlockingPipe' uses a worker thread to manage the input data. This means that even when using the 'SerialProcessStrategy' the caller will not be blocked and the data is processed in a separate thread. However, the pipe will not be able to process additional data before it processed previously given data. (Because it needs to wait for the 'SerialProcessStrategy' to finish before it is able to give it more data.) The same applies to the 'ParallelProcessStrategy'.

Pipe Connectors

Pipe Connectors are used to pass processed data from one pipe to other pipes.

This example is taken out of the tests. (Testcase: Process_ConditionalPipeConnector) The goal is to multiply a number until it is bigger or equal than 20, then append a string.

var mathPipe = new Pipe<double>();
var stringPipe = new Pipe<double>();

//                                                   false     true        predicate
var connector = new ConditionalPipeConnector<double>(mathPipe, stringPipe, x => x < 20);

mathPipe.FirstConnector
    .Connect(x => x * 2)
    .Finish(connector.Process);

stringPipe.FirstConnector
    .Connect(x => $"{x} is your number!")
    .Finish(results => Console.WriteLine);

mathPipe.Process(new List<double> { 1, 2, 3, 4, 5, 6 });

Running the Tests

Change directory into test project and restore dependencies:

$ cd Realmar.Pipes.Tests
$ dotnet restore

Run the tests:

$ dotnet xunit

Further Work / Ideas

  • Add more processors which are useful!
  • Distributed pipe connectors which allow to send the processed data to another computer for further processing

License

This project is licensed under the MIT License - see the LICENSE file for details.

pipes-1's People

Contributors

realmar avatar

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.