leonoel / missionary Goto Github PK
View Code? Open in Web Editor NEWA functional effect and streaming system for Clojure/Script
License: Eclipse Public License 2.0
A functional effect and streaming system for Clojure/Script
License: Eclipse Public License 2.0
Some flows terminate successfully on cancellation. A publisher backed by such a flow should be subscribable anytime as long as it's not failed.
Proposed changes :
Returns a flow evaluating body (in an implicit do) and producing values of each subsequent fork. Body evaluation can be parked with ? and forked with ?? and ?!.
AFAICT at least ?=
should be noted too, shouldn't it?
A consecutive transfer happens when a continuous flow notifies synchronously with the sampling. The current behavior is to propagate the consecutive transfer, but the point of continuous flow is that only the latest value matters therefore it may be more correct to collapse all consecutive transfers to keep only the latest.
A common scenario would be using reductions
to turn a discrete flow into a continuous flow. If the discrete flow happens to have a first event available immediately, then a consecutive transfer happens.
(->> (m/reductions + (m/ap 1))
(m/latest inc)
(m/reduce conj)
(m/?))
Currently this returns [1 2]
, with a collapsing latest
it would return [2]
.
Impacted operators : latest
signal!
sample
The following code
(m/?
(m/reactor
(let [pub (m/stream! (m/seed [1 2]))]
(m/stream!
(m/ap
(m/? (m/sleep 0))
(println (m/?> pub)))))))
throws
Execution error at missionary.impl.Reactor/<clinit> (Reactor.java:12).
Subscription failure : not in publisher context.
I was just wondering whether this is a bug, or expected behavior.
Hope I can explain this properly:
(require '[missionary.core :as ms])
(def cancel
(do
(when (bound? (resolve 'cancel))
(cancel))
(def mbx (ms/mbx))
(let [mbx-flow (ms/ap
(->> (repeat mbx)
ms/seed
ms/?>
ms/?))
flow (ms/ap
(let [next (ms/?< mbx-flow)]
(try
(let [task-id (ms/?= (ms/seed next))]
(loop []
(println "RUNNING TASK" (ms/? (ms/sleep 1000 task-id)))
(recur)))
(catch Exception ex
(ms/?> ms/none)))))
task (->> (ms/reduce (constantly nil) flow))]
(task println println))))
(cancel)
(mbx [1 2 3]) ;; [1]
(mbx [4 5 6]) ;; [2]
Running [1] starts printing as expecting:
RUNNING TASK 1
RUNNING TASK 2
RUNNING TASK 3
I would expect that running [2] would stop printing 1,2,3 and instead start printing only 4,5,6. However, currently it continues printing 1,2,3 in addition to 4,5,6.
E.g. running [2] after running [1] causes following output:
RUNNING TASK 5
RUNNING TASK 4
RUNNING TASK 6
RUNNING TASK 3
RUNNING TASK 1
RUNNING TASK 2
Calling (cancel)
cancels everything as expected.
If I replace ?= with ?>, then it works as expected (of course running only the first of each mbx value).
Is this expected behavior or a bug?
(m/sp #js{})
No matching clause: :js-object
impl.cljc: 472 cloroutine.impl$fn__20758$add_breaking__20952/invoke
Please share suggestions about ways to improve documentation.
In the following experiment,
(inst)
canceller at any time results in immediate aggregate printing and cancellation being effective(defn printer [t]
(t (fn [res] (println "res" res))
(fn [err] (println "err" err))))
(def inst
(printer (->> (m/observe
(fn [event-fn]
(let [producer
(Thread.
(fn []
(dotimes [n 6]
(println "Producing" n)
(Thread/sleep 1000)
(event-fn n))))]
(.start producer)
(fn []
(println "cancelled")
(.stop producer)))))
(m/transform (take 3))
(m/aggregate conj))))
Rationale : modeling DAG topologies.
Guidelines :
API design :
reactor
takes an initializer thunk and returns a task. Running the task evaluates the thunk within a fresh reactor. A node can be spawned only in the dynamic context of a reactor (in the initializer, or in reaction to an event of another node).stream!
and signal!
take a flow, spawn a node running this flow (resp. discrete and continuous) and return a flow subscribing to this node.Skimming the java impl, I noticed that passing the cancellation callback to event-fn is one possiblity (?), but, while it works, it also results in event-fn being emitted from the observer and doesn't seem like the canonical way.
Probably invoking the event-fn with itself would be nice.
Hi, I'm still quite new to clojure - so be nice lol.
What is the best way to limit concurrency with the ?=
forking function? I'm currently using core.async channels to do this, but it feels hackish.
(defn mergemap
[func concurrency flow]
(let [c (async/chan concurrency)]
(m/ap
(let [value (m/?= flow)]
(async/>!! c true)
(let [r (m/? (func value))]
(async/<!! c)
r)))))
;; Creates a flow that multiplies the numbers by 10 after 500ms.
;; Max concurrency of 5
(mergemap #(m/sleep 500 (* % 10)) 5 (m/enumerate (range 10000)))
Any suggestions?
Running the following code breaks missionary in some way. For example, after I run the code at [1] it throws a null exception, and from then on (m/? (m/sleep 1000 :abc))
freezes indefinitely until I restart the REPL.
(defn test-flow-breaks []
(m/ap
(let [r (m/?> (m/seed [(m/sleep 1 :res)]))]
(m/? r)
(m/? (m/sp (throw (ex-info "TEST" {})))))))
(defn test-flow-works []
(m/ap
(let [r (m/?> (m/seed [(m/sp :res)]))]
(m/? r)
(m/? (m/sp (throw (ex-info "TEST" {})))))))
(comment
;; [1]
(m/? (m/reactor
(let [state (m/stream! (test-flow-breaks))]
(m/stream! (m/ap (println (m/?> state)))))))
(m/? (m/sleep 1000 :a)))
Running this throws an unhandled exception:
Exception in thread "missionary scheduler" java.lang.NullPointerException
at missionary.impl.Reactor.signal(Reactor.java:93)
at missionary.impl.Reactor.emit(Reactor.java:187)
at missionary.impl.Reactor.leave(Reactor.java:210)
at missionary.impl.Reactor$1.invoke(Reactor.java:401)
at missionary.impl.Ambiguous$3.invoke(Ambiguous.java:115)
at missionary.impl.Ambiguous$4.invoke(Ambiguous.java:128)
at missionary.impl.Sequential.step(Sequential.java:52)
at missionary.impl.Sequential$1.invoke(Sequential.java:32)
at missionary.impl.Sleep$Scheduler.trigger(Sleep.java:61)
at missionary.impl.Sleep$Scheduler.run(Sleep.java:75)
This causes further operations like (m/? (m/sleep 1000 :test))
to hang indefinitely.
Trying to see if I've been paying attention in class, CompletableFuture to task should look something like:
(import '(java.util.concurrent Executor CompletableFuture)
'(missionary.impl Thunk))
(defn cf->task
[^CompletableFuture cf]
(fn [success failure]
(.handleAsync
cf
(reify java.util.function.BiFunction
(apply [_ r e]
(if (instance? Exception e)
(failure e)
(success r))))
Thunk/cpu)
(fn [] (.cancel cf true))))
(let [cf (CompletableFuture.)
t (cf->task cf)
success! (fn [res] (println 'yay! res))
fail! (fn [e] (println 'Error! (ex-message e)))]
(t success! fail!)
(.complete cf 2))
(let [cf (CompletableFuture.)
t (cf->task cf)
success! (fn [res] (println 'yay! res))
fail! (fn [e] (println 'Error! (ex-message e)))]
(t success! fail!)
(.completeExceptionally cf (Exception. "failed!")))
Originally posted by @bsless in #41 (comment)
Problem : switching flows is currently possible with ap
/?!
but the behavior is discrete and eager, which is not always what we want.
Solution : provide a new macro cp
(continuous process) returning a continuous flow, with ?!
available as a forking operator. Like in ap
, ?!
runs provided flow, forks current computation and cancels previous branch for each successive value, but each branch is run lazily.
Example :
(def numbers-channel (atom 0))
(def strings-network (atom "hello"))
(def remote-control (atom :strings))
(def tv
(m/cp
(case (m/?! (m/watch remote-control))
:numbers (m/?! (m/watch numbers-channel))
:strings (m/?! (m/watch strings-network))
:both (m/?! (m/latest vector (m/watch numbers-channel) (m/watch strings-network)))
:static-noise)))
(def it (tv #(prn :ready) #(prn :done))) ;; :ready
@it #_=> "hello"
(reset! remote-control :numbers) ;; :ready
@it #_=> 0
(swap! numbers-channel inc) ;; :ready
(swap! numbers-channel inc)
@it #_=> 2
(swap! numbers-channel inc) ;; :ready
(reset! remote-control :both)
(reset! strings-network "world")
@it #_=> [3 "world"]
(reset! remote-control nil) ;; :ready
@it #_=> :static-noise
On a large project a very big concern is tracing, metrics and logging. I assume in a framework like this stack traces only go to the last fiber which it not very helpful.
A couple of projects with interesting takes on this are https://github.com/tokio-rs/tracing (the rust async community in general)
and probably closer for us to emulate ZIO2 logging and tracing https://www.youtube.com/watch?v=vYKea3hGw28
Both communities have concluded that it is important to make it first class in the application framework.
I have only been following a few months so it is possible this is already solved and just not clearly documented.
Following #29, concerns about sigils harming the learning curve. It could be nice to have both short symbols (for experts) and human-friendly aliases (for beginners). Let's find plain english equivalents for ?
!
?>
?<
?=
Allowing deferred initialization has no obvious benefit and goes against the idea of "defined on every point in time".
This is a requirement for cp
#26 , and could simplify implementation of signal!
, latest
and sample
#33 . signal!
, latest
and sample
should fail if input flow is not immediately ready to transfer.
example from #4 in cljs
Unlike the CLJ version, this version yields a result. I. e. it prints
res [0 1 2]
But shortly after that, an uncaught type error is thrown in impl.cljs
(defn process-producer [event-fn]
(let [step
(fn step [i]
(if (= 5 i)
nil
(js/setTimeout (fn []
(event-fn i)
(step (inc i)))
1000)))]
(step 0)
(fn []
(.log js/console "Process producer cancelled"))))
(defn printer [t]
(t (fn [res] (println "res" res))
(fn [err] (println "err" err))))
(def inst
(printer (->> (m/observe
process-producer)
(m/transform (take 3))
(m/aggregate conj))))
Find the right pattern to parallelize processing on some part of a flow pipeline.
Using rdv
and dfv
, we can build a channel-like primitive and use ap
+ ?=
to run a single producer concurrently with an arbitrary amount of consumers. The producer feeds values in the channel, close it on termination and emits nothing. Each consumer reads values from the channel, passes them through a user-provided pipeline and emits resulting values.
cf POC
Problem : the pattern may be too complex to be implemented manually in user space.
?=
+ sem
via Panel on slack
(let [sem (m/sem 2)]
(m/ap
(let [batch (m/?= (->> (fetch-ids)
(m/eduction (partition-all 20))))]
(m/holding sem (->> (fetch-projects batch)
(m/reduce conj [])
m/?)))))
Problem : while the semaphore effectively ensures no more than 2 fetch-projects
instances run concurrently, the memory footprint is unbounded because ?=
pulls input and spawns new branches as soon as possible. If fetch-ids
is infinite and able to produce batches faster than the maximal processing thoughput (in this case, 2 divided by the average delay of fetch-projects
), then the semaphore queue grows steadily.
?=
Add an optional argument to ?=
to specify an upper bound on the number of concurrent branches. When this number is reached, values stop being pulled from input and resume again when a branch terminates.
(m/ap
(let [batch (m/?= 2 (->> (fetch-ids)
(m/eduction (partition-all 20))))]
(->> (fetch-projects batch)
(m/reduce conj [])
m/?)))
Prior art : ReactiveX - Flowable/flatMap
https://rxjs-dev.firebaseapp.com/api/operators/expand
Is there an equivalent way of doing this in missionary right now?
The examples in the "Hello flow" documentation aren't exactly matching what I'm getting in my repl:
(def hello-world
(m/ap
(println (m/?? (m/enumerate ["Hello" "World" "!"])))
(m/? (m/sleep 1000))))
(comment
(m/? (m/aggregate conj hello-world))
#_> #object[java.lang.ThreadLocal$SuppliedThreadLocal 0xb8473c1 "java.lang.ThreadLocal$SuppliedThreadLocal@b8473c1"]
;; should be [nil nil nil]
)
The other examples where you check the result are the same way. This is in a clojure repl.
When the target task fails to complete within given delay, the task fails and the error is not easy to check for recovery (in addition of not being documented). The timeout case could result in a success with a user-provided value, akin to the timed out version of deref
. The timeout value could be optional and default to nil
, like sleep
.
(defn timeout
([task delay] (timeout task delay nil))
([task delay value]
(-> task
(attempt)
(race (sleep delay #(-> value)))
(absolve))))
(m/? (timeout (m/sleep 20 :a) 25 :b)) ;; :a after 20ms
(m/? (timeout (m/sleep 20 :a) 15 :b)) ;; :b after 15ms
(m/? (timeout (m/sleep 20 :a) 15)) ;; nil after 15ms
I believe this issue is related to #39 .
Running the following code gives me a NullPointerException, and the error handling around the reactor call is never called. I would expect the message "ERROR IS VISIBLE" to be printed, however I get the stacktrace below.
(def mbx (ms/mbx))
(do
(def cancel
((ms/sp
(try
(ms/? (ms/reactor
(let [state (ms/stream! (ms/ap
(let [r (ms/?> (ms/ap
(->> (repeat mbx)
ms/seed
ms/?>
ms/?)))]
(if (= r 1)
(throw (ex-info "Invalid" {:a 123}))
r))))]
(ms/stream!
(ms/ap (println (ms/?> state)))))))
(println "DONE")
(catch Exception ex
(println "ERROR IS VISIBLE")
(throw ex))))
prn prn))
(mbx 2)
(mbx 1))
Unhandled java.lang.NullPointerException
(No message)
Ambiguous.java: 45 missionary.impl.Ambiguous/more
Ambiguous.java: 59 missionary.impl.Ambiguous$1/invoke
Ambiguous.java: 115 missionary.impl.Ambiguous$3/invoke
Mailbox.java: 27 missionary.impl.Mailbox/invoke
REPL: 384 mattsum.task-feed.statecharts/eval119791
REPL: 384 mattsum.task-feed.statecharts/eval119791
Compiler.java: 7181 clojure.lang.Compiler/eval
Compiler.java: 7171 clojure.lang.Compiler/eval
Compiler.java: 7136 clojure.lang.Compiler/eval
core.clj: 3202 clojure.core/eval
core.clj: 3198 clojure.core/eval
interruptible_eval.clj: 87 nrepl.middleware.interruptible-eval/evaluate/fn/fn
AFn.java: 152 clojure.lang.AFn/applyToHelper
AFn.java: 144 clojure.lang.AFn/applyTo
core.clj: 667 clojure.core/apply
core.clj: 1977 clojure.core/with-bindings*
core.clj: 1977 clojure.core/with-bindings*
RestFn.java: 425 clojure.lang.RestFn/invoke
interruptible_eval.clj: 87 nrepl.middleware.interruptible-eval/evaluate/fn
main.clj: 437 clojure.main/repl/read-eval-print/fn
main.clj: 437 clojure.main/repl/read-eval-print
main.clj: 458 clojure.main/repl/fn
main.clj: 458 clojure.main/repl
main.clj: 368 clojure.main/repl
RestFn.java: 137 clojure.lang.RestFn/applyTo
core.clj: 667 clojure.core/apply
core.clj: 662 clojure.core/apply
regrow.clj: 20 refactor-nrepl.ns.slam.hound.regrow/wrap-clojure-repl/fn
RestFn.java: 1523 clojure.lang.RestFn/invoke
interruptible_eval.clj: 84 nrepl.middleware.interruptible-eval/evaluate
interruptible_eval.clj: 56 nrepl.middleware.interruptible-eval/evaluate
interruptible_eval.clj: 152 nrepl.middleware.interruptible-eval/interruptible-eval/fn/fn
AFn.java: 22 clojure.lang.AFn/run
session.clj: 202 nrepl.middleware.session/session-exec/main-loop/fn
session.clj: 201 nrepl.middleware.session/session-exec/main-loop
AFn.java: 22 clojure.lang.AFn/run
Thread.java: 832 java.lang.Thread/run
gather
was useful before ap
/?=
was a thing, much less now, especially considering amb=
.
Some pseudocode:
(let [x (m/? (http-x)), y (m/? (http-y)]
(m/? (http-z x y))
So all http-*
calls are tasks and there might be some interdependency between them. Is there a simple way to run this optimally? In this example the results of x
and y
do not depend on each other so they could be run concurrently.
Discussion about operator naming in order to come up with a better scheme while breaking changes are still tolerated. I think the current scheme is needlessly obscure and could be made more consistent.
The reasoning behind current naming is :
?
wait for a result that is non-deterministic and not immediately available!
check for interruptionForking operators share with ?
the properties of non-determinism and lack of immediate availability, that's why they start with ?
as well, and because multiple results are produced an extra character is added. The second character gives information about how the flow is consumed.
?
because conceptually we wait for the entire child flow to be completed before pulling the next value.!
because the child flow should expect to be interrupted if it's too slow=
because it introduces parallelism#27 amb
operators should reuse suffixes ?
and =
for consistency.
I think ?
and !
make sense as single operators, but reusing them as fork suffixes was probably not a good idea, the link is not really obvious and rather confusing.
Just syntactic sugar. Inside an ap
block, amb?
and amb=
evaluate an arbitrary number of branches ambiguously, respectively sequentially or in parallel.
(defmacro amb? [& forms]
`(case (m/?? (m/enumerate (range ~(count forms))))
~@(interleave (range) forms)))
(defmacro amb= [& forms]
`(case (m/?= (m/enumerate (range ~(count forms))))
~@(interleave (range) forms)))
@mjmeintjes via slack :
(m/ap
(let [[g fl] (->> (m/seed [1 2 1 1 3 1])
(m/group-by (comp keyword str))
m/?=)
i (m/?> fl)]
(when (= g :2)
(throw (ex-info "GROUP 2" {})))
(println "GROUP" g " VAL " i)
[g i]))
When the first exception is thrown, the process is cancelled, then
(let [run-task (fn [t]
(t #(.log js/console "success" %)
#(.log js/console "err" %)))
in (atom nil)
in-flow (m/watch in)]
(run-task
(->> (m/ap (let [x (m/?! in-flow)
y (m/?! in-flow)]
(println [x y])))
(m/aggregate conj)))
(reset! in 42))
#object[TypeError TypeError: Cannot read property 'call' of null]
(<NO_SOURCE_FILE>)
missionary$impl$watch_cb (target/node/build/missionary/impl.cljs:738:18)
cljs$core$IWatchable$notify_watches$arity$3 (target/node/build/cljs/core.cljs:4450:8)
cljs$core$reset_BANG (target/node/build/cljs/core.cljs:4491:28)
I'm getting the following error, but I am struggling to reproduce it. Just wondered if it is something obvious that is wrong.
{:via [{:type java.lang.ArrayIndexOutOfBoundsException
:message "Index -1 out of bounds for length 2"
:at [missionary.impl.Latest deref "Latest.java" 88]}]
:trace [[missionary.impl.Latest deref "Latest.java" 88]
[missionary.impl.Sample deref "Sample.java" 102]
[missionary.impl.Transform pull "Transform.java" 74]
[missionary.impl.Transform$2 invoke "Transform.java" 102]
[missionary.impl.Sample$3 invoke "Sample.java" 70]
[missionary.impl.Ambiguous emit "Ambiguous.java" 36]
[missionary.impl.Ambiguous step "Ambiguous.java" 51]
[missionary.impl.Ambiguous$3 invoke "Ambiguous.java" 133]
[missionary.impl.Sleep$Scheduler trigger "Sleep.java" 61]
[missionary.impl.Sleep$Scheduler run "Sleep.java" 75]]
:cause "Index -1 out of bounds for length 2"}
Currently missionary doesn't work on self-hosted clojurescript due to its dependency on core.async
for ioc machinery.
Considered strategies :
core.async
self-hosted-clojurescript compatible, unlikely to happencore.async
core.async
dependencyThis is highly specific to my codebase so feel free to decline any help.
I have a pipeline implementation that weaves a bunch of java.util.Iterator
s together. It's just a bunch of functions composed together, where each one takes the previous Iterator + some options and returns a new Iterator. So something like
(-> (read-from-db nil {:connection "..."}) (transform {:fn some-fn}) (buffer {:size 32}) (write-to-db {:connection "..."}))
is a composition of a bunch of functions that each return a new Iterator. To run it you need to walk (drain) the final Iterator.
This is purely sequential, so there's a lot of stalling happening, i.e. the next read is waiting for the previous write to finish. I'd like to inject a prefetch
function into the pipeline, just before the write-to-db
call so that the next value in the pipeline is eagerly pulled from upstream.
Here's an implementation of that strategy, previously named fork, without missionary:
(defn fork [in opts]
(let [n (:n opts (.availableProcessors (Runtime/getRuntime)))
tq (u/q n), q (u/q n), stop (volatile! false)
_th (u/thread nil "pg-fork-coordinator"
(try
(loop []
(when-not @stop
(u/take tq)
(if (it/next? in)
(do (u/put q [:ok (it/next in)]) (recur))
(vreset! stop true))))
(catch Throwable e (u/put q [:ex e]))))
ini (volatile! nil)]
(reify Iterator
(hasNext [_]
(try (when (nil? @ini) (vreset! ini true) (dotimes [_ n] (u/put tq ::go)))
;; TODO possible race while not stopped and q is empty but tq is not and we're waiting for a value
(boolean (or (not @stop) (u/has? q)))
(catch Throwable e (vreset! stop true) (throw e))))
(next [_] (try (let [[st vl] (u/take q)] (if (= :ok st) (do (u/put tq ::go) vl) (throw vl)))
(catch Throwable e (vreset! stop true) (throw e)))))))
The other functions in that defition are for the alias it
:
(defn next [it] (.next ^Iterator it))
(defn next? [it] (.hasNext ^Iterator it))
and from u
:
(defn q ([] (LinkedTransferQueue.)) ([n] (LinkedBlockingQueue. (int n))))
(defn put [q v] (if (instance? TransferQueue q) (.transfer ^TransferQueue q v) (.put ^BlockingQueue q v)) v)
(defn take [q] (.take ^BlockingQueue q))
(defn has? [q] (not (.isEmpty ^java.util.Collection q)))
(defmacro with {:style/indent 1} [[s v] & body] `(let [rt# ~v ~s rt#] ~@body rt#))
(defmacro thread {:style/indent 2} [^ThreadGroup tg nm & code]
`(with [t# (Thread. (or ~tg (.getThreadGroup (Thread/currentThread))) (fn [] ~@code) ~nm)] (.start t#)))
It basically forks a thread and uses 2 queues, 1 "token queue" that unblocks the loop to prefetch another value and 1 result queue where the computed values get put. There's also 2 volatiles to coordinate the initialization and stopping. As it stands it's pretty tricky code that took some time to get right.
I wonder if a missionary solution would look cleaner? Do you have an idea of how you would go about implementing the function? I haven't written any missionary code yet so I'm having a hard time deciding which primitives would be useful. I guess a similar solution could be built with a mailbox and calculating in blk
?
In Thunk.java
: if the run method terminates concurrently with a cancellation, interruption flag may be reset before the runner thread is interrupted by the canceller thread, resulting with the interruption flag set when the runner thread starts the next task.
Not a problem with executors exposed by missionary because j.u.c.ThreadPoolExecutor
resets the flag between successive tasks anyways, but could be problematic if via
is used with another executor implementation.
(m/?
(m/sp
(let [f (fn [] (m/? (m/sleep 1000 1)))]
(f))))
gives:
1. Unhandled java.lang.NullPointerException
(No message)
impl.cljc: 60 cloroutine.impl$coroutine$fn__66068/invoke
Sequential.java: 49 missionary.impl.Sequential/step
Sequential.java: 32 missionary.impl.Sequential$1/invoke
Sleep.java: 61 missionary.impl.Sleep$Scheduler/trigger
Sleep.java: 75 missionary.impl.Sleep$Scheduler/run
but
(m/?
(m/sp
(let [f (fn [] (future (m/? (m/sleep 1000 1))))]
@(f))))
gives 1
.
I'm not sure if this is expected behaviour or if it is a bug, but I generally see all NullPointerExceptions as bugs, so thought I'd submit it.
Currently when a process is cancelled before termination it throws a new instance of ExceptionInfo
with an entry for key :cancelled
. Two problems :
The only requirement of the cancellation error is to be able to disambiguate it from other errors. A singleton would be a better fit - no runtime costs, fast checking based on identity.
The singleton is immutable and can be exposed :
(def cancelled "The object thrown by processes cancelled before termination." (comment TODO))
Checking could be made easier with simple sugar :
(defmacro on-cancel "
Evaluates body and returns result of last expression.
If evaluation fails due to cancellation, evaluates recovery expression and returns its result.
" [recovery & body]
`(try ~@body
(catch ~(if (:js-globals &env) :default Throwable) e#
(if (identical? e# cancelled) ~recovery (throw e#)))))
(def task
(let [mbx (m/mbx)]
((m/sp
(dotimes [n 4]
(mbx n)
(let [cur (m/? mbx)
]
(println cur)
(m/? (m/sleep 1000)))))
(fn [success] (println 'success success))
(fn [fail] (println 'fail fail)))))
Prints
nil
nil
nil
nil
success nil
https://google.github.io/closure-library/api/goog.async.nextTick.html
Its preferable over setTimeout, because setTimeout induces a 4ms delay.
(def it ((m/sample vector
(m/observe (fn [!] (def continuous !) #()))
(m/observe (fn [!] (def discrete !) #())))
#(prn :ready) #(prn :done)))
(discrete 0) ;; should not be ready, because continuous input is not ready.
If you remove missionary from the example you get the same semantics.
Found by @dustingetz
Repro :
(m/? (m/aggregate conj (m/latest identity (m/enumerate [:a :b]))))
Considering that flows can spawn expensive processes like subscribing to external APIs, it would be useful to be able to multiplex a flow.
(def task (m/aggregate - (m/enumerate [2 1])))
(task prn prn) ;; exception thrown, expected call to failure continuation
gather
is able to merge an arbitrary amount of flows into a single one, but there's currently no way to have gather semantics for a flow of flows.
We could have a gathering operator that would behave like ??
or ?!
except when the flow emits its next value before the previous continuation terminates, a new continuation is run concurrently and the resulting ap
flow emits values as soon as they're ready.
Just like ??
and ?!
respectively match RxJava's concatMap
and switchMap
, this new operator would match flatMap
.
Currently observe
watch
stream!
signal!
don't fail when cancelled before termination. This is inconsistent and sometimes problematic :
observe
and watch
because they don't self-terminate, but they may be able to do so at some point - #6).While playing around with reactor I stumbled across a couple bugs (on CLJS)
;; Setup
(def my-interval
(m/observe (fn [evt]
(println "Starting interval")
(let [ctr (let [ct (atom 0)]
#(swap! ct inc))
id
(js/setInterval (fn [t]
(try
(evt (ctr))
(catch :default e
(println "Can't put!")
(.log js/console e))))
3000)]
(fn []
(println "Cancelling interval")
(js/clearInterval id))))))
(def t
(a/run-task
(m/reactor
(let [i (m/signal! my-interval)]
(m/stream!
(m/ap (println "pr1" (m/?? i))
(println "pr2" (m/?? i))))
))))
(t)
Starting interval
pr1 1
pr2 1
pr2 1
pr2 2
pr2 3
pr2 4
Cancelling interval
pr1 4
Expected: "pr2 1" is printed only once.
(def t
(a/run-task
(m/reactor
(let [i (m/signal! my-interval)
i2 (m/signal! my-interval)]
(m/stream!
(m/ap (println "pr1" (m/?? i))
))
))))
(t)
Printout
Starting interval
Starting interval
pr1 1
pr1 2
Can't put!
pr1 3
Can't put!
pr1 4
Can't put!
Cancelling interval
Cancelling interval
success #object[missionary.impl.Pub]
Expected: signal i2 wouldn't backpressure. N. b. core.asyncs mult doesn't backpressure when there are no takers - but independently I would expect i2 only to backpressure my-interval
if it was a m/stream!
(def t
(a/run-task
(m/reactor
(let [i (m/stream! my-interval)
i2 (m/stream! my-interval)]
(m/stream!
(m/ap (println (m/?? (m/zip vector i i2)))
))
))))
Starting interval
Starting interval
Cancelling interval
Cancelling interval
error #object[Error Error: No such element.]
Opening a new issue to continue the discussion in #23 which was becoming off-topic for that issue. The initial question was
how does
?=
run code concurrently without using a threadpool?
The answer was
Parking happens when the computation requires a value that is not immediately available. It doesn't use a threadpool, the code is run synchronously by the thread responsible for unparking.
This "value that is not immediately available" point is what alludes me. Consider a simple example
% clj -Sdeps '{:deps {missionary {:mvn/version "b.17"}}}'
Clojure 1.10.1
user=> (require '[missionary.core :as m])
nil
user=> (defn fetch [n] (Thread/sleep 100) (* 2 n))
#'user/fetch
user=> (time (m/? (m/aggregate conj (m/ap (let [n (m/?= (m/enumerate (range 10)))] (fetch n))))))
"Elapsed time: 1004.458757 msecs"
[0 2 4 6 8 10 12 14 16 18]
Consider fetch
an IO operation that is a black box, i.e. outside of our reach to reimplement. Can the last example be rewritten to happen concurrently? The only solution I could come up with is
user=> (time (m/? (m/aggregate conj (m/ap (let [n (m/?= (m/enumerate (range 10)))] (m/? (m/via m/blk (fetch n))))))))
"Elapsed time: 102.950868 msecs"
[0 2 4 6 8 10 12 14 16 18]
Was playing around with the mailbox sample. This form doesn't compile
(defn countor
[fail]
(let [self (m/mbx)]
((m/sp
(loop [n 0]
((m/? self) n)
(recur (inc n))))
nil fail)
self))
Unexpected error (NullPointerException) macroexpanding cloroutine.core/cr ...
null
(version b.5)
(def hello-world
(m/sp (println "Hello, world!")))
(hello-world #(println "success" %) #(println "error" %))
This results in a fiber which (according to repl print) contains an exception that error fn is invoked with 0 args. If I provide one with 0 args, it is invoked along with the success continuatino.
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.