Git Product home page Git Product logo

arroyo's People

Contributors

asottile-sentry avatar ayirr7 avatar barkbarkimashark avatar byk avatar chadwhitacre avatar cmanallen avatar dbanda avatar fpacifici avatar getsentry-bot avatar hubertdeng123 avatar kamilogorek avatar loewenheim avatar lynnagara avatar markstory avatar mattgauntseo-sentry avatar mcannizz avatar mdtro avatar meredithanya avatar mitsuhiko avatar mj0nez avatar nikhars avatar rahul-kumar-saini avatar untitaker avatar volokluev avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

arroyo's Issues

Automatically record consumer latency in Arroyo

Currently at Sentry we primarily monitor backlog size as an indication of consumer health, and we get paged if the backlog is too big. This is problematic because backlog size varies wildly across different consumers, and a number that is too big on one consumer can be perfectly fine on another. The goal is to move to consumer latency everywhere.

One way to solve this in Arroyo could be to add a message_timestamp property onto the Message class which represents the timestamp of the message from the Kafka broker. Then, when offsets are committed, we can diff the message's timestamp with the current time, which we'll then use as our latency metric.

Make Arroyo opinionated in resource lifecycle management

There is no clear direction suggested by the API on how to manage the lifecycle of stateful resources like a Producer.
This introduced a variety of different approaches in production that are not consistent with each other.
Most of them are broken in a way or another (for example we have consumers that fail to close producers).
This is made worse as getting the lifecycle right is hard since most people do not properly grasp the lifecycle of a ProcessingStrategy to begin with.

There are fundamentally three options:

  • If a ProcessingStrategy needs a Producer it can instantiate it, own it, and close it. The processing strategy is fully responsible.
  • If a ProcessingStrategy needs a Producer it can rely on the callsite that instantiate the factory to provide a functioning Producer. The callsite is fully responsible and the processing strategy leaves it open.
  • If a ProcessingStrategy needs a Producer it can rely on the callsite to provide a Producer and it gains ownership at that point, thus close it at the end.

I think the third will be massively confusing (also fairly hard to achieve consistently as the ProcessingStrategy can be closed at every rebalance).
The first is problematic for unit tests as of today. If the processing strategy instantiates a Producer it is impossible to use the Fake Producer on unit tests. This cannot be alleviated by creating the producer in the Factory as the factory cannot close it.

Without fundamental changes in Producer instantiation, we would have to rely on option two.

So this issue can be addressed in two ways:

  • [enforce Options 2]: Investigate ways to detect when a client tries to use option 1 or 3 and block it. Or restructure the Producer creation process to make it harder to implement 1 or 3. I think the way to address this is, first, to see if we can infer the cases where 1 or 3 are being used automatically.
  • [fundamental changes in Producer instantiation]: Can we study ways to make the API responsible to provide the needed resources given a spec from the client? So the API would have the control of the Producer lifecycle in a way that can be customized for unit tests. Example: provide a spec for the producer needed by a strategy factory. The processor keeps a registry of resources. The strategy factory retrieves the producer instance when it needs it via a context manager. The processor decides when it is time to close it.

Make Arroyo consistent in the usage of the Message object

There are two competing requirements in how processing strategies are designed. These are emerging from real world usages of the API.
On one hand, there are designs where there is a 1:1 relationship between messages seen by a ProcessingStrategy and messages produced by Kafka. The payload may have been altered but the Message still contains a meaningful partition and offset. The snuba consumer works like this
On the other hand there are designs where new messages are created in the pipeline that do not have a direct correspondence to an individual Kafka message. An example is the batching step in the Metrics Indexer. It accumulates messages into a batch and send the batch as a message to the next step. In this case the Batch, which has to be represented as a message, does not have a corresponding meaningful Kafka partition and offset.

The interface we have today seems to be designed for the first use case and does not work for the second.
This has implications:

  • We cannot have a clean representation of a pipeline that contains aggregation steps or steps that introduce new messages as these cannot be associated to an individual Kafka message. Or if they do, that association has no meaning. This is troublesome as batching steps tend to be very useful.
  • Following the first approach means we cannot modularize pipeline as a sequence of steps that much. Any step that performs batching needs to encapsulate all the following steps if it wants to stick to the ProcessingStrategy API correctly. This is because it cannot assign an offset to a Batch.
  • This is the reason the existing CollectStep is so cumbersome.

I think we need to make a decision about how to modularize our pipelines and stick to it:

  • If we want to stick to the current interface: "a processing strategy takes a Message that has a valid partition and offset", then we cannot compose pipelines by introducing steps that batch messages and forward a batch to the following step. This means we need to introduce a different composition mechanism that works at a different abstraction level so that we can still express pipelines as transform, filter, batch.
  • If we want to properly break this constraint and allow steps to forward batches, then the submit interface needs to change to accommodate the case where we forward a message that does not correspond to an individual Kafka message.

In alternative we can give up on the idea to provide batching features out of the box and let the client implement their own solution. I would advise against this as it is such a common and useful features. A ton of use cases rely on batching in some way, adding friction for the customer that wants to use it would defy the purpose of the api which is to make the right thing easy.

PyPi sdist does not ship requirements.txt

Environment

sentry-arroyo 1.0.4

Steps to Reproduce

  1. Try to install the sdist from PyPi

Expected Result

Packet should build from source and install into the environment.

Actual Result

The requirements.txt isn't shipped in the source distribution.

Traceback (most recent call last):
  File "/build/sentry-arroyo-1.0.4/nix_run_setup", line 8, in <module>
    exec(compile(getattr(tokenize, 'open', open)(__file__).read().replace('\\r\\n', '\\n'), __file__, 'exec'))
  File "setup.py", line 24, in <module>
    install_requires=get_requirements(),
  File "setup.py", line 7, in get_requirements
    with open("requirements.txt") as fp:

Multiprocessing's block sizes are very confusing -- hide them?

This feedback was provided by @Swatinem and @wedamija. input_block_size and output_block_size are not comprehensible to the user, and end up being too hard to tweak even for S&S members sometimes.

We should document them better, but also provide better defaults.

A simple improvement could be to just set them to a much larger value than today, and hope nobody ever runs into them. It wastes a lot of memory but is simple to understand.

I personally want to explore the idea of dynamically resizing buffer blocks when they become too small. I think this is feasible for as long as there is some upper limit to how large those blocks are allowed to get (such as, 200MB)

Make arroyo serve a liveness/readiness checks for k8s

This idea was provided by @beezz. Arroyo consumers can die because of many reasons. The largest category is about the main thread being blocked. Main thread can be blocked due to bugs during rebalancing (as observed in #217 #237), or because the user-defined strategy (or RunTask) misbehaves.

When we fail to poll, Kafka already kicks us out of the broker after max.poll.interval. But the pod is still there, doing nothing.

The suggestion by michal is as follows: We should, everytime we poll, touch a file (debounced). If the file has not been recently touched, k8s notices and deletes the pod.

Support out of order message processing and committing

Most of how arroyo is designed quietly requires messages to be processed and committed in the same order they are received. The only exception is the parallel transform that transforms messages potentially out of order, but then re-orders them before sending them to the next step, thus the problem does not apply.

There was a potential use case in replays recently, I suspect we will encounter more as we expand to more use cases that need throughput but do not have strong consistency guarantees.

In that scenario most messages would be quick to process, while some took orders of magnitude longer. In a scenario like this, we would write a consumer such that it would have two independent queues inside to process the messages at different pace. One way to build this is to have a router and two separate consumer for the separate classes of traffic but at times the giht queue is not known upfront.

In scenarios like the one above, messages would reach the last step in the pipeline out of order. Which means that we cannot just trust the committable property as the high watermark of what to commit. We would need to keep track of all the offsets we saw and only commit an offset when all the previous offsets have been processed.

The first question to support this is: should we? Out of order processing can be very useful for throughput though the question is whether the consumer owner should take care of understanding that and reordering the offsets before committing. It seems to me this would go slightly against the idea of making commit easy and as transparent as possible.

The second question, if we say yes to the first, is how to reorder the offsets behind the scenes before committing. One way would be to record all the offsets we see and mark them as "committable" when the application code asks to commit. Though we would only commit the highest watermark.

Idea to reuse strategies between partition assign/revoke

We currently recreate the processing strategy everytime we assign/revoke a set of partitions. To my knowledge there are two reasons for that:

  1. We want to flush out old messages to preserve ordering guarantees (messages on the same partition should be submitted in-order, and strategy should commit in-order)
  2. StrategyFactory.create_with_partitions gets the currently assigned partitions passed

However, at the same time:

  1. we only provide ordering guarantees per-partition. if a new partition is assigned, we can reuse the processing strategy. if a partition is revoked, we can reuse the processing strategy. the only case where we need to join() is when the same partition is revoked and assigned again to the same consumer

  2. Almost no factory we write actually cares about the assigned partitions.

And additionally, we observe in production rebalancing where partitions get shuffled around between consumers in the following pattern:

  1. new partition assigned n
  2. partition n revoked
  3. new partition assigned n + 1
  4. partition n+1 revoked
  5. ...

(see attachment out.tsv for full logs pertaining to INC-402)

out.tsv.txt

The logs show that we are closing the strategy on every partition revocation, i.e. on step 2 and 4. In theory it can also happen on partition assignment. But in the pattern above, it is completely unnecessary to recreate the strategy at all in order to preserve ordering of messages per-partition.

Proposal:

  1. Deprecate create_with_partitions and replace it with a simpler create interface that does not take partition count.

    There is one strategy here that uses partition count to determine query concurrency. But could it possibly observe partitions as messages come in, and adjust query concurrency based on that?

    Alternatively there can be a mechanism where strategy factories that do define create_with_partitions still enable today's behavior on rebalancing, while most other factories that don't need partition information can use create will receive a nice speed boost during rebalancing.

  2. In on_revoke, do not close the strategy. Instead, add the revoked partitions to a set of "unflushed partitions"

  3. In on_assign, also do not close the strategy, but only create it if it is None. If there is overlap between the "unflushed partitions" and the newly assigned partitions, call join() on the strategy (but do not recreate it) and clear the set "unflushed partitions" (only if join() happened)

Unclear:

  • Do we need to actually recreate the strategy, or can we just call join() and continue using it?

Arroyo provides little insight into what is actually being stored in input/output buffers

While working on #270 I noticed that with certain transformation functions, the returned bytestring did not get stored in the output buffer at all. It's not clear to me why this happens.

I noticed a pattern where returning a plain b"x" * 20 did not get stored in the output buffer, but wrapping it in a KafkaPayload like the tests do in #270 do would store the bytestring out of band.

This behavior could explain some mysterious performance regressions we had in the past (output buffer not being used). Hopefully the new output buffer metrics will provide insight into this. If that metric is much lower than input batch size in bytes, we have a problem in that area.

We also don't know how much data is being transmitted in-band via pool.submit vs what is being sent over the buffer. Right now we only emit metrics for the size of the out-of-band buffer, not whatever else we pickle.

Move committing offsets into it's own strategy

Since Arroyo supports building consumers with at least once guarantees committing offsets is always the last step in a chain of processing strategies.

Today committing offsets often happens within a strategy. For example: ProduceAndCommit produces messages then commits the offsets when messages are produced, and RunTaskInThreads runs a task then commits offsets when it is done.

This means that ProduceAndCommit and RunTaskInThreads can both only be used as the last step in a sequence of processing steps. Something like Consume -> RunTaskInThreads -> ProduceAndCommit would not be possible.

In order to make our strategies more flexible, we can make committing offsets a separate step. Every other strategy except the commit strategy takes a next_step which can be any other step step (if more processing needs to be done) or the new CommitStep if that was the last step and we are ready to commit offsets.

Expose message size stats in arroyo

We currently do not have those and as a result, every consumer writes their own.

i tried to add them in #235 -- but that PR ended up being a huge performance regression

Ideas for structuring consumer definitions and isolating independent components

Arroyo Ideas

Intro

I’m thinking of this as more of a discussion point and not a definitive answer to the question: “how should we structure pipelines in our consumer”. If you like it – great, wants changes – great, or think its useless – that’s fine too! I’m happy with the current Arroyo.

Goal

Users of the library should rarely define strategies. Not because its bad but because its unnecessary. We should be able to expose 99% of the range of behavior as primitives. The user can then mix and match primitives to achieve a result.

Another goal is to remove all thinking about Kafka. Allow the maintainers of arroyo primitives (in sentry or arroyo) to define sane interactions with the consumer that can be blindly reused by junior developers.

Problems with the current implementation

“Problem” is used loosely here. The current implementation is really good. This refactor changes how steps are composed and normalizes some naming to “XYZ Step” or “XYZ Pipeline”. I think doing so gives better reuse and clearer semantics for new users trying to implement their consumers.

Naming can be updated. These are just examples. Any naming outside of the “Step” and “Pipeline” suffixes should be ignored. The definition language is the most important aspect.

Thoughts About the Data Model

The current implementation uses the decorator pattern. Behavior encloses other behavior. Whereas a pipeline is generally thought of as "my output is your input". Behavior is not enclosed, components are distinct.

I will illustrate this. Consider a pipeline of three functions A, B, and C. The way a developer would mentally conceive of this pipeline is illustrated below:

Input => A => B => C => Output

However, Arroyo defines pipelines that generate this call stack:

Input => A => B => C => B => A => Output

This difference can be described in code. You can see the inversion of the function ordering. This is required to satisfy the decorator pattern. A component must be defined after and execute before its dependency.

# Pipeline Pattern
A = lambda: ...
B = lambda: ...
C = lambda: ...

A()
B()
C()

# Decorator Pattern
C = lambda: ...
B = lambda: C()
A = lambda: B()

A()

This is not a problem per se. It could cause confusion in certain circumstances but I think with the right interface much of that confusion could be limited without changes to the underlying data model.

Desired implementation

Consider the following pipeline. A consumer receives a payload containing type and value fields. We want to multiply the message’s value field by 2 if it is of the multiply type. Otherwise we drop the message.

steps = [
    FilterStep(lambda msg: msg["type"] == "multiply"),
    ApplyStep(lambda msg: msg["value"] = msg["value"] * 2)  # this is broken psuedo code.  pretend it returns the mutated msg.
]

The steps are defined as a list and not a linked-list. The steps function identically to the current steps and implement the submit, poll workflow.

FilterStep removes items from processing. ApplyStep applies functions to a message and returns the output. These are provided by Arroyo. The lambdas are user-defined. Notice the steps do not enclose one another. They are totally independent and can be composed in any order.

However, this is not a complete pipeline. How are the steps called? For this we need another layer.

steps = [
    FilterStep(lambda m: m["type"] == "multiply"),
    ApplyStep(lambda m: m["value"] = m["value"] * 2)
]

pipeline = SynchronousPipeline(steps=steps)
pipeline.submit(message)
pipeline.poll()

The SynchronousPipeline would be provided by Arroyo in this hypothetical. It works how you would think. It calls submit and poll on all its steps. At this layer we can do things like catch exceptions. We can manage backpressure.

Almost done but not quite. We need to commit the message.

pipeline = SynchronousPipeline(
    steps=[
        FilterStep(lambda m: m["type"] == "multiply"),
        ApplyStep(lambda m: m["value"] = m["value"] * 2)
    ],
    next_step=CommitStep(commit)
)

pipeline.submit(message)
pipeline.poll()

Pipelines implement a linked list pattern for accessing next steps. They function almost identically to the current Arroyo implementation. Pipelines decorate their next steps. Pipeline steps (the sub-steps defined as a list) do not decorate one another and are independent components.

Why isn’t CommitStep in the steps list? Well it can be but it makes sense to call it as a linked next_step. This will become more clear in the next example. next_step could also be a nested pipeline.

Real world example (Replays)

Replays is like attachments. Chunked messages come in. A capstone comes in. The capstone commits. If the capstone fails the chunks can be retried because they never commit. If a message raises an exception, it should log but NOT commit.

# Catches exceptions and logs them. Exception prevents commit.
pipeline = LogExceptionPipeline(
    message="Recording pipeline failed.",
    steps=[
        # Unpack the message.
        TransformStep(msgpack.unpackb),
        # Cache it if its the right type.
        TransformStep(cache_if_chunk),
        # Filter out the messages we just cached.  They should not be committed.
        FilterStep(filter_chunk_message),
        # In a thread store the chunks in GCS (chunks are fetched from cache).
        RunTaskInThreads(
            store_chunks,
            concurrency=16,
            max_pending_futures=32,
        ),
    ],

    # Next step is composed the same way Arroyo composes now.
    next_step=CommitStep(commit),  # Can be a pipeline or a step.
)
return pipeline

With the CommitStep defined as a next step it exists outside of the try catch pipeline and is unimpacted by its behavior. In other words, the CommitStep is free to raise.

Compare to the current arroyo implementation:

# Commits are made in batches.
commit_strategy = CommitOffsets(commit)
# In a thread store the chunks in GCS (chunks are fetched from cache).
store_strategy = RunTaskInThreads(
    processing_function=store_chunks,
    concurrency=16,
    max_pending_futures=32,
    next_step=commit_strategy,
)
# Filter out the messages we just cached.  They should not be committed.
filter_chunks_strategy = FilterStep(filter_chunks, next_step=store_strategy)
# Cache it if its the right type.
cache_strategy = TransformStep(cache_if_chunk, next_step=filter_chunks_strategy)
# Unpack the message.
deserialize_strategy = TransformStep(deserialize, next_step=cache_strategy)
# Catches exceptions and logs them.  Pipeline stops.  Commit is never reached.
log_error_strategy = LogExceptionStep(next_step=deserialize_strategy)

return log_error_strategy

Its less clear how things are composed. How Steps interact with one another. By refactoring the steps to be more isolated, we can clearly understand how all of the components interact with one another. The proposed implementation also has the benefit of presenting the pipeline steps in order.

Internal of SynchronousPipeline

# THIS IS PSUEDO CODE :D

class SynchronousPipeline:
    def __init__(self, steps: list[Callable], next_step=None):
        self.steps = steps
        self.next_step = next_step

    def submit(self, message):
        for step in self.steps:
            message = step(message)

        if self.next_step:
            self.next_step.submit(message)

    def poll(self):
        for step in self.steps:
            step.poll()

        if self.next_step:
            self.next_step.poll()

Random Notes

  • Step types do not have a next_step property (they do but its always None). Only pipelines can move to a next step.

Support committing offsets of filtered messages

Today the FilterStep in Arroyo works by calling a test_function on every message received and either forwarding the message to the next step if it passes or dropping it if it does not.

Since committing must always happen as the last step in any pipeline, this means that the offsets of messages that are dropped by the filter will never be committed.

Most of the time this is not really a problem since we only ever want to commit offsets periodically anyway. Dropping some doesn't matter.

However in some use cases, so many messages might get filtered out that there may be an extended period of time for which no messages will ever get through the filter and nothing will get committed. This is problematic because the backlog will grow and alerts connected to this can fire.

One potential solution to this involves sending a special "filtered message" through the pipeline so it can get committed at the end. All the steps in between would have to recognise and simply forward the filtered message without doing any other processing on it. Though this has the downside that all other steps may need to have additional complexity as they would now need to be aware of the filtered message and handle it.

Write tests for example

so that those writing new consumers can have a boilerplate of what tests should look like for arroyo

Allow the Dead Letter Queue to record the original message

The arroyo DLQ can be applied to any Processing Strategy.
While this is a relatively simple design it does not solve the most common DLQ problem: accumulate invalid messages and replay them after a fix is applied.
This is because only the first transformation step in an Arroyo consumer has access to the original message generally. If an error is identified in a following step, only the partially processed message would be sent to the DLQ, thus making it impossible to replay the messages.

One option would be to leave this in the hands of the application developer and recommend that the original message would be passed through as well. I think this is gross. It would make it really cumbersome and discourage people from employing the DLQ which would be an anti pattern.

Alternative option:
make the DLQ more transparent than it is now. Each step wrapped by a DLQ would only provide offset and partition in the invalid message (or invalid messages) exception. The DLQ step would be wrapping the entire strategy and would keep track of the original message indexed by partition and offset. When the DLQ strategy sees an invalid message it would produce the original message on the DLQ topic. (we would need to figure out a good way to clean up this buffer when offsets are committed)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.