Git Product home page Git Product logo

streamtools's People

Contributors

agness avatar akamediasystem avatar bmabey avatar buth avatar durple avatar friej715 avatar jasoncapehart avatar jprobinson avatar mikedewar avatar mreiferson avatar nickjones avatar nikhan avatar nytlabsbot avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

streamtools's Issues

generalized wrappers for http responses

Currently there is a bunch of duplicate header code in daemon.go that should be put in some kind of wrapper so that we don't have to add it every time we have a new type of HTTP response. blech.

toGet

make get requests by converting specific message keys, or all the message key/values into URL params

graph

a new message contributes a new node if we've not seen that node type before, and increments an edge weight.

store in a standard json graph format (d3 must have specified one) and keep in mind this will be backed by some graph database one day.

fromFile

should read through a file, line by line, and emit into streamtools

vegaBar

emits a bar chart vega object based on the incoming message

example scripts that explain simple functionality of ST

there need to be some updated example scripts that demonstrate simple data processing cases with ST, for example:

  • from file, through mask, count, then back to file
  • from nsq, filter, bunch, back to nsq
  • http streaming/polling/rule setting, etc

fromSNS

collect messages from Amazon SNS

make sure all channels are *simplejson.Json

We need to be passing pointers to JSON around, not the values themselves. This helps simplejson work, as well as massively reducing the copying of data that we are doing currently.

vegaForce

emits a force-directed network vis vega object based on the incoming message

proposed standards for blocks

Some ideas for the standardization of blocks, heavily inspired by #34 :

terms

Block Routine
A Block Routine is the func that is called as a go routine that contains only logic. Has a simplejson.Json in chan, a simpleJson out chan, or both. Block Routines are wrapped in Blocks. Block Routines reside as part of the /streamtools library.

Block Function
Block Functions are dependencies for Block Routines . They are not run as go routines and are located in the /streamtools library. It may be a good idea to divorces Block Routines from Block Functions in /streamtools.

Blocks
Blocks provide an NSQ wrapper for Block Routines. Blocks take care of wiring up go channels to NSQ and are responsible for initializing and running Block Routines. Typically, there will be 2-3 go routines per block: an NSQ reader, NSQ publisher, and a Block Routine.

A block should act as a standalone executable and be able to interface to a standard mode of execution and introspection.

principles

one Block Routine per Block
All logic for a block should be contained in a single function ( Block Routine ). All Block Routine state should be maintained within that single go routine. All messages in and out of that block should be managed by that single go routine. A Block Routine should not share state with any other go routines, unless it is through a channel.

Block Routines allow introspection
Block Routines should have a chan of some kind that allow reports on what is currently happening in the routine. A health chan may also be nice, to divorce technical stats (in flight, backed up queues, processing time, num msgs processed ) from stats that come from the blocks logic (distribution of X, etc).

online setting of rules
We have yet to standardize how a Block Routine is initialized with rules that govern its logic. I propose that Block Routines should have a rules chan that initializes the logic and signals it to start processing. This would help us in avoiding any kind of flag soup, and potentially allowing for run-time rule changes.

ideas for interesting and helpful examples?

  • creating a stream via sampling endpoint/reading file
  • muxing of some kind
  • outputting/polling on condition
  • online rule changing/blocks setting rules for other blocks
  • block hotswap
  • st talking to st/network IO/NSQ (?)
  • how blocking effects ST
  • performance test
  • end-to-end stream to vis

ideas?

toMongo

write messages into mongo

Arch v3 proposal

So we have a ton of blocks now, and some experience of using this in prod. Let's do one more big architecture review before all this get serious. Suggestions should go in the comments below!

vegaScatter

emits a scatter chart vega object based on the incoming message

Block creation should alert user if no default-but-required rules are set

  1. create a fromNSQ block
  2. create a count block
  3. connect blocks 1 and 2

...this results in the command line never becoming available again, because count and fromNSQ both are created but none of the required rules are set.

One nice solution would be for the return message (ie, {"daemon":"BLOCK_CREATED"}) would have text describing which rules must be set.

Another approach is to create defaults for all blocks, but this encourages errors and might reinforce bad habits IMO.

vegaLine

emits a line chart vega object based on the incoming message

kernelDensityEstimate

a kernel density estimate that is updated with each new message.

Are there online methods for this? Gotta be...

basic block documentation

Many of our blocks are missing documentation. Every block should have a basic description in the file that contains the blockroutine. This should include both how the block works, what kind of data can be expected form the block, whether or not the block hasoutputs, and what kind of parameters that can be sent to it.

architecture to allow for megablocks, agnostic logic

Consider an example flow:

  1. Import from NSQ
  2. Filter
  3. Synchronize
  4. Export to Web Sockets

The import from NSQ takes a stream from a non-local NSQ and puts in the local NSQ. The Filter then reads from the local NSQ and then publishes to the local NSQ. The Synchronizer reads from the local NSQ and then publishes to the local NSQ, and finally, the Export reads from the local NSQ.

This would work fine for smaller streams, but the load caused by putting things on and off a local NSQ causes a bunch of redundancy. The thing is that all of the filtering/synchronizing/export logic is still super useful, the only problem is that the logic to speed up the architecture drastically is locked away in binaries that include NSQ readers/publishers.

I propose an architecture for the design of megablocks

something like this:
/streamtools just contains the structs that we use to deal with Go chan messages.
/blocks contains basically everything that is in the root right now (w/ NSQ stuff)

a file in streamtools would look something like this:

package streamtools

type Filter struct{
    in chan []byte,
    out chan []byte,
    pattern string
}

func NewFilter(in chan []byte, out chan[]byte, pattern string){
    this = &Filter{
        in: make(chan []byte),
        out: make(chan []byte),
        pattern: pattern
    }

    go func(){
        this.run();
    }
    return this
}

func (this *Filter) run(){
    for{
        select{
            case in<-:
           // do filter stuff here

}

This way, all of the logic in streamtools becomes agnostic as to how they are implemented. You could use them as part of the streamtools suite, or if you are just handling Go msgs in your own application you can import from /streamtools and use them without NSQ. Or you could chain them together to make megablocks.

/blocks would be full of NSQ-ready binaries, with really simple code that are basically NSQ wrappers around the streamtools logic. a filer would look something like this:

import "github.com/nytlabs/stream_tools"

func main(){
    streamtools.NewNSQReader(params, channel A)
    streamtools.NewFilter(pattern, channel A, channel B)
    streamtools.NewNSQPublisher(params, channel B)
}

and this also means you could do something like

import "github.com/nytlabs/stream_tools"

func main(){
    streamtools.NewNSQReader(params, channel A)
    streamtools.NewFilter(pattern, channel A, channel B)
    streamtools.NewSynchronizer(channel B, channel C)
    streamtools.NewNSQPublisher(params, channel C)
}

basically, it allows for streamtools core to be a library that we use in the making of the NSQ-based block binaries. It's also good for when we want to start sharing util functions, like flatten, map, etc

eh?

jqblock pukes with complex commands

... like

jqblock -command="'if .data.stories[0].url == \"http://www.nytimes.com/\" then .data.stories[1] else .data.stories[0] end'" -read-topic="top_stories_by_tweet" -write-topic="top_story_by_tweet" -name="extract_top_story"

toS3

write messages to S3

standardized key/value retrieval

all our blocks use different methods to grab values from a key specified in the rule. Many of the blocks lack the ability to grab nested keys. We should have some standard way of grabbing keys in streamtools/utils where we accept '.' delineated key paths.

filter value by set membership accepts a nested key in the format of key_a.key_b.key_c, I think other blocks shoud probably follow suit

handling of HTTP streaming/websockets through daemon

Our current architecture is limited in that we cannot use the http server created by daemon to handle websockets or http streaming capabilities on a per block basis. This means that each block that needs to http stream/use websockets has to stand up another http server on a different port.

Ideally we would write our own handler in the future that allows blocks more basic control of the kinds of http traffic it can handle.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.