Git Product home page Git Product logo

missionary's People

Contributors

dustingetz avatar holyjak avatar leonoel avatar pbaille avatar pez avatar reilysiegel avatar telekid avatar vaibhavwakde52 avatar xificurc avatar yenda avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

missionary's Issues

Uncatchable NullPointerException on exception in reactor publisher

I believe this issue is related to #39 .
Running the following code gives me a NullPointerException, and the error handling around the reactor call is never called. I would expect the message "ERROR IS VISIBLE" to be printed, however I get the stacktrace below.

  (def mbx (ms/mbx))
  (do
    (def cancel
      ((ms/sp
        (try
          (ms/? (ms/reactor
                 (let [state (ms/stream! (ms/ap
                                          (let [r (ms/?> (ms/ap
                                                          (->> (repeat mbx)
                                                               ms/seed
                                                               ms/?>
                                                               ms/?)))]
                                            (if (= r 1)
                                              (throw (ex-info "Invalid" {:a 123}))
                                              r))))]
                   (ms/stream!
                    (ms/ap (println (ms/?> state)))))))
          (println "DONE")
          (catch Exception ex
            (println "ERROR IS VISIBLE")
            (throw ex))))
       prn prn))
    (mbx 2)
    (mbx 1))
Unhandled java.lang.NullPointerException
   (No message)

            Ambiguous.java:   45  missionary.impl.Ambiguous/more
            Ambiguous.java:   59  missionary.impl.Ambiguous$1/invoke
            Ambiguous.java:  115  missionary.impl.Ambiguous$3/invoke
              Mailbox.java:   27  missionary.impl.Mailbox/invoke
                      REPL:  384  mattsum.task-feed.statecharts/eval119791
                      REPL:  384  mattsum.task-feed.statecharts/eval119791
             Compiler.java: 7181  clojure.lang.Compiler/eval
             Compiler.java: 7171  clojure.lang.Compiler/eval
             Compiler.java: 7136  clojure.lang.Compiler/eval
                  core.clj: 3202  clojure.core/eval
                  core.clj: 3198  clojure.core/eval
    interruptible_eval.clj:   87  nrepl.middleware.interruptible-eval/evaluate/fn/fn
                  AFn.java:  152  clojure.lang.AFn/applyToHelper
                  AFn.java:  144  clojure.lang.AFn/applyTo
                  core.clj:  667  clojure.core/apply
                  core.clj: 1977  clojure.core/with-bindings*
                  core.clj: 1977  clojure.core/with-bindings*
               RestFn.java:  425  clojure.lang.RestFn/invoke
    interruptible_eval.clj:   87  nrepl.middleware.interruptible-eval/evaluate/fn
                  main.clj:  437  clojure.main/repl/read-eval-print/fn
                  main.clj:  437  clojure.main/repl/read-eval-print
                  main.clj:  458  clojure.main/repl/fn
                  main.clj:  458  clojure.main/repl
                  main.clj:  368  clojure.main/repl
               RestFn.java:  137  clojure.lang.RestFn/applyTo
                  core.clj:  667  clojure.core/apply
                  core.clj:  662  clojure.core/apply
                regrow.clj:   20  refactor-nrepl.ns.slam.hound.regrow/wrap-clojure-repl/fn
               RestFn.java: 1523  clojure.lang.RestFn/invoke
    interruptible_eval.clj:   84  nrepl.middleware.interruptible-eval/evaluate
    interruptible_eval.clj:   56  nrepl.middleware.interruptible-eval/evaluate
    interruptible_eval.clj:  152  nrepl.middleware.interruptible-eval/interruptible-eval/fn/fn
                  AFn.java:   22  clojure.lang.AFn/run
               session.clj:  202  nrepl.middleware.session/session-exec/main-loop/fn
               session.clj:  201  nrepl.middleware.session/session-exec/main-loop
                  AFn.java:   22  clojure.lang.AFn/run
               Thread.java:  832  java.lang.Thread/run

Can't assign mbx in consumer

(def task
  (let [mbx  (m/mbx)]
    ((m/sp
      (dotimes [n 4]
        (mbx n)
        (let [cur (m/? mbx)
              ]
          (println cur)
          (m/? (m/sleep 1000)))))
     (fn [success] (println 'success success))
     (fn [fail] (println 'fail fail)))))

Prints

nil
nil
nil
nil
success nil

Hello flow documentation

The examples in the "Hello flow" documentation aren't exactly matching what I'm getting in my repl:

(def hello-world
  (m/ap
    (println (m/?? (m/enumerate ["Hello" "World" "!"])))
    (m/? (m/sleep 1000))))

(comment
  (m/? (m/aggregate conj hello-world))
  #_> #object[java.lang.ThreadLocal$SuppliedThreadLocal 0xb8473c1 "java.lang.ThreadLocal$SuppliedThreadLocal@b8473c1"]
  ;; should be [nil nil nil]
)

The other examples where you check the result are the same way. This is in a clojure repl.

operator naming

Discussion about operator naming in order to come up with a better scheme while breaking changes are still tolerated. I think the current scheme is needlessly obscure and could be made more consistent.

The reasoning behind current naming is :

  • ? wait for a result that is non-deterministic and not immediately available
  • ! check for interruption

Forking operators share with ? the properties of non-determinism and lack of immediate availability, that's why they start with ? as well, and because multiple results are produced an extra character is added. The second character gives information about how the flow is consumed.

  • concat suffix is ? because conceptually we wait for the entire child flow to be completed before pulling the next value.
  • switch suffix is ! because the child flow should expect to be interrupted if it's too slow
  • gather suffix is = because it introduces parallelism

#27 amb operators should reuse suffixes ? and = for consistency.

I think ? and ! make sense as single operators, but reusing them as fork suffixes was probably not a good idea, the link is not really obvious and rather confusing.

observe callback must be a no-op after cancellation

example from #4 in cljs

Unlike the CLJ version, this version yields a result. I. e. it prints

res [0 1 2]

But shortly after that, an uncaught type error is thrown in impl.cljs


(defn process-producer [event-fn]
  (let [step
        (fn step [i]
          (if (= 5 i)
            nil
            (js/setTimeout (fn []
                             (event-fn i)
                             (step (inc i)))
                           1000)))]
    (step 0)
    (fn []
      (.log js/console "Process producer cancelled"))))

(defn printer [t]
  (t (fn [res] (println "res" res))
     (fn [err] (println "err" err))))

(def inst
  (printer (->> (m/observe
                 process-producer)
                (m/transform (take 3))
                (m/aggregate conj))))

`timeout` usage is cumbersome

When the target task fails to complete within given delay, the task fails and the error is not easy to check for recovery (in addition of not being documented). The timeout case could result in a success with a user-provided value, akin to the timed out version of deref. The timeout value could be optional and default to nil, like sleep.

(defn timeout
  ([task delay] (timeout task delay nil))
  ([task delay value]
   (-> task
     (attempt)
     (race (sleep delay #(-> value)))
     (absolve))))
(m/? (timeout (m/sleep 20 :a) 25 :b)) ;; :a after 20ms
(m/? (timeout (m/sleep 20 :a) 15 :b)) ;; :b after 15ms
(m/? (timeout (m/sleep 20 :a) 15))    ;; nil after 15ms

Potential bugs w reactor in cljs

While playing around with reactor I stumbled across a couple bugs (on CLJS)

;; Setup
(def my-interval
    (m/observe (fn [evt]
                 (println "Starting interval")
                 (let [ctr (let [ct (atom 0)]
                             #(swap! ct inc))
                       id
                       (js/setInterval (fn [t]
                                         (try
                                           (evt (ctr))
                                           (catch :default e
                                             (println "Can't put!")
                                             (.log js/console e))))
                                       3000)]
                   (fn []
                     (println "Cancelling interval")
                     (js/clearInterval id))))))

1

(def t
    (a/run-task
     (m/reactor
      (let [i (m/signal! my-interval)]
        (m/stream!
         (m/ap (println "pr1" (m/?? i))
               (println "pr2" (m/?? i))))

        ))))

(t)

Starting interval
pr1 1
pr2 1
pr2 1
pr2 2
pr2 3
pr2 4
Cancelling interval
pr1 4

Expected: "pr2 1" is printed only once.

2


(def t
    (a/run-task
     (m/reactor
      (let [i (m/signal! my-interval)
            i2 (m/signal! my-interval)]
        (m/stream!
         (m/ap (println "pr1" (m/?? i))
               ))

        ))))
(t)

Printout

Starting interval
Starting interval
pr1 1
pr1 2
Can't put!
pr1 3
Can't put!
pr1 4
Can't put!
Cancelling interval
Cancelling interval
success #object[missionary.impl.Pub]

Expected: signal i2 wouldn't backpressure. N. b. core.asyncs mult doesn't backpressure when there are no takers - but independently I would expect i2 only to backpressure my-interval if it was a m/stream!

3

  • irrelevant -

4


(def t
    (a/run-task
     (m/reactor
      (let [i (m/stream! my-interval)
            i2 (m/stream! my-interval)]
        (m/stream!
         (m/ap (println (m/?? (m/zip vector i i2)))
               ))

        ))))

Starting interval
Starting interval
Cancelling interval
Cancelling interval
error #object[Error Error: No such element.]

Exception from reactor source flow breaks missionary (b.21)

Running the following code breaks missionary in some way. For example, after I run the code at [1] it throws a null exception, and from then on (m/? (m/sleep 1000 :abc)) freezes indefinitely until I restart the REPL.

(defn test-flow-breaks []
  (m/ap
   (let [r (m/?> (m/seed [(m/sleep 1 :res)]))]
     (m/? r)
     (m/? (m/sp (throw (ex-info "TEST" {})))))))

(defn test-flow-works []
  (m/ap
   (let [r (m/?> (m/seed [(m/sp :res)]))]
     (m/? r)
     (m/? (m/sp (throw (ex-info "TEST" {})))))))

(comment

  ;; [1]
  (m/? (m/reactor
         (let [state (m/stream! (test-flow-breaks))]
           (m/stream! (m/ap (println (m/?> state)))))))

  (m/? (m/sleep 1000 :a)))

Running this throws an unhandled exception:

Exception in thread "missionary scheduler" java.lang.NullPointerException
	at missionary.impl.Reactor.signal(Reactor.java:93)
	at missionary.impl.Reactor.emit(Reactor.java:187)
	at missionary.impl.Reactor.leave(Reactor.java:210)
	at missionary.impl.Reactor$1.invoke(Reactor.java:401)
	at missionary.impl.Ambiguous$3.invoke(Ambiguous.java:115)
	at missionary.impl.Ambiguous$4.invoke(Ambiguous.java:128)
	at missionary.impl.Sequential.step(Sequential.java:52)
	at missionary.impl.Sequential$1.invoke(Sequential.java:32)
	at missionary.impl.Sleep$Scheduler.trigger(Sleep.java:61)
	at missionary.impl.Sleep$Scheduler.run(Sleep.java:75)

This causes further operations like (m/? (m/sleep 1000 :test)) to hang indefinitely.

ditch gather

gather was useful before ap/?= was a thing, much less now, especially considering amb=.

Cannot read property 'call' of null

(let [run-task (fn [t]
                   (t #(.log js/console "success" %)
                      #(.log js/console "err" %)))
        in (atom nil)
        in-flow (m/watch in)]
    (run-task
     (->> (m/ap (let [x (m/?! in-flow)
                      y (m/?! in-flow)]
                  (println [x y])))
          (m/aggregate conj)))
    (reset! in 42))

#object[TypeError TypeError: Cannot read property 'call' of null]
(<NO_SOURCE_FILE>)
missionary$impl$watch_cb (target/node/build/missionary/impl.cljs:738:18)
cljs$core$IWatchable$notify_watches$arity$3 (target/node/build/cljs/core.cljs:4450:8)
cljs$core$reset_BANG
(target/node/build/cljs/core.cljs:4491:28)

NullPointerException when running a task "outside a process block" inside another task

 (m/?
   (m/sp
    (let [f (fn [] (m/? (m/sleep 1000 1)))]
      (f))))

gives:

1. Unhandled java.lang.NullPointerException
   (No message)

                 impl.cljc:   60  cloroutine.impl$coroutine$fn__66068/invoke
           Sequential.java:   49  missionary.impl.Sequential/step
           Sequential.java:   32  missionary.impl.Sequential$1/invoke
                Sleep.java:   61  missionary.impl.Sleep$Scheduler/trigger
                Sleep.java:   75  missionary.impl.Sleep$Scheduler/run

but

  (m/?
   (m/sp
    (let [f (fn [] (future (m/? (m/sleep 1000 1))))]
      @(f))))

gives 1.

I'm not sure if this is expected behaviour or if it is a bug, but I generally see all NullPointerExceptions as bugs, so thought I'd submit it.

code flow deduction

Some pseudocode:

(let [x (m/? (http-x)), y (m/? (http-y)]
  (m/? (http-z x y))

So all http-* calls are tasks and there might be some interdependency between them. Is there a simple way to run this optimally? In this example the results of x and y do not depend on each other so they could be run concurrently.

observe must be guarded against subject/cleanup throwing

In the following experiment,

  • the aggregate is never produced ("res" is not printed)
  • even though the canceller of observe is called after 3 numbers are produced ("cancelled" is printed), apparently (.stop producer) has no effect, and it also prints "Producing 3", 4 5..
  • without the transform expression, calling (inst) canceller at any time results in immediate aggregate printing and cancellation being effective
(defn printer [t]
  (t (fn [res] (println "res" res))
     (fn [err] (println "err" err))))

(def inst
  (printer (->> (m/observe
                 (fn [event-fn]
                   (let [producer
                         (Thread.
                          (fn []
                            (dotimes [n 6]
                              (println "Producing" n)
                              (Thread/sleep 1000)
                              (event-fn n))))]
                     (.start producer)
                     (fn []
                       (println "cancelled")
                       (.stop producer)))))
                (m/transform (take 3))
                (m/aggregate conj))))

amb operators, sequential and parallel

Just syntactic sugar. Inside an ap block, amb? and amb= evaluate an arbitrary number of branches ambiguously, respectively sequentially or in parallel.

(defmacro amb? [& forms]
  `(case (m/?? (m/enumerate (range ~(count forms))))
     ~@(interleave (range) forms)))
(defmacro amb= [& forms]
  `(case (m/?= (m/enumerate (range ~(count forms))))
     ~@(interleave (range) forms)))

Unexpected error macroexpanding

Was playing around with the mailbox sample. This form doesn't compile

(defn countor
  [fail]
  (let [self (m/mbx)]
    ((m/sp
      (loop [n 0]
        ((m/? self) n)
        (recur (inc n))))
     nil fail)
    self))

Unexpected error (NullPointerException) macroexpanding cloroutine.core/cr ...
null
(version b.5)

Reactive programming facilities

Rationale : modeling DAG topologies.

Guidelines :

  • glitch-free. If C depends on A and B, and B depends on A, C must consider event A and B's reaction to event A as simultaneous, and react only once.
  • dynamic topology. A new node can be spawned in reaction to an event in another node. If node B subscribes to node A in reaction to an event in A, the event and the subscription must be considered simultaneous so B must be notified of the event.
  • support for both discrete and continuous nodes (events and behavior, in original FRP). A discrete node consumes its flow eagerly and propagates backpressure from its subscribers. A continuous node consumes its flow lazily, the current value is sampled when the first subscriber asks for it and the value is then cached for next subscribers until invalidated.
  • strict supervision. At any time it must be possible to cancel an entire graph. If a node fails, the graph must be cancelled. A graph is terminated when all of its nodes are terminated.

API design :

  • reactor takes an initializer thunk and returns a task. Running the task evaluates the thunk within a fresh reactor. A node can be spawned only in the dynamic context of a reactor (in the initializer, or in reaction to an event of another node).
  • stream! and signal! take a flow, spawn a node running this flow (resp. discrete and continuous) and return a flow subscribing to this node.

race condition on `via` - interruption flag may not be reset

In Thunk.java : if the run method terminates concurrently with a cancellation, interruption flag may be reset before the runner thread is interrupted by the canceller thread, resulting with the interruption flag set when the runner thread starts the next task.

Not a problem with executors exposed by missionary because j.u.c.ThreadPoolExecutor resets the flag between successive tasks anyways, but could be problematic if via is used with another executor implementation.

continuous switches

Problem : switching flows is currently possible with ap/?! but the behavior is discrete and eager, which is not always what we want.

Solution : provide a new macro cp (continuous process) returning a continuous flow, with ?! available as a forking operator. Like in ap, ?! runs provided flow, forks current computation and cancels previous branch for each successive value, but each branch is run lazily.

Example :

(def numbers-channel (atom 0))
(def strings-network (atom "hello"))
(def remote-control (atom :strings))

(def tv
  (m/cp
    (case (m/?! (m/watch remote-control))
      :numbers (m/?! (m/watch numbers-channel))
      :strings (m/?! (m/watch strings-network))
      :both (m/?! (m/latest vector (m/watch numbers-channel) (m/watch strings-network)))
      :static-noise)))

(def it (tv #(prn :ready) #(prn :done)))                  ;; :ready
@it #_=> "hello"
(reset! remote-control :numbers)                          ;; :ready
@it #_=> 0
(swap! numbers-channel inc)                               ;; :ready
(swap! numbers-channel inc)
@it #_=> 2
(swap! numbers-channel inc)                               ;; :ready
(reset! remote-control :both)
(reset! strings-network "world")
@it #_=> [3 "world"]
(reset! remote-control nil)                               ;; :ready
@it #_=> :static-noise

How can an observer signalize termination?

Skimming the java impl, I noticed that passing the cancellation callback to event-fn is one possiblity (?), but, while it works, it also results in event-fn being emitted from the observer and doesn't seem like the canonical way.
Probably invoking the event-fn with itself would be nice.

possible infinite loop with `group-by` + `ap`

@mjmeintjes via slack :

(m/ap
     (let [[g fl] (->> (m/seed [1 2 1 1 3 1])
                       (m/group-by (comp keyword str))
                       m/?=)
           i (m/?> fl)]
       (when (= g :2)
         (throw (ex-info "GROUP 2" {})))
       (println "GROUP" g "  VAL " i)
       [g i]))

When the first exception is thrown, the process is cancelled, then

  1. the next group is transferred
  2. group consumer is run
  3. group consumer is cancelled immediately, because parent process is in cancelled state
  4. group-by creates a new group for the unconsumed value, goto 1

continuous flows not immediately ready should be considered illegal

Allowing deferred initialization has no obvious benefit and goes against the idea of "defined on every point in time".

This is a requirement for cp #26 , and could simplify implementation of signal!, latest and sample #33 . signal!, latest and sample should fail if input flow is not immediately ready to transfer.

ArrayIndexOutOfBoundsException in Latest.java

I'm getting the following error, but I am struggling to reproduce it. Just wondered if it is something obvious that is wrong.

{:via [{:type java.lang.ArrayIndexOutOfBoundsException
        :message "Index -1 out of bounds for length 2"
        :at [missionary.impl.Latest deref "Latest.java" 88]}]
 :trace [[missionary.impl.Latest deref "Latest.java" 88]
         [missionary.impl.Sample deref "Sample.java" 102]
         [missionary.impl.Transform pull "Transform.java" 74]
         [missionary.impl.Transform$2 invoke "Transform.java" 102]
         [missionary.impl.Sample$3 invoke "Sample.java" 70]
         [missionary.impl.Ambiguous emit "Ambiguous.java" 36]
         [missionary.impl.Ambiguous step "Ambiguous.java" 51]
         [missionary.impl.Ambiguous$3 invoke "Ambiguous.java" 133]
         [missionary.impl.Sleep$Scheduler trigger "Sleep.java" 61]
         [missionary.impl.Sleep$Scheduler run "Sleep.java" 75]]
 :cause "Index -1 out of bounds for length 2"}

behavior of continuous operators in face of consecutive transfers

A consecutive transfer happens when a continuous flow notifies synchronously with the sampling. The current behavior is to propagate the consecutive transfer, but the point of continuous flow is that only the latest value matters therefore it may be more correct to collapse all consecutive transfers to keep only the latest.

A common scenario would be using reductions to turn a discrete flow into a continuous flow. If the discrete flow happens to have a first event available immediately, then a consecutive transfer happens.

(->> (m/reductions + (m/ap 1))
  (m/latest inc)
  (m/reduce conj)
  (m/?))

Currently this returns [1 2], with a collapsing latest it would return [2].

Impacted operators : latest signal! sample

self-hosted clojurescript support

Currently missionary doesn't work on self-hosted clojurescript due to its dependency on core.async for ioc machinery.
Considered strategies :

  • convince core maintainers to make core.async self-hosted-clojurescript compatible, unlikely to happen
  • rely on andare instead of core.async
  • duplicate ioc-macros code, make it self-hosted-clojurescript compatible and ditch core.async dependency

requirements for cancellation errors

Currently when a process is cancelled before termination it throws a new instance of ExceptionInfo with an entry for key :cancelled. Two problems :

  • possible conflicts with user-crafted exceptions.
  • suboptimal performance, because it may happen frequently and each time the exception is reconstructed with the entire stacktrace.

The only requirement of the cancellation error is to be able to disambiguate it from other errors. A singleton would be a better fit - no runtime costs, fast checking based on identity.

The singleton is immutable and can be exposed :

(def cancelled "The object thrown by processes cancelled before termination." (comment TODO))

Checking could be made easier with simple sugar :

(defmacro on-cancel "
Evaluates body and returns result of last expression.
If evaluation fails due to cancellation, evaluates recovery expression and returns its result.
" [recovery & body]
  `(try ~@body
     (catch ~(if (:js-globals &env) :default Throwable) e#
       (if (identical? e# cancelled) ~recovery (throw e#)))))

parallel processing

Goal

Find the right pattern to parallelize processing on some part of a flow pipeline.

Solution 1 : emulate channels

Using rdv and dfv, we can build a channel-like primitive and use ap + ?= to run a single producer concurrently with an arbitrary amount of consumers. The producer feeds values in the channel, close it on termination and emits nothing. Each consumer reads values from the channel, passes them through a user-provided pipeline and emits resulting values.

cf POC

Problem : the pattern may be too complex to be implemented manually in user space.

Solution 2 : ?= + sem

via Panel on slack

(let [sem (m/sem 2)]
  (m/ap
   (let [batch (m/?= (->> (fetch-ids)
                          (m/eduction (partition-all 20))))]
     (m/holding sem (->> (fetch-projects batch)
                         (m/reduce conj [])
                         m/?)))))

Problem : while the semaphore effectively ensures no more than 2 fetch-projects instances run concurrently, the memory footprint is unbounded because ?= pulls input and spawns new branches as soon as possible. If fetch-ids is infinite and able to produce batches faster than the maximal processing thoughput (in this case, 2 divided by the average delay of fetch-projects), then the semaphore queue grows steadily.

Solution 3 : fix ?=

Add an optional argument to ?= to specify an upper bound on the number of concurrent branches. When this number is reached, values stop being pulled from input and resume again when a branch terminates.

(m/ap
  (let [batch (m/?= 2 (->> (fetch-ids)
                        (m/eduction (partition-all 20))))]
    (->> (fetch-projects batch)
      (m/reduce conj [])
      m/?)))

Prior art : ReactiveX - Flowable/flatMap

Future interop

Trying to see if I've been paying attention in class, CompletableFuture to task should look something like:

(import '(java.util.concurrent Executor CompletableFuture)
        '(missionary.impl Thunk))

(defn cf->task
  [^CompletableFuture cf]
  (fn [success failure]
    (.handleAsync
     cf
     (reify java.util.function.BiFunction
       (apply [_ r e]
         (if (instance? Exception e)
           (failure e)
           (success r))))
     Thunk/cpu)
    (fn [] (.cancel cf true))))

(let [cf (CompletableFuture.)
      t (cf->task cf)
      success! (fn [res] (println 'yay! res))
      fail! (fn [e] (println 'Error! (ex-message e)))]
  (t success! fail!)
  (.complete cf 2))


(let [cf (CompletableFuture.)
      t (cf->task cf)
      success! (fn [res] (println 'yay! res))
      fail! (fn [e] (println 'Error! (ex-message e)))]
  (t success! fail!)
  (.completeExceptionally cf (Exception. "failed!")))

Originally posted by @bsless in #41 (comment)

Concurrent forking within Preemptive forking not being cancelled

Hope I can explain this properly:

  (require '[missionary.core :as ms])
  (def cancel
    (do
      (when (bound? (resolve 'cancel))
        (cancel))
      (def mbx (ms/mbx))
      (let [mbx-flow (ms/ap
                      (->> (repeat mbx)
                           ms/seed
                           ms/?>
                           ms/?))
            flow (ms/ap
                  (let [next (ms/?< mbx-flow)]
                    (try
                      (let [task-id (ms/?= (ms/seed next))]
                        (loop []
                          (println "RUNNING TASK" (ms/? (ms/sleep 1000 task-id)))
                          (recur)))
                      (catch Exception ex
                        (ms/?> ms/none)))))
            task (->> (ms/reduce (constantly nil) flow))]
        (task println println))))

  (cancel)
  (mbx [1 2 3])  ;; [1]
  (mbx [4 5 6])  ;; [2]

Running [1] starts printing as expecting:

RUNNING TASK 1
RUNNING TASK 2
RUNNING TASK 3

I would expect that running [2] would stop printing 1,2,3 and instead start printing only 4,5,6. However, currently it continues printing 1,2,3 in addition to 4,5,6.

E.g. running [2] after running [1] causes following output:

RUNNING TASK 5
RUNNING TASK 4
RUNNING TASK 6
RUNNING TASK 3
RUNNING TASK 1
RUNNING TASK 2

Calling (cancel) cancels everything as expected.

If I replace ?= with ?>, then it works as expected (of course running only the first of each mbx value).

Is this expected behavior or a bug?

dealing with blocking iterators

This is highly specific to my codebase so feel free to decline any help.

I have a pipeline implementation that weaves a bunch of java.util.Iterators together. It's just a bunch of functions composed together, where each one takes the previous Iterator + some options and returns a new Iterator. So something like

(-> (read-from-db nil {:connection "..."}) (transform {:fn some-fn}) (buffer {:size 32}) (write-to-db {:connection "..."}))

is a composition of a bunch of functions that each return a new Iterator. To run it you need to walk (drain) the final Iterator.

This is purely sequential, so there's a lot of stalling happening, i.e. the next read is waiting for the previous write to finish. I'd like to inject a prefetch function into the pipeline, just before the write-to-db call so that the next value in the pipeline is eagerly pulled from upstream.

Here's an implementation of that strategy, previously named fork, without missionary:

(defn fork [in opts]
  (let [n (:n opts (.availableProcessors (Runtime/getRuntime)))
        tq (u/q n), q (u/q n), stop (volatile! false)
        _th (u/thread nil "pg-fork-coordinator"
              (try
                (loop []
                  (when-not @stop
                    (u/take tq)
                    (if (it/next? in)
                      (do (u/put q [:ok (it/next in)]) (recur))
                      (vreset! stop true))))
                (catch Throwable e (u/put q [:ex e]))))
        ini (volatile! nil)]
    (reify Iterator
      (hasNext [_]
        (try (when (nil? @ini) (vreset! ini true) (dotimes [_ n] (u/put tq ::go)))
             ;; TODO possible race while not stopped and q is empty but tq is not and we're waiting for a value
             (boolean (or (not @stop) (u/has? q)))
             (catch Throwable e (vreset! stop true) (throw e))))
      (next [_] (try (let [[st vl] (u/take q)] (if (= :ok st) (do (u/put tq ::go) vl) (throw vl)))
                     (catch Throwable e (vreset! stop true) (throw e)))))))

The other functions in that defition are for the alias it:

(defn next [it] (.next ^Iterator it))
(defn next? [it] (.hasNext ^Iterator it))

and from u:

(defn q ([] (LinkedTransferQueue.)) ([n] (LinkedBlockingQueue. (int n))))
(defn put   [q v] (if (instance? TransferQueue q) (.transfer ^TransferQueue q v) (.put ^BlockingQueue q v)) v)
(defn take  [q]   (.take ^BlockingQueue q))
(defn has? [q] (not (.isEmpty ^java.util.Collection q)))
(defmacro with   {:style/indent 1} [[s v] & body] `(let [rt# ~v ~s rt#] ~@body rt#))
(defmacro thread {:style/indent 2} [^ThreadGroup tg nm & code]
  `(with [t# (Thread. (or ~tg (.getThreadGroup (Thread/currentThread))) (fn [] ~@code) ~nm)] (.start t#)))

It basically forks a thread and uses 2 queues, 1 "token queue" that unblocks the loop to prefetch another value and 1 result queue where the computed values get put. There's also 2 volatiles to coordinate the initialization and stopping. As it stands it's pretty tricky code that took some time to get right.

I wonder if a missionary solution would look cleaner? Do you have an idea of how you would go about implementing the function? I haven't written any missionary code yet so I'm having a hard time deciding which primitives would be useful. I guess a similar solution could be built with a mailbox and calculating in blk?

Subscription failure : not in publisher context.

The following code

  (m/?
   (m/reactor
    (let [pub (m/stream! (m/seed [1 2]))]
      (m/stream!
       (m/ap
        (m/? (m/sleep 0))
        (println  (m/?> pub)))))))

throws

Execution error at missionary.impl.Reactor/<clinit> (Reactor.java:12).
Subscription failure : not in publisher context.

I was just wondering whether this is a bug, or expected behavior.

Cryptic operators

Following #29, concerns about sigils harming the learning curve. It could be nice to have both short symbols (for experts) and human-friendly aliases (for beginners). Let's find plain english equivalents for ? ! ?> ?< ?=

Add a gathering operator for `ap`

gather is able to merge an arbitrary amount of flows into a single one, but there's currently no way to have gather semantics for a flow of flows.

We could have a gathering operator that would behave like ?? or ?! except when the flow emits its next value before the previous continuation terminates, a new continuation is run concurrently and the resulting ap flow emits values as soon as they're ready.

Just like ?? and ?! respectively match RxJava's concatMap and switchMap, this new operator would match flatMap.

update `ap` docstring

Returns a flow evaluating body (in an implicit do) and producing values of each subsequent fork. Body evaluation can be parked with ? and forked with ?? and ?!.

AFAICT at least ?= should be noted too, shouldn't it?

reactor publishers should be disabled on failure, not cancellation

Some flows terminate successfully on cancellation. A publisher backed by such a flow should be subscribable anytime as long as it's not failed.
Proposed changes :

  • A publisher's arity 0 cancels its underlying process, but it can still be subscribed.
  • When a publisher fails, active subscriptions and subsequent ones are cancelled.

Multiplexing flows

Considering that flows can spawn expensive processes like subscribing to external APIs, it would be useful to be able to multiplex a flow.

execution model

Opening a new issue to continue the discussion in #23 which was becoming off-topic for that issue. The initial question was

how does ?= run code concurrently without using a threadpool?

The answer was

Parking happens when the computation requires a value that is not immediately available. It doesn't use a threadpool, the code is run synchronously by the thread responsible for unparking.

This "value that is not immediately available" point is what alludes me. Consider a simple example

% clj -Sdeps '{:deps {missionary {:mvn/version "b.17"}}}'
Clojure 1.10.1
user=> (require '[missionary.core :as m])
nil
user=> (defn fetch [n] (Thread/sleep 100) (* 2 n))
#'user/fetch
user=> (time (m/? (m/aggregate conj (m/ap (let [n (m/?= (m/enumerate (range 10)))] (fetch n))))))
"Elapsed time: 1004.458757 msecs"
[0 2 4 6 8 10 12 14 16 18]

Consider fetch an IO operation that is a black box, i.e. outside of our reach to reimplement. Can the last example be rewritten to happen concurrently? The only solution I could come up with is

user=> (time (m/? (m/aggregate conj (m/ap (let [n (m/?= (m/enumerate (range 10)))] (m/? (m/via m/blk (fetch n))))))))
"Elapsed time: 102.950868 msecs"
[0 2 4 6 8 10 12 14 16 18]

Question: Best way to limit concurrency of `?=`?

Hi, I'm still quite new to clojure - so be nice lol.

What is the best way to limit concurrency with the ?= forking function? I'm currently using core.async channels to do this, but it feels hackish.

(defn mergemap
  [func concurrency flow]
  (let [c (async/chan concurrency)]
    (m/ap
     (let [value (m/?= flow)]
       (async/>!! c true)
       (let [r (m/? (func value))]
         (async/<!! c)
         r)))))

;; Creates a flow that multiplies the numbers by 10 after 500ms.
;; Max concurrency of 5
(mergemap #(m/sleep 500 (* % 10)) 5 (m/enumerate (range 10000)))

Any suggestions?

Operations (tracing, metrics and logging)

On a large project a very big concern is tracing, metrics and logging. I assume in a framework like this stack traces only go to the last fiber which it not very helpful.

A couple of projects with interesting takes on this are https://github.com/tokio-rs/tracing (the rust async community in general)
and probably closer for us to emulate ZIO2 logging and tracing https://www.youtube.com/watch?v=vYKea3hGw28

Both communities have concluded that it is important to make it first class in the application framework.

I have only been following a few months so it is possible this is already solved and just not clearly documented.

consistent failures on flow cancellation

Currently observe watch stream! signal! don't fail when cancelled before termination. This is inconsistent and sometimes problematic :

  • if a transfer is pending and for some reason the value must not be propagated to the rest of the pipeline, there's no way to prevent that. If instead the flow fails on cancellation, then it can cancel pending transfers and short-circuit the pipeline.
  • no way to disambiguate between clean termination and abrupt termination (arguably less of a problem for observe and watch because they don't self-terminate, but they may be able to do so at some point - #6).

Implement custom printer methods

(def hello-world
  (m/sp (println "Hello, world!")))
(hello-world #(println "success" %) #(println "error" %))

This results in a fiber which (according to repl print) contains an exception that error fn is invoked with 0 args. If I provide one with 0 args, it is invoked along with the success continuatino.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.