Comments (7)
Please add this to the README. It is incredibly important for people to know that the behavior of the Reader (even when calling FetchMessage()
is not to replay messages that weren't committed back, its to keep reading no matter what.
😢 spent a LOT of time trying to figure this out.
from kafka-go.
You can use a buffered channel to retry failed message, until it succeeds and proceeds to next message automatically.
func (s *Server) collectionWorker(ctx context.Context) {
ch := make(chan kafka.Message, 1)
// message processing logic
do := func(m kafka.Message) error {
err, o := unmarshal(m)
if err != nil {
// if errored, don't retry
return nil
}
err = s.collectionService.Save(o)
return err
}
for {
select {
case <-ctx.Done():
s.collectionService.KafkaReader.Close()
return
case m := <-ch:
// retry back-off
time.Sleep(3 * time.Second)
fmt.Printf("*** RETRY message at topic/partition/offset %v/%v/%v\n", m.Topic, m.Partition, m.Offset)
err := do(m)
if err != nil {
ch <- m
break
}
s.collectionService.KafkaReader.CommitMessages(ctx, m)
default:
m, err := s.collectionService.KafkaReader.FetchMessage(ctx)
if err != nil {
fmt.Printf("fetch messages error: %v\n", err)
break
}
err = do(m)
if err != nil {
ch <- m
break
}
s.collectionService.KafkaReader.CommitMessages(ctx, m)
}
}
}
from kafka-go.
Thank you for answer, at least I know I did not confuse some things in my head :) I have to admit it was a little of surprise (in kafka itself, not library) that you can even poll next message without commiting previous one, I was sure that poll will fetch the same not commited message again and all we need to do is to ensure that message/processing is idempotent.
from kafka-go.
I'll update the documentation to make it more explicit what the behavior of the reader is, then close this issue.
from kafka-go.
I've been thinking about this problem quite a bit recently.
I've had difficulty coming up with solutions beyond just not processing events in a concurrent context. We can provide a solid guarantee that messages from an individual partition will be returned from the client library in-order but it's the responsibility of the caller to ensure that you don't ack a more recent event before and older event has finished processing.
This obviously is not the dream. The dream is that the library validates acking events in-order for you, but that presents a ton of complications as well suchas communicating when the library hasn't seen a particular offset back to the library consumer. What do you do in those cases?
Anyway, at the very least we should document this behavior and come up with some general guidance.
from kafka-go.
Agree that more documentation would be useful. However, I'm not sure this is a kafka-go issue. While polling for messages and committing offsets seem like they should be tightly coupled activities, they're actually independent activities within Kafka. One can poll for messages and never commit an offset. Likewise, one can commit offsets without ever having polled for a message.
This lack of coupling enables the consumer to provide logic suitable for its scenarios. For example:
- commit after each message - the example the OP gave
- commit after N messages e.g. batch operations
- commit periodically - very useful for high throughput async operations
- commit without ever reading a message - useful for transferring consumers to another topic (copy messages from topic A to topic B, update offsets on topic B for all consumers of topic A)
Rather than changing the core library, I could adding utility functions (perhaps in a contrib package) to help with each of the use cases.
from kafka-go.
We could leave them decoupled, but I don't think any kafka library is complete without an API for offset management.
from kafka-go.
Related Issues (20)
- `reader.Close()` stuck randomly HOT 1
- Make `(msg *Message) totalSize()` public
- Asynchronous mode in kafka package not working
- Duplicate consumption occurs when join a new consumer and CommitInterval is configured HOT 1
- After increasing partition count, consumer group is not assigned to the new partitions. HOT 1
- Observability : Kafka Consumer/Producer metrics and tracing.
- Subscribe topic on demand
- A new method for Batch called ReadIntoMessage
- authorization retries
- What is the difference between kafka.NewConsumerGroup and kafka.NewReader,
- What is QueueCapacity param ?
- Offset is going ahead and we are missing message with lag HOT 3
- How do I specify partition when producing messages,I want to implement ordered messages HOT 1
- AllowAutoTopicCreation does not exist in writer config, but only in the writer struct HOT 1
- Go1.16 cannot run tests HOT 2
- Feture request: logger with different levels HOT 1
- Issue: Getting half partitions while Consumer Group Migration
- Do not panic on errors
- Making the producer backoff algorithm configurable
- a lot of unexpected EOF error when reading from partition
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from kafka-go.