Git Product home page Git Product logo

Comments (4)

CJ-Wright avatar CJ-Wright commented on September 4, 2024

IMHO this seems like an awesome use case and is pretty similar to what we are hoping to do with experimental x-ray data processing.

  1. Is similar to what I was asking for in #7 (in case this use case does a better job of articulating it)

from streamz.

jrmlhermitte avatar jrmlhermitte commented on September 4, 2024

I like this idea. Sounds like everything is there to do what you need to do using dask.

Correct me if I'm wrong, but I think I see two points:

  1. Converting from dask to stream
  2. Allowing of asynchronous computations (so that the pipeline is not limited to its slowest part)

For the second, streams may help in the asynchronous bit you mention. You could do it by using a buffer:

s = Stream()
s.buffer(1000)# where 1000 is some max size you'll allow
# everything else from here is now asynchronous
s.map(otherfunctions)
# etc...

Only issue is you may need to add a component in the stream to check that the buffer hasn't filled up, and if yes, then probably block everything.

You'll need to start the IOLoop that runs the asynchronous calls somewhere. I'm guessing something like (I haven't tested this but I think something like this should work):

from threading import Thread
th = Thread(target=s.loop.start)
th.daemon = True
th.start()

Streams is nice because buffer would handle the setup necessary asynchronous calls. I really like that.

Also, if #13 goes through, then delayed objects could be easily passed through the stream with:

@stream_map.register(Delayed)
def stream_map(obj, func, **kwargs):
    return delayed(func)(obj)
# now add compute

data = 1
s = Stream()
s2 = s.map(addone)
s2.map(compute, raw=True).map(print)
s.emit(delayed(data))

Converting from dask to stream I think could be easily written as well. (Just loop through tasks and create mappings one by one)

I'm playing around with this, but it is my opinion that the power of streams comes from having control over rates and asynchronous calls (else you could do the rest by iterating over a for loop).

I'm wondering if this resolves your use case or I misunderstood? I'm also interested to see what @mrocklin thinks.

from streamz.

mrocklin avatar mrocklin commented on September 4, 2024

Sorry for the delay in response (I've been mostly away for the last two weeks).

@limx0 can you expand on the statement below?

The only missing piece is being able to move that dask graph into production. This could be done by creating a DAG in dask and calling it repeatedly with new data, however in the case of nodes completing at different times, the pipeline becomes as fast as its' slowest part, which isn't ideal when you want paths to complete ASAP.

In particular have you tried using the distributed scheduler's asynchronous computing functionality? You can submit many graphs at once before the first finishes. I suspect that you could achieve what you want with that API and a for loop, though I might not fully understand your situation.

For @ordirules 's statement about one key point:

Converting from dask to stream

We need to be clear that a Dask graph doesn't contain enough information to fully create a stream in any but the most trivial of cases. I anticipate (perhaps incorrectly) that people will want to establish how multiple input streams are joined together (there are several choices) or how various points in a stream are buffered or bundled (there are several choices). A dask graph does not contain this information.

So I guess my response here now is @limx0 are you able to achieve what you want using the asynchronous abilities of dask's distributed scheduler? If so, what common operations do you find yourself doing frequently? If not then what is missing?

# example
from dask.distributed import Client
client = Client()

all_futures = []
for fn in filenames:
    delayed_values = ...
    futures = client.compute(delayed_values)
    all_futures.append(futures)

from streamz.

elementace avatar elementace commented on September 4, 2024

@mrocklin , @ordirules , @CJ-Wright
I think the largest differentiator is that in the case @limx0 is describing above, he is acting with live streaming data. As in, the environment for which we are pushing information through the DAG is dynamic (not computation of static sets).

from streamz.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.