Comments (1)
Additinal info:
Every time this happens, I spot a recurring issue: we are waiting inside consumergroup.run
. It seems like consumer group
cannot be cancelled easily during rebalance?
Either:
goroutine 585773 [semacquire]:
sync.runtime_Semacquire(0x1d90c50?)
/usr/local/go/src/runtime/sema.go:62 +0x25
sync.(*WaitGroup).Wait(0x0?)
/usr/local/go/src/sync/waitgroup.go:116 +0x48
github.com/segmentio/kafka-go.(*Reader).unsubscribe(0xc00016d340)
/go/pkg/mod/github.com/segmentio/[email protected]/reader.go:113 +0x2f
github.com/segmentio/kafka-go.(*Reader).run.func4({0x1493200?, 0xc002a60580?})
/go/pkg/mod/github.com/segmentio/[email protected]/reader.go:345 +0x95
github.com/segmentio/kafka-go.(*Generation).Start.func1()
/go/pkg/mod/github.com/segmentio/[email protected]/consumergroup.go:396 +0x2e
created by github.com/segmentio/kafka-go.(*Generation).Start in goroutine 32
/go/pkg/mod/github.com/segmentio/[email protected]/consumergroup.go:395 +0x15d
goroutine 32 [semacquire]:
sync.runtime_Semacquire(0xc000162440?)
/usr/local/go/src/runtime/sema.go:62 +0x25
sync.(*WaitGroup).Wait(0x1486ac0?)
/usr/local/go/src/sync/waitgroup.go:116 +0x48
github.com/segmentio/kafka-go.(*ConsumerGroup).Close(0xc000162360?)
/go/pkg/mod/github.com/segmentio/[email protected]/consumergroup.go:689 +0x5a
github.com/segmentio/kafka-go.(*Reader).run(0xc00016d340, 0xc000162360)
/go/pkg/mod/github.com/segmentio/[email protected]/reader.go:310 +0x3c2
created by github.com/segmentio/kafka-go.NewReader in goroutine 1
/go/pkg/mod/github.com/segmentio/[email protected]/reader.go:742 +0x7bf
or
goroutine 38 [semacquire]:
sync.runtime_Semacquire(0xc0003be200?)
/usr/local/go/src/runtime/sema.go:62 +0x25
sync.(*WaitGroup).Wait(0x1486ac0?)
/usr/local/go/src/sync/waitgroup.go:116 +0x48
[github.com/segmentio/kafka-go.(*ConsumerGroup](http://github.com/segmentio/kafka-go.(*ConsumerGroup "")).Close(0xc0003be120?)
/go/pkg/mod/github.com/segmentio/[email protected]/consumergroup.go:689 +0x5a
[github.com/segmentio/kafka-go.(*Reader](http://github.com/segmentio/kafka-go.(*Reader "")).run(0xc000127340, 0xc0003be120)
/go/pkg/mod/github.com/segmentio/[email protected]/reader.go:310 +0x3c2
created by [github.com/segmentio/kafka-go.NewReader](http://github.com/segmentio/kafka-go.NewReader "") in goroutine 1
/go/pkg/mod/github.com/segmentio/[email protected]/reader.go:742 +0x7bf
goroutine 37 [select]:
net.(*Resolver).lookupIPAddr(0x1dac0a0, {0x1492f60?, 0xc000545b90}, {0x12823c2, 0x3}, {0xc00121b140, 0x33})
/usr/local/go/src/net/lookup.go:332 +0x3fe
net.(*Resolver).internetAddrList(0x1492f60?, {0x1492f60?, 0xc000545b90?}, {0x12823c2, 0x3}, {0xc00121b140?, 0xc0004a5a00?})
/usr/local/go/src/net/ipsock.go:288 +0x4e5
net.(*Resolver).resolveAddrList(0x1de0860?, {0x1492f60, 0xc000545b90}, {0x12828de, 0x4}, {0x12823c2?, 0x2?}, {0xc00121b140, 0x38}, {0x0, ...})
/usr/local/go/src/net/dial.go:282 +0x405
net.(*Dialer).DialContext(0xc000255a78, {0x1492f60, 0xc000545b90}, {0x12823c2, 0x3}, {0xc00121b140, 0x38})
/usr/local/go/src/net/dial.go:488 +0x42c
[github.com/segmentio/kafka-go.(*Dialer](http://github.com/segmentio/kafka-go.(*Dialer "")).dialContext(0x1d9f580, {0x1492f60, 0xc000545b90}, {0x12823c2, 0x3}, {0xc00004a00c?, 0x4aef92?})
/go/pkg/mod/github.com/segmentio/[email protected]/dialer.go:357 +0x19b
[github.com/segmentio/kafka-go.(*Dialer](http://github.com/segmentio/kafka-go.(*Dialer "")).connect(0x1d9f580, {0x1492e80?, 0x1de0860?}, {0x12823c2, 0x3}, {0xc00004a00c, 0x38}, {{0x0, 0x0}, {0x0, ...}, ...})
/go/pkg/mod/github.com/segmentio/[email protected]/dialer.go:278 +0x1ac
[github.com/segmentio/kafka-go.(*Dialer](http://github.com/segmentio/kafka-go.(*Dialer "")).DialContext(...)
/go/pkg/mod/github.com/segmentio/[email protected]/dialer.go:113
[github.com/segmentio/kafka-go.(*Dialer](http://github.com/segmentio/kafka-go.(*Dialer "")).Dial(0xc00024fd18?, {0x12823c2?, 0x0?}, {0xc00004a00c?, 0x401?})
/go/pkg/mod/github.com/segmentio/[email protected]/dialer.go:96 +0x8b
[github.com/segmentio/kafka-go.(*ConsumerGroupConfig](http://github.com/segmentio/kafka-go.(*ConsumerGroupConfig "")).Validate.makeConnect.func1(0x1000000010000?, {0xc0000c44a0?, 0x2, 0x0?})
/go/pkg/mod/github.com/segmentio/[email protected]/consumergroup.go:883 +0x8b
[github.com/segmentio/kafka-go.(*ConsumerGroup](http://github.com/segmentio/kafka-go.(*ConsumerGroup "")).coordinator(0xc0003be120)
/go/pkg/mod/github.com/segmentio/[email protected]/consumergroup.go:903 +0x66
[github.com/segmentio/kafka-go.(*ConsumerGroup](http://github.com/segmentio/kafka-go.(*ConsumerGroup "")).leaveGroup(0xc0003be120, {0xc00015f960, 0x63})
/go/pkg/mod/github.com/segmentio/[email protected]/consumergroup.go:1220 +0x12f
[github.com/segmentio/kafka-go.(*ConsumerGroup](http://github.com/segmentio/kafka-go.(*ConsumerGroup "")).run(0xc0003be120)
/go/pkg/mod/github.com/segmentio/[email protected]/consumergroup.go:737 +0x225
[github.com/segmentio/kafka-go.NewConsumerGroup.func1()](http://github.com/segmentio/kafka-go.NewConsumerGroup.func1() "")
/go/pkg/mod/github.com/segmentio/[email protected]/consumergroup.go:661 +0x1c
created by [github.com/segmentio/kafka-go.NewConsumerGroup](http://github.com/segmentio/kafka-go.NewConsumerGroup "") in goroutine 1
/go/pkg/mod/github.com/segmentio/[email protected]/consumergroup.go:660 +0x168
from kafka-go.
Related Issues (20)
- kafka.Conn.Broker() and kafka.Conn.Controller() return different Host value when there is one broker. HOT 1
- How to set kafka clientId when using high level writer API
- Can I use select to readmessage? HOT 1
- WriteMessages can force metadata updates when occur NotLeaderForPartition error HOT 1
- FetchMessage returns io.EOF when reader not closed
- Need to upgrade to Kafka Client version 3.6.0 HOT 1
- How do producers obtain the partition and offset of message sending HOT 2
- Kafka Compatibility for the Client versions HOT 2
- docs: An example of using Writer to create a missing topic imports has an import error in README HOT 1
- How can I get the latest message in a topic? HOT 2
- Ability to describe Kafka topic HOT 4
- How can I specify the producer serializer and consumer deserializer? HOT 1
- Incorrect documentation in the code ReaderConfig.RetentionTime HOT 1
- suggestion: replace existing Log interface with log/slog
- The default value of fetch.max.wait.ms HOT 1
- Make `(msg *Message) totalSize()` public
- Asynchronous mode in kafka package not working
- Duplicate consumption occurs when join a new consumer and CommitInterval is configured HOT 1
- After increasing partition count, consumer group is not assigned to the new partitions. HOT 1
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from kafka-go.