Git Product home page Git Product logo

Comments (21)

savaki avatar savaki commented on May 14, 2024 2

Could Offset and SetOffset simply return errors if the Reader is configured to be part of a consumer group? Because in this case offset management is automatic so the program cannot be given control over where the Reader is supposed to start.

+1

I'd advocate for getting a working implementation first (no matter what approach you choose), but if connections occur quite often we'll probably want to keep a persistent connection established to the coordinator.

Agreed. I have a working implementation that we're testing internally. However, I'm breaking up the check-ins as per our discussion to ease the review process. As it stands, not reusing connections was simpler to reason about so I went that direction. The main driver of a persistent connection is the commit which currently opens a new conn on commit.

Same answer as the previous question, let's aim for a simple and working solution first, no matter how inefficient, then we can improve it.

Agreed. I'm coming from the pov that I have a working version currently and have capacity to look at additional issues. I'll make sure to submit these changes we're discussing as separate pull requests to not muddy the waters.

My suggestion is that we set a Config time.Duration, CommitInterval. When zero, commits are pushed synchronously with no buffering. When > 0, commits will be buffered and pushed at the specified interval. Close on the Reader will push any buffered commits to the broker.

from kafka-go.

savaki avatar savaki commented on May 14, 2024

Thanks for opening this thread. Here was my thinking on each of those:

  • As for the exporting. By exporting, you'll enable people that use kafka-go to build off it to add features that you might not have the time or desire for. If you look at the relationship that bsm/sarama-cluster and burrow have with sarama, the former exists because sarama exposes its broker level API. I can understand if you want to keep it private, but I think there's a real value in opening it up.

  • As for the naming style, I'm happy to change things to match the current style. I prefixed kafka broker api files separately as I was having a hard time parsing the existing code base to know what was specific to kafka-go and what was part of the kafka protocol api.

  • The api prefix follows the same line of thinking as above. As I was working with the code, I really wanted to create a Broker or Client abstraction as it feels like Conn is becoming overloaded. Also, it would simplify other people using library as the distinction between what's required for the kafka protocol and what Segment has added would be clearer.

  • Happy to split things up between Kafka protocol and the consumer group work.

  • I considered using the existing types as is, but ended up going with a separate type. Here was my thinking.

    • consumer groups are global within the broker and rebalancing takes place across all topics and partitions
    • consequently, a single service that consumed multiple topic partitions would have to be aware of this and give each Reader a separate group id
    • OR we would need a custom Segment strategy that was aware of the desired topic partition of Reader
    • using a customer Segment strategy would mean that the segment consumer group reader would be incompatible with other kafka consumer groups readers that use the common range and roundrobin protocols so it'd be impossible to use them in a heterogeneous environment say if you were trying to work kafka-go in to an existing sarama environment
    • the existing single topic partition case can be supported by a reader that supports multiple topic partitions

from kafka-go.

achille-roussel avatar achille-roussel commented on May 14, 2024
  • I agree exporting APIs allows people to reuse the code and build other packages on top of it. However, we have no immediate needs for this, and once exposed we have to maintain the code forever because there is no way to tell how many dependencies there will be out there (and we'd end up with a really large public API like sarama has, there's no point redoing what's already been done, this project intends to take a different approach). If someone ever opens an issue requesting to expose the internal types we'll probably do so by moving some of the lower-level components in a different package, for now we must aim for the smallest public API needed to use consumer groups (and having access to the low-level message types must not be needed).

  • Conn becoming overloaded is very subjective, to one developer it could seem too large, to another it could seem just fine. I stand on the side of not adding abstractions just to split things up. We must add abstractions when they serve a purpose or clearly represent different concepts. Conn is intended to be the low-level representation of a connection to a Kafka server, it is OK to add more methods to it if needed, the only thing we must aim for is readability (short methods serving one purpose for example, your code seemed to be fine in this regard). If you look at files of the standard library (which stands as one of the highest quality Go codebase out there), there are some very large files sometimes, but the code is still easy to read and navigate.

  • As to "what Segment has added", there is nothing Segment-specific in this repository, and I intend to keep it this way, do you mind explaining your concern here?

  • I understand that re-using the Reader concept would be more restrictive (for example it would only allow balancing on a single topic), but I'm convinced that this is a good first step as it would solve the majority of use cases without requiring new abstractions (which means programs using kafka-go would have more work to do to support both the Reader and ConsumerGroup types). In my experience, convenience is more important than features, so sacrificing some features for an API that's simpler to use is a trade off worth taking (kafka-go is actually a great example of this, as the project got good traction even tho it had less features than sarama). I wouldn't be against supporting multiple topics on the Reader type if that's needed, but I'd advocate for supporting single-topic consumer groups with no breaking changes in the first place.

from kafka-go.

savaki avatar savaki commented on May 14, 2024
  • You make good points about not wanting to support more than you have to. I'll go ahead and un-export the api and update the naming within Conn

  • I agree that being overloaded is subjective. I will say as a newcomer to the codebase, the blending of api calls with higher order concepts took me a little to get used to. I thought an abstraction would have cleared up the confusion for future contributors. That said, I'm happy to take your lead on this.

  • As to Segment's opinion. When I look at the Kafka api, especially wrt consumer groups, it views the world as a set of topics with their associated partitions. kafka-go on the other hand views the world as topic partition pairs. I noticed in putting together consumer groups the conflict between these perspectives. It comes front and center during rebalancing.

  • For using Reader, I think there are a couple of questions that would help my thinking:

    • Is compatibility with other consumers desired? e.g. can I gradually co-mingle kafka-go and sarama consumers using consumer groups?
    • What is the expected behavior if the client uses the same consumer group id for multiple topics partitions? For example, if we assume a consumer group aware Reader, what would be the expected behavior if one service has two Readers that use the same consumer group id, but point to different topic-partitions?
    • What are your thoughts on using a custom rebalance strategy?

from kafka-go.

achille-roussel avatar achille-roussel commented on May 14, 2024

When I look at the Kafka api, especially wrt consumer groups, it views the world as a set of topics with their associated partitions. kafka-go on the other hand views the world as topic partition pairs. I noticed in putting together consumer groups the conflict between these perspectives. It comes front and center during rebalancing.

Like I said, I'm not against supporting multiple topics on the Reader type, it may even be a great improvement of the current code. I was just saying that as a first step and to reuse as much of the existing code as possible we could start with single topics, then improve the code to be fully featured.

Is compatibility with other consumers desired? e.g. can I gradually co-mingle kafka-go and sarama consumers using consumer groups?

I believe this is a requirement, companies have programs written in multiple languages or using multiple libraries. As library writers we can build much more powerful tools if we offer cross-compatibility instead of building something new. This is also what's recommended by the Kafka documentation:

The primary use case for the membership API is consumer groups, but the requests are intentionally generic to support other cases (e.g. Kafka Connect groups). The cost of this generality is that specific group semantics are pushed into the client. For example, the JoinGroup/SyncGroup requests defined below have no explicit fields supporting partition assignment for consumer groups. Instead, they contain generic byte arrays in which assignments can be embedded by the consumer client implementation.

But although this allows each client implementation to define its own embedded schema, compatibility with Kafka tooling requires clients to use the standard embedded schema used by the client shipped with Kafka. The consumer-groups.sh utility, for example, assumes this format to display partition assignments. We therefore recommend that clients should follow the same schema so that these tools will work for all client implementations.

What is the expected behavior if the client uses the same consumer group id for multiple topics partitions? For example, if we assume a consumer group aware Reader, what would be the expected behavior if one service has two Readers that use the same consumer group id, but point to different topic-partitions?

Requiring that one group equals one topic is fine as a first step if that means less public API changes and a simpler implementation. I've never seen groups being used to load balance across topics (I'm sure there are use cases for it but it's not common in my experience).

What are your thoughts on using a custom rebalance strategy?

Let's focus on one implementation first, then once we have it working we can add an abstraction for supporting custom strategies. In my experience, it's always been easier to first solve one problem, then step back and think about the generic solution. It also means we'll already have tests to ensure that as we introduce an abstraction we won't be breaking existing code.

from kafka-go.

savaki avatar savaki commented on May 14, 2024

These answers help a lot. The reason I ask about compatibility vs multiple topics has to do with how consumer groups rebalance. Conceptually Kafka consumers handle rebalancing as follows:

  1. Find the broker that will coordinate the group
  2. Attempt to join the group. As part of the request, I pass the list of rebalancing strategies I support. The standard-ish ones are roundrobin and range. bsm/sarama-cluster supports these.
  3. The coordinator will (a) select a common strategy among the consumers and return that as part of join group and (b) select a leader among the consumers attempting to join in
  4. The leader is soley responsible for deciding which consumers will be responsible for which topic partitions. The important thing is that a single leader handles assignments for all consumers across all topic partitions that share a common group id
  5. The leader pushes its assignments to the coordinator which in turn forwards them to the other consumers which then subscribe to the appropriate topic partitions

The challenge I see is thus. A single leader is responsible for assignments across all topic-partitions, not just a single topic partition pair.

Consequently, if we had a heterogeneous environment with both sarama cluster and kafka-go, if the leadership were given to a sarama cluster consumer, the kafka-go consumer may not be able to handle the assignment given e.g. that both the topic and partition should be changed.

Conversely, if the kafka-go consumer were elected leader, without a custom strategy, it would be unaware of which topic-partitions each of the existing consumers were bound to. A workaround for this would be to create a custom rebalancing strategy. However, if we create a custom strategy, then there would be no common strategy between sarama-cluster consumers and kafka-go consumers and new consumers would get an InconsistentGroupProtocol error.

Hence, my thinking in allowing ConsumerGroup to support multiple topic partitions. I considered making the change to Reader, but ran into the following:

  • kafka.Message has neither Topic nor Partition to allow consumers to distinguish what they're receiving
  • ReaderConfig defines Topic and Partition and I was wary of also adding Topics []string and Partitions []int32

from kafka-go.

achille-roussel avatar achille-roussel commented on May 14, 2024
  • Adding Topic/Partition to kafka.Message seems like a very fine change to me, maybe we can even make it its own PR because it would likely touch a lot of the existing code and we should probably set those fields even for a reader that doesn't use consumer groups (for consistency purposes).

  • I understand the leader is in charge of all topic and partitions of the group, however if we document the restriction of supporting a single topic in the first place, then kafka-go is compatible with programs using sarama clusters for a single topic as well, isn't it?
    I'm not sure I understand the motivation for adding Partitions []int to the configuration, what's the use case for setting the partitions of the topic? This information can be discovered via the metadata API, and is dynamic anyway (the number of partitions may be increased on a topic for example).

from kafka-go.

savaki avatar savaki commented on May 14, 2024

I like that first suggestion a lot. I'll go ahead and start there.

Partitions[] shouldn't be in the configuration. It was something I experimented with, but clearly didn't remove completely before committing.

from kafka-go.

savaki avatar savaki commented on May 14, 2024

I noticed that ReaderStats has an embedded partition. How would you like to handle that if the Reader can handle more than one partitions. Options I see would include:

  • set value to -1 in cases where there's more than one partition and stats become aggregate
  • add new StatsByPartition => map[int32]ReaderStats that returns the stats by partition
  • separate StatsByPartition into ReaderStats and ReaderStatsByPartition
    • values like Rebalances make more sense at the Reader vs partition level

or a combination of the above.

from kafka-go.

achille-roussel avatar achille-roussel commented on May 14, 2024

That's a good point. I think it's fine to set the partition to -1 for now when the reader is used with a consumer group. It's not ideal but I'm not a big fan of adding new methods for solving this.

Ideally I think we should return a slice of ReaderStats, one of each topic/partition that the reader handles, but this is an API breaking change so I'd rather delay this until we have consumer groups fully implemented and maybe we'll make a major version release where we can introduce those kind of changes.

from kafka-go.

savaki avatar savaki commented on May 14, 2024

sounds good to me

from kafka-go.

savaki avatar savaki commented on May 14, 2024

At this point, I have a working version of consumer groups with kafka-go.

As you've suggested, I split the work into smaller commits to make it easier to digest. The first commit, expand Connto handle kafka api calls, increases the surface area of *Conn to support the api calls required by consumer groups. I've intentionally made the calls shallow so the functions could be a simple adapter to Kafka.

In a separate commit, I can introduce the code that modifies Reader to support consumer groups.

I've updated the link above, https://github.com/savaki/kafka-go/tree/consumer-groups, to point to the working version. Still has a number of open issues and needs more tests, but should serve as a good straw man for discussion.

Some of the open issues include:

  • Handling lag as we've talked about
  • What to do with (*Reader).Offset and (*Reader).SetOffset
  • Should the Reader hold a persistent connection to the coordinator, or just create one on demand
  • Should any attempt be made to buffer commits? Currently I'm using a naive implementation that opens a connection, writes the commit, and closes the connection

from kafka-go.

achille-roussel avatar achille-roussel commented on May 14, 2024

What to do with (*Reader).Offset and (*Reader).SetOffset

Could Offset and SetOffset simply return errors if the Reader is configured to be part of a consumer group? Because in this case offset management is automatic so the program cannot be given control over where the Reader is supposed to start.

Should the Reader hold a persistent connection to the coordinator, or just create one on demand

I'd advocate for getting a working implementation first (no matter what approach you choose), but if connections occur quite often we'll probably want to keep a persistent connection established to the coordinator.

Should any attempt be made to buffer commits? Currently I'm using a naive implementation that opens a connection, writes the commit, and closes the connection

Same answer as the previous question, let's aim for a simple and working solution first, no matter how inefficient, then we can improve it.

from kafka-go.

errnoh avatar errnoh commented on May 14, 2024

Could Offset and SetOffset simply return errors if the Reader is configured to be part of a consumer group? Because in this case offset management is automatic so the program cannot be given control over where the Reader is supposed to start.

(Not directly about Offset() or SetOffset(), mostly about ReaderConfig) While I really like the ease of use of the current implementation this is my main issue with it. Without having the support to start the consumer group from latest offset I'll have to keep using separate consumers for each partition.

Use case here is having temporary consumer group read latest data from a topic. With data possibly containing terabytes and retention of couple weeks starting from beginning each time is not ok.

from kafka-go.

achille-roussel avatar achille-roussel commented on May 14, 2024

That’s a good pint, would you have the time to work on adding this feature?

from kafka-go.

errnoh avatar errnoh commented on May 14, 2024

I think I can find some time soonish, and I know what to change. Just need to decide where to expose the configuration for this.

from kafka-go.

achille-roussel avatar achille-roussel commented on May 14, 2024

The reader config seems like the right place for this, but feel free to suggest something else if you think otherwise.

from kafka-go.

abraithwaite avatar abraithwaite commented on May 14, 2024

@errnoh, I might do some work on this soon. Have you written anything?

from kafka-go.

errnoh avatar errnoh commented on May 14, 2024

Yeah, sorry. A bit busy at work but my proof-of-concept is now at #88

Any opinion if that's the way we should do it or do you have any other suggestions?

from kafka-go.

mariusw avatar mariusw commented on May 14, 2024

Just so I understand: kafka-go supports consumer groups, but not entirely (or, natively)? There has been an effort to implement it better, but that stopped in March?
We really want to use (and contribute to) this library rather than Sarama (with the sarama-cluster lib for consumer groups), but reading the open issues I get the feeling kafka-go is not quite there yet? @achille-roussel @Pryz @abraithwaite

from kafka-go.

achille-roussel avatar achille-roussel commented on May 14, 2024

Catching up here, @mariusw consumer group support in kafka-go should be usable and we are using this code in production at Segment, so you should be able to as well.

If there are features that you think are missing feel free to open a discussion about it or a PR if you have a fix ready to be merged.

Regarding this issue, I think that all the work that it tracked has landed or has other issues/PRs tracking it, so I'll go ahead and close it.

Thank you all for your contributions in the discussions and the code!

from kafka-go.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.