Git Product home page Git Product logo

Comments (13)

mrocklin avatar mrocklin commented on September 4, 2024

from streamz.

CJ-Wright avatar CJ-Wright commented on September 4, 2024

The data exists as a generator which puts out (name, document) pairs.
There are 4 different name/documents:

  1. start: Metadata from the start of the experiment or processing (one per stream)
  2. descriptor: Describes the data held in the events (many per stream)
  3. events: the actual data (many per descriptor)
  4. stop: Finishing metadata (one per stream)

For map I need to issue a new start, descriptor, and stop documents. The mapped function will only apply to the event documents, which will also issue new event documents. So I need map to be aware that it only applies the function to the event documents.

Similarly for filter, I need to issue new start, descriptor, and stop documents, while true filtering only applies to the event level.

from streamz.

CJ-Wright avatar CJ-Wright commented on September 4, 2024

Sink is most likely ok.

from streamz.

mrocklin avatar mrocklin commented on September 4, 2024

from streamz.

CJ-Wright avatar CJ-Wright commented on September 4, 2024

One of the issues with subclassing is that the Stream class is written into most of the functions, so if I want to use map which I overwrite, then zip which I don't and then map again zip will have switched me over to Streams which will not have my bespoke map method.

I tried to look at #13 but I'm not certain I understand it fully yet.

from streamz.

mrocklin avatar mrocklin commented on September 4, 2024

@CJ-Wright to give some context here there are a number of cases where we want to have different map/filter/etc. behaviors (remote, batched, dataframes, with metadata, etc.). To add complexity we sometimes want to apply multiple such behaviors at the same time (remote-batched). We're trying to think of a clean way to enable this kind of behavior more generally. (By "we" I mean myself, @ordirules, @danielballan, and now yourself.)

from streamz.

CJ-Wright avatar CJ-Wright commented on September 4, 2024

Hmm, ok. I will try to think on it.

from streamz.

jrmlhermitte avatar jrmlhermitte commented on September 4, 2024

@CJ-Wright if I understand correctly, you want to use streams to treat the most general case possible of the event based architecture. (Link is to provide again context for any newcomer) Correct? That is very interesting.

As a suggestion for map, how about two operations? One that returns a 4 tuple of 2 tuple pairs for example, the mapped function could return:

(('start', startdoc), ('descriptor', descriptordoc), ('event', eventdoc), ('stop', stopdoc) )

followed by a concat stream (splits the tuples into individual elements in the stream).

You can use __stream_map__ to output the proper 4-tuple, and treat the input in the proper way you choose. For the same reason as the output, I would be tempted to use something similar to partition. With a little bit of work, I think this would do the job for this one case.

Here is an example of __stream_map__ being used here.
Note that __stream_map__ is commented out but I have used it and it has worked just fine. See the decorator parse_streamdoc on that page for the implementation. Line 304 (note : this line number will change over time) result = f(*args, **kwargs) is where the function is still run, and the rest is details to get the inputs/outputs to come out right.

I would also argue that treating streams in the most general case (start, stop, descriptor, events documents, where the number of events is unknown until we receive a stop) might not be a good idea. My reasoning is that the stop implies a conditional statement. This isn't easy to delay or submit to a cluster in my opinion (somewhere a machine will have to make a decision what code to run, which can create delays). I would rather aggregate results, and then pass them through as closed entities (so the delays, from aggregation, happen in a known region of code). However, the general case, is very interesting, and may likely lead to interesting ideas. I am just playing devil's advocate. @danielballan and @tacaswell may have some opinions on this particular paragraph, if they have time.

Please correct me if I misunderstood anything. I'm interested to hear your ideas.

from streamz.

jrmlhermitte avatar jrmlhermitte commented on September 4, 2024

This just came across my mind. I think in your situation, you may want something similar to groupby (not quite), basically an accumulator that caches results in some dictionary. For example, for your events, a dictionary of uids: acc = {'f8213' : [eventdoc1, eventdoc2], '134af' : [eventdoc3, eventdoc4]}
you could do with with accumulator as of now. However, there currently is no way of sending external signals to the accumulator (similar to flush in collect), so this would have to be added. The idea would be you could use this accumulator in conjunction with a stream that filters stop events. This stream would then activate the flush method of the accumulator stream, with the uid of the stop event that describes the appropriate events. This flush would then cause the accumulator to emit that data in some list. You could then use zip to also ensure things are coming at the right time. Here is an example:

sin = Stream()
# split stream into a stream of starts, events and stops
s_starts = sin.filter(lambda x : x[0] == 'start')
s_events = sin.filter(lambda x : x[0] == 'event')
s_stops = sin.filter(lambda x : x[0] == 'stop')

def myacc(prev, next):
    # accumulate docs of form prev = (name, doc)
    # into next : next = dict of uids where doc is saved
    # assume ('start', doc)
    uid = prev[0]['uid']
    if uid not in next:
        next[uid] = list()
    next[uid].append(prev[0])

def myflushroutine(prev, uid):
    # pops data off
    # here what is returned would be emitted by the accumulator
    return prev.pop(uid)

# this will only emit when flushed
s_start_accum = sin.accumulate(myacc, flush=myflushroutine)
# this will also only emit when flushed
s_events_accum = sin.accumulate(myacc, flush=myflushroutine)

# here x[0]['uid'] is the general uid referring to the collection of events
# so it could be some other index. I'm assuming : stop -> ('stop', dict(uid=N,...))
# I make the assumption that when stop emits, *all* events are assumed to have arrived
# flush would need to be written
s_stop_accum.map(lambda x : s_start_accum.flush(x[0]['uid']) )
s_stop_accum.map(lambda x : s_events_accum.flush(x[0]['uid']) )

# this will only emit when all three arrive
sout = s_start_accum.zip(s_events_accum, s_stop_accum)

Anyway, I think this is logic that would help you for the event based model. Happy to hear thoughts.

For @mrocklin, would giving the accumulator some flush option make sense? (the user would have to implement this on their own; if not given, default behaviour is usual accumulator behaviour) Or else, would a more complex version of collect make more sense? (I feel collect and accumulate could converge)

from streamz.

CJ-Wright avatar CJ-Wright commented on September 4, 2024

Yes I'd like to work with the event architecture (I don't know if I want the most general case yet (async everything), but moving in that direction may be good).

I'd prefer to not have all the duplication if possible, especially as one may not even have a stop document when running the analysis and in the most general case one could get different descriptors at different positions.

I don't really understand what you mean by "stop implies a conditional statement". I don't know how useful aggregation will be, especially if we want to use this data live during experiments (autonomous experimentation/variation of parameters). As for the delays I imagine that the machine making decisions will slow down the execution less than having to wait for the entire experiment to stop (depending on the experiment of course). Some of this name dependent processing is already done with bluesky callbacks. To put a bit of a blunt point on this, if you are willing to aggregate the results why not listify and use Dask?

from streamz.

jrmlhermitte avatar jrmlhermitte commented on September 4, 2024

I misunderstood you (thought you meant async); the code was for this, sorry. (I still think it's interesting how streams seems to handle that, in my opinion, quite nicely).

I'm mainly worried about the logic handling the full set of event documents being intertwined with other logic. I would still aggregate before doing anything else, but I'm definitely open to suggestions.

I should explain some context, as I think we may have different usage cases. In my case, the start, event and stop document stream is either a full image or a time series of images. Thus, reading in an image is basically the same as reading the full set of start, events and stop. The decision making is outside of this logic. We thus can consider a "unit" in the streams to be the start, events and stop packaged together, and thus aggregate them together. I don't think we'll need to step outside that assumption in our case. It seems like this may not be the case for you.

from streamz.

CJ-Wright avatar CJ-Wright commented on September 4, 2024

Yea sorry I should have specified async document generation/data acquisition.

@danielballan and I had a discussion about if the stream logic can be separated from the document logic. He made a compelling case as to why it might not be, which I slowly seem to be coming around on.

I feel that that aggregation is rather limiting. For in-line data processing we can't wait for the stop document to come in (for an experiment that takes hours this could take a while).

My current working model goes something along the lines of this:

  1. We need something that understands both streams and the document model, so make a new class which inherits from both. This way we can have document aware maps, filters, scans and the like. As much as I'd like to have these things separated there doesn't seem to be a clean way; too much of update has to be tailored to the document model.
  2. The class which understands documents looks a lot like CallbacksBase, it has start, descriptor, event, and stop methods and a dispatcher which knows which documents to send to whom.
  3. One problem with this is that in some cases we want to operate on multiple (name, doc) pairs simultaneously (eg I zip two streams together and now want to use map on a function with two arguments). So instead of taking in a single doc now the document methods take in a tuple of docs.
  4. This becomes problematic with the events. Usually we want to perform 3 operations on events.
    1. Interrogate the event "guts": we look at the actual data in the event, and hand it over to something that either makes a judgment or performs an operation on it or something else.
    2. Issue a new event: we hand over some information which needs to be re-packaged into a new event and then issue that event.
    3. Pass the event through: Just allow the event to pass without doing anything to it. (This one is of dubious usefulness)

from streamz.

CJ-Wright avatar CJ-Wright commented on September 4, 2024

I'm going to close this as the register_api system seems to be working very well.

from streamz.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.