Git Product home page Git Product logo

Comments (10)

leonoel avatar leonoel commented on June 6, 2024 1

If you need to interop with a blocking java.util.Iterator you can define a flow consuming its values, like so :

(defn iterator-consumer [^java.util.Iterator iterator]
  (m/ap
    (while (m/?? (m/enumerate (when (m/? (m/via m/blk (.hasNext iterator))) [false true]))))
    (.next iterator)))

Then you can use missionary's operators to build your pipeline, each stage defines its own logical process so prefetching becomes a non-issue.

Turning a flow into a blocking iterator is a bit trickier, here's a possible implementation :

(deftype FlowIterator [^:unsynchronized-mutable iterator
                       ^:unsynchronized-mutable pending?]
  clojure.lang.IFn
  (invoke [this]
    (set! iterator
          (iterator (partial this false)
                    (partial this true)))
    this)
  (invoke [this done?]
    (locking this
      (set! pending? false)
      (when done? (set! iterator nil))
      (.notify this)))
  java.util.Iterator
  (hasNext [this]
    (locking this
      (while pending?
        (try (.wait this)
             (catch InterruptedException _
               (iterator))))
      (some? iterator)))
  (next [this]
    (locking this
      (set! pending? true)
      @iterator)))

(defn flow->iterable [flow]
  (reify Iterable
    (iterator [_]
      ((->FlowIterator flow true)))))

from missionary.

leonoel avatar leonoel commented on June 6, 2024 1

It took me a day to understand that while loop :) This forking with ?? takes a while to internalize.

TBH I'm not sure it's the best way to write that. It could be a case where an amb operator would help, e.g :

(defmacro amb [& forms]
  `(case (m/?? (m/enumerate (range ~(count forms))))
     ~@(interleave (range) forms)))

(defn iterator-consumer [^java.util.Iterator iterator]
  (m/ap
    (loop []
      (if (m/? (m/via m/blk (.hasNext iterator)))
        (amb (.next iterator) (recur))
        (amb)))))

I have no clue what's happening in the second snippet since I don't understand what (iterator (partial this false) (partial this true)) and @iterator do. If I'm reading this correctly iterator is bound to a flow, so I would need to look up their IFn and IDeref implementations respectively.

That's fine, this code relies on the low-level details of the flow protocol, you don't have to understand how it works unless you're writing a library. I'm still considering adding a flow->iterable operator to missionary.core, so if you find it useful feel free to elaborate.

What do you mean by that? Right now with the fork function I posted above I have complete control over how many pages reside in memory, each fork adds max 1 page. Since some of my jobs are handling large amounts of data in bulk I need to really understand the upper bound of each pipeline memory-wise. Since missionary doesn't give out anything willy-nilly to some thread pool I understand I can retain that same level of control, but not sure how would I go about implementing it. I wanted to try and define the function with missionary, which as you noted means reading from a possibly blocking java Iterator and returning one in the end.

With blocking iterators, each thread is associated to a pipeline stage, you can add more stages with fork, this will increase parallelism and also memory footprint because stage synchronization requires to buffer at least 1 item, as you said. With missionary, each flow transformation creates a new pipeline stage, stages are decoupled from threads but the parallelism/buffering tradeoff is pretty much the same.

from missionary.

xificurC avatar xificurC commented on June 6, 2024 1

After understanding your example I would have probably come up with

(m/ap (loop []
        (when (m/? (m/via m/blk (.hasNext it)))
          (if (m/?? (m/enumerate [true false]))
            (.next it)
            (recur)))))

With that written I see a direct resemblance with unix forking and would have come up with:

(defmacro fork [] `(m/?? (m/enumerate [true false])))
(m/ap (loop []
        (when (m/? (m/via m/blk (.hasNext it)))
          (if (m/fork) (.next it) (recur)))))

Since lisp has macros the if can be nicely hidden, leading to:

(defmacro fork [a b] `(if (m/?? (m/enumerate [true false])) ~a ~b))
(m/ap (loop []
        (when (m/? (m/via m/blk (.hasNext it)))
          (m/fork (.next it) (recur)))))

At this point I can see amb as a generalization of the unix fork.

from missionary.

xificurC avatar xificurC commented on June 6, 2024

I had to fix some edge cases, the current version:

(defn fork [in opts]
  (let [n (opts :n 1), f (opts :fn identity), tq (u/q n), q (u/q n), cl! #(run! future-cancel q)
        _th (u/thread nil "pg-fork-coordinator"
              (try (loop []
                     (u/take tq)
                     (if (it/next? in)
                       (let [v (it/next in)] (u/put q [:ok (future (f v))]) (recur))
                       (u/put q [:no])))
                   (catch Throwable e (cl!) (u/put q [:ex e]))))
        ini (volatile! nil), cur (volatile! :none)]
    (reify Iterator
      (hasNext [_]
        ;; (print (if (future? @cur) @@cur "nil")) (print " ") (pr (vec tq)) (print " ") (prn (mapv first q))
        (when (nil? @ini) (vreset! ini true) (dotimes [_ n] (u/put tq :go)))
        (if (nil? @cur)
          false
          (let [[st vl] (u/take q)]
            (case st
              :ok (do (u/put tq :go) (vreset! cur vl) true)
              :no (do (vreset! cur nil) false)
              :ex (throw vl)))))
      (next [_] (let [v @cur] (if (nil? v) (it/bad!) (try @v (catch Throwable e (cl!) (throw e)))))))))

I need to spawn a separate thread to do the processing (that's the whole point), which brings me to sending messages between the threads, erlang-style. I don't see how to get rid of that though. If I could swap the futures out for tasks that would be dependent on one another I could get rid of the cl! function, don't know how to model that though. Apart from that little piece I have no clue how to rewrite the rest into missionary.

from missionary.

xificurC avatar xificurC commented on June 6, 2024

It took me a day to understand that while loop :) This forking with ?? takes a while to internalize. I have no clue what's happening in the second snippet since I don't understand what (iterator (partial this false) (partial this true)) and @iterator do. If I'm reading this correctly iterator is bound to a flow, so I would need to look up their IFn and IDeref implementations respectively.

each stage defines its own logical process so prefetching becomes a non-issue.

What do you mean by that? Right now with the fork function I posted above I have complete control over how many pages reside in memory, each fork adds max 1 page. Since some of my jobs are handling large amounts of data in bulk I need to really understand the upper bound of each pipeline memory-wise. Since missionary doesn't give out anything willy-nilly to some thread pool I understand I can retain that same level of control, but not sure how would I go about implementing it. I wanted to try and define the function with missionary, which as you noted means reading from a possibly blocking java Iterator and returning one in the end.

Thank you for your reply! I am closing this since this isn't an issue per se.

from missionary.

leonoel avatar leonoel commented on June 6, 2024

At this point I can see amb as a generalization of the unix fork.

Right, and the other way round. BTW to fully emulate unix fork, ?? should be replaced by ?= to allow both branches to run concurrently.

from missionary.

xificurC avatar xificurC commented on June 6, 2024

That's fine, this code relies on the low-level details of the flow protocol, you don't have to understand how it works unless you're writing a library. I'm still considering adding a flow->iterable operator to missionary.core, so if you find it useful feel free to elaborate.

I like to understand the mechanics of a library to get a feeling for the abstractions, the tradeoffs, the performance etc. For now missionary remains a black box. I managed to understand how cloroutine works, so I guess that's 1 step in the right direction :)

BTW to fully emulate unix fork, ?? should be replaced by ?= to allow both branches to run concurrently.

This also begs the question, how does ?= run code concurrently without using a threadpool? Is it parking the forked tasks at safepoints? What are those safepoints?

from missionary.

leonoel avatar leonoel commented on June 6, 2024

I like to understand the mechanics of a library to get a feeling for the abstractions, the tradeoffs, the performance etc.

I suggest you take some time to understand task and flow. They're basically callback-based protocols wrapped in constructor functions, so you can easily see what happens at the REPL.

Is it parking the forked tasks at safepoints?

Parking happens when the computation requires a value that is not immediately available. It doesn't use a threadpool, the code is run synchronously by the thread responsible for unparking.

I'll keep this issue open until the blocking iterator case is settled, please open a new one if you want to discuss the execution model further.

from missionary.

leonoel avatar leonoel commented on June 6, 2024

The object returned by this function could reasonably implement java.lang.Iterable, clojure.lang.IReduceInit and clojure.lang.Sequential, and a possible name could be educe to match clojure.core/eduction's semantics (lazy but not memoized).

The clojurescript version should be hardly useful because we can't block, but we can still provide a degraded version of the same function that would throw an exception when a result is not immediately available.

from missionary.

dustingetz avatar dustingetz commented on June 6, 2024
(ns dustingetz.scratch
  (:require [missionary.core :as m]
            [hyperfiddle.rcf :refer [tests]]))

(defn iterator-consumer "blocking iterable pattern"
  [^java.util.Iterator it]
  ; why not one thread tied to the iterator extent?
  ; (future (while (.hasNext it) (! (.next it))))
  (m/ap
    (loop []
      (if (m/? (m/via m/blk (.hasNext it)))
        (m/amb (m/? (m/via m/blk (.next it))) (recur))
        (m/amb)))))

(defn seq-consumer [xs]
  (m/ap
    (loop [xs xs]
      (if (m/? (m/via m/blk (seq xs)))
        (m/amb (m/? (m/via m/blk (first xs))) (recur (rest xs)))
        (m/amb)))))

(tests
  (def !it (.iterator (.keySet (java.lang.System/getProperties))))
  (->> (iterator-consumer !it)
       (m/eduction (take 3))
       (m/reduce conj []) m/?)
  := ["java.specification.version" "sun.jnu.encoding" "java.class.path"]

  ; careful, Java iterator is stateful

  (def xs (iterator-seq (.iterator (.keySet (java.lang.System/getProperties)))))
  (take 3 xs) := ["java.specification.version" "sun.jnu.encoding" "java.class.path"]

  (->> (seq-consumer xs)
       (m/eduction (take 3))
       (m/reduce conj []) m/?)
  := ["java.specification.version" "sun.jnu.encoding" "java.class.path"])

from missionary.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.