clj-commons / dirigiste Goto Github PK
View Code? Open in Web Editor NEWcentrally-planned object and thread pools
centrally-planned object and thread pools
It would be really nice to be able to tune the number of threads available at runtime.
Hi Zach,
I was wondering if you'd be opposed to the idea of extracting an interface from the single concrete Pool
implementation? I've been considering wrapping the current Pool
with some extra logic in acquire
.
FWIW I'm considering putting a rate limit in acquire
(per key) that I could plug into aleph. While I could theoretically try to limit requests/second indirectly by controlling connection allocation via a custom Pool.Controller
, (1) I'd prefer to have more immediate control at the point of acquisition (vs delayed adjustments), and (2) it might even be possible that in certain circumstances with short enough requests that I could exceed a rate limit using only a single connection, rendering the controller approach ineffective.
Would my wrapping and delaying calls to the underlying acquire
defeat the semantics of dirigiste's pooling in any way?
If you think extracting an interface would be alright, I'd be more than happy to submit a patch. Thanks for all of your libraries -- I've been finding them really useful.
Thanks,
Brian
I'm just experimenting with dirigiste at this point and am a bit confused by the fact that getUtilization
returns 1.5 for me in some cases. With the below snippet if I run (.acquire p :foo)
three times the Utilization log message will print 1.5. (Acquiring three :foo
acquires without releasing causes the .acquire
call to block since the pool will not allocate any more objects.)
Is that expected or to be considered a bug?
(ns my.pool
(:require [clojure.tools.logging :as log])
(:import [java.util.concurrent TimeUnit]
[io.aleph.dirigiste Pools Pool IPool$Generator IPool$Controller]))
(defn generator [disposed]
(let [cnt (atom 0)]
(reify IPool$Generator
(generate [_ k]
(log/info "Generating new object for key" k)
(Thread/sleep (* @cnt 1000))
(swap! cnt inc))
(destroy [_ k v]
(log/info "Disposing" k v)
(swap! disposed conj [k v])))))
(def stats (atom nil))
(defn controller [min-objects max-total-objects]
(reify IPool$Controller
(shouldIncrement [_ key objects-for-key total-objects]
(and (< total-objects max-total-objects)
(< objects-for-key min-objects)))
(adjustment [_ key->stats]
(when (:foo key->stats)
(log/info "Utilization" (.getUtilization (:foo key->stats) 0.9))
(log/info "Num Workers" (.getNumWorkers (:foo key->stats)))
(log/info "Mean Queue Length" (.getMeanQueueLength (:foo key->stats))))
(reset! stats key->stats)
{:foo (int (max min-objects ))
:bar (int (max min-objects ))})))
(def p
(Pool. (generator (atom nil)) (controller 2 10) 1e5 25 1e4 TimeUnit/MILLISECONDS))
A divide by zero exception can occur in the mean calculation here https://github.com/ztellman/dirigiste/blob/master/src/io/aleph/dirigiste/Stats.java#L227 when vals is not null but does have length 0.
With dirigiste 0.1.0-alpha5 (as used by aleph 0.4.0-SNAPSHOT at 5030a05), I occasionally see a stack trace like the following when doing an http/get
:
Exception in thread "dirigiste-pool-controller-0" java.lang.NullPointerException
at io.aleph.dirigiste.Stats.lerp(Stats.java:156)
at io.aleph.dirigiste.Stats.getTaskLatency(Stats.java:355)
at aleph.flow$stats__GT_map$fn__7585.invoke(flow.clj:37)
at clojure.core$mapv$fn__6689.invoke(core.clj:6611)
at clojure.lang.PersistentVector.reduce(PersistentVector.java:332)
at clojure.core$reduce.invoke(core.clj:6513)
at clojure.core$mapv.invoke(core.clj:6611)
at aleph.flow$stats__GT_map$q__7579.invoke(flow.clj:29)
at aleph.flow$stats__GT_map.invoke(flow.clj:37)
at aleph.flow$stats__GT_map.invoke(flow.clj:26)
at clojure.core$map$fn__4529.invoke(core.clj:2614)
at clojure.lang.LazySeq.sval(LazySeq.java:40)
at clojure.lang.LazySeq.seq(LazySeq.java:49)
at clojure.lang.RT.seq(RT.java:485)
at clojure.core$seq__4109.invoke(core.clj:135)
at clojure.core$zipmap.invoke(core.clj:2942)
at aleph.flow$instrumented_pool$reify__7607.adjustment(flow.clj:100)
at io.aleph.dirigiste.Pool.startControlLoop(Pool.java:359)
at io.aleph.dirigiste.Pool.access$500(Pool.java:8)
at io.aleph.dirigiste.Pool$1.run(Pool.java:419)
at java.lang.Thread.run(Thread.java:745)
Looks like vals
in the (long[], double)
overloading of lerp
is sometimes null
when calculating the task latency:
Howdy,
I see that dirigiste 0.1.3 was released, but github only has commits until 0.1.2?
Cheers,
Reynald
The README file contains a link to the documentation http://ztellman.github.com/dirigiste/, but it gives 404.
For some reason, on my 2019 Macbook Pro, running Clojure 1.11.1, this test fails for me consistently:
FAIL in (test-executor) (executor_test.clj:56)
expected: (< 15 (.getNumWorkers (stress-executor ex 16 100000.0 pause)) 20)
actual: (not (< 15 22 20))
Is it a facet of my machine or does the test need to be fixed in some way?
Hello Zach, I had a problem while using the utilization-executor
in manifold and tracked it down to dirigiste
. I think.
Basically as you can see by running lein run
in this repro repo, the compile-time exception is kind of swallowed when using the dirigiste Executor
. Nothing finishes in the Thread.UncaughtExceptionHandler
basically.
I haven't actually tried the fix, but I smell something is going on here.
I will probably experiment a bit more but I wanted to hear from you if you think I am on the right track.
Thanks!
Please see the following test: dm3@387548b
The issue is in the Pool$Queue
which adds a pending take and never removes if the generator throws. I stumbled into the issue when trying to create an Aleph TCP client with a fixed pool of connections and no room for waiting (:max-queue-size 1
).
I think this is a real issue with Dirigiste, but at the same time please point me in the right direction if my design decision is bonkers :)
Hi,
Java 7 introduced ThreadLocalRandom object that we could use to replace those lines: https://github.com/ztellman/dirigiste/blob/f8d90d81ba324d020ad5bea93e97da9985616491/src/io/aleph/dirigiste/Stats.java#L19-L23
What do you think of such change? I can submit a PR to introduce it if you think it's fine to remove Java 6 compatibility in Dirigiste.
true if this executor terminated and false if the timeout elapsed before termination
However, in practice it seems to return true for a trivial case:
(let [^io.aleph.dirigiste.Executor executor (manifold.executor/fixed-thread-executor 1)]
(.executeWithoutRejection executor #(do
(.println System/out "Hello")
(Thread/sleep 10000)
(.println System/out "Bye, everybody")))
(.shutdown executor)
(.awaitTermination executor 5 java.util.concurrent.TimeUnit/SECONDS))
Maybe it should be <= 0
instead for the check?
It seems that dirigiste is causing my java processes to not exit and I noticed that threads are not being started as daemon threads. Is that intentional? Is there something I should do to make my process exit nicely?
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.