Git Product home page Git Product logo

Comments (8)

leonoel avatar leonoel commented on May 26, 2024 1

This pattern can be split in two separate parts, the fan-out and the fan-in. ap already covers the fan-in, therefore it should be enough to implement fan-out. The missing part is actually very close to clojure's group-by, if we treat the map as a sequence of entries then there is a natural translation to flows. Here is a possible implementation :

(defn group-by "Same as clojure's group-by, but takes a flow instead of a sequence as input,
and returns a flow of key-flow pairs instead of a map from keys to vectors."
  [kf >xs]
  (m/ap (let [x (m/?> (m/eduction
                        (fn [rf]
                          (let [topics (object-array 1)]
                            (fn
                              ([] (rf))
                              ([r]
                               (transduce (map #(% %)) rf r (vals (aget topics 0))))
                              ([r x]
                               (let [k->t (aget topics 0)
                                     k (kf x)]
                                 (if-some [t (get k->t k)]
                                   (rf r (t x))
                                   (let [t (m/rdv)]
                                     (aset topics 0 (assoc k->t k t))
                                     (rf r [k (m/eduction (take-while (complement #{t}))
                                                (m/ap (m/? (m/?> (m/seed (cons (m/sp x) (repeat t)))))))])))))))) >xs))]
          (if (vector? x) x (do (m/? x) (m/amb>))))))

(comment
  (def words ["Air" "Bud" "Cup" "Awake" "Break" "Chunk" "Ant" "Big" "Check"])
  (m/?
    (m/reduce conj
      (m/ap
        (let [[k >x] (m/?= (group-by (juxt first count) (m/seed words)))]
          (println "processing group" k)
          (let [word (m/?> >x)]
            (m/? (m/via m/cpu (str/upper-case word))))))))
  ;; processing group [A 3]
  ;; processing group [B 3]
  ;; processing group [C 3]
  ;; processing group [A 5]
  ;; processing group [B 5]
  ;; processing group [C 5]
  #_=> ["AIR" "BUD" "CUP" "AWAKE" "BREAK" "ANT" "BIG" "CHUNK" "CHECK"])

Regarding API design, I think this is an interesting simplification compared to ZIO :

  • we don't need the flow transformer, instead we can destructure the pair in an ap block
  • we don't need the task-returning version either, instead we can transform the input flow such that group-by has enough
    enough information to derive the key synchronously.

from missionary.

leonoel avatar leonoel commented on May 26, 2024 1

As an aside, group-by is not needed if topics are known ahead of time.

(m/ap
  (let [i (m/?= (m/seed (range 3)))]
    (println "handle" i "changed to"
      (m/?> (m/eduction (map #(nth % i)) (dedupe) (m/watch !slider))))
    (m/amb>)))

from missionary.

mjmeintjes avatar mjmeintjes commented on May 26, 2024

This would be useful to have. Something like this:

;; Basic grouping
  (->> (m/seed [{:key 1 :val 11} {:key 2 :val 22} {:key 1 :val 33} {:key 3 :val 44}])
       (m/group-by-key
        :key
        (fn [key flow]
          (m/ap
           (println [key (m/?> flow)])))))

;; Allow grouping function to return a task
  (->> (m/seed [{:key 1 :val 11} {:key 2 :val 22} {:key 1 :val 33} {:key 3 :val 44}])
       (m/group-by
        (fn [{:keys [key]}]
          (m/sp key))
        (fn [key flow]
          (m/ap
           (println [key (m/?> flow)])))))       

A few use cases:

  • I have a flow of datomic transactions, and would like to have a separate flow for each entity. So I could group-by entity-id, and then use ?< to cancel processing if an entity is changed while doing some long running calculation.
  • I have a UI slider component with 2 handles, and at each change it returns [min max]. Only one can change at one time, and I would like to update another component with the latest change - either min or max, whichever changed. If I could group by nth,
    then I would have 2 flows and each flow could independently update the 3rd component.
    Current
  (ms/ap
   (let [[[prev-min prev-max :as prev]
          [now-min now-max :as now]]
         (->> (observe-event-flow slider "update" (fn [_ _ [min max]] [min max]))
              (msu/partition-flow 2 1)
              ms/?<)
         changed (cond
                   (and (= now-min prev-min)
                        (= now-max prev-max)) nil
                   (= now-min prev-min) now-max
                   (= now-max prev-max) now-min)]
     (when changed
       (.currentTime player changed))))

With my imaginary group-by:

  (ms/ap
   (->> (observe-event-flow s "update" (fn [_ _ [min max]] [min max]))
        (m/group-by
         nth
         (fn [k flow]
           (ms/ap
            (.currentTime player (ms/?> flow)))))))

from missionary.

leonoel avatar leonoel commented on May 26, 2024

RX - groupBy has well defined semantics.

I like : inner branch cancellation can be used to prevent the group-by internal state to grow indefinitely.

I don't like :

  • Subtyping the flow to allow access to the key. Returning a pair is more idiomatic in clojure.
  • Optional valueSelector complects the API for little added value. Better to do this operation in the fan-in stage.
  • Failure in input flow is propagated to all inner branches. IMO it is enough to propagate the exception to the fan-in stage and leave it the responsibility for cancelling pending branches. This also obviates the need for the delayError flag.

Needs more thinking : only one run allowed for a given inner flow.

from missionary.

leonoel avatar leonoel commented on May 26, 2024

@mjmeintjes could you elaborate on the UI slider component use case ? using nth as the key function of group-by seems wrong to me.

from missionary.

mjmeintjes avatar mjmeintjes commented on May 26, 2024

@mjmeintjes could you elaborate on the UI slider component use case ? using nth as the key function of group-by seems wrong to me.

The slider can have 1 or more handles, and it fires an update event each time any of the handles change, but just passes along all the current values with no indication of which one changed (eg in the case that there are 3 handles, I would get something like [3 8 10] to indicate the current value of each of the handles). So to figure out which handle actually changed I have to compare the current list of values with the previous list of values (eg [3 8 10], then [3 8 13], indicates that the third handle changes from 10 to 13).

The alternative, using group-by by nth, means I can have a flow for each handle's changes.

from missionary.

leonoel avatar leonoel commented on May 26, 2024

Thanks, I understand better now. I think it's missing an intermediate stage to decompose the input vector into a succession of focused updates. Here is how I think group-by could be used to solve this problem.

  (def !slider (atom [3 8 10]))
  (def cancel
    ((m/ap
       (let [[i >x] (->> (m/watch !slider)
                      (m/eduction (mapcat (partial map-indexed vector)))
                      (m/group-by first)
                      (m/?=))]
         (println "handle" i "changed to"
           (m/?> (m/eduction (map second) (dedupe) >x)))
         (m/amb>))) #() #()))
  ;; handle 0 changed to 3
  ;; handle 1 changed to 8
  ;; handle 2 changed to 10
  (swap! !slider assoc 2 13)
  ;; handle 2 changed to 13
  (swap! !slider assoc 0 5)
  ;; handle 0 changed to 5

from missionary.

leonoel avatar leonoel commented on May 26, 2024

Implemented in b.23

from missionary.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.