Comments (8)
This pattern can be split in two separate parts, the fan-out and the fan-in. ap
already covers the fan-in, therefore it should be enough to implement fan-out. The missing part is actually very close to clojure's group-by
, if we treat the map as a sequence of entries then there is a natural translation to flows. Here is a possible implementation :
(defn group-by "Same as clojure's group-by, but takes a flow instead of a sequence as input,
and returns a flow of key-flow pairs instead of a map from keys to vectors."
[kf >xs]
(m/ap (let [x (m/?> (m/eduction
(fn [rf]
(let [topics (object-array 1)]
(fn
([] (rf))
([r]
(transduce (map #(% %)) rf r (vals (aget topics 0))))
([r x]
(let [k->t (aget topics 0)
k (kf x)]
(if-some [t (get k->t k)]
(rf r (t x))
(let [t (m/rdv)]
(aset topics 0 (assoc k->t k t))
(rf r [k (m/eduction (take-while (complement #{t}))
(m/ap (m/? (m/?> (m/seed (cons (m/sp x) (repeat t)))))))])))))))) >xs))]
(if (vector? x) x (do (m/? x) (m/amb>))))))
(comment
(def words ["Air" "Bud" "Cup" "Awake" "Break" "Chunk" "Ant" "Big" "Check"])
(m/?
(m/reduce conj
(m/ap
(let [[k >x] (m/?= (group-by (juxt first count) (m/seed words)))]
(println "processing group" k)
(let [word (m/?> >x)]
(m/? (m/via m/cpu (str/upper-case word))))))))
;; processing group [A 3]
;; processing group [B 3]
;; processing group [C 3]
;; processing group [A 5]
;; processing group [B 5]
;; processing group [C 5]
#_=> ["AIR" "BUD" "CUP" "AWAKE" "BREAK" "ANT" "BIG" "CHUNK" "CHECK"])
Regarding API design, I think this is an interesting simplification compared to ZIO :
- we don't need the flow transformer, instead we can destructure the pair in an
ap
block - we don't need the task-returning version either, instead we can transform the input flow such that group-by has enough
enough information to derive the key synchronously.
from missionary.
As an aside, group-by
is not needed if topics are known ahead of time.
(m/ap
(let [i (m/?= (m/seed (range 3)))]
(println "handle" i "changed to"
(m/?> (m/eduction (map #(nth % i)) (dedupe) (m/watch !slider))))
(m/amb>)))
from missionary.
This would be useful to have. Something like this:
;; Basic grouping
(->> (m/seed [{:key 1 :val 11} {:key 2 :val 22} {:key 1 :val 33} {:key 3 :val 44}])
(m/group-by-key
:key
(fn [key flow]
(m/ap
(println [key (m/?> flow)])))))
;; Allow grouping function to return a task
(->> (m/seed [{:key 1 :val 11} {:key 2 :val 22} {:key 1 :val 33} {:key 3 :val 44}])
(m/group-by
(fn [{:keys [key]}]
(m/sp key))
(fn [key flow]
(m/ap
(println [key (m/?> flow)])))))
A few use cases:
- I have a flow of datomic transactions, and would like to have a separate flow for each entity. So I could group-by entity-id, and then use
?<
to cancel processing if an entity is changed while doing some long running calculation. - I have a UI slider component with 2 handles, and at each change it returns
[min max]
. Only one can change at one time, and I would like to update another component with the latest change - either min or max, whichever changed. If I could group by nth,
then I would have 2 flows and each flow could independently update the 3rd component.
Current
(ms/ap
(let [[[prev-min prev-max :as prev]
[now-min now-max :as now]]
(->> (observe-event-flow slider "update" (fn [_ _ [min max]] [min max]))
(msu/partition-flow 2 1)
ms/?<)
changed (cond
(and (= now-min prev-min)
(= now-max prev-max)) nil
(= now-min prev-min) now-max
(= now-max prev-max) now-min)]
(when changed
(.currentTime player changed))))
With my imaginary group-by:
(ms/ap
(->> (observe-event-flow s "update" (fn [_ _ [min max]] [min max]))
(m/group-by
nth
(fn [k flow]
(ms/ap
(.currentTime player (ms/?> flow)))))))
from missionary.
RX - groupBy has well defined semantics.
I like : inner branch cancellation can be used to prevent the group-by
internal state to grow indefinitely.
I don't like :
- Subtyping the flow to allow access to the key. Returning a pair is more idiomatic in clojure.
- Optional
valueSelector
complects the API for little added value. Better to do this operation in the fan-in stage. - Failure in input flow is propagated to all inner branches. IMO it is enough to propagate the exception to the fan-in stage and leave it the responsibility for cancelling pending branches. This also obviates the need for the
delayError
flag.
Needs more thinking : only one run allowed for a given inner flow.
from missionary.
@mjmeintjes could you elaborate on the UI slider component use case ? using nth
as the key function of group-by
seems wrong to me.
from missionary.
@mjmeintjes could you elaborate on the UI slider component use case ? using
nth
as the key function ofgroup-by
seems wrong to me.
The slider can have 1 or more handles, and it fires an update event each time any of the handles change, but just passes along all the current values with no indication of which one changed (eg in the case that there are 3 handles, I would get something like [3 8 10] to indicate the current value of each of the handles). So to figure out which handle actually changed I have to compare the current list of values with the previous list of values (eg [3 8 10], then [3 8 13], indicates that the third handle changes from 10 to 13).
The alternative, using group-by by nth, means I can have a flow for each handle's changes.
from missionary.
Thanks, I understand better now. I think it's missing an intermediate stage to decompose the input vector into a succession of focused updates. Here is how I think group-by
could be used to solve this problem.
(def !slider (atom [3 8 10]))
(def cancel
((m/ap
(let [[i >x] (->> (m/watch !slider)
(m/eduction (mapcat (partial map-indexed vector)))
(m/group-by first)
(m/?=))]
(println "handle" i "changed to"
(m/?> (m/eduction (map second) (dedupe) >x)))
(m/amb>))) #() #()))
;; handle 0 changed to 3
;; handle 1 changed to 8
;; handle 2 changed to 10
(swap! !slider assoc 2 13)
;; handle 2 changed to 13
(swap! !slider assoc 0 5)
;; handle 0 changed to 5
from missionary.
Implemented in b.23
from missionary.
Related Issues (20)
- reactor publishers should be disabled on failure, not cancellation HOT 1
- race condition on `via` - interruption flag may not be reset HOT 1
- `timeout` usage is cumbersome HOT 1
- possible infinite loop with `group-by` + `ap` HOT 2
- parallel processing HOT 3
- destructuring in let bindings breaks `sp` HOT 2
- multithreaded reactors ?
- Define behavior for task/flow protocol violations HOT 2
- Custom printers for tasks and flows
- Odd interaction with xtdb HOT 2
- m/sleep hangs indefinitely, scheduler thread seems blocked HOT 3
- immediate switch HOT 3
- continuous operators should skip work on duplicates HOT 1
- Beyond structured concurrency HOT 5
- Cloroutine exception HOT 9
- Performance warning - case has int tests, but tested expression is not primitive.
- Compiler Exception on Clojure 1.11.1 - No matching clause: :static-field HOT 2
- zip doesn't terminate properly HOT 3
- group-by consumers must terminate immediately on input crash HOT 6
- Large dependency: missionary dependency grows uber jar by 26 MiB
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from missionary.