Comments (10)
Yep, exactly.
from franz-go.
Are you using a consumer group?
from franz-go.
To start consuming from the end in general, you need the ConsumeResetOffset option:
func ConsumeResetOffset(offset Offset) ConsumerOpt
ConsumeResetOffset sets the offset to restart consuming from when a
partition has no commits (for groups) or when beginning to consume a
partition (for direct partition consuming), or when a fetch sees an
OffsetOutOfRange error, overriding the default ConsumeStartOffset.
with NewOffset().AtEnd()
.
This will always start consuming from the end for direct partition consuming, or at the end when a partition is seen for the first time when group consuming.
from franz-go.
from franz-go.
If you're using a consumer group, can you use LogLevelInfo
? When you shutdown, is there a log about committing?
What's your NewClient
look like, and are you waiting for Close
to finish when shutting down?
from franz-go.
Thank you so much for your speedy reply and this is exactly what I needed
from franz-go.
So this is similar to what i am looking for, but I only want to start consuming from the end if the consumer group already doesn't have a current offset.
Essentially trying to change this line:
https://github.com/twmb/franz-go/blob/v1.6.0/pkg/kgo/config.go#L488
to be NewOffset().AtEnd()
since once the consumer group exists I don't want to repeatedly start at the end, just the very first time it is initialized and doesn't have any offsets stored.
Is there a way I can pass an offset to ConsumeResetOffset() that will use current offsets if they exist otherwise use the latest offset possible?
from franz-go.
@ekeric13 that's actually how the option works currently -- AtEnd will be used for new groups with no commits. Existing offsets will be used if they are committed. If a client ever receives offset out of range, it will reset to the nearest offset (if you're on a modern broker, it will reset to an exact offset).
from franz-go.
So when creating my client I just need to do:
clientOpts := []kgo.Opt{
....
kgo.ConsumeResetOffset(kgo.NewOffset().AtEnd())
}
cl, err := kgo.NewClient(clientOpts...)
and it will correctly just use existing offsets if commited?
Likewise I could do:
clientOpts := []kgo.Opt{
....
kgo.ConsumeResetOffset(kgo.NewOffset().AtStart())
}
cl, err := kgo.NewClient(clientOpts...)
and it will replicate the default behavior: start at the beginning if there are no offsets, otherwise use current committed offsets?
from franz-go.
one thing to keep in mind is that when you use kgo.NewOffset().AtEnd()
to connect for the first time, any previous messages that has not been consumed will not show up. Any new messages after you connect will, even if you exit and restart it will.
from franz-go.
Related Issues (20)
- Broker Throttling
- Retry fetched records on handle error HOT 1
- github.com/twmb/franz-go/pkg/sr#Client.AllSchemas 404 not found HOT 3
- Deadlock in Produce() / TryProduce() when kgo.MaxBufferedBytes() is configured HOT 2
- Unable to specify LZ4 compression level HOT 3
- Error on kadm/UserSCRAMs calls with redpanda HOT 3
- Question: Is it safe to poll after receiving ErrGroupSession on poll when a broker node restarts in the cluster? HOT 1
- Data Race: concurrent access to recBatch.canFailFromLoadErrs during retry errors HOT 4
- Is there a way to test kgo.Opts returned from wrapper function? HOT 1
- fetch using topic id HOT 3
- How to delete offset when message got consumed? HOT 4
- Imbalanced Partition Assignment to Consumers Per Topic HOT 1
- Want to delete the consumer group after processing records from kafka HOT 1
- Connecting franz-go with a confluent kafka cloud cluster HOT 2
- Decoding GroupMetadataValueMember fails HOT 1
- Cache metadata more HOT 1
- Question regarding manual commit example HOT 1
- Update GroupMetadataKey / GroupMetadataValue
- Bump epoch if the log_start_offset advanced and broker returned unknown_producer_id
- kgo.LiveProduceConnection option for low-latency settings HOT 4
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from franz-go.