Git Product home page Git Product logo

arroyosystems / arroyo Goto Github PK

View Code? Open in Web Editor NEW
3.3K 32.0 175.0 11.35 MB

Distributed stream processing engine in Rust

Home Page: https://arroyo.dev

License: Apache License 2.0

Rust 87.26% HTML 0.09% CSS 0.10% TypeScript 12.08% Dockerfile 0.15% Shell 0.10% Smarty 0.19% JavaScript 0.03%
data dev-tools infrastructure kafka rust sql stream-processing data-stream-processing stream-processing-engine

arroyo's Introduction

Arroyo

Arroyo is a distributed stream processing engine written in Rust, designed to efficiently perform stateful computations on streams of data. Unlike traditional batch processing, streaming engines can operate on both bounded and unbounded sources, emitting results as soon as they are available.

In short: Arroyo lets you ask complex questions of high-volume real-time data with subsecond results.

running job

Features

πŸ¦€ SQL and Rust pipelines

πŸš€ Scales up to millions of events per second

πŸͺŸ Stateful operations like windows and joins

πŸ”₯State checkpointing for fault-tolerance and recovery of pipelines

πŸ•’ Timely stream processing via the Dataflow model

Use cases

Some example use cases include:

  • Detecting fraud and security incidents
  • Real-time product and business analytics
  • Real-time ingestion into your data warehouse or data lake
  • Real-time ML feature generation

Why Arroyo

There are already a number of existing streaming engines out there, including Apache Flink, Spark Streaming, and Kafka Streams. Why create a new one?

  • Serverless operations: Arroyo pipelines are designed to run in modern cloud environments, supporting seamless scaling, recovery, and rescheduling
  • High performance SQL: SQL is a first-class concern, with consistently excellent performance
  • Designed for non-experts: Arroyo cleanly separates the pipeline APIs from its internal implementation. You don't need to be a streaming expert to build real-time data pipelines.

Getting Started

You can get started with a single node Arroyo cluster by running the following docker command:

$ docker run -p 8000:8000 ghcr.io/arroyosystems/arroyo-single:latest

or if you have Cargo installed, you can use the arroyo cli:

$ cargo install arroyo
$ arroyo start

Then, load the Web UI at http://localhost:8000.

For a more in-depth guide, see the getting started guide.

Once you have Arroyo running, follow the tutorial to create your first real-time pipeline.

Developing Arroyo

We love contributions from the community! See the developer setup guide to get started, and reach out to the team on discord or create an issue.

Community

Arroyo Cloud

Don't want to self-host? Arroyo Systems provides fully-managed cloud hosting for Arroyo. Sign up here.

arroyo's People

Contributors

akennedy4155 avatar bakjos avatar breezykermo avatar chenquan avatar dependabot[bot] avatar edmondop avatar fourspaces avatar gbto avatar haoxins avatar harshit2283 avatar jacksonrnewhouse avatar jbeisen avatar mwylde avatar rcjmurillo avatar rtyler avatar tqwewe avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

arroyo's Issues

Stopping a job mid-checkpoint hangs the job

Currently if you stop a job while it is taking a checkpoint it will launch a second checkpoint in CheckpointStopping mode. The controller doesn't like this and the job will end up hung. Restarting the controller does put it back in a healthy state, but this is still pretty bad. The job controller should cleanly handle a stop signal mid-checkpoint.

Window Function not picking up additional partition expressions

The below query should have row_num = 1, but it just partitions by the window.

SELECT counter, ROW_NUMBER() OVER (
        PARTITION BY counter, window
        ORDER BY counter DESC) AS row_num
        FROM (
SELECT counter, hop(interval '1 second', interval '4 second') as window FROM impulse GROUP BY 1,2)

Show backpressure in the UI

When a node in the job graph is not keeping up with its input data, its input queue fills up and its upstreams cannot continue sending it data until it pulls items from the queue. This backpressures the upstream, which is blocked from doing work until it is able to write its messages downstream. In this way, we prevent faster upstreams from overloading slower downstreams. See this blogpost for more on the general theory of backpressure in streaming systems (although note the details are bit different in arroyo).

Figuring out whether and where backpressure is occurring is important for users to understand the behavior and performance of their pipelines.

In Arroyo, we have a metric arroyo_worker_tx_queue_rem that reports how much space remains in a task's transmit queue. When this is 0, that means that the downstream node is causing backpressure on us.

This data should be visible in the UI. We already have infrastructure to pass metrics back to the UI (which currently powers the data rate graphs) so this would involve extending that API to add the arroyo_worker_tx_queue_rem metric. For visualization, the simplest approach would be to color the nodes in the pipeline graph according to how backpressured they are (for example, as a fraction of the remaining queue size and total queue size).

The nodes in the graph represent logical operators, but in the physical execution each operator is subdivided into N parallel subtasks. Similarly, each operator may have M downstream nodes if the edge between them is a shuffle. The backpressure for an operator will be some combination of the backpressure of its parallel subtasks (median or min?).

So it will also be helpful to see the per-subtask backpressure, for example in the operator detail view that currently shows the data rate graphs.

For API, some functions use `Transaction` while others use `impl GenericClient`

I think that the impl GenericClient argument to these functions makes more sense to allow them to use either Transaction or a generic Postgres client.

Would it make sense to refactor all of the functions such as create_connection, delete_source, get_pipelines etc. to all use GenericClient instead of a concrete type such as Transaction?

The UI should allow users to change parallelism

Pipelines have an associated parallelism configuration that controls how many parallel subtasks we run for each operator. (Inside the dataflow itself, we support operators have different parallelism, but in the current API we only allow setting a single parallelism across the entire job).

This parallelism is set to an inferred value at pipeline creation, however that may be too high or low depending on the actual data volume and complexity of the query.

There is a gRPC API (UpdateJob) that allows users to change the parallelism of a running job, but it is not currently exposed on the Web UI.

This issue covers adding the ability to change the parallelism from the job details page (http://localhost:8000/jobs/{job_id}).

Note that because we do not currently support dynamic rescaling of pipelines, changing the parallelism triggers this sequence in the controller:

  1. The job is stopped with a final checkpoint
  2. The existing workers are shut down
  3. A new job is scheduled with the new parallelism

This can take several seconds, and the UI should reflect that the change is rolling out.

Auth as Middleware for API gRPC calls

I noticed that most of the functions in the API run self.authenticate. For now, this is just a passthrough that returns some AuthData and always returns OK. Instead of having this logic running individually for each request, I'm wondering if this is something that could be run as a middleware or a tower::Layer or somewhere that abstracts it away from the requests.

Support Joining windowed and non-windowed datasets

We'd like to support joins between a windowed aggregate and a non-windowed stream. This would allow us to run Nexmark Query 6. The Arroyo version of the query is below. Ideally we'd also inspect the WHERE clause in order to not keep data around after the window has passed, as the only data from B1 we care about is that within active windows.

WITH 
auction as (
    SELECT auction.category as category, 
        auction.datetime as datetime, 
        auction.expires as expires,
        auction.id as id 
    FROM nexmark where auction is not null),
bid as (
    SELECT bid.auction as auction,
        bid.bidder as bidder, 
        bid.extra as extra,
        bid.datetime as datetime,
         bid.price as price
    FROM nexmark  where bid is not null)

SELECT B.auction, B.price, B.bidder, B.dateTime, B.extra
from bid B
JOIN (
  SELECT MAX(B1.price) AS maxprice, tumble(INTERVAL '10' SECOND) as window
  FROM bid B1
  GROUP BY 2
) B1
ON B.price = B1.maxprice
WHERE B.dateTime BETWEEN B1.window.start_time AND B1.window.end_time;

Convert State Machine to `statig`?

I've used statig, which is open source, to implement FSM in the past with Rust, and I loved the user experience. I noticed that the state/transition logic in the controller is not using any library as its basis. It's all code written for Arroyo.

Would it make more sense to use an external library for such logic, so that we keep Arroyo specific for streaming and let the FSM logic be maintained by a project that specifically focuses on such?

Catalog in web ui should show SQL types to the user

Users can view their available sources by looking at the catalog, on the left sidebar of the query editor.

image

Currently the catalog returns types from our internal naming schema, like Int64 and UnixMillis. These are returned by the GetSources gRPC call, which returns a list of SourceDef messages, each of which contains a list of SourceField. Ultimately, you end up returning various PrimitiveTypes with names like Int64, etc.

This API should instead return SQL types that will be familiar to SQL users, like bigint and timestamp.

Support Update Outputs

Currently arroyo streams are of the style that Flink calls "append", where every new record is emitted from the operator. This limits the system in two ways:
First, it doesn't allow for updating single records. For example, doing a non-windowed outer join isn't currently supported because there is never a guarantee that the left or right side won't appear later.

Second, some sliding window applications only want to know when a key has entered or left the result set. Think, for example, of a query that detects unusual account behavior. Emitting all currently triggered keys could overload the downstream system. It should be possible to only emit the delta for such queries.

Controller continues in degraded state when psql connection fails

On startup, the controller spawns a tokio task that watches the postgres job_configs table. This is what allows it to respond to changes in configuration, starting, stopping and modifying pipelines.

However, if that connection fails (because of a configuration issue or because the DB is not available) the task will panic, but the overall controller process will continue on. However, because that task is gone it will not do any useful work.

This should be fixed by:

  1. Failing controller startup if we're unable to talk to postgres
  2. Adding error handling to the config watcher task so that intermittent issues with postgres don't lead to a broken controller

Add ability to stop pipeline without entering the pipeline specific view

Feature Request

Right now, a user cannot stop a pipeline unless they navigate through the console to the pipeline's view.

I propose adding a button in the pipelines list view to stop a pipeline.

image

image

Other Considerations

Pipelines cannot be deleted unless they are stopped, this makes the deletion process more streamlined.

Self Joins not Possible with Alias

This query which performs a self join using an alias:

select nexmark_50000.person as seller_record
from nexmark_50000
join nexmark_50000 as t2
    on nexmark_50000.auction.seller = t2.person.id
where nexmark_50000.person is not null;

does not parse correctly from DF SQL to syn expr

image

only support converting column expressions to columns.

Create Intermediate Representation Between arroyo-sql::SqlOperator and arroyo-datastream::Program

Currently this is translated in a single call to get_program_from_operator().

However, this code path is overloaded, responsible for code generation, creation of the operator graph, edge construction and several in-lined optimizations. This results in unclear, complex code.

For this issue, the task is to define another representation, SqlPlanGraph which can be more directly translated to the Program.

Before

[SqlOperator] - (graph construction, code gen, optimization) -> [Program].

After

[SqlOperator] -(graph construction) -> [SqlPlanGraph] - (optimizations) -> [SqlPlanGraph] - (code gen) -> [Program]

Optimize Aggregates for offset Sliding Windows

In the SQL planning logic aggregates over sliding windows use a two-phase aggregation strategy, rolling up an intermediate aggregate over each step size then updating the overall aggregate window value on each step. The current implementation requires that the step evenly divides the width, and when that isn't true it falls back to a much slower approach.

It is possible to modify the current algorithm so that it works with offset sliding windows, as follows:

Let the width be W, the step be S and some nonzero remainder R = W % S. Rather than having everything happen at every step S, you take a set of actions at time T when T % S = 0, and another set when T % S = R.

When T%S=0 calculate the partial aggregate from [T-S, T). This is then processed by all currently active windows and any new windows become initialized.

When T%S=R, calculate the final partial window of [T-R, T), then have the second phase aggregation consume it and emit for all live windows. If this is the last time a given window will be emitted, evict it from tracking.

Spec needed: Error handling strategy in Arroyo SQL Functions

As I tried to implement #25, I had to face two type of errors:

While it is probably true that the Arrow implementation returns error because it receive a String and not a regexp, and try to build the regexp internally, I think unwrapping the error assuming it will never lead to panicking is not ideal. That would be relying on internal implementation details rather than the signature/the types, so I think we should probably come up with a strategy for mapping lower level errors into Arroyo specific errors.

DataFusion also returns an algebraic data type from its API, so I think we need to clarify what's the approach should be for arroyo
https://github.com/apache/arrow-datafusion/blob/8a112484ac7ae89afc7006d56c65fba2dab106ce/datafusion/physical-expr/src/regex_expressions.rs#L54

Add Discussion to GH

I'd like to see a GH discussion section for questions for users and devs that aren't necessarily issues, bugs, or feature requests. Would this be possible with the Apache license now?

Allow Users to specify retention windows for JoinWithExpiration in SQL

Currently non-windowed joins in SQL always have a retention of 24 hours. This can be both much too long and much too short, depending on the use case. Flink currently handles this with a configurable execution option, table.exec.state.ttl. We could follow suit by allowing settings to be passed into the GrpcApi, or figure out a way to have it specified within the body of the query.

Kubernetes scheduler

It should be possible to use Kubernetes as a scheduler for jobs, implemented in a similar fashion to our current Nomad scheduler.

More clearly document a "long time" with regards to compiling pipelines

With a basic developer setup and the simple tutorial pipeline, compiling is taking longer than 30 minutes.

I am seeing this repeated in the api logs when on the main page for the pipeline:

2023-04-13T22:26:11.887964Z ERROR response failed{code="Internal" path="/arroyo_api.ApiGrpc/GetJobMetrics"}: arroyo_server_common: new
...
2023-04-13T22:26:42.494625Z ERROR response failed{code="Internal" path="/arroyo_api.ApiGrpc/GetJobMetrics"}: arroyo_server_common: new

Postgres

Running via postgres -D /usr/local/var/postgres

I made sure to do the setup with the arroyo database, the correct arroyo user/pass and ran the migrations.

API

cd arroryo/arroyo-api
cargo run

Controller

cd arroyo/arroyo-controller
cargo run

Console

pnpm run dev

All in separate panes in iterm mux


I'm going to let it run for an hour or so and check to see if it's made any progress.

Not seeing any logs that incidicate such, the logs that I see on the api were added above, and here's what I'm seeing on the controller:

2023-04-13T22:20:54.478312Z  INFO arroyo_controller: Using process scheduler
2023-04-13T22:20:54.486223Z  INFO arroyo_controller: Starting arroyo-controller on 0.0.0.0:9190
2023-04-13T22:20:54.486307Z  INFO arroyo_server_common: Starting arroyo-controller admin server on 0.0.0.0:9191
2023-04-13T22:20:54.507758Z  INFO arroyo_controller::states: starting state machine job_id="hbmgfamd"
2023-04-13T22:20:54.509053Z  INFO arroyo_controller::states: state transition job_id="hbmgfamd" from="Created" to="Compiling"
2023-04-13T22:20:54.516598Z  INFO arroyo_controller::states::compiling: Compiling pipeline job_id="hbmgfamd" hash="f9ubevthoxw2g6c7"
2023-04-13T22:20:54.517804Z  INFO arroyo_controller::compiler: digraph {
    0 [ label = "source_0:UnboundedNexmarkSource<qps: 100>" ]
    1 [ label = "watermark_1:Watermark" ]
    2 [ label = "sink_4:NullSink" ]
    3 [ label = "map_2:expression<fused<map,filter>:OptionalRecord>" ]
    0 -> 1 [ label = "() β†’ arroyo_types::nexmark::Event" ]
    3 -> 2 [ label = "() β†’ generated_struct_16429945049069439673" ]
    1 -> 3 [ label = "() β†’ arroyo_types::nexmark::Event" ]
}
2023-04-13T22:23:43.822146Z  INFO arroyo_controller::states: starting state machine job_id="dtendbjv"
2023-04-13T22:23:43.823730Z  INFO arroyo_controller::states: state transition job_id="dtendbjv" from="Created" to="Compiling"
2023-04-13T22:23:43.832809Z  INFO arroyo_controller::states::compiling: Compiling pipeline job_id="dtendbjv" hash="kbnrqmwueowoiayb"
2023-04-13T22:23:43.834899Z  INFO arroyo_controller::compiler: digraph {
    0 [ label = "source_0:UnboundedNexmarkSource<qps: 100>" ]
    1 [ label = "watermark_1:Watermark" ]
    2 [ label = "sink_10:NullSink" ]
    3 [ label = "map_2:expression<fused<map,aggregator_key>:Record>" ]
    4 [ label = "aggregate_window_4:SlidingWindowAggregator<SlidingWindow(size: 1m, slide: 5s)>" ]
    5 [ label = "aggregation_8:expression<fused<aggregation,map>:Record>" ]
    6 [ label = "aggregation_5:expression<fused<aggregation,aggregator_key>:Record>" ]
    7 [ label = "aggregate_window_7:TumblingWindowAggregator<TumblingWindow(0ms)>" ]
    0 -> 1 [ label = "() β†’ arroyo_types::nexmark::Event" ]
    7 -> 5 [ label = "generated_struct_7077236267445092260 β†’ generated_struct_11296174908488371850" ]
    1 -> 3 [ label = "() β†’ arroyo_types::nexmark::Event" ]
    3 -> 4 [ label = "generated_struct_2862623353463512223 
 generated_struct_16429945049069439673" ]
    5 -> 2 [ label = "() β†’ generated_struct_8568894031591935709" ]
    4 -> 6 [ label = "generated_struct_2862623353463512223 β†’ generated_struct_8556445246977061536" ]
    6 -> 7 [ label = "generated_struct_7077236267445092260 
 generated_struct_8241898945625653774" ]
}

The two pipelines that I'm trying to compile are:

SELECT count(distinct bid.price) AS count,
        hop(interval '5 seconds', interval '60 seconds') AS window
            FROM nexmark 
            GROUP BY window;

AND

select bid from nexmark where bid is not null;

Implement date_trunc(), date_part(), and extract()

The input will be a possibly optional SystemTime, as that is the representation for timestamps currently. Follow the documentation from data fusion for date_trunc, date_part, and extract.

The most natural place is a new variant on Expression for DateFunction(), with a corresponding DateExpression. Testing should be done by adding additional assertions to arroyo-sql-testing.

First Pipeline tutorial "First Query" shows different table name than the source created in the previous step

image

Click continue, and finally give the source a name, like β€œnexmark_100,” and click β€œPublish” to finish creating the source.

SELECT bid FROM nexmark WHERE bid IS NOT NULL; is the query that is displayed for the user to use as their first pipeline, however the previous step recommends the name nexmark_100.

I knew instinctively that I should change this, but for new users this could be confusing, especially to those unfamiliar with SQL or data systems such as Arroyo.

Seems pedantic, but I believe that this could be a barrier to entry and this first pipeline process tutorial needs to be as smooth as possible for adoption of the tool.

Implement Case Statements

Test Query:

select CASE person.name when null then 'anonymous' else person.name end 
    || ' was a part of this transaction.' 
    || ' Nice!'
from nexmark;

Output:

image

Missing 'schemaType' field in schema registry response

Arroyo version 0.3.0

Using confluent schema registry during kafka source setup, this is the only response I am able to retrieve:

Missing 'schemaType' field in schema registry response

My set up is standard debezium + postgres w/confluent schema registry v6.1.0 using avro, self-hosted.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    πŸ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. πŸ“ŠπŸ“ˆπŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❀️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.