Git Product home page Git Product logo

Comments (15)

birdayz avatar birdayz commented on September 26, 2024

Some more info/logs:

kgo[DEBUG] updated committed; group: ----, to: ---1{2126=>2127}]

However, when i restart it, i see:

kgo[INFO] fetching offsets for added partitions; group: -----------------, added: map[-----:[0 1 2 3]]
kgo[DEBUG] sharded request; destinations: [0]
kgo[DEBUG] wrote OffsetFetch v3; broker: 0, bytes_written: 66, write_wait: 32.331µs, time_to_write: 90.121µs, err: <nil>
kgo[DEBUG] read OffsetFetch v3; broker: 0, bytes_read: 94, read_wait: 33.453µs, time_to_read: 21.29811ms, err: <nil>
kgo[DEBUG] assign requires loading offsets
kgo[DEBUG] fetched committed offsets; group: ------, fetched: map[----:map[0:{-2 0 -1 0} 1:{-2 0 -1 0} 2:{-2 0 -1 0} 3:{-2 0 -1 0}]]

(censored hub and group name, but they match)

from franz-go.

twmb avatar twmb commented on September 26, 2024

What kgo version are you running? Also, are you consuming few enough records such that PollFetches only runs once?

from franz-go.

twmb avatar twmb commented on September 26, 2024

If you're using v0.10.2, I'm worried you may be running into this, which was fixed yesterday: c6df11d

If you're using v0.10.1, then this should be working, and I'd be curious to know what the output of kcl group describe -v <yourgroup> says (if you can use kcl).

from franz-go.

vtolstov avatar vtolstov commented on September 26, 2024

dont do that

fetches.EachRecord(func(record *kgo.Record) {
			processor.Process(context.Background(), record)
			seen++
			rs = append(rs, record)
		})
		if err := cl.CommitRecords(context.Background(), rs...); err != nil {
			fmt.Printf("commit records failed: %v", err)
			continue
		}

rs = append(rs, records) i'm have something like this and this bring very big garbage overhead
if you have many messages and want to have lower memory overhead (and cpu spikes when gc drains) create own map for CommitOffsetsSync

from franz-go.

birdayz avatar birdayz commented on September 26, 2024
$ kcl group describe -v -----------
BROKER      0
ID          iothub-ingester
STATE       Stable
PROTO TYPE  consumer
PROTO       range
MEMBERS     1
ERROR
TOPIC   PARTITION  CURRENT OFFSET  LOG END OFFSET  LAG   MEMBER ID                                                                                                             INSTANCE ID  CLIENT ID  HOST     USER DATA  LOAD ERR
---  0          -1              -1              0     ----------------------:kgo-57005fc856714bcd85ad4aab21039212               kgo        0.0.0.0
---  1          -1              -1              0     ----------------------:kgo-57005fc856714bcd85ad4aab21039212               kgo        0.0.0.0
---  2          -1              -1              0     ----------------------:kgo-57005fc856714bcd85ad4aab21039212               kgo        0.0.0.0
---  3          -1              -1              0     ----------------------kgo-57005fc856714bcd85ad4aab21039212               kgo        0.0.0.0

edit: i'm on latest master, did a go get @ master just a couple minutes before.

from franz-go.

birdayz avatar birdayz commented on September 26, 2024

Some more info. Spew.dump from the Commit callback:

(*kmsg.OffsetCommitRequest)(0xc00050a4e0)({
 Version: (int16) 3,
 Group: (string) (len=15) "------------",
 Generation: (int32) 7,
 MemberID: (string) (len=116) "------------------------:c:------------:I:kgo-ddec0c3ca255495dbac13d64a1c964d3",
 InstanceID: (*string)(<nil>),
 RetentionTimeMillis: (int64) 0,
 Topics: ([]kmsg.OffsetCommitRequestTopic) (len=1 cap=1) {
  (kmsg.OffsetCommitRequestTopic) {
   Topic: (string) (len=6) "--------",
   Partitions: ([]kmsg.OffsetCommitRequestTopicPartition) (len=1 cap=1) {
    (kmsg.OffsetCommitRequestTopicPartition) {
     Partition: (int32) 1,
     Offset: (int64) 2403,
     Timestamp: (int64) 0,
     LeaderEpoch: (int32) 0,
     Metadata: (*string)(0xc00050a500)((len=116) "------------------------:c:------------:I:kgo-ddec0c3ca255495dbac13d64a1c964d3"),
     UnknownTags: (kmsg.Tags) {
      keyvals: (map[uint32][]uint8) <nil>
     }
    }
   },
   UnknownTags: (kmsg.Tags) {
    keyvals: (map[uint32][]uint8) <nil>
   }
  }
 },
 UnknownTags: (kmsg.Tags) {
  keyvals: (map[uint32][]uint8) <nil>
 }
})
(*kmsg.OffsetCommitResponse)(0xc0007243c0)({
 Version: (int16) 3,
 ThrottleMillis: (int32) 0,
 Topics: ([]kmsg.OffsetCommitResponseTopic) (len=1 cap=1) {
  (kmsg.OffsetCommitResponseTopic) {
   Topic: (string) (len=6) "--------",
   Partitions: ([]kmsg.OffsetCommitResponseTopicPartition) (len=1 cap=1) {
    (kmsg.OffsetCommitResponseTopicPartition) {
     Partition: (int32) 1,
     ErrorCode: (int16) 0,
     UnknownTags: (kmsg.Tags) {
      keyvals: (map[uint32][]uint8) <nil>
     }
    }
   },
   UnknownTags: (kmsg.Tags) {
    keyvals: (map[uint32][]uint8) <nil>
   }
  }
 },
 UnknownTags: (kmsg.Tags) {
  keyvals: (map[uint32][]uint8) <nil>
 }
})

from franz-go.

twmb avatar twmb commented on September 26, 2024

The output from kcl group describe is a bit mismatched from the output from the offset commit: the offset commit indicates that the request was successful and committed offset 2403 for partition 1, but the group describe indicates offset 0 (essentially what you're describing).

(also fwiw, kcl has some new commits @ master that would make the group output nicer, I plan to cut a release for that shortly)

Another oddity is that the member ID in the offset commit is different from the member ID in the group describe. Was the group describe run at the same time as the spew.Dump of the offset commit / request? Trying to narrow down variables here. And, is it possible there is some typo mismatch somewhere (group or topic)?

from franz-go.

birdayz avatar birdayz commented on September 26, 2024

yeah sorry, IIRC the two outputs were from separate runs.

group names match ( i censored them), and subsequent runs keep re-processing all the old data (as per auto-reset to oldest).

i would not rule out some.. speciality of eventhub, as always. i'm trying out franz because we're hitting again, like every 6months, some incompatibility issues between sarama and eventhub (this time Fetch request can't be unmarshaled).
I wonder if you had any experience like this with eventhub, or if you recently tested franz-go against EH.

from franz-go.

twmb avatar twmb commented on September 26, 2024

It's been a good bit since I tested franz-go with EH actually, but I never had any issue like that, but I also primarily produced to EH. It probably wouldn't be a today thing if I need to re-figure out how to setup EH, so I'm hoping a few more tries of some random stuff on your end might help.

I have a suspicion as to what this is... the retention time for the offset commit is 0. This field was introduced in Kafka v0.11.0 and removed in v2.0.0. I'm going to push a commit right now that changes this to -1 (the default unset value).

from franz-go.

twmb avatar twmb commented on September 26, 2024

Can you try this commit 262afb4 and let me know if that changes anything?

from franz-go.

birdayz avatar birdayz commented on September 26, 2024

It fixed it. Great!

edit: To write a few more words: thanks so much for reacting quickly, saved me some time and saved me from using confluent-kafka-go :) Keep up the good work!

from franz-go.

twmb avatar twmb commented on September 26, 2024

Great! I'll tag this soon. I'm going to convert everything the client to use kmsg.NewXyz appropriately (which sets defaults in structs properly), and potentially a few other things. How important is the tag, or is it possible to wait a week while I figure out whether any API should change for 1.0?

from franz-go.

birdayz avatar birdayz commented on September 26, 2024

no problem,i can stick to the commit on master for now.

from franz-go.

twmb avatar twmb commented on September 26, 2024

I will tag this as v0.10.3 tomorrow, and then also release v0.11.0 which will contain a few (breaking) option renames (as part of the step towards a v1 release; tagging this as v0.10.3 will allow for an easy upgrade if you do not want to rename a few config options).

from franz-go.

twmb avatar twmb commented on September 26, 2024

I've release v0.10.3, which is a stable release that will fix this. Feel free to pin to that, or v0.11.0 which will change some API and serves as an rc2 for v1.0.0.

from franz-go.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.