Comments (15)
Some more info/logs:
kgo[DEBUG] updated committed; group: ----, to: ---1{2126=>2127}]
However, when i restart it, i see:
kgo[INFO] fetching offsets for added partitions; group: -----------------, added: map[-----:[0 1 2 3]]
kgo[DEBUG] sharded request; destinations: [0]
kgo[DEBUG] wrote OffsetFetch v3; broker: 0, bytes_written: 66, write_wait: 32.331µs, time_to_write: 90.121µs, err: <nil>
kgo[DEBUG] read OffsetFetch v3; broker: 0, bytes_read: 94, read_wait: 33.453µs, time_to_read: 21.29811ms, err: <nil>
kgo[DEBUG] assign requires loading offsets
kgo[DEBUG] fetched committed offsets; group: ------, fetched: map[----:map[0:{-2 0 -1 0} 1:{-2 0 -1 0} 2:{-2 0 -1 0} 3:{-2 0 -1 0}]]
(censored hub and group name, but they match)
from franz-go.
What kgo version are you running? Also, are you consuming few enough records such that PollFetches
only runs once?
from franz-go.
If you're using v0.10.2, I'm worried you may be running into this, which was fixed yesterday: c6df11d
If you're using v0.10.1, then this should be working, and I'd be curious to know what the output of kcl group describe -v <yourgroup>
says (if you can use kcl).
from franz-go.
dont do that
fetches.EachRecord(func(record *kgo.Record) {
processor.Process(context.Background(), record)
seen++
rs = append(rs, record)
})
if err := cl.CommitRecords(context.Background(), rs...); err != nil {
fmt.Printf("commit records failed: %v", err)
continue
}
rs = append(rs, records) i'm have something like this and this bring very big garbage overhead
if you have many messages and want to have lower memory overhead (and cpu spikes when gc drains) create own map for CommitOffsetsSync
from franz-go.
$ kcl group describe -v -----------
BROKER 0
ID iothub-ingester
STATE Stable
PROTO TYPE consumer
PROTO range
MEMBERS 1
ERROR
TOPIC PARTITION CURRENT OFFSET LOG END OFFSET LAG MEMBER ID INSTANCE ID CLIENT ID HOST USER DATA LOAD ERR
--- 0 -1 -1 0 ----------------------:kgo-57005fc856714bcd85ad4aab21039212 kgo 0.0.0.0
--- 1 -1 -1 0 ----------------------:kgo-57005fc856714bcd85ad4aab21039212 kgo 0.0.0.0
--- 2 -1 -1 0 ----------------------:kgo-57005fc856714bcd85ad4aab21039212 kgo 0.0.0.0
--- 3 -1 -1 0 ----------------------kgo-57005fc856714bcd85ad4aab21039212 kgo 0.0.0.0
edit: i'm on latest master, did a go get @ master just a couple minutes before.
from franz-go.
Some more info. Spew.dump from the Commit callback:
(*kmsg.OffsetCommitRequest)(0xc00050a4e0)({
Version: (int16) 3,
Group: (string) (len=15) "------------",
Generation: (int32) 7,
MemberID: (string) (len=116) "------------------------:c:------------:I:kgo-ddec0c3ca255495dbac13d64a1c964d3",
InstanceID: (*string)(<nil>),
RetentionTimeMillis: (int64) 0,
Topics: ([]kmsg.OffsetCommitRequestTopic) (len=1 cap=1) {
(kmsg.OffsetCommitRequestTopic) {
Topic: (string) (len=6) "--------",
Partitions: ([]kmsg.OffsetCommitRequestTopicPartition) (len=1 cap=1) {
(kmsg.OffsetCommitRequestTopicPartition) {
Partition: (int32) 1,
Offset: (int64) 2403,
Timestamp: (int64) 0,
LeaderEpoch: (int32) 0,
Metadata: (*string)(0xc00050a500)((len=116) "------------------------:c:------------:I:kgo-ddec0c3ca255495dbac13d64a1c964d3"),
UnknownTags: (kmsg.Tags) {
keyvals: (map[uint32][]uint8) <nil>
}
}
},
UnknownTags: (kmsg.Tags) {
keyvals: (map[uint32][]uint8) <nil>
}
}
},
UnknownTags: (kmsg.Tags) {
keyvals: (map[uint32][]uint8) <nil>
}
})
(*kmsg.OffsetCommitResponse)(0xc0007243c0)({
Version: (int16) 3,
ThrottleMillis: (int32) 0,
Topics: ([]kmsg.OffsetCommitResponseTopic) (len=1 cap=1) {
(kmsg.OffsetCommitResponseTopic) {
Topic: (string) (len=6) "--------",
Partitions: ([]kmsg.OffsetCommitResponseTopicPartition) (len=1 cap=1) {
(kmsg.OffsetCommitResponseTopicPartition) {
Partition: (int32) 1,
ErrorCode: (int16) 0,
UnknownTags: (kmsg.Tags) {
keyvals: (map[uint32][]uint8) <nil>
}
}
},
UnknownTags: (kmsg.Tags) {
keyvals: (map[uint32][]uint8) <nil>
}
}
},
UnknownTags: (kmsg.Tags) {
keyvals: (map[uint32][]uint8) <nil>
}
})
from franz-go.
The output from kcl group describe
is a bit mismatched from the output from the offset commit: the offset commit indicates that the request was successful and committed offset 2403 for partition 1, but the group describe indicates offset 0 (essentially what you're describing).
(also fwiw, kcl has some new commits @ master that would make the group output nicer, I plan to cut a release for that shortly)
Another oddity is that the member ID in the offset commit is different from the member ID in the group describe. Was the group describe run at the same time as the spew.Dump of the offset commit / request? Trying to narrow down variables here. And, is it possible there is some typo mismatch somewhere (group or topic)?
from franz-go.
yeah sorry, IIRC the two outputs were from separate runs.
group names match ( i censored them), and subsequent runs keep re-processing all the old data (as per auto-reset to oldest).
i would not rule out some.. speciality of eventhub, as always. i'm trying out franz because we're hitting again, like every 6months, some incompatibility issues between sarama and eventhub (this time Fetch request can't be unmarshaled).
I wonder if you had any experience like this with eventhub, or if you recently tested franz-go against EH.
from franz-go.
It's been a good bit since I tested franz-go with EH actually, but I never had any issue like that, but I also primarily produced to EH. It probably wouldn't be a today thing if I need to re-figure out how to setup EH, so I'm hoping a few more tries of some random stuff on your end might help.
I have a suspicion as to what this is... the retention time for the offset commit is 0. This field was introduced in Kafka v0.11.0 and removed in v2.0.0. I'm going to push a commit right now that changes this to -1 (the default unset value).
from franz-go.
Can you try this commit 262afb4 and let me know if that changes anything?
from franz-go.
It fixed it. Great!
edit: To write a few more words: thanks so much for reacting quickly, saved me some time and saved me from using confluent-kafka-go :) Keep up the good work!
from franz-go.
Great! I'll tag this soon. I'm going to convert everything the client to use kmsg.NewXyz appropriately (which sets defaults in structs properly), and potentially a few other things. How important is the tag, or is it possible to wait a week while I figure out whether any API should change for 1.0?
from franz-go.
no problem,i can stick to the commit on master for now.
from franz-go.
I will tag this as v0.10.3 tomorrow, and then also release v0.11.0 which will contain a few (breaking) option renames (as part of the step towards a v1 release; tagging this as v0.10.3 will allow for an easy upgrade if you do not want to rename a few config options).
from franz-go.
I've release v0.10.3, which is a stable release that will fix this. Feel free to pin to that, or v0.11.0 which will change some API and serves as an rc2 for v1.0.0.
from franz-go.
Related Issues (20)
- fetchMetadata causing issues when running kafka in docker during dev HOT 4
- Question: best practice when consuming from multiple topics HOT 2
- TransactionalID Partition Suffix HOT 6
- Another kafka NullPointerException on fetch HOT 2
- Feature request: KIP-842 HOT 1
- lz4 decompression causes a lot of allocations due to ioutil.ReadAll HOT 16
- Kotel consumer not setting trace id correctly. HOT 4
- "default commit failed" logs contain no error HOT 4
- Custom number of acks HOT 2
- How to handle TLS certificate renewals from client HOT 3
- Option to set group.instance.id HOT 4
- Question on serde for Protobuf HOT 5
- Allow rebalance question with manual commits HOT 1
- Support for ACL requests in kfake package
- v1.16 release status
- Struggling to catch retryable errors despite KeepRetryableFetchErrors() option HOT 7
- Consumer fetch returns error when coordinator broker can't be reached HOT 1
- Question about fetching timestamp offset HOT 7
- Panic when calling `client.OptValue(kgo.WithLogger)` when logger was not set. HOT 1
- gssapi HOT 1
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from franz-go.