Comments (26)
looks to be a sync error with getting topic metadata and the assigning heartbeats
from franz-go.
Thanks for the report, and apologies for the panic. I’ll be able to look into this later in the day my time.
I checked a few things quickly, and the code causing this panic comments that the caller must be non nil at that point, so that comment is clearly based on old information. My offhand guess is that something got lost / moved / added during refactorings.
How reliably are you able to trigger this panic?
from franz-go.
At the moment every app run its happening consuming from 200+topics indvididly. Our Kafka server looks to be at 80%+ load.
I was in the middle of some debugging to add some extra info:
but its like the topic metadata is not full populated before the topics_and_partitions.go:73 is run. I only see one topic listed in the metadata list and not the topic we should be subscribe for heartbeats for
code using
client, err := kgo.NewClient(kgo.SeedBrokers(c.brokers...),
kgo.Dialer(tlsDialer.DialContext),
if err != nil {
return err
}
defer client.Close()
client.AssignGroup(c.id, kgo.GroupTopics(c.topic), kgo.DisableAutoCommit())
we create 200 clients in the app one for each topic at moment as we currently testing this library as a drop in replacement and would be optimised in that respect later. I don't see this being the problem as the library does not to contain any sheared state from what I can tell.
from franz-go.
Cool thanks for the update—doesn’t seem like an isolation case is a thing.
I was planning to fully audit the consumer group code soon, and I’ll push up that timeline based off this issue (this weekend or next week).
I do see something that I suspect and can validate later when I’m on a laptop: the code does not currently handle the case where a client is assigned a topic it actually did not express interest in. I’ll also look for a case where it is assigned something but the internal topic partitions struct has not been created for some reason.
What group balancer are you using here?
from franz-go.
I don't override the default which this client provides
from franz-go.
Thanks! And lastly, are all consumers in the group using this client?
from franz-go.
from franz-go.
Interesting, so this happening in the cooperative case. If it's easy, can you try using the StickyBalancer? And, if that causes the issue still, can you try the RangeBalancer? These can help give clues as to areas of my code to look into.
from franz-go.
Here's what I'm currently looking at:
- A consumer group member indicates only topics it is interested in when joining a group. In the client, these interests are either static (if you use GroupTopics) or dynamic (if you use GroupTopicsRegex). The topics added to the JoinGroup request are only added if they have been discovered from a Metadata response, and they are stored in the client after discovery before tracking that they are usable.
- The sticky balancer has at least a few tests that ensure topics/partitions are assigned correctly, even when some consumers are interested in more topics than others.
- Given that, a client should only be assigned topics that it has already stored internally.
I can still fix the panic itself by fixing up fetchOffsets, but given that you said all consumers are using this client, there's a bug somewhere else.
Can you also confirm that each consumer is using its own dedicated client? That is, no consumer is sharing a client?
I may add some info level logging around what a client expresses interest in when joining a group, and then what that client is assigned.
from franz-go.
We do get some connection timeouts on this cluster if that's any help, this happens a lot more frequently in other go libs, not sure about the python ones we use.
[unable to open connection to broker addr *:17189 id 51 err dial tcp *:17189: i/o timeout]
Using different balancers
Sticky
client.AssignGroup(c.id, kgo.GroupTopics(c.topic), kgo.DisableAutoCommit(), kgo.Balancers(kgo.StickyBalancer()))
Not crashing a lot of topics are not getting any data, like they are stuck. There is a lot of lag on these topics. Only 3 topics look to have got data from our metrics.
Ran for 10min
Range
client.AssignGroup(c.id, kgo.GroupTopics(c.topic), kgo.DisableAutoCommit(), kgo.Balancers(kgo.RangeBalancer()))
Crashing
panic: runtime error: index out of range [1] with length 1
goroutine 1895 [running]:
github.com/twmb/franz-go/pkg/kgo.(*rangeBalancer).balance(0x152eab8, 0xc00262c000, 0xf4, 0xf4, 0xc00342e3c0, 0xf4)
/go/pkg/mod/github.com/twmb/[email protected]/pkg/kgo/group_balancer.go:306 +0x766
github.com/twmb/franz-go/pkg/kgo.(*groupConsumer).balanceGroup(0xc00062edc0, 0xc003227530, 0x5, 0xc0004f7000, 0xf4, 0xf4, 0xa, 0xc0016b4ed8, 0x7f6f401ae8a0)
/go/pkg/mod/github.com/twmb/[email protected]/pkg/kgo/group_balancer.go:122 +0x2ab
github.com/twmb/franz-go/pkg/kgo.(*groupConsumer).handleJoinResp(0xc00062edc0, 0xc0012a0180, 0x0, 0x0, 0x2, 0xc0000b6001, 0x1, 0x0)
/go/pkg/mod/github.com/twmb/[email protected]/pkg/kgo/consumer_group.go:1064 +0x73b
github.com/twmb/franz-go/pkg/kgo.(*groupConsumer).joinAndSync(0xc00062edc0, 0xc0006eff03, 0xebfc80)
/go/pkg/mod/github.com/twmb/[email protected]/pkg/kgo/consumer_group.go:973 +0x3d3
github.com/twmb/franz-go/pkg/kgo.(*groupConsumer).manage(0xc00062edc0)
/go/pkg/mod/github.com/twmb/[email protected]/pkg/kgo/consumer_group.go:385 +0xc5
created by github.com/twmb/franz-go/pkg/kgo.(*groupConsumer).findNewAssignments
/go/pkg/mod/github.com/twmb/[email protected]/pkg/kgo/consumer_group.go:1316 +0x70a
Could it be related to the number of topics we have on our kafka instance, we are only consuming from a subset
Total topics is around 350~
We want to consume 200~
Yes no consumer is shearing a client as we create a whole new client per topic
something like this what happens internally.
topics := []string{} // Would contain around 200 topics
wg := &sync.Waitroup{}
for _, topic := range topics {
topic := topic
go func() {
wg.Add(1)
defer wg.Done()
client, err := kgo.NewClient(kgo.SeedBrokers(c.brokers...),
kgo.Dialer(tlsDialer.DialContext),
if err != nil {
return err
}
defer client.Close()
client.AssignGroup("consumerID", kgo.GroupTopics(topic), kgo.DisableAutoCommit())
for {
fetches := client.PollFetches(ctx)
iter := fetches.RecordIter()
for _, fetchErr := range fetches.Errors() {
// Handle errors
}
for !iter.Done() {
msg := iter.Next()
// Do something with message
}
}
}()
}
wg.Wait()
from franz-go.
Thanks for testing. If it's not too much, could you try the RoundRobin balancer too?
I think the dialing is unrelated, that's at a lower level than this library.
from franz-go.
Also, I see that you're disabling autocommitting -- can you try with autocommitting enabled?
Also, if you're open to joining the Discord channel, I can ask some higher bandwidth questions, and I really appreciate your assisted debugging so far. The surface area of where this could be occurring is so far quite large, so I'm trying to narrow down at least a few things about it.
from franz-go.
Ok round robin no crash just stuck.
The reason auto committing is disabled is because we want to only commit once we know that the msg has processed correctly. but happily disable for a quick test.
Yeah should be able to join discord, but will most likely be tomorrow.
from franz-go.
Thanks!
Here's my current plan:
- the panic in the range balancer looks to be from an obvious mistake, I can fix this and add test cases for it
- the original reported panic is still an unknown--I can fix it from occurring, but it would only occur on a bad assignment
- I'm going to add some verbose logs to the consumer that is assigned the leader, these logs will print what topics every consumer has expressed interest in, and then the resulting plan, and for each consumer, I will add logs printing their assignment.
When I get those fixes / logs in, if possible I'd like you to re-run the cooperative sticky test (original balancer), the sticky test, and the range test. My hope is that the balancing logs there can give some more concrete things to look at. I can have these small commits pushed today.
from franz-go.
auto commit same results. some topics working with messages. At this point I am just assuming its pot luck that some topics work the topics which work sometimes are different if they do work.
from franz-go.
Thanks!
Here's my current plan:
- the panic in the range balancer looks to be from an obvious mistake, I can fix this and add test cases for it
- the original reported panic is still an unknown--I can fix it from occurring, but it would only occur on a bad assignment
- I'm going to add some verbose logs to the consumer that is assigned the leader, these logs will print what topics every consumer has expressed interest in, and then the resulting plan, and for each consumer, I will add logs printing their assignment.
When I get those fixes / logs in, if possible I'd like you to re-run the cooperative sticky test (original balancer), the sticky test, and the range test. My hope is that the balancing logs there can give some more concrete things to look at. I can have these small commits pushed today.
No problem
from franz-go.
I've pushed two commits, can you try those tests with go get github.com/twmb/franz-go@8178f2cc5883304d409f653ff7e5d42163cf3e8e
? Specifically, the logging output for all consumers across a rebalance is what I'm looking for.
from franz-go.
Ok here is the log output for
client.AssignGroup(c.id, kgo.GroupTopics(c.topic), kgo.DisableAutoCommit())
from franz-go.
client.AssignGroup(c.id, kgo.GroupTopics(c.topic), kgo.DisableAutoCommit(), kgo.Balancers(kgo.StickyBalancer()))
from franz-go.
client.AssignGroup(c.id, kgo.GroupTopics(c.topic), kgo.DisableAutoCommit(), kgo.Balancers(kgo.RangeBalancer()))
from franz-go.
@owenhaynes I added the new logs at the Debug level, and I think the default is Info -- is it possible to up the level to Debug? I see some logs that are at the info level but none of the new ones I added.
Also, how many members are in the group?
from franz-go.
I thought I had set it logging to debug will double check.
Just one member as only have 1 pod running
from franz-go.
Here are the fixed debug logs
defualt_balancer.log
range_balancer.log
sticky_balancer.log
from franz-go.
FYI we upgraded to kafka 2.7 today, issue still exists so not related to using Kafka 2.6
from franz-go.
As an update: I have an idea of why the topic assignment after the group balance isn't working for you, I am in the middle of fixing that. I'm not sure yet what caused the original issue you reported, though.
from franz-go.
For posterity, this was moved to discord and fixed in the following three commits:
- e038916 fixes range balancer
- e168855 fixes an edge case in the sticky balancer (what caused this panic originally)
- a670bc7 allows the leader to balance topics that the leader itself does not know of (the stuck problem from this issue)
- 938651e ensures that the panic will not happen even if a different leader is bugged
These commits are released in v0.6.10
from franz-go.
Related Issues (20)
- Question - Purging and adding topics results in redelivery of old messages despite last offset HOT 2
- Connection problem HOT 1
- Transaction is not properly aborted HOT 1
- Need to "re-assume the IAM role when re-authenticating" to avoid "Session too short" errors on MSK HOT 6
- PollFetches take long time for the first fetch HOT 2
- question about sync=>async=>sync messaging HOT 1
- add ability to provide own member id HOT 3
- Produce Latency Spikes Due To Race Condition When Brokers Are Scaled Down HOT 6
- TXN: error beginning transaction: invalid attempt to begin a transaction while already in a transaction HOT 3
- Clarify producer batch config, once again HOT 3
- [Question] Checking client state before using it HOT 1
- Confluent: Unable to create a topic due to policy violation HOT 3
- How to fetch zero records without waiting HOT 1
- GroupTransactSession Close hangs, preventing restart HOT 4
- Use EOS sessions with BlockRebalanceOnPoll HOT 1
- ProduceSync returning "context canceled" error when parent context hasn't been cancelled HOT 1
- Is there a way to limit consumer PollRecords buffered fetches by size? HOT 1
- Question: Why does kgo.StopProducerOnDataLossDetected() exist? HOT 2
- Incorrect documentation regarding sarama partitioner compatibility SaramaCompatHasher HOT 3
- Feature request: expose metadata to the user HOT 2
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from franz-go.