Git Product home page Git Product logo

rx's Introduction

Reactive Extensions for D Programming Language

Dub version MIT License Build Status codecov

Overview

This is a library like Rx.NET for asynchronous or event based programs, based on the concept of OutputRange.

The operators' name is based on std.algorithm and ReactiveX.

Example

import rx;
import std.conv : to;
import std.range : iota, put;

void main()
{
    // create own source of int
    auto subject = new SubjectObject!int;

    // define result array
    string[] result;

    // define pipeline and subscribe
    // sequence: source -> filter by even -> map to string -> join to result
    auto disposable = subject.filter!(n => n % 2 == 0).map!(o => to!string(o))
        .doSubscribe!(text => result ~= text);

    // set unsubscribe on exit
    // it is not necessary in this simple example,
    // but in many cases you should call dispose to prevent memory leaks.
    scope (exit)
        disposable.dispose();

    // put values to source. 
    put(subject, iota(10));

    // result is like this
    assert(result == ["0", "2", "4", "6", "8"]);
}

And more examples or Documents

Usage

Setting dependencies in dub.json

{
    ...
    "dependencies": {
        "rx": "~>0.10.0"
    }
}

or dub.sdl

dependency "rx" version="~>0.10.0"

Concepts

Basic interfaces

All operators are written using template and struct for optimization. this example is a binary interface like std.range.interfaces.

//module rx.disposable
interface Disposable
{
    void dispose();
}

//module rx.observer
interface Observer(E) : OutputRange!E
{
    //void put(E obj); //inherits from OutputRange!E
    void completed();
    void failure(Exception e);
}

//module rx.observable
interface Observable(E)
{
    alias ElementType = E;
    Disposable subscribe(Observer!E observer);
}

Supported Compilers

Supported compilers are dmd and ldc that latest 3 versions.

License

This library is under the MIT License.
Some code is borrowed from Rx.NET.

Contributing

Issue and PullRequest are welcome! ๐Ÿ˜ƒ

Refer to CONTRIBUTING.md for details.

Development

Build and unittest

git clone https://github.com/lempiji/rx
cd rx
dub test

Update documents

Use https://github.com/adamdruppe/adrdox

Future work

  • generic observable factory
    • create, start, timer, interval
  • more subjects
    • publish, connectable
  • more algorithms
    • window, multicast
  • more test
  • more documents

rx's People

Contributors

ghost91- avatar lempiji avatar tanitta avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar

rx's Issues

Speed of message distribution?

Let's assume I have many steams (500) & observables (10000). The steams filter a couple of external message streams into more specific ones. How does rx scale? Are there are some optimizations to only call the necessary streams/observables that are involved?

I'm wondering where the limit of such a framework is, when it just takes to long to route messages etc. around.

Commented examples / Basic Docs

I think what you have created is pretty cool and way under-recognized in the D community.

The code base is not easy to understand (at least not for a casual D user) and the examples are not self-explanatory.

How about creating a minimal documentation, comment the examples etc.? I'm open to help but need some guidance too first.

makeObserver in a loop / why does it use static struct?

makeObserver uses a static struct AnonymouseObserver{...}which, from my understanding, means that there is only one instance of this struct per thread. Hence, when I do this:

foreach(thing; myThings){
 myMasterThing.subscribe(
  makeObserver(
   (float f) {thing.myFunc(f);},
   (Exception) {}
  )
 );
}

I get a lof of observers, but they all point to the same function of the last thing in the loop.

But I would like to have an observer per thing. How to do this?

Return type of filter!(...)

Subject!int sub = new SubjectObject!int;
auto filtered = sub.filter!(n => n % 2 == 0);

I can get my code to work using auto and filter! but I'm wondering what the explicit return type of filter!(...) is? The type suggested is FilterObservable, but this one can't be used in my code.

Subject!int sub = new SubjectObject!int;
FilterObservable filtered = sub.filter!(n => n % 2 == 0);

This gives: Error: undefined identifier FilterObservable

How to get reference to underlying object/function of an observer?

When using observers, is there a way to access the underlying object implementing the observer?

class myClass {...}
myClass mc = new myClass;
auto observer1 = makeObserver(&mc.myPut);

How can I access/get back mc via observer1?

Since there are several different ways to get an observer (makeObserver which returns an AnonymousObserver struct, inheritance, observerObject) how can I get the pointer/reference to the underlying object/function implementing the functions?

struct TestObserver  {
  void put(int n) { }
  void put(Object obj) { }
}
Observer!int observer = observerObject!int(TestObserver());

How to access the pointer created by TestObserver() call?

The use case is: An instance of myClass can have many observers. These observers are handled around and subscribed to streams etc. Sometimes I need to get back the object an observer belongs to call some functions etc.

Observable pipeline question

How can I create an observable pipeline that runs continuously. It'll wait for input, process it and keep running. Wait for another input, process and keep waiting until and tell it to stop? Can figure the API

RX fails with DMD 2.089

rx 0.11.0: building configuration "default"...
C:\D\dmd2\windows\bin\..\..\src\druntime\import\core\atomic.d(293,64): Error: template core.internal.atomic.atomicCompareExchangeStrongNoResult cannot deduce function from argument types !(cast(MemoryOrder)5, cast(MemoryOrder)5)(Disposable*, Disposable, Disposable), candidates are:
C:\D\dmd2\windows\bin\..\..\src\druntime\import\core\internal\atomic.d(479,6):        atomicCompareExchangeStrongNoResult(MemoryOrder succ = MemoryOrder.seq, MemoryOrder fail = MemoryOrder.seq, T)(T* dest, const T compare, T value)
  with succ = succ,
       fail = fail,
       T = rx.disposable.Disposable
  must satisfy the following constraint:
       CanCAS!T
C:\D\dmd2\windows\bin\..\..\src\druntime\import\core\atomic.d(313,28): Error: template instance core.atomic.cas!(cast(MemoryOrder)5, cast(MemoryOrder)5, Disposable, Disposable, Disposable) error instantiating
C:\Users\robby\AppData\Local\dub\packages\rx-0.11.0\rx\source\rx\disposable.d(342,17):        instantiated from here: cas!(cast(MemoryOrder)5, cast(MemoryOrder)5, Disposable, shared(Disposable), shared(Disposable))
C:\D\dmd2\windows\bin\..\..\src\druntime\import\core\atomic.d(293,64): Error: template core.internal.atomic.atomicCompareExchangeStrongNoResult cannot deduce function from argument types !(cast(MemoryOrder)5, cast(MemoryOrder)5)(Disposable*, Disposable, Disposable), candidates are:
C:\D\dmd2\windows\bin\..\..\src\druntime\import\core\internal\atomic.d(479,6):        atomicCompareExchangeStrongNoResult(MemoryOrder succ = MemoryOrder.seq, MemoryOrder fail = MemoryOrder.seq, T)(T* dest, const T compare, T value)
  with succ = succ,
       fail = fail,
       T = rx.disposable.Disposable
  must satisfy the following constraint:
       CanCAS!T
C:\D\dmd2\windows\bin\..\..\src\druntime\import\core\atomic.d(313,28): Error: template instance core.atomic.cas!(cast(MemoryOrder)5, cast(MemoryOrder)5, Disposable, Disposable, Cancelable) error instantiating
C:\Users\robby\AppData\Local\dub\packages\rx-0.11.0\rx\source\rx\util.d(47,16):        instantiated from here: cas!(cast(MemoryOrder)5, cast(MemoryOrder)5, Disposable, shared(Disposable), shared(Cancelable))
C:\Users\robby\AppData\Local\dub\packages\rx-0.11.0\rx\source\rx\disposable.d(359,47):        instantiated from here: exchange!(Disposable, shared(Cancelable))
C:\D\dmd2\windows\bin\..\..\src\druntime\import\core\atomic.d(99,46): Error: template core.internal.atomic.atomicLoad cannot deduce function from argument types !(cast(MemoryOrder)5)(shared(Disposable)*), candidates are:
C:\D\dmd2\windows\bin\..\..\src\druntime\import\core\internal\atomic.d(50,10):        atomicLoad(MemoryOrder order = MemoryOrder.seq, T)(inout(T)* src)
  with order = ms,
       T = shared(Disposable)
  must satisfy the following constraint:
       CanCAS!T
C:\Users\robby\AppData\Local\dub\packages\rx-0.11.0\rx\source\rx\util.d(48,22): Error: template instance core.atomic.atomicLoad!(cast(MemoryOrder)5, Disposable) error instantiating
C:\Users\robby\AppData\Local\dub\packages\rx-0.11.0\rx\source\rx\disposable.d(359,47):        instantiated from here: exchange!(Disposable, shared(Cancelable))
C:\D\dmd2\windows\bin\dmd.exe failed with exit code 1.

Thread model questions

Am I right that rx uses threads?

Is every observer running in its own thread? What other threads do exists?

CompositeObserver / several times the same observer

It seems that CompositeObserver accepts the same observer several times by appending it several times. When removing an observer, only the first instance found is removed (because of using .countUntil().

IMO this should be streamlined and made more consistent: Either avoid adding the same observer several times or when removing the observer remove all instances, not only the first one, by using .count() instead of .countUntil()

CompositeObserver.remove() returning Observer?

When I have:

CompositeObserver!E  myCO;

How am I supposed to remove an observer from it?

myCO = myCO.remove(observer);

doesn't work because of an implicit conversion error. Why isn't .remove() returning a CompositeObserver!E? If there is only one observer left, it could still be a CompositeObserver!E with just one element.

Filter subscribers

I have a sequence that takes (x,y) coordinates. Now I want to notify only subscribes which pass a hit-test for this coordinate. How can this be done?

All algorithms are working against the event streams but not against the subscribers.

subscribeOn is broken with TaskPoolScheduler and ThreadScheduler

The following simple program crashes with a segmentation fault:

import std.stdio : writeln;

import rx;

void main()
{
    auto data = [1, 2, 3, 4];

    data.asObservable().subscribeOn(new ThreadScheduler).doSubscribe((int a){});
}

Backtrace:

#0  0x00007fffffffe520 in ?? ()
#1  0x0000000000449899 in std.range.primitives.doPut!(void(int) pure nothrow @nogc @safe function, int).doPut(ref void(int) pure nothrow @nogc @safe function, ref int) (e=@0x7ffff56dcc10: 6914160, r=@0x7fffffffe378: 0x0) at /usr/include/dlang/dmd/std/range/primitives.d:226
#2  0x0000000000449878 in std.range.primitives.put!(void(int) pure nothrow @nogc @safe function, int).put(ref void(int) pure nothrow @nogc @safe function, int) (e=6914160, r=@0x7fffffffe378: 0x0) at /usr/include/dlang/dmd/std/range/primitives.d:304
#3  0x0000000000449957 in std.range.primitives.put!(void(int) pure nothrow @nogc @safe function, int[]).put(ref void(int) pure nothrow @nogc @safe function, int[]) (e=..., r=@0x7fffffffe378: 0x0) at /usr/include/dlang/dmd/std/range/primitives.d:346
#4  0x0000000000449903 in rx.observable.from!(int[]).from(ref int[]).FromObservable.subscribe!(void(int) pure nothrow @nogc @safe function).subscribe(ref void(int) pure nothrow @nogc @safe function) (this=..., observer=@0x7fffffffe378: 0x0) at ../../source/rx/observable.d:606
#5  0x0000000000449858 in rx.observable.doSubscribe!(rx.observable.from!(int[]).from(ref int[]).FromObservable, void(int) pure nothrow @nogc @safe function).doSubscribe(ref rx.observable.from!(int[]).from(ref int[]).FromObservable, ref void(int) pure nothrow @nogc @safe function) (observer=@0x7fffffffe378: 0x0, observable=...) at ../../source/rx/observable.d:110
#6  0x0000000000449814 in rx.scheduler.SubscribeOnObservable!(rx.observable.from!(int[]).from(ref int[]).FromObservable, rx.scheduler.ThreadScheduler).SubscribeOnObservable.subscribe!(void(int) pure nothrow @nogc @safe function).subscribe(ref void(int) pure nothrow @nogc @safe function).__lambda2() (this=0x7ffff7ec1080) at ../../source/rx/scheduler.d:499
#7  0x000000000045a986 in core.thread.Thread.run() ()
#8  0x000000000045a389 in thread_entryPoint ()
#9  0x00007ffff7bc2297 in start_thread () from /usr/lib/libpthread.so.0
#10 0x00007ffff6fce1ef in clone () from /usr/lib/libc.so.6

The crash also happens with TaskPoolScheduler. With LocalScheduler everything works fine.

Observer not triggered

Since we can't send PM and I don't have any email I try it via an issue:
Platform: win-10

I want to observe all WM_* messages that flow to a window-proc. So I created some global (I know bad style but for a playground OK) stuff but the observer is never triggered.

// windows message that should go to the observer
class winMsg {
  HWND    hwnd;
  UINT    message;
  WPARAM  wParam;
  LPARAM  lParam;

  this(HWND chwnd, UINT cmessage, WPARAM cwParam, LPARAM clParam){
    hwnd    = chwnd;
    message = cmessage;
    wParam  = cwParam;
    lParam  = clParam;
  }
  this(){}
}

// oberservable of windows messages
class osStream {
  static winMsg currentMsg;
  static SubjectObject!winMsg WMStream;
  
  static this() {
    WMStream = new SubjectObject!winMsg;
  }
}

During app initialization I created a filtered stream of osStream.WMStreamand want to call an init function.

  // if we receive a WM_CREATE, we initalize the app
  auto initStream = osStream.WMStream.filter!(n => n.message == WM_CREATE).doSubscribe((winMsg msg) {appInit();});

and appInit() is jsut a simple dummy function:

void appInit(){
  OutputDebugString("App started!");
}

And this function never gets called.

Any idea what I'm doing wrong?

Compile error from v0.11.0 on / .filter() / casts give different sizes error

With every version > 0.10.1 I get a very strange casting error I don't understand:

alias typeof(windows_message_streams[WM_MOUSEMOVE].filter!(win => (win.wParam & MK_LBUTTON))) WM_MOUSEMOVE_LBUTTON_TYPE;
WM_MOUSEMOVE_LBUTTON_TYPE WM_MOUSEMOVE_LBUTTON_STREAM;
pragma(msg,typeof(WM_MOUSEMOVE_LBUTTON_STREAM));
pragma(msg,WM_MOUSEMOVE_LBUTTON_TYPE.sizeof);

>> FilterObservable!(__lambda39, SubjectObject!(OS_State))
>> 16LU

WM_MOUSEMOVE_LBUTTON_STREAM = cast(WM_MOUSEMOVE_LBUTTON_TYPE)(windows_message_streams[WM_MOUSEMOVE].filter!(win => (win.wParam & MK_LBUTTON)));
pragma(msg,typeof(WM_MOUSEMOVE_LBUTTON_STREAM));
pragma(msg,WM_MOUSEMOVE_LBUTTON_STREAM.sizeof);

>> FilterObservable!(__lambda7, SubjectObject!(OS_State))
>> 8LU

..\..\gui.d(317,104): Error: cannot cast expression filter(windows_message_streams[512u]) of type FilterObservable!(__lambda6, SubjectObject!(OS_State)) to FilterObservable!(__lambda39, SubjectObject!(OS_State)) because of different sizes
FilterObservable!(__lambda7, SubjectObject!(OS_State))

My code works with version v0.10.1

Why three different makeObserver(...) functions instead of one?

There exists:

auto makeObserver(E)(void delegate(E) doPut, void delegate() doCompleted, void delegate(Exception) doFailure)
auto makeObserver(E)(void delegate(E) doPut, void delegate() doCompleted)
auto makeObserver(E)(void delegate(E) doPut, void delegate(Exception) doFailure)

Since every delegate is checked anyway with:

if(_doDelegate !is null)

why not use only one function like this?


auto makeObserver(E)(void delegate(E) doPut, void delegate() doCompleted = null, void delegate(Exception) doFailure = null)

IMO this gives a simpler interface and it allows me to only provide a doPut delegate.

Implement groupBy

Hi ...
I really like rx for dlang :)
I want to use it in a filewatcher program to debounce events from the filesystem.
Problem is, that the low-level api produces several events for a write on a file, that i want not directly published to the application.
This can easily be done with e.g. rxjava by doing something like this:
https://stackoverflow.com/a/43431636/204070

merge(
source.groupBy(value -> value.objectId)
.map(observable -> observable.debounce(1, TimeUnit.SECONDS))
)

unfortunately i do not know enough (yet) of rx for dlang to implement my own groupBy ...

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.