Git Product home page Git Product logo

eventstream's Introduction

Event Stream

This is a python package designed to facilitate event based communication through Redis Streams.

How to Run

Running an Event Stream application is as simple as calling:

$ python ./event_stream/application.py example.json

I don't want a whole application - I just want an event handler

Good news! The application itself just loads the handlers and adds their tasks to an event loop. Mimic the config and launching logic and you can do pretty much whatever you want. The most difficult part is just keeping track and managing that task as everything is going on.

For example:

from uuid import uuid1

import event_stream.handlers
from event_stream.configuration.parts import CodeDesignation
from event_stream.configuration.group import HandlerGroup

from event_stream.streams.handlers import HandlerReader

handler_config = {
    "name": "Example",
    "event": "execution_complete",
    "stream": "EVENTS",
    "handler": CodeDesignation.from_function(event_stream.handlers.echo_message)
}

handler_configuration = HandlerGroup.parse_obj(handler_config)
handler_configuration.set_application_name("Whatever Service")
handler_configuration.set_instance_identifier(str(uuid1()))

handler = HandlerReader(configuration=handler_configuration, verbose=True)
task = handler.launch()

eventstream's People

Contributors

christophertubbs avatar

Watchers

 avatar

eventstream's Issues

Properly configure the build scripts

The build scripts currently follow the template logic for the most part (some values have been changed). The pyproject.toml, setup.cfg, make, and Dockerfile need to change to fit this project.

Connect message handlers to message work tracking

Message handlers are currently naive in that a consumer performs xreadgroup, grabs a message, and calls all the functions on it. Functions like update_handler_completion in event_stream.bus need to be called around the message processing in order to ensure operations that failed may be attempted again.

Properly implement the __main__ module functionality

Correctly implementing the __main__.py functionality allows calling the application via python -m event_stream -v example.json instead of python ./src/event_stream/application.py -v example.json. That should all be set up correctly to make it easier to work with.

Delete messages when handling is complete

There needs to be a mechanism for full on deleting old messages once they should no longer be operated on. Currently, messages are xacked once processing complete or orphaned when they don't. If processing can't complete, handling should be able to continue with any other bus that can operate on the stream and its events.

Convert `event_stream.application.main` into an `App` object

Many similar applications have the user define something like:

app = ExampleApplication(**options_from_somewhere)

@app.register
def some_function(...):
     pass

@app.register
def other_function(...):
   pass

In order to simplify interaction and make it a little more extensible and less reliant on complex configurations.

This application should follow a similar model and have the ability to create one of these app objects that things may attached to. The current main functionality does not necessarily need to be removed.

Add examples

There needs to be concrete and specific examples of how to use this

Edit references to ParamSpec

Issues arise when defining arguments of type Callable that may or may not accept variable arguments. There are two naive ways to define variable arguments typing.Callable[[int, str, typing.Tuple[typing.Any, ...], typing.Dict[str, typing.Any]], typing.Any] as read by PyCharm, or:

P = typing.ParamSpec("P")

EXAMPLE = typing.Callable[[int, str, P], typing.Any]

Both ways can trip up a linter when they are supposed to accept functions like def example(i: int, j: str = "nope", *args) or def example(i: int, j: str, q: bool = False, **kwargs) since neither correctly use a firm ParamSpec or use the tuple or dict and don't use the exact parameters.

The solution is to use:

P = typing.ParamSpec("P")

EXAMPLE = typing.Callable[typing.Concatenate[int, str, P], typing.Any]

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.