Git Product home page Git Product logo

Comments (23)

CJ-Wright avatar CJ-Wright commented on September 4, 2024

Also since there isn't a sign up sheet anywhere I am very interested in this project!

from streamz.

mrocklin avatar mrocklin commented on September 4, 2024

Would it be possible to parse a dask task graph dictionary and turn it into a streaming pipeline?

Possibly. What did you have in mind? It might be helpful to sketch out a couple of situations in which this would be helpful and how you would expect it to behave.

from streamz.

CJ-Wright avatar CJ-Wright commented on September 4, 2024

task graph:https://github.com/CJ-Wright/xpdAn/blob/f81c9f26e4e9ad61ca0d71453ef5cc9adee83977/examples/xpd_pipeline.pdf

code-ish: https://github.com/CJ-Wright/xpdAn/blob/f81c9f26e4e9ad61ca0d71453ef5cc9adee83977/examples/pipeline2.py

Edit:
raw is a potentially streaming iterator (the idea was to pipe the data from the instruments (with a whole bunch of stuff in between via a iterator queue)

from streamz.

mrocklin avatar mrocklin commented on September 4, 2024

OK, so that graph has exactly one input and one output. Things might become more interesting if there were multiple inputs and outputs operating at different rates.

What is your objective here? What would you want to achieve by "streaming" such a dask graph (whatever that ends up meaning).

from streamz.

CJ-Wright avatar CJ-Wright commented on September 4, 2024

It could have more than one output (currently it has a couple, one for the data to be stored and 3 for visualization code). It could also have more than one input (data taken from multiple sources at once), but I tried to keep it simple for the first go round.

from streamz.

CJ-Wright avatar CJ-Wright commented on September 4, 2024

The task graph/dict combination might work really well for meta-programming. Currently (without streams) if you want to change how the nodes interact you need to actually change the underlying code/bespoke function you are using to manage the workflow. Using the dict all you need to do is pop something off or update/add a key/value pair and you are ready to go.

from streamz.

CJ-Wright avatar CJ-Wright commented on September 4, 2024

More concrete use case:

  1. User finds data in database
  2. User streams data into existing dict/pipeline
  3. User doesn't like results
  4. User edits dict to change outcome (rather than a blob of source code)
  5. User re-runs pipeline with new task graph
  6. User likes results
  7. Pipeline publishes the paper for you and wins you Nobel Prize (you could even get it to write the speech for you) /s

Edit:
Essentially this turns bespoke scientific functions (which are usually super tailored for a given group/instrument/data-set) and turns it more into a loosely combined/re-configurable series of steps which takes very little effort to re-arrange.

from streamz.

mrocklin avatar mrocklin commented on September 4, 2024

Currently (without streams) if you want to change how the nodes interact you need to actually change the underlying code/bespoke function you are using to manage the workflow

Setting up a graph with dask.delayed should ideally be fast enough that you can create these within a few milliseconds.

Playing devils advocate here, I think that currently the competition to streams is dask.delayed and a for loop.

@delayed
def f(x):
    ...

@delayed
def g(x, y):
    ...

results = []
for data in input:
    a = f(data)
    b = g(a, 10)
    c = f(b)
    c = c.compute()
    results.append(c)

User edits dict to change outcome (rather than a blob of source code)

I actually perceive changing source code like what's above to be easier than changing the dict. I may be operating under a different set of application assumptions though.

from streamz.

CJ-Wright avatar CJ-Wright commented on September 4, 2024

Ah, but does dask.delayed support generators?
The data is currently streamed as a generator and each node itself is a generator.

I have found editing the code itself and all the generators (and the tees to go with) rather messy.

from streamz.

mrocklin avatar mrocklin commented on September 4, 2024

Are you able to produce a toy example that anyone reading this issue is easy to play with? Perhaps something involving functions like inc and add rather than anything domain specific, but which still captures the essence of what you're trying to accomplish. This might help to bring us both to the same perspective more quickly.

from streamz.

CJ-Wright avatar CJ-Wright commented on September 4, 2024

I might be able to, although I should put up a disclaimer that my qualifying exam is next month and I am planning on turning into a digital hermit.

from streamz.

mrocklin avatar mrocklin commented on September 4, 2024

Ideally such an example would be very small and take no more than ten-twenty lines of code and 15-20 minutes of thought.

from streamz.

CJ-Wright avatar CJ-Wright commented on September 4, 2024

hmm, ok let me see what I can put together

from streamz.

CJ-Wright avatar CJ-Wright commented on September 4, 2024

@ordirules @danielballan might also be interested/have good ideas

from streamz.

CJ-Wright avatar CJ-Wright commented on September 4, 2024

how about this? Of course it could get more complex, with generators consuming multiple input generators, filters, branches, etc. but I did try to keep it simple. Let me know if you want a more complex example.

import numpy as np

def start():
    # data header
    yield 'start', {'data_name': 'test', 'maker': 'CJ-Wright', 
                    'reason': 'streams'}
    s = (10, 10)
    # data descriptor
    yield 'descriptor', {'img': {'type': 'array', 'shape': s}}
    # the actual data
    for i in range(5):
        yield 'event', {'img': np.random.random(s)}
    # stop
    yield 'stop', {'status': 'success'}

def mult_2(gen):
    for n, d in gen:
        if n == 'start':
            yield 'start', {'data_name': 'test', 'maker': 'CJ-Wright',
                      'reason': 'streams', 'function_name': 'mult_2'}
        elif n == 'descriptor':
            yield 'descriptor', {'img': {'type': 'array', 
                                         'shape': d['img']['shape']}}
        elif n == 'event':
            yield 'event', {'img': 2*d['img']}
        elif n == 'stop':
            yield 'stop', {'status': 'success'}

def summer(gen):
    summed = None
    for n, d in gen:
        if n == 'start':
            yield 'start', {'data_name': 'test', 'maker': 'CJ-Wright',
                      'reason': 'streams', 'function_name': 'summer'}
        elif n == 'descriptor':
            yield 'descriptor', {'img': {'type': 'array', 
                                         'shape': d['img']['shape']}}
        elif n == 'event':
            if summed is None:
                summed = d['img']
            else:
                summed += d['img']
        elif n == 'stop':
            yield 'event', {'img': 2*summed}
            yield 'stop', {'status': 'success'}

for n, d in summer(mult_2(start())):
    print(n)
    print(d)
    

Ideally this would be phrased as a dict

task_graph = {'start': (start, ),
              'm2': (mult_2, 'start'),
              'summed': (summer, 'm2')
              }

from streamz.

CJ-Wright avatar CJ-Wright commented on September 4, 2024

Note that with the above example, if you want to branch you need to use tee to make duplicate iterators, which can be a pain. It certainly destroys the otherwise clean nested calls.

from streamz.

mrocklin avatar mrocklin commented on September 4, 2024

Sorry to ask for a glass of milk after receiving a cookie, but an example that stripped away all domain specific details would be ideal. It would help to engage others who don't have any knowledge of numpy for example. Data might be simple integers. Functions might be computations like add.

Is this how you build your computation now or how you want to build it? It seems like you already have already taken care of most of the data flow so a project like streams probably wouldn't be as helpful in this case. Instead, I wonder what it would take to build up your computation if mult_2 were defined simply as follows:

def mult_2(x):
    return x * 2

Could a project like streams be built to handle both the iteration that you're accomplishing here with yield as well as the signals processing that you handle with 'event', 'stop', etc..

from streamz.

jrmlhermitte avatar jrmlhermitte commented on September 4, 2024

I had ended up writing a lot of text and found it was too long, so I've
redacted it to one issue I think @cjwright may be pointing to here (correct me
if I'm wrong), that we need to get streams working. Perhaps other points I
wanted to mention may eventually flow in the conversation. I really enjoyed
reading your blog post @mrocklin and thanks @cjwright for sharing your
thoughts.

It is very common in pipeline data that attributes/tags describing the data
also be passed through in a way that the functions running the computations
don't have to worry about them. It seems that the most natural way to do so is
by using a network packet like structure like streams (a start , one data
packet and a stop). However, perhaps this may not be the most natural way.
Maybe @mrocklin has some thoughts.
For example:

mydog = [('start', 'mydogsname' : 'fido'),
         ('data', 'data' : {'height_m' : .5, 'weight_lb' : 150}),
         ('stop', )]
def calctype(dog):
    for name, doc in dog:
        if name == 'start':
            yield 'start', doc
        elif name == 'data':
            if doc['data']['weight_lb'] > 100:
                doc['data']['dogtype'] = 'obese'
            yield 'data', doc
            #...
        elif name=='stop':
            yield 'stop', doc

Here calctype would add a new element in the data that would tell you if your
dog is obese or something else (sorry for the lack of creativity here...). It
would allow your attributes still to pass so that at the end of the day you
will know that fido is obese. You could imagine using a decorator to remove a
lot of this boilerplate code (and correclty select which elements in the doc
dict to extract).

You could also implement this as follows:

def calctype(height, weight, name):
    if weight > 100:
        dogtype = 'obese'
    #...
    return dict(height=height, weight=weight, dogtype=dogtyp, name=name)

But this has the serious disadvantage of no longer being flexible to the
metadata being used to tag the data (also can lead to naming conflicts if you simply use **kwargs)

My main question will be, is the attribute handling of interest in this stream
library, or is it appropriate elsewhere? Again, if this is not in line with
@cjwright's thinking, I'll move it elsewhere for the sake of newcoming readers
to this issue. Thanks again @CJ-Wright and I like your thinking for using databroker streams
to tackle this problem.
(For incoming readers, databroker streams are described by schematic here.)

from streamz.

mrocklin avatar mrocklin commented on September 4, 2024

A number of other issues in streaming processing, like error or event time handling, also rely on passing around metadata with results. I wouldn't be surprised if eventually all elements were accompanied with dictionaries that would pass through operations like map. We could imagine putting arbitrary user metadata in here as well.

Signals like "stop" though I would expect to be handled by the library rather than user code. They are likely to occur in many applications. If I were to write your code above using iterators today I would probably raise StopIteration and rely on the standard Python machinery to handle this.

from streamz.

CJ-Wright avatar CJ-Wright commented on September 4, 2024

Here is an other example without numpy. Pretty much I have a foreground and background streams, foreground gets multiplied by two, background is remuxed to be the same length as the foreground, and the two are subtracted. Note that I would hope that the task graph parser could handle the tee logic needed in the workflow.

Sorry if this is not simpler.

from itertools import tee

fg = (i for i in range(100))
bg = (1 for i in range(10))

def remux(small_grid, large_grid):
    small_grids = tee(small_grid, 2)
    for i in large_grid:
        try:
            # yield from small_grids[0]
            s = next(small_grids[0])
            yield s
        except StopIteration:
            small_grids = tee(small_grids[1], 2)
            s = next(small_grids[0])
            yield s
            pass


def m2(gen):
    for g in gen:
        yield g*2

def subs(gen1, gen2):
    for g1, g2 in zip(gen1, gen2):
        yield g1 - g2

fgs = list(tee(fg, 2))
fg = m2(fgs.pop())
bg2 = remux(bg, fgs.pop())

s = subs(fg, bg2)
for g in s:
    print(g)

# could be phrased as
pipeline = {'m2': (m2, fg),
            'rbg': (remux, (bg, fg)),
            'subs': (subs('m2', 'rbg'))}

from streamz.

jrmlhermitte avatar jrmlhermitte commented on September 4, 2024

Thanks for the thoughts @mrocklin.
About the attributes, I'm wondering if they could be ziped in and de-ziped at function run time. I guess some convention and input/output normalization must happen if we want to add attributes. And I think just zipping might be easier (than a special dict or object etc). Any thoughts? Should this be moved onto a separate issue? It sounds like you have given this some consideration already.

@CJ-Wright I like the usage of tee in this case, to me seems very clean, I like it. However, the fact it is repeated in a stream processing function seems stream unfriendly (I would be tempted to say that streams should not be modified unless branched/merged as a separate operation, but I could be wrong. This multiple branching happening here seems like it can get messy). Maybe there is a way to create a special type of stream, like a RepeatedStream or something? (which could use tee inside) I think repetitions should be done at the source. However, it seems also that the bg stream in the general case would have to be synchronized with the fg stream. I'm not sure how that would be resolved. Just throwing thoughts out there, they may not be right...

from streamz.

mrocklin avatar mrocklin commented on September 4, 2024

from streamz.

CJ-Wright avatar CJ-Wright commented on September 4, 2024

I'm closing this as a not applicable/won't fix. The tl;dr version is that streams has to contend with some more intricacies than dask and as such there is not a good mapping between a dask task graph and the streams one. This especially blatant as streams can have cyclic graphs, which is not supported in dask. It may be possible (maybe?) to go from a stream task graph and make a dask task graph which would operate on a static data source?

from streamz.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.