Git Product home page Git Product logo

marshal's Introduction

Marshal - a Kafka consumer coordination library

GoDoc Build Status

Marshal is in beta. We have deployed it in a few places and are working to ensure it's stable and fast. It is not 100% battle tested yet, feedback is very welcome.

Purpose

This project assumes you have some familiarity with Kafka. You should know what a topic is and what partitions are.

In Kafka, the unit of scalability is the partition. If you have a topic that is getting "too busy", you increase the partition count. Consumption of data from those busy topics requires consumers to be aware of these partitions and be able to coordinate their consumption across all of the consumers.

Traditional setups use Zookeeper or some other system for coordinating consumers. This works in many situations, but introduces a point of failure that isn't necessary. It is possible to completely perform consumer coordination using Kafka alone.

Additionally, getting consumer coordination correct is a rather taxing exercise in development and, frankly, shouldn't need to be done for every single project, company, etc. There should be an open source system that handles it for you.

Marshal is a library that you can drop into your Go programs and use it to coordinate the consumption of partitions across multiple processes, servers, etc. It is implemented in terms of Kafka itself: zero extra dependencies.

Marshal is designed for use in production environments where there are many topics, each topic having hundreds of partitions, with potentially thousands of consumers working in concert across the infrastructure to consume them. Marshal is designed for big environments with critical needs.

Usage

This module is designed to be extremely simple to use. The basic logical flow is that you create a Marshaler and then you use that to create as many Consumers as you need topics to consume. Logically, you want one Marshaler in your program, and you want a single Consumer per topic that you need to consume from.

Here's the simplest example (but see a more complicated example in the example directory):

package main

import "fmt"
import "github.com/dropbox/marshal/marshal"

func main() {
    marshaler, _ := marshal.NewMarshaler(
        "clientid", "groupid", []string{"127.0.0.1:9092"})
    defer marshaler.Terminate()

    consumer, _ := marshaler.NewConsumer(
        []string{"some-topic"}, marshal.NewConsumerOptions())
    defer consumer.Terminate()

    msgChan := consumer.ConsumeChannel()

    for {
        msg := <-msgChan
        fmt.Printf("Consumed message: %s", msg.Value)
        consumer.Commit(msg)
    }
}

If you were to hypothetically run this against a cluster that contained a topic named some-topic that had 8 partitions, it would begin claiming those partitions one by one until it had them all. If you started up a second copy of the program, it would only claim the partitions that are not already claimed. If the first one dies, the second one will pick up the dropped partitions within a few minutes.

In essence, Marshal takes all of the effort of consumer coordination out of your software and puts it where it belongs: on Kafka.

How Coordination Works

Please read this section to get a handle on how Kafka performs coordination and the guarantees that it gives you. In particular, the failure scenarios might be interesting.

If you want the gory details about the protocol used internally, please see the PROTOCOL documentation. You don't need to read and understand it, though, but it might be useful.

Basic Coordination

In essence, Marshal uses a special topic within Kafka to coordinate the actions of many consumers anywhere in the infrastructure. As long as the consumers can connect to the Kafka cluster you want to coordinate, you can use Marshal. There is no language dependency either -- Marshal the algorithm could be implemented in any language and consumers could coordinate with each other.

We assume that you're familiar with the basics of Kafka -- notably that each partition is effectively a write-ahead log that records an ordered set of events, and that it's not possible (barring unclean leader elections) for two consumers to see different event orderings. Marshal takes advantage of that property to perform distributed coordination.

When a program using Marshal starts up, the first thing it does is read the logs in the coordinating topic. These logs contain certain events, such as: claim partition, heartbeat, and release partition to name a few.

Using these events Marshal can know not only what consumers exist, but what partitions they are currently working on and how far along they are. Using that information the local program can decide such things as "which partitions are unclaimed" and then take action to claim and begin consuming those partitions.

Groups and Clients

Coordination happens within "groups". When you create a Marshaler you can specify the group that your consumer is part of. All claims are done on a per-group basis, which means you can consume the same topic N times -- as long as you have N groups. There is a one-to-one mapping between "consumers that can claim a given partition" and "number of groups".

The "client ID" specified when you create a Marshaler is used to identify a particular instance of a program. These should be unique per instance of software, but they should be reasonably stable. At Dropbox we use the name of the machine the software is running on, plus possibly an instance ID if we run multiple copies on a single box.

Consumption of Messages

The main engine of Marshal happens when you create a consumer and call consumer.Consume(). This will possibly return a message from one of the partitions you have claimed. You then do something with the message... and consume the next one. You don't have to do anything else.

Behind the scenes, the act of consuming updates internal cursors and timers and will possibly generate heartbeat messages into the Marshal event log. These messages contain information about the last offset consumed, allowing other consumers (and monitoring systems) to know where you are within the partition. In case of failure, they can resume at the last point you heartbeated.

Presently, all consumption within Marshal is at least once. In case of most consumer failures, it is likely a block of messages (one heartbeat interval) will be reprocessed by the next consumer.

Message Ordering

Kafka guarantees the ordering of messages committed to a partition, but does not guarantee any ordering across partitions. Marshal will give you messages from any partition it has claimed, so in essence, Marshal does not guarantee ordering. If you need message ordering, this library is not presently appropriate for you.

If you are having throughput problems you should increase the number of partitions you have available so that Marshal can have more in-flight messages.

Failure Modes

This documents some of the failure modes and how Marshal handles them. Please let us know about more questions and we can analyze and write about them.

Consumer Too Slow

In the case where a consumer is too slow -- i.e. it is consuming more slowly from a partition than data is coming in -- Marshal will detect this and internally it will start failing its health checks. When this happens it will, after enough time has passed, decide that it is not able to sustain the load and will voluntarily surrender partitions.

This is useful as a load balancing mechanism if you happen to have one consumer that ends up with 8 claims while another has only a handful, the former can shed load and the latter will pick it up.

However, it is worth noting that in the unbalanced scenario, as long as the consumers are keeping up with the traffic they won't release partitions. It is perfectly valid right now for Marshal consumers to end up unbalanced -- as long as they're all pulling their weight.

Consumer Death: Expected

If a consumer dies or shuts down in an expected (controlled) way, Marshal will attempt to commit release partition events into the log. If this happens successfully then other consumers will be able to pick up the partitions within seconds and begin consuming exactly where the last consumer left off.

No data is skipped or double-consumed in this mode and the downtime is extremely minimal.

Consumer Death: Unexpected

If a consumer dies unexpectedly, things are slightly worse off. Assuming a hardware failure or other such issue (network split, etc), the partition's claim will start to become stale. From the perspective of the rest of the fleet, they will have to wait an appropriate interval (two heartbeats) until they can claim the partition.

Data might be double-consumed, but the maximum amount is one heartbeat's worth. Depending on the last time you heartbeated, at worst you will see that many messages be double-consumed. The downtime of consumption is also up to two heartbeat intervals at worst.

Network Partitions

Since Kafka can only have a single leader for a partition, any consumers that are on the side of the leader will be able to continue working. Consumers that are on the other side will fail to heartbeat and will stop being able to work -- even if they could otherwise reach the leader for the topics they were consuming.

The consumers on the side of the Marshal coordination partitions will be able to tell that the other consumers dropped off and will be able to start working. (Of course, this may cause them to overload themselves with too many claims, leading to consumer slowness.)

If the partition is between the consumer and Kafka, the consumers will be unable to consume and will also fail their heartbeat. This is effectively treated as Consumer Death: Unexpected. When the partition heals, the consumers that lost their lock will know (assuming machine time is synchronized) and will abandon their claims.

Important Notes

This system assumes that timestamps are valid. If your machines are not using NTP to synchronize their clocks, you will not be able to get deterministic behavior. Sorry.

Marshal also relies on all actors being good actors. Malicious users can cause the system to act unpredictably or at their choosing.

Frequently Asked Questions

Here are some questions we've seen. For more, see us on IRC.

My consumers are unbalanced; one has more partitions than the others.

This is a design property of Marshal's implementation. We start with the premise that we can capably health check ourself and determine whether or not we are keeping up with our current claims. If that's true, then it doesn't matter how many partitions we have -- we'll be healthy.

This means that we can end up in a state where one consumer has several partitions and another consumer has fewer (or none), but Marshal guarantees that all of them will be healthy.

My consumer isn't claiming any partitions.

This usually happens when you are reusing Client IDs and your consumer has previously become unhealthy and released partitions. A sick consumer will not reclaim partitions it has previously released.

Make sure you have multiple consumers with different Client IDs, or make sure that in the single consumer use case you are using randomly generated Client IDs every time your program starts.

Bugs and Contact

There may be bugs. This is a new project. There are tests, however, and we very much welcome the submission of bug reports, pull requests, etc.

Github: https://github.com/dropbox/marshal

IRC: #kafka-marshal on Freenode

marshal's People

Contributors

drtall avatar pengkang avatar robot-dreams avatar zorkian avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

marshal's Issues

Use expvar and track statistics

We should track more data about the Marshal machine. How many messages are we getting from Kafka? How fast is the consumer pulling them? Do we have a queue or are we waiting for messages? What is the message turnaround time? Message size? etc.

This would be really helpful for users to be able to monitor exactly what is going on with their client when they're seeing issues.

cc @bonafidehan @DrTall

Batching consume API?

This was brought up in discussion today, but particularly for the AMO consumer when we get it it would be nice to have batching. I.e. when you consume you get a batch of messages and you can deal with them however you want, rather than just consuming one by one with the current API.

Add Support for Max Topic Claims

In order to support load-balancing consumers that consume messages, but process them at their own pace, we need to limit the number of topics that can be claimed for a given marshal client-id when ClaimEntireTopic is specified.

Remove fatal in double-claim

This is per @bonafidehan, who ran into the fatal double claim.

We should make it such that the interface for the Consumer channel is that if the channel has closed, the Consumer object (or the Marshal object) has declared itself unhealthy and should be thrown away. The world is in a bad place.

Then we should remove the Fatal and make it a channel close that propagates to all of the Consumers on the same Marshal. Since we can't know who's at fault for the bad state, we need to terminate everything.

Add "start at latest" option for consumer

For starting consumption sometimes people want to start with the newest data for streamers. This should be supported when creating a new consumer.

This should probably be implemented like the Python module which lets you specify how to reset the offset when there's an out-of-bounds or no-previous-offset encountered. This way people can restart their client and resume where they just were, but new clients can start at the latest offset.

Marshal consumer decides not to consume ?

Hello

We are trying to use marshal lib in hopes that we can support kafka topics with multiple partitions. Still testing out with our single partition topic and not sure if this is user-error or something to do with the heartbeat/healthchecking mechanisms in marshal, but we are experiencing an issue where we try to attach a marshal Consumer to a very high traffic topic and it decides it is behind and cannot keep up. Then subsequent attempts to reattach use the stored offset, which seems to max out at 2892349 and it decides that it is out-of-range and abandons the partition. This offset then never changes on subsequent reconnects/release/restarts of the consumers.

Here is the short version of the error we are experiencing (topic: ypec, only 1 partition):

2016/02/18 22:49:38 [ypec:0] consumer attempting to claim
2016/02/18 22:49:39 [ypec:0] consumer claimed at offset 2892349 (is -2827884 behind)
2016/02/18 22:49:39 [ypec:0] error consuming: out of range, abandoning partition
2016/02/18 22:49:39 [ypec:0] releasing partition claim

I tried deleting the __marshal topic and restarting from a fresh install and after recreating it and starting up the marshal/debug client I see similar messages:

2016/02/19 14:53:44 rationalize[0]: starting
2016/02/19 14:53:44 Waiting for all rationalizers to come alive.
2016/02/19 14:53:44 rationalize[0]: offsets 0 to 0
2016/02/19 14:53:44 All rationalizers alive, Marshaler now alive.
2016/02/19 14:53:44 <22.29 ms> construct Marshaler
2016/02/19 14:53:44 Topic ypec has 1 partitions.
2016/02/19 14:53:44 <0.10 ms> construct Consumer
2016/02/19 14:53:45 [ypec:0] consumer offsets: early = 0, cur/comm = 0/2892349, late = 159991
2016/02/19 14:53:45 [ypec:0] recovering committed offset of 2892349
2016/02/19 14:53:45 [ypec:0] consumer attempting to claim
2016/02/19 14:53:45 rationalize[0]: @0: [ClaimingPartition/0/1455922425/de9ca559/debug-client/ypec-logdata/ypec/0]
2016/02/19 14:53:46 [ypec:0] consumer claimed at offset 2892349 (is -2732358 behind)
2016/02/19 14:53:46 [ypec:0] error consuming: out of range, abandoning partition
2016/02/19 14:53:46 <9.57 ms> terminate Consumer
2016/02/19 14:53:46 <2079.58 ms> claim all partitions
2016/02/19 14:53:46 Marshal state dump beginning.
2016/02/19 14:53:46
2016/02/19 14:53:46 Group ID:    ypec-logdata
2016/02/19 14:53:46 Client ID:   debug-client
2016/02/19 14:53:46 Instance ID: de9ca559
2016/02/19 14:53:46
2016/02/19 14:53:46 Marshal topic partitions: 1
2016/02/19 14:53:46 Known Kafka topics:       8
2016/02/19 14:53:46 Internal rsteps counter:  1
2016/02/19 14:53:46
2016/02/19 14:53:46 State of the world:
2016/02/19 14:53:46
2016/02/19 14:53:46   GROUP: ypec-logdata
2016/02/19 14:53:46     TOPIC: ypec [on __marshal:0]
2016/02/19 14:53:46       *  0 [CLMD]: GPID ypec-logdata | CLID debug-client | LHB 1455922425 (1) | LOF 0 | PCL 0
2016/02/19 14:53:46
2016/02/19 14:53:46 Consumer states:
2016/02/19 14:53:46
2016/02/19 14:53:46   CONSUMER: 0 messages in queue
2016/02/19 14:53:46     TOPIC: ypec
2016/02/19 14:53:46       *  0 [CL+T]: offsets 0 <= 2892349 <= 159991 | 2892349
2016/02/19 14:53:46                    BC 0 | LHB 1455922426 (0) | OM 0 | CB 0
2016/02/19 14:53:46                    TRACK COMMITTED 0 | TRACK OUTSTANDING 0
2016/02/19 14:53:46                    PV 0.00 | CV 0.00
2016/02/19 14:53:46
2016/02/19 14:53:46 Marshal state dump complete.
2016/02/19 14:53:46 <0.00 ms> terminate Marshaler

Am I doing something wrong? There is definitely traffic on the ypec topic, as I can see it using kafkacat and the like. I do notice traffic on __marshal topic also, showing a hearbeat from that debug client, here is the total topic contents after recreating fresh and running debug client:

ClaimingPartition/0/1455922425/de9ca559/debug-client/ypec-logdata/ypec/0
Heartbeat/0/1455922425/de9ca559/debug-client/ypec-logdata/ypec/0/2892349

Support a limit on number of claims

By default Marshal will claim as many as it can until it becomes unhealthy.

A user has requested the ability to put an upper bound. This is useful in the situation where you know that your consumers can never operate with more than X partitions/topics. It does put more work on the part of the client to ensure they never underprovision, but this should be fine.

Deadlock in call to marshal.Terminate from Consumer.tryClaimPartition

This line calls marshal.Terminate while the lock is held (Lock)
marshal/consumer.go#L293

marshal.Terminate calls terminateAndCleanup, which calls terminateAndCleanup for all consumers:
marshal/marshal.go#L149

consumer.terminateAndCleanup attempts to acquire the lock (RLock)
marshal/consumer.go#L576

I think this blocks forever.

We discussed a bit offline, it should be sufficient to call marshal.Terminate from a separate goroutine.

It would be nice to have a test for this code path -- that way we'll know for sure that it works and can correctly handle when this condition is reached in the code that uses this client.

Create callback for "overdue" work

Right now Marshal is vulnerable to a type of situation where, if you get a message that causes your processing to break, your committed offset will never be advanced since that message will end up the "oldest outstanding" and Marshal will not advance.

One solution to this is to be able to give Marshal a callback and a timeout and, when we detect that a message has been outstanding for to too long, Marshal will call you and give you a copy of the message. You can then take whatever action you want (such as moving the message to a new queue or firing an exception or etc).

The proposed implementation:

  • New consumer option MessageTimeout which specifies the number of seconds before we will let you know a message has "timed out". This is measured by the time from the message being returned in the Consume call.
  • New consumer option MessageTimeoutCallback which is called with a *proto.Message. When this callback returns, Marshal will consider the message to have been committed and knows it can advance the offset.

Thoughts?

State dump when double claimed

We should dump state (claim map, marshal topic) when Marshal detects that a partition has been double claimed. This provides the debugging state that we can use to understand how this situation can be reached.

Allow "topic claim" mode

Marshal should support an option where you can specify that you want a claim to be on the entire topic. This is used to support the sharded production use case, where you want to ensure all messages in a given shard go to a given consumer.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.