Git Product home page Git Product logo

reactivebayes / rocket.jl Goto Github PK

View Code? Open in Web Editor NEW
167.0 9.0 17.0 13.78 MB

Functional reactive programming extensions library for Julia

Home Page: https://reactivebayes.github.io/Rocket.jl/

License: MIT License

Julia 100.00%
reactive observable asynchronous reactive-programming reactive-frameworks reactive-extensions reactive-library julia-language julia functional-reactive-programming

rocket.jl's Introduction

Reactive extensions library for Julia

Documentation Build Status

Rocket.jl is a Julia package for reactive programming using Observables, to make it easier to work with asynchronous data.

In order to achieve best performance and convenient API Rocket.jl combines Observer pattern, Actor model and Functional programming.

Inspired by RxJS and ReactiveX communities.

Rocket.jl has been designed with a focus on performance and modularity.

The essential concepts in Rocket.jl are:

  • Observable: represents a collection of future messages (data or/and events).
  • Actor: is an object that knows how to react on incoming messages delivered by the Observable.
  • Subscription: represents a teardown logic which might be useful for cancelling the execution of an Observable.
  • Operators: are objects that enable a functional programming style to dealing with collections with operations like map, filter, reduce, etc.
  • Subject: the way of multicasting a message to multiple Observers.

Quick start

For a quick start and basic introduction take a look at the demo folder and Quick Start notebook.

using Rocket, Compose, IJulia ; set_default_graphic_size(35cm, 2cm)
function draw_ball(t)
    IJulia.clear_output(true)
    x = -exp(-0.01t) + 1                     # x coordinate
    y = -abs(exp(-0.04t)*(cos(0.1t))) + 0.83 # y coordinate
    display(compose(context(), circle(x, y, 0.01)))
end
source = interval(20) |> take(200) # Take only first 200 emissions

subscription = subscribe!(source, draw_ball)

Alt Text

unsubscribe!(subscription) # It is possible to unsubscribe before the stream ends    
IJulia.clear_output(false);

Documentation

Full documentation is available at reactivebayes website.

It is also possible to build a documentation locally. Just execute

$ julia make.jl

in the docs/ directory to build a local version of the documentation.

First example

Normally you use an arrays for processing some data.

for value in array_of_values
    doSomethingWithMyData(value)
end

In Rocket.jl you will use an observable.

subscription = subscribe!(source_of_values, lambda(
    on_next     = (data)  -> doSomethingWithMyData(data),
    on_error    = (error) -> doSomethingWithAnError(error),
    on_complete = ()      -> println("Completed!")
))

At some point of time you may decide to stop listening for new messages.

unsubscribe!(subscription)

Actors

To process messages from an observable you have to define an Actor that know how to react on incoming messages.

struct MyActor <: Rocket.Actor{Int} end

Rocket.on_next!(actor::MyActor, data::Int) = doSomethingWithMyData(data)
Rocket.on_error!(actor::MyActor, error)    = doSomethingWithAnError(error)
Rocket.on_complete!(actor::MyActor)        = println("Completed!")

Actor can also have its own local state

struct StoreActor{D} <: Rocket.Actor{D}
    values :: Vector{D}

    StoreActor{D}() where D = new(Vector{D}())
end

Rocket.on_next!(actor::StoreActor{D}, data::D) where D = push!(actor.values, data)
Rocket.on_error!(actor::StoreActor, error)             = doSomethingWithAnError(error)
Rocket.on_complete!(actor::StoreActor)                 = println("Completed: $(actor.values)")

For debugging purposes you can use a general LambdaActor actor or just pass a function object as an actor in subscribe! function.

Operators

What makes Rocket.jl powerful is its ability to help you process, transform and modify the messages flow through your observables using Operators.

List of all available operators can be found in the documentation (link).

squared_int_values = source_of_int_values |> map(Int, (d) -> d ^ 2)
subscribe!(squared_int_values, lambda(
    on_next = (data) -> println(data)
))

Rocket.jl is fast

Rocket.jl has been designed with a focus on efficiency, scalability and maximum performance. Below is a benchmark comparison between Rocket.jl, Signals.jl, Reactive.jl and Observables.jl in Julia v1.9.3 (see versioninfo below).

We test map and filter operators latency in application to a finite stream of integers. Code is available in demo folder.

Rocket.jl outperforms Observables.jl, Reactive.jl and Signals.jl significantly in terms of execution times and memory consumption both in synchronous and asynchronous modes.

Rocket.jl vs Reactive.jl

Rocket.jl vs Signals.jl

Rocket.jl vs Observables.jl

versioninfo()
Julia Version 1.9.3
Commit bed2cd540a1 (2023-08-24 14:43 UTC)
Build Info:
  Official https://julialang.org/ release
Platform Info:
  OS: macOS (arm64-apple-darwin22.4.0)
  CPU: 10 ร— Apple M2 Pro
  WORD_SIZE: 64
  LIBM: libopenlibm
  LLVM: libLLVM-14.0.6 (ORCJIT, apple-m1)
  Threads: 1 on 6 virtual cores
Environment:
  JULIA_NUM_THREADS = 
] status
  [6e4b80f9] BenchmarkTools v1.3.2
  [510215fc] Observables v0.5.4
  [a223df75] Reactive v0.8.3
  [df971d30] Rocket v1.7.2
  [6303bc30] Signals v1.2.0

License

MIT License Copyright (c) 2021-2024 BIASlab, 2024-present ReactiveBayes

rocket.jl's People

Contributors

bartvanerp avatar bvdmitri avatar mroavi avatar pengwyn avatar thijsvdlaar avatar wmkouw avatar zsoerenm avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

rocket.jl's Issues

TagBot trigger issue

This issue is used to trigger TagBot; feel free to unsubscribe.

If you haven't already, you should update your TagBot.yml to include issue comment triggers.
Please see this post on Discourse for instructions and more details.

If you'd like for me to do this for you, comment TagBot fix on this issue.
I'll open a PR within a few hours, please be patient!

unsubscribing from asynchronous observables

Apologies for the long MWE. Not sure it is really an issue, but I have not been able to figure this out from the available documentation.

I am developing an agent that subscribes to an unbounded stream of (network) asynchronous events and can be controlled through keyboard presses. My logic to end the execution of the agent and close the program is that upon pressing c, the agent unsubscribes from all observables and should quit. Except that it really doesn't at the moment. See MWE below.

Am I misusing the SyncActor logic?

using Rocket

# get single char from keyboard, excluding return
# https://stackoverflow.com/questions/56888266/how-to-read-keyboard-inputs-at-every-keystroke-in-julia/56893331#56893331
function getchar()
    ret = ccall(:jl_tty_set_mode, Int32, (Ptr{Cvoid},Int32), stdin.handle, true)
    ret == 0 || error("unable to switch to raw mode")
    c = read(stdin, Char)
    ccall(:jl_tty_set_mode, Int32, (Ptr{Cvoid}, Int32), stdin.handle, false)
    return c
end

# a basic logger controllable by keyboard presses
mutable struct Agent <: Actor{Union{Char, Int}} 
    subs :: Vector{Any}
    Agent() = new(Any[])
end

Rocket.on_next!(a::Agent, c::Int) = println("got Int  ", c)

function Rocket.on_next!(a::Agent, c::Char)
    println("got Char ", c )
    # close on pressing c
    if c == 'c'
        foreach(unsubscribe!, a.subs)
        println("done") # hangs after this
    end
end

Rocket.on_complete!(a::Agent) = println("completed")

# int sources
intsource = make(Int) do actor
    i = 0
    while true
        next!(actor, i)
        i+=1
        sleep(1)
    end
end

# keyboard events
keysource = make(Char) do actor
    while true
        next!(actor, getchar())
    end
end

# create sync agent
agent = sync(Agent())

# subscribe to asynchronous event streams
sub1 = subscribe!(intsource |> safe() |> async(), agent)
sub2 = subscribe!(keysource |> safe() |> async(), agent)

# store subscriptions
push!(agent.actor.subs, sub1)
push!(agent.actor.subs, sub2)

yield()
wait(agent)

Schedulers don't appear to be workingfor me, tried several configurations.

I have two functions that deal with adding to the subject

function source!(src::Source{T}) where T Subject(T, scheduler = AsyncScheduler()) do subject start_consume!(src, subject) end end

function start_consume!(src::Source{T}, consumer) where T while true msg = src.pollFn() if !isnothing(msg) next!(consumer, msg) else yield() end end end

When I replace Subject(T, scheduler = AsyncScheduler()) with make(T) everything works. However I want to run multiple queues on the same scheduler, I have tried running queues by looping through them and using @spawn and this didn't work it ran the first queue but the second did not execute at all. Consequently, my new approach is to give both queues the same scheduler so that the work is distributed between them. But currently, I can't get one queue running with a scheduler.

Asynchronous execution question

Hi,
I am looking for to translate Python code that uses RxPY in Jupyter Notebooks.
It works very well. Using interval function, It allows cells to execute periodically and display its results.
The executions are asynchronous and they don't block the Jupyter Notebook.
How can I do the same async execution using Rocket.jl?
Thanks.

Relevant code in Python:

from rx.concurrency import ThreadPoolScheduler
from rx import Observable

threadpool_scheduler = ThreadPoolScheduler()
observable1 = Observable.interval(15000).observe_on(threadpool_scheduler)
subscription = observable1.subscribe(
         lambda value: specialPrint(out1,"\rRunningThread:{}, Value:{},\t DateTime:{}".\
                     format(threading.get_ident(),value, datetime.datetime.now())),
         lambda error: specialPrint(out1,"\rRunningThread:{}, Error:{}, \t DateTime:({})".\
                     format(threading.get_ident(),error, datetime.datetime.now())),
         lambda: specialPrint(out1,"\rRunningThread:{}, Complete!, \t DateTime:({})".\
                     format(threading.get_ident(),datetime.datetime.now())))

How to unsubsribe from a running stream

Hi

I am trying to use this nice package to handle some real time websocket data. How can one unsubscribe from a running stream midway? Using this example below, how can that work? From this example the stream runs and terminates at the end but how can I terminate the stream say in the middle of the stream using the unsubscribe! method?

using Rocket


struct MyActor <: Rocket.Actor{Int} end

Rocket.on_next!(actor::MyActor, data::Int) = println(data)
Rocket.on_error!(actor::MyActor, error)    = println(error)
Rocket.on_complete!(actor::MyActor)        = println("Completed!")

stream = from([x for x in 1:10_000])

data_stream = subscribe!(stream, MyActor())

unsubscribe!(data_stream)

Couple of quick pointers pls

Hi

thanks, this is quite an awesome library. I am using the library Jib.jl to subscribe to data from Interactive brokers.

quick ques: in Rxpy, there is a method run() to block execution. is there anything similar in Rocket.jl please?

thanks

Unexpected error when calling next! within a subscription to an interval

My use case is something like this:

using Rocket

stream = Subject(Int)
subscribe!(interval(1000), (count) -> next!(stream, count))
subscribe!(stream, (val) -> println("got: ", val))

println("Press enter to exit")
readline()

However, I receive the following error:

Error in Timer:
MethodError(var"#5#6"(), (0,), 0x00000000000082c1)
Stacktrace:
 [1] error(s::MethodError)
   @ Base ./error.jl:44
 [2] on_error!(actor::FunctionActor{Int64, var"#3#4"}, err::MethodError)
   @ Rocket ~/.julia/packages/Rocket/RELb1/src/actor/function.jl:33
 [3] error!(actor::FunctionActor{Int64, var"#3#4"}, err::MethodError)
   @ Rocket ~/.julia/packages/Rocket/RELb1/src/actor.jl:223
 [4] on_error!(actor::Rocket.TimerActor{FunctionActor{Int64, var"#3#4"}}, err::MethodError)
   @ Rocket ~/.julia/packages/Rocket/RELb1/src/observable/timer.jl:106
 [5] error!(actor::Rocket.TimerActor{FunctionActor{Int64, var"#3#4"}}, err::MethodError)
   @ Rocket ~/.julia/packages/Rocket/RELb1/src/actor.jl:223
 [6] (::Rocket.var"#138#139"{Rocket.TimerActor{FunctionActor{Int64, var"#3#4"}}})(timer::Timer)
   @ Rocket ~/.julia/packages/Rocket/RELb1/src/observable/timer.jl:118
 [7] macro expansion
   @ ./asyncevent.jl:281 [inlined]
 [8] (::Base.var"#702#703"{Rocket.var"#138#139"{Rocket.TimerActor{FunctionActor{Int64, var"#3#4"}}}, Timer})()
   @ Base ./task.jl:134

If I call subscribe! on the subject before defining the interval subscription then it works successfully. But I don't quite understand why it fails in the other case. Is this due to the AsapScheduler?

Any help on this would be much appreciated ๐Ÿ™

Next: A Proposal to easily create Operators from Actors.

Hello,

On a high level, I need to be able to subscribe to Actors. This is nonsensical, but numerous actors do not act as pure sinks, but instead as processing Operators in a data flow.

A theoretical example:

a1 = MyActor()
a2 = MyOtherActor()
subscribe!(a1, a2)

To my understanding, this isn't possible. The workaround I'm using is to pass a Subject and hack subscribe!()

struct MyActor{A} <: Actor{Int}
    # ...
    next::A
end

subject = Subject(Int)
a1 = MyActor(subject)
a2 = MyOtherActor()

Rocket.subscribe!(a::MyActor, b::Actor) = subscribe!(a.next, b)

subscribe!(a1, a2)

This is a similar design to what Actors that are through Proxies in Operators do. While I wish there would be a clean pattern to declare an Actor both an Actor and an Observable/Subject, I thought of an in-between step that matches the current design.

The idea occurred to me while looking at the CircularKeep Actor. I needed that Actor to actually next! his buffer every time new data is received. Since CircularKeep is a 'sink' Actor, the only way to do that would be to rewrite an Actor that copies the behavior of CircularKeep:

struct CircularKeepForOperators{A} <: Actor
     # ... Same stuff as in CircularKeep
     # Plus a receiving Actor
    actor::A
end

# Then define proxy and operator, on_call, operator_right, etc.

So this was painfully redundant, and there may be some pattern to turn any Actor into its Operator equivalent. (Though turning it into an Observable/Subject still makes sense to me.)

So I came up with a next() Operator. I would love to hear your thoughts as I think it would be a nice addition, possibly reduce the code base and allow for any actor to be turned into an operator easily:

export next

using Rocket

# Operator
next(actor::Actor{L1}, next::Actor{L2}) where {L1,L2} = NextOperator{L1,L2}(actor, next)

struct NextOperator{L1,L2} <: InferableOperator
    actor::Actor{L1}
    next::Actor{L2}
end

Rocket.operator_right(::NextOperator{L1,L2}, ::Type{L1}) where {L1,L2} = L2

function Rocket.on_call!(::Type{L1}, ::Type{L2}, operator::NextOperator, source) where {L1,L2}
    return proxy(L2, source, NextProxy(operator.actor, actor.next))
end

# Proxy
struct NextProxy{L1,L2} <: ActorProxy
    actor::Actor{L1}
    next::Actor{L2}
end

function Rocket.actor_proxy!(::Type{L2}, proxy::NextProxy{L1,L2}, actor::Actor{L2}) where {L1,L2}
    return NextActor{L1,L2}(proxy.actor, proxy.next)
end

# Observable
struct NextActor{L1,L2} <: Actor{L2}
    actor::Actor{L1}
    next::Actor{L2}
end

function Rocket.on_next!(actor::NextActor{L1,L2}, data::L1) where {L}
    next!(actor.actor, data)
    next!(actor.next, getrecent(actor.actor))
end

function Rocket.on_error!(actor::NextActor, error)
    error!(actor.actor, error)
    error!(actor.next, error)
end

function Rocket.on_complete!(actor::NextActor)
    complete!(actor.actor)
    complete!(actor.next)
end

Best! ๐Ÿš€

Error in map operator in documentation

Hi,

There is a bug in some code in the documentation, where two type parameters are passed to the map operator instead of one. It is kind of silly, but I figured I should notify so that no misunderstandings arrise for new users.

rocket-error

buffering asynchronous events closer in time than given `delay`

Hi,

I am trying to implement a custom operator that lumps together in a Vector items from an asynchronous observable that are closer than a specified delay and releases the buffer as a vector once delay has passed since the last item was received. In addition, the buffer is also released if its length exceeds maxsize.

This is an attempt to model what described here and specifically the first operation of this marble diagram (from the first link).

As far as I understand from the available library in this package, this functionality is not yet available.

This is my current attempt, and I would welcome any help. If this (and future ones too) material is better suited for the discourse forum, please close the issue.

import Rocket
import Dates

struct ThrottleBufferOperator <: Rocket.InferableOperator 
      delay :: Int # delay in millisecond
    maxsize :: Int # maximum buf size
end

throttlebuffer(delay::Int, maxsize::Int) = ThrottleBufferOperator(delay, maxsize)

Rocket.on_call!(::Type{T}, ::Type{Vector{T}}, op::ThrottleBufferOperator, source) where {T} =
    Rocket.proxy(Vector{T}, source, ThrottleBufferProxy(op.delay, op.maxsize))

Rocket.operator_right(operator::ThrottleBufferOperator, ::Type{T}) where T = Vector{T}

struct ThrottleBufferProxy <: Rocket.ActorProxy 
      delay :: Int # delay in millisecond
    maxsize :: Int # maximum buf size
end

Rocket.actor_proxy!(::Type{Vector{T}}, op::ThrottleBufferProxy, actor) where {T} =
    ThrottleBufferActor{T}(actor, op.delay, op.maxsize)

mutable struct ThrottleBufferActor{T, A} <: Rocket.Actor{T}
         actor :: A
          data :: Vector{T}
    lastupdate :: Dates.DateTime
     completed :: Condition
    function ThrottleBufferActor{T}(actor::A, delay::Int, maxsize::Int) where {T, A}

        buf = new{T, A}(actor, T[], Dates.now(), Condition())
       
        @async while true
            # do next! if last update was more then `delay` milliseconds ago
            if length(buf.data) > 0
                if Dates.now() - buf.lastupdate > Dates.Millisecond(delay)
                    __next!(buf)
                end
            end

            # clear buffer if needed
            if length(buf.data) == maxsize
                __next!(buf)
            end

            # granularity
            sleep(0.001)
        end
        
        return buf
    end
end

function __next!(buf::ThrottleBufferActor)
    Rocket.next!(buf.actor, copy(buf.data)) # release copy of buffer
    resize!(buf.data, 0)  # reset buffer
    notify(buf.completed) # necessary for correct completeion
end

function Rocket.on_next!(buf::ThrottleBufferActor{T, A}, item::T) where {T, A}
    push!(buf.data, item)
    buf.lastupdate = Dates.now()
    return nothing
end

Rocket.on_error!(buf::ThrottleBufferActor, err) = Rocket.error!(buf.actor, err)

Rocket.on_complete!(buf::ThrottleBufferActor) = 
    (wait(buf.completed); Rocket.complete!(buf.actor))


source = Rocket.make(Int) do actor
    # will print [0, 1, 2]
    Rocket.next!(actor, 0)
    Rocket.next!(actor, 1)
    Rocket.next!(actor, 2)
    sleep(500/1000)
    
    # will print [3, 4]
    Rocket.next!(actor, 3)
    sleep(10/1000)
    Rocket.next!(actor, 4)
    sleep(1000/1000)

    # will print [5]
    Rocket.next!(actor, 5)

    # will complete after [5] was printed
    Rocket.complete!(actor)
end

Rocket.subscribe!(source |> Rocket.async() |> throttlebuffer(100, 10), Rocket.logger())

However, I am not happy with the way the @async task in the constructor of ThrottleBufferActor is polling at high frequency. I suspect that there is a different mechanisms using Julia's task programming that would be better.

I also worry about possible race conditions, and potentially loosing some items due to resizing buf.data in a different task.

Any suggestions are welcome.

Add optional seed argument to `pairwise` operator.

Hi,

just learning about reactive programming and this repo is great!

I am using the pairwise operator to pair up elements of an observable. However, this means that I need to wait for two elements to be available for the first output element to be emitted. Elements 1, 2, 3, 4, ... would be paired as (1, 2), (3, 4), ...

Would it be possible to add a seed optional argument to pairwise, to be use as a fictitious element number one? This would pair elements as (seed, 1), (2, 3), (4, 5), ...

Usability in REPL

Hi, great work!!!, congratulations.
I am new with Julia and reactx, i was tried to use Rocket in REPL console, but I don know how to syncronize "a" variable, subscribe it and then show the change whe I want change the new value of "a", I am trying with schedule, task an nothing. PleasePlease can you show how to useit?, thanks a lot

NamedTuple ERROR: Actor of type LoggerActor expects data to be of type

MWE:

using Rocket
source = from([ 0, 1, 2 ])
result = source |> map(NamedTuple{(:value,), Tuple{Union{Nothing, Int64}}}, val -> (value = val < 2 ? nothing : val, ))
subscribe!(result, logger())

Errors with:

ERROR: Actor of type LoggerActor{NamedTuple{(:value,), Tuple{Union{Nothing, Int64}}}, Nothing} expects data to be of type NamedTuple{(:value,), Tuple{Union{Nothing, Int64}}}, but data of type NamedTuple{(:value,), Tuple{Nothing}} has been found.
Stacktrace:
  [1] next!(actor::LoggerActor{NamedTuple{(:value,), Tuple{Union{Nothing, Int64}}}, Nothing}, data::NamedTuple{(:value,), Tuple{Nothing}})
    @ Rocket ~/.julia/packages/Rocket/EGm1W/src/actor.jl:208
  [2] on_next!(actor::Rocket.MapActor{Int64, LoggerActor{NamedTuple{(:value,), Tuple{Union{Nothing, Int64}}}, Nothing}, var"#19#20"}, data::Int64)
    @ Rocket ~/.julia/packages/Rocket/EGm1W/src/operators/map.jl:62
  [3] scheduled_next!
    @ ~/.julia/packages/Rocket/EGm1W/src/schedulers/asap.jl:23 [inlined]
  [4] next!
    @ ~/.julia/packages/Rocket/EGm1W/src/actor.jl:207 [inlined]
  [5] on_subscribe!(observable::ArrayObservable{Int64, AsapScheduler}, actor::Rocket.MapActor{Int64, LoggerActor{NamedTuple{(:value,), Tuple{Union{Nothing, Int64}}}, Nothing}, var"#19#20"}, scheduler::AsapScheduler)
    @ Rocket ~/.julia/packages/Rocket/EGm1W/src/observable/array.jl:45
  [6] scheduled_subscription!
    @ ~/.julia/packages/Rocket/EGm1W/src/schedulers/asap.jl:21 [inlined]
  [7] subscribe!
    @ ~/.julia/packages/Rocket/EGm1W/src/utils.jl:110 [inlined]
  [8] on_subscribe!
    @ ~/.julia/packages/Rocket/EGm1W/src/observable/proxy.jl:103 [inlined]
  [9] subscribe!
    @ ~/.julia/packages/Rocket/EGm1W/src/utils.jl:108 [inlined]
 [10] check_on_subscribe_with_factory!
    @ ~/.julia/packages/Rocket/EGm1W/src/subscribable.jl:246 [inlined]
 [11] subscribe!(subscribable::ProxyObservable{NamedTuple{(:value,), Tuple{Union{Nothing, Int64}}}, ArrayObservable{Int64, AsapScheduler}, Rocket.MapProxy{Int64, var"#19#20"}}, factory::Rocket.LoggerActorFactory{Nothing})
    @ Rocket ~/.julia/packages/Rocket/EGm1W/src/subscribable.jl:242
 [12] top-level scope
    @ REPL[18]:1

Pairwise fails with asynchronous inputs

Here is minimum working example:

subject = Subject(Int)
paired = subject |> pairwise()
subscribe!(paired, logger("paired"))
for i = 1:10
    @async next!(subject, i)
end

output

[paired] Data: (1, 2)
[paired] Data: (1, 4)
[paired] Data: (1, 5)
[paired] Data: (1, 6)
[paired] Data: (1, 7)
[paired] Data: (1, 8)
[paired] Data: (1, 9)
[paired] Data: (1, 10)

It does work with a little sleep in between:

julia> for i = 1:10
           @async next!(subject, i)
           sleep(0.1)
       end
[paired] Data: (1, 2)
[paired] Data: (2, 3)
[paired] Data: (3, 4)
[paired] Data: (4, 5)
[paired] Data: (5, 6)
[paired] Data: (6, 7)
[paired] Data: (7, 8)
[paired] Data: (8, 9)
[paired] Data: (9, 10)

Confusing output order

Consider this MWE:

using Rocket

inputS = RecentSubject(Float64)

verified_inputS = inputS |> map(Float64, x -> x < 0 ? -x : x)

subscribe!(verified_inputS, verified_input -> begin
    if (inputS.recent != verified_input)
        next!(inputS, verified_input)
    end
end
)

subscribe!(inputS, logger())

next!(inputS, -1.0)

Which outputs

[LogActor] Data: 1.0
[LogActor] Data: -1.0

However, I would have expected

[LogActor] Data: -1.0
[LogActor] Data: 1.0

Because next!(inputS, -1.0) is called before next!(inputS, verified_input)

I need to implement a little "hack" with @async to get the expected order:

subscribe!(verified_inputS, verified_input -> begin
    if (inputS.recent != verified_input)
        @async next!(inputS, verified_input)
    end
end
)

How to avoid calling map function multiple times?

Here is a MWE:

fooS = RecentSubject(String)
mappedFooS = fooS |> map(String, x -> begin println("Called"); x end)
subscribe!(mappedFooS, logger())
subscribe!(mappedFooS, logger())
next!(fooS, "Test")

Output

Called
[LogActor] Data: Test
Called
[LogActor] Data: Test

I intentionally subscribed to mappedFooS twice. Is doesn't need to be a subscription, it could also be any other map function with a subscription. The problem is that the map function inside mappedFooS get called twice even though it only needs to be called once in theory. If the map function does some heavy calculation things can become quite slow.
Is there a solution to this?

Add flatMap operator

Hi

Thanks for putting this together, looks like an awesome library

I have an array of values. for each value, I download relevant data from a website. this creates a data frame with m cols and n rows. for a particular column, each row corresponds to a URL, which I then intend to download and then merge data for all n rows

  1. is there a way to do a FlatMap, so, I can read each column, now asynchronously download files for each row.
  2. How do I then merge data from these files into a data frame?

Thanks
Rohit

using Rocket

function downloadDataForMarket(market::String, forceDownload::Bool)
@info "downloadDataForMarket market: $market thread: $(Threads.threadid())"
df = DataFrame(A = rand(Int, 2), B = rand(Int, 2))
return df.A
end

function downloadDataForTicker(market, ticker)
@info "downloadDataForTicker thread: $(Threads.threadid())"
return market
end

markets = ["NSE", "US"]
exchanges = ["NSE", "NYSE"]

marketsSource = from(markets, scheduler = AsyncScheduler())

subscribe!(
marketsSource |>
map(Vector{Int64}, (d) -> downloadDataForMarket(d, true)) |>
map(Vector{Int64}, (d) -> downloadDataForTicker(d, true)) |>
map(Vector{Int64}, (d) -> d), logger())

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.