Comments (10)
Would it be possible for you to share the code sample thy triggered the panic?
from kafka-go.
it is spread over several functions, but comes down to something like this:
func createConsumer(config map[string]string, partition int) *kafka.Reader {
maxBytes, err := strconv.Atoi(config[max_partition_fetch_bytes])
brokerList := strings.Split(config[brokers], ",")
if err != nil {
log.Fatalf("\ncould not parse a number for 'max_partition_fetch_bytes' from config %+v due to:%+v\n", err)
}
consumer := kafka.NewReader(kafka.ReaderConfig{
Brokers: brokerList,
Topic: config[topic],
Partition: partition,
MaxBytes: maxBytes,
})
fmt.Printf("created consumer: %+v\n", *consumer)
return consumer
}
and in another consume
go routine:
for ctx.Err() == nil {
m, err := consumer.ReadMessage(ctx) ;; <<<<<<< (!) PANICS FROM HERE
if err != nil {
//TODO: fatal for now to fail fast (and learn the reasons) until recover / retry logic is in
log.Fatalf("\ncould not consume from partition: %+v, %+v \ndue to:%+v\n", consumer.Stats(), consumer.Config(), err)
} else {
...
}
}
from kafka-go.
caused by OffsetOutOfRange
. It seems that even when this error occurs, the flow continues into the readLoop:
and fails here:
Line 585 in 1e83cf5
because the connection is nil
(i.e. was not created due to the OffsetOutOfRange
).
the real panic due to the nil
connection is here:
Line 690 in 1e83cf5
minor:
Line 97 in a8872ed
from kafka-go.
Thanks for the report, I'll send a PR with a test to reproduce the issue and a fix.
from kafka-go.
I'm having trouble reproducing the issue, it seems like this code should be taking care of not having offsets out of range issues. Could it be a race between the reader reading from the first offset and kafka expiring those messages?
Would you happen to know what's the range of valid offsets on your partition, and which offset the reader was trying to consume from? (looking at your code snippet it seems like it's reading from the first offset).
from kafka-go.
Could it be a race between the reader reading from the first offset and kafka expiring those messages?
I don't think this is the case due to the next answer
Would you happen to know what's the range of valid offsets on your partition, and which offset the reader was trying to consume from?
I did point it to a different Kafka broker that did not yet have messages for a partition. But I read offsets from Consul, which remembered offsets from a previous broker. i.e. so any offset set to greater than zero for a brand new partition results in this panic
looking at your code snippet it seems like it's reading from the first offset
I do manual offset management: recording them in a sync.Map
, and periodically persisting them to Consul. When the app starts it reads Consul along with all the partition offsets
from kafka-go.
Hm... I tried writing a test that would do r.SetOffset(42)
on an empty topic but it didn't trigger the issue, I'll dig more.
from kafka-go.
Ah! Never mind I got it, in the test the topic didn't exist until messages were produced to it. See the PR for the test and fix.
Thanks for your help making this project better!
from kafka-go.
just in case: Kafka version kafka_2.12-0.10.2.0
if I put continue
after OffsetOutOfRange
case, as in your pull request, it does not panic on a wrong offset.
At this point this is not a blocker for me, since theoretically "it's a user error", but it would be nice to avoid a panic of course.
from kafka-go.
great, sure always welcome, thanks for a quick fix :)
from kafka-go.
Related Issues (20)
- `reader.Close()` stuck randomly HOT 1
- Make `(msg *Message) totalSize()` public
- Asynchronous mode in kafka package not working
- Duplicate consumption occurs when join a new consumer and CommitInterval is configured HOT 1
- After increasing partition count, consumer group is not assigned to the new partitions. HOT 1
- Observability : Kafka Consumer/Producer metrics and tracing.
- Subscribe topic on demand
- A new method for Batch called ReadIntoMessage
- authorization retries
- What is the difference between kafka.NewConsumerGroup and kafka.NewReader,
- What is QueueCapacity param ?
- Offset is going ahead and we are missing message with lag HOT 3
- How do I specify partition when producing messages,I want to implement ordered messages HOT 1
- AllowAutoTopicCreation does not exist in writer config, but only in the writer struct HOT 1
- Go1.16 cannot run tests HOT 2
- Feture request: logger with different levels HOT 1
- Issue: Getting half partitions while Consumer Group Migration
- Do not panic on errors
- Making the producer backoff algorithm configurable
- a lot of unexpected EOF error when reading from partition
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from kafka-go.