Git Product home page Git Product logo

franz-go's People

Contributors

axw avatar brunsgaard avatar c-pro avatar celrenheit avatar dcrodman avatar dependabot[bot] avatar dwagin avatar eduard-netsajev avatar jacobsmoller avatar jan-g avatar jgshaw avatar kklipsch avatar ladislavmacoun avatar lovromazgon avatar luke-vear avatar michaelwilner avatar mihaitodor avatar neal avatar owenhaynes avatar pleasingfungus avatar r-vasquez avatar sbuliarca avatar simon0191 avatar streppel avatar turkenh avatar twmb avatar vtolstov avatar weeco avatar yianni avatar yuzhichang avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

franz-go's Issues

Add produceSync() method

In order to await the response of a produced record I have to use a callback. This is a bit cumbersome as I have to use a waitgroup or a channel so that I could proceed with handling the produce response like this:

       var wg sync.WaitGroup
	wg.Add(1)
	err = client.Produce(ctx, &rec, func(rec *kgo.Record, err error) {
		defer wg.Done()
		if err != nil {
			c.logger.Error("failed to produce record",
				zap.String("topic_name", rec.Topic),
				zap.Error(err),
			)
		}
	})
	wg.Wait()

Therefore it would be very handy if franz-go offers another method, such as ProduceSync() that does that for me and only returns when the produce request has been handled, so that I don't need to use a promise function.

not all messages consumed from topic with may partitions

i have test topic with 33 partitions, publish to topic 600 000 messages and try to consume it via PollFetches func
after consuming messages and blocking on PollFetches i'm check lag in partitions and get

GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                               HOST            CLIENT-ID
test            test            14         272719          272719          0               test-2ae9870c-08b0-40dd-96ee-ae3cd7390814 /172.18.0.1     test
test            test            6          95404           95404           0               test-2ae9870c-08b0-40dd-96ee-ae3cd7390814 /172.18.0.1     test
test            test            17         330429          330429          0               test-2ae9870c-08b0-40dd-96ee-ae3cd7390814 /172.18.0.1     test
test            test            20         231589          231591          2               test-2ae9870c-08b0-40dd-96ee-ae3cd7390814 /172.18.0.1     test
test            test            29         252090          252090          0               test-2ae9870c-08b0-40dd-96ee-ae3cd7390814 /172.18.0.1     test
test            test            23         173664          173664          0               test-2ae9870c-08b0-40dd-96ee-ae3cd7390814 /172.18.0.1     test
test            test            24         212880          212880          0               test-2ae9870c-08b0-40dd-96ee-ae3cd7390814 /172.18.0.1     test
test            test            11         173948          173948          0               test-2ae9870c-08b0-40dd-96ee-ae3cd7390814 /172.18.0.1     test
test            test            10         84643           84643           0               test-2ae9870c-08b0-40dd-96ee-ae3cd7390814 /172.18.0.1     test
test            test            28         119807          119807          0               test-2ae9870c-08b0-40dd-96ee-ae3cd7390814 /172.18.0.1     test
test            test            19         239245          239245          0               test-2ae9870c-08b0-40dd-96ee-ae3cd7390814 /172.18.0.1     test
test            test            0          246418          246419          1               test-2ae9870c-08b0-40dd-96ee-ae3cd7390814 /172.18.0.1     test
test            test            7          109678          109678          0               test-2ae9870c-08b0-40dd-96ee-ae3cd7390814 /172.18.0.1     test
test            test            18         477203          477218          15              test-2ae9870c-08b0-40dd-96ee-ae3cd7390814 /172.18.0.1     test
test            test            22         146831          146831          0               test-2ae9870c-08b0-40dd-96ee-ae3cd7390814 /172.18.0.1     test
test            test            25         223192          223192          0               test-2ae9870c-08b0-40dd-96ee-ae3cd7390814 /172.18.0.1     test
test            test            5          230747          230748          1               test-2ae9870c-08b0-40dd-96ee-ae3cd7390814 /172.18.0.1     test
test            test            8          345563          345564          1               test-2ae9870c-08b0-40dd-96ee-ae3cd7390814 /172.18.0.1     test
test            test            1          291728          291728          0               test-2ae9870c-08b0-40dd-96ee-ae3cd7390814 /172.18.0.1     test
test            test            26         280354          280895          541             test-2ae9870c-08b0-40dd-96ee-ae3cd7390814 /172.18.0.1     test
test            test            27         387977          387979          2               test-2ae9870c-08b0-40dd-96ee-ae3cd7390814 /172.18.0.1     test
test            test            13         369057          369057          0               test-2ae9870c-08b0-40dd-96ee-ae3cd7390814 /172.18.0.1     test
test            test            4          227867          227867          0               test-2ae9870c-08b0-40dd-96ee-ae3cd7390814 /172.18.0.1     test
test            test            30         331319          331319          0               test-2ae9870c-08b0-40dd-96ee-ae3cd7390814 /172.18.0.1     test
test            test            16         142291          142291          0               test-2ae9870c-08b0-40dd-96ee-ae3cd7390814 /172.18.0.1     test
test            test            9          345443          345444          1               test-2ae9870c-08b0-40dd-96ee-ae3cd7390814 /172.18.0.1     test
test            test            3          276085          276085          0               test-2ae9870c-08b0-40dd-96ee-ae3cd7390814 /172.18.0.1     test
test            test            21         328815          328815          0               test-2ae9870c-08b0-40dd-96ee-ae3cd7390814 /172.18.0.1     test
test            test            12         233569          233569          0               test-2ae9870c-08b0-40dd-96ee-ae3cd7390814 /172.18.0.1     test
test            test            32         160723          160728          5               test-2ae9870c-08b0-40dd-96ee-ae3cd7390814 /172.18.0.1     test
test            test            15         114914          114914          0               test-2ae9870c-08b0-40dd-96ee-ae3cd7390814 /172.18.0.1     test
test            test            2          192850          192853          3               test-2ae9870c-08b0-40dd-96ee-ae3cd7390814 /172.18.0.1     test
test            test            31         187689          187689          0               test-2ae9870c-08b0-40dd-96ee-ae3cd7390814 /172.18.0.1     test

[Question] Recovering from INVALID_PRODUCER_EPOCH

Hey @twmb, over the course of testing a transactional producer I've written I've encountered this error from EndTransaction after a rebalance:

INVALID_PRODUCER_EPOCH: Producer attempted an operation with an old epoch.

For a bit of context, this is from a consumer (currently Sarama, will be franz-go soon) that does the following:

  1. client.BeginTransaction
  2. Consumes a record from a topic
  3. Writes a bunch of records to a different topic (client.Send)
  4. If all goes well, client.EndTransaction(ctx, kgo.TryCommit)
  5. If something goes wrong in step 3 or 4, there's a client.AbortBufferedRecords followed by a client.EndTransaction(ctx, kgo.TryAbort)

We're running multiple instances of these consumers, each reading from one or more partitions on the input topic and writing (randomly) to partitions on the output topic. These tasks can be relatively long running and rebalances may occur during any of those steps due to deploys, scaling, kafka operations, etc.

What happens after a rebalance is they start throwing those kerr.InvalidProducerEpoch errors. A little digging suggests this is due to franz-go's handling of KIP-588 and that seems to make sense, but what isn't clear to me is how to recover from this situation. Right now, the producers just spin forever on that error - able to begin a transaction but not write records or end the transaction (though aborting still works).

I remembered this doc from config.TransactionalID, but I'm not sure it applies to me since I'm not using a franz-go consumer.

// Note that, unless using Kafka 2.5.0, a consumer group rebalance may be
// problematic. Production should finish and be committed before the client
// rejoins the group. It may be safer to use an eager group balancer and just
// abort the transaction. Alternatively, any time a partition is revoked, you
// could abort the transaction and reset offsets being consumed.

So with that in mind and with the bold assumption that I'm doing this correctly in the first place, what is the proper way to recover from this situation with the franz-go API?

Bootstrap on assignement and closing on revocation

Hello,

I am experimenting with your client and I am wondering how do you handle bootstrap functions on assignement while accounting for possible errors.
My use-case is reading a specific partition of a compacted topic for (re)building a state on assignment of a partition.
OnAssigned and OnRevoked do not return an error and for good reasons (avoiding to exceed the rebalance interval as suggested in the comment).

I am wondering if you have suggestions on how to do that with the provided abstractions ?

Thank you

Lazy loading connection with connection errors

Version 0.6.12

I am testing producer with some invalid dns name. So dns resolver should fail on Dial and this should be treated as non retry-able error.

What I get instead are infinite attempts to resolve dns in a loop (OnConnect hook):

OnConnect {-2147483648 9092 nonexistingserver-producer <nil> {}} 2.684883ms dial tcp: lookup nonexistingserver-producer: no such host
OnConnect {-2147483648 9092 nonexistingserver-producer <nil> {}} 1.986152ms dial tcp: lookup nonexistingserver-producer: no such host
OnConnect {-2147483648 9092 nonexistingserver-producer <nil> {}} 2.117893ms dial tcp: lookup nonexistingserver-producer: no such host
OnConnect {-2147483648 9092 nonexistingserver-producer <nil> {}} 2.425091ms dial tcp: lookup nonexistingserver-producer: no such host
OnConnect {-2147483648 9092 nonexistingserver-producer <nil> {}} 2.018065ms dial tcp: lookup nonexistingserver-producer: no such host
OnConnect {-2147483648 9092 nonexistingserver-producer <nil> {}} 2.03519ms dial tcp: lookup nonexistingserver-producer: no such host
OnConnect {-2147483648 9092 nonexistingserver-producer <nil> {}} 2.150969ms dial tcp: lookup nonexistingserver-producer: no such host
OnConnect {-2147483648 9092 nonexistingserver-producer <nil> {}} 2.081916ms dial tcp: lookup nonexistingserver-producer: no such host

I've tried looking at the code and all retry conditions are returning that this should not be retried. RetryTimeout and RequestRetries are not affecting anything. So I am guessing some other mechanism is triggering this undesirable behaviour.

Return more descriptive error messages for kerr.Error

I often want to return a proper error message for one of Kafka's returned error codes. I use kerr.ErrorForCode(partition.ErrorCode) for this purpose that returns an error which is of type kerr.Error. I'd only like to return a single descriptive string rather than the whole error object and therefore I call the Error() method which unfortunately only returns the Message property of that kerr.Error struct which is something like UNKNOWN_TOPIC_OR_PARTITION. A more descriptive error string would be something like this:

func (e *Error) Error() string {
	return fmt.Sprintf("%v (%v))", e.Description, e.Message)
}

That would for example return: "This server does not host this topic-partition. (UNKNOWN_TOPIC_OR_PARTITION)"

bug in concurrent map write with latest master

goroutine 114 [running]:

runtime.throw(0x13823a4, 0x26)

 /usr/local/go/src/runtime/panic.go:1116 +0x72 fp=0xc000393cc8 sp=0xc000393c98 pc=0x445332

runtime.mapiternext(0xc000393e30)

 /usr/local/go/src/runtime/map.go:853 +0x554 fp=0xc000393d48 sp=0xc000393cc8 pc=0x41e514

runtime.mapiterinit(0x116e0e0, 0xc00058c420, 0xc000393e30)

 /usr/local/go/src/runtime/map.go:843 +0x1c5 fp=0xc000393d68 sp=0xc000393d48 pc=0x41dec5

github.com/twmb/franz-go/pkg/kgo.(*groupConsumer).getUncommittedLocked(0xc00018e380, 0x1, 0x1366fb9)

 github.com/twmb/franz-go/pkg/kgo/consumer_group.go:1709 +0x495 fp=0xc000393f00 sp=0xc000393d68 pc=0xa8a335

github.com/twmb/franz-go/pkg/kgo.(*groupConsumer).loopCommit(0xc00018e380)

github.com/twmb/franz-go/pkg/kgo/consumer_group.go:1571 +0x291 fp=0xc000393fd8 sp=0xc000393f00 pc=0xa89251

runtime.goexit()

 /usr/local/go/src/runtime/asm_amd64.s:1374 +0x1 fp=0xc000393fe0 sp=0xc000393fd8 pc=0x47d9c1

created by github.com/twmb/franz-go/pkg/kgo.(*consumer).initGroup

 github.com/twmb/franz-go/pkg/kgo/consumer_group.go:207 +0x708

retriable error bug?

(apologies for the bad issue title, I don't have enough context yet so this is the best I could name the issue but feel free to update it)

so I noticed after bumping from v0.10.2 to v0.10.3, my producer was not working. I was using sync producer so it was just stuck until context got canceled. I figured to use the bench example to try reproducing and it was not producing anything as well:

$ ./bench -brokers localhost:61810 -topic test
0.00 MiB/s; 0.00k records/s
0.00 MiB/s; 0.00k records/s
0.00 MiB/s; 0.00k records/s

after some testing, I think there's something with the retriable code. this change makes it work:

diff --git a/pkg/kgo/client.go b/pkg/kgo/client.go
index 486708c..9db4d30 100644
--- a/pkg/kgo/client.go
+++ b/pkg/kgo/client.go
@@ -619,6 +619,7 @@ start:
 				// just retry now. If the error is *not* retriable but
 				// is a broker-specific network error, and the next
 				// broker is different than the current, we also retry.
+				fmt.Printf("retriable: err=%v retryErr=%v\n", err, retryErr)
 				if r.cl.shouldRetry(tries, err) || r.cl.shouldRetry(tries, retryErr) {
 					if r.cl.waitTries(ctx, tries) {
 						next, nextErr = r.br()
diff --git a/pkg/kgo/errors.go b/pkg/kgo/errors.go
index e4995bf..af17799 100644
--- a/pkg/kgo/errors.go
+++ b/pkg/kgo/errors.go
@@ -11,7 +11,7 @@ import (

 func isRetriableBrokerErr(err error) bool {
 	if err == nil { // sanity
-		return true
+		// return true
 	}
 	// https://github.com/golang/go/issues/45729
 	//
diff --git a/pkg/kgo/go115.go b/pkg/kgo/go115.go
index 634f131..473672c 100644
--- a/pkg/kgo/go115.go
+++ b/pkg/kgo/go115.go
@@ -6,5 +6,8 @@ package kgo
 import "strings"

 func isNetClosedErr(err error) bool {
+	if err == nil { // sanity
+		return false
+	}
 	return strings.Contains(err.Error(), "use of closed network connection")
 }
$ ./bench -brokers localhost:61810 -topic test
retriable: err=broker is too old; the broker has already indicated it will not know how to handle the request retryErr=<nil>
33.99 MiB/s; 356.41k records/s
12.99 MiB/s; 136.23k records/s
12.99 MiB/s; 136.23k records/s
38.98 MiB/s; 408.69k records/s
38.98 MiB/s; 408.69k records/s

I left the debug log and reverted the rest and this is what I get:

$ ./bench -brokers localhost:61810 -topic test
retriable: err=broker is too old; the broker has already indicated it will not know how to handle the request retryErr=<nil>
retriable: err=broker is too old; the broker has already indicated it will not know how to handle the request retryErr=<nil>
retriable: err=broker is too old; the broker has already indicated it will not know how to handle the request retryErr=<nil>
retriable: err=broker is too old; the broker has already indicated it will not know how to handle the request retryErr=<nil>
0.00 MiB/s; 0.00k records/s
retriable: err=broker is too old; the broker has already indicated it will not know how to handle the request retryErr=<nil>
0.00 MiB/s; 0.00k records/s
retriable: err=broker is too old; the broker has already indicated it will not know how to handle the request retryErr=<nil>
0.00 MiB/s; 0.00k records/s
retriable: err=broker is too old; the broker has already indicated it will not know how to handle the request retryErr=<nil>

for what it's worth, I am using the latest redpanda:

$ rpk version
v21.8.1 (rev 3522ce7e9e4b9fbe62b9474f80138c4deeb8359b)

[Question] Start consuming from the end?

I've successfully consumed all the records in my kafka server and I terminate the application, When I launch my application it runs through all the records again, how do you just start where you left off?

how to gracefully stop consuming

in sarama or segmentio packages i can do unsubscribe from topic before closing connection.
How to deal with this in franz-go ?
my use case - ability to unsubscribe from topic gracefully, commit all pending offsets and close the connection

Calculating consumer lag

I think I got it form the documentation but wanted to confirm with you because it's not completely clear.

You would need to use this request to get Latest Committed Offset (LCO):
https://pkg.go.dev/github.com/twmb/[email protected]/pkg/kmsg#OffsetFetchRequest
https://pkg.go.dev/github.com/twmb/[email protected]/pkg/kmsg#OffsetFetchResponseTopicPartition

Then send this request to get Latest Partition Offset (LPO):
https://pkg.go.dev/github.com/twmb/[email protected]/pkg/kmsg#ListOffsetsRequest
https://pkg.go.dev/github.com/twmb/[email protected]/pkg/kmsg#ListOffsetsResponseTopicPartition

And LAG for particular group would be:

LAG
for each topic
    for each partition
        LAG += LPO - LCO

Is this enough or more operations are needed?

Eventhub example / trouble committing

Hey! Thanks for the great library.

Do i have to consider specific things to work with EventHub? It seems, that my offset commits have no effect. I do commit, tried various methods (all from the group example). However, once my consumer re-connects, i start over again (offsets were apparently not correctly committed).

Excerpt from my setup:

	opts := []kgo.Opt{
		kgo.SeedBrokers(consumerCfg.Brokers...),
		kgo.ConsumerGroup(consumerCfg.ConsumerGroup),
		kgo.ConsumeTopics(iothubTopic),
		// SASL Options
		kgo.SASL(plain.Auth{
			User: "$ConnectionString",
			Pass: connString,
		}.AsMechanism()),

		// Configure TLS. Uses SystemCertPool for RootCAs by default.
		kgo.Dialer(tlsDialer.DialContext),
	}

func consume(ctx context.Context, cl *kgo.Client, log *zap.Logger, processor *processor.Processor) error {
	for {
		fetches := cl.PollFetches(context.Background())
		if fetches.IsClientClosed() {
			return nil
		}
		fetches.EachError(func(t string, p int32, err error) {
			log.Error("Error on partition", zap.String("topic", t), zap.Int32("partition", p), zap.Error(err))
		})

		var rs []*kgo.Record

		var seen int
		fetches.EachRecord(func(record *kgo.Record) {
			processor.Process(context.Background(), record)
			seen++
			rs = append(rs, record)
		})
		if err := cl.CommitRecords(context.Background(), rs...); err != nil {
			fmt.Printf("commit records failed: %v", err)
			continue
		}

		// TODO pass to handler

		//if err := cl.CommitUncommittedOffsets(context.Background()); err != nil {
		//log.Error("Failed to commit offsets", zap.Error(err))
		//		continue
		//	}
		log.Info("Processed records offsets", zap.Int("n", seen))
	}
}

Interface for kgo.Client

Any plans to provide an interface for kgo.Client struct in this pacakge? It'll be very useful when users want to mock their client during unit tests.

1.0 release status

1.0 is coming soon. This issue tracks what I'd like to accomplish beforehand.

  • #57 should be addressed
  • RetryTimeout => rename RetryTimeoutFn, add new RetryTimeout option with a static time.Duration argument
  • RetryBackoff => rename RetryBackoffFn (a static RetryBackoff will not be added, because linear static backoff is not a great backoff strategy)
  • ProduceRetries => rename RecordRetries
  • add func DialTLSConfig(*tls.Config) option, which will trigger dialing by tls (simplifies setting up tls)

Some of these are breaking changes to things that are currently seldom used.

Possible record loss on unclean kill of consumer when using automatic offset commit

Because the offset commit and consumption are done independently it is possible to have record loss on the consumer side if the consumer process has a kill -9 applied to it mid loop.
You can reproduce this by producing 1000 records, then run up a consumer and sleep at the 500 record mark. Then issue a kill -9 on the consumer process. When the consumer starts back up the full 1000 records sometimes don't all get consumed.
This is presumably because the offsets for these records have already been committed.
When you run the Go confluent lib this does not happen because the offset commit occurs inside the Poll call itself, so there is no message loss.

panic: runtime error: invalid memory address or nil pointer dereference

Client version v0.6.9
Kafka Version: kafka 2.6
Connection Auth: MTLS connection
Auto commit disabled

Our Kafka dev cluster can be under high load, so no idea if its related

panic: runtime error: invalid memory address or nil pointer dereference
[signal SIGSEGV: segmentation violation code=0x1 addr=0x0 pc=0x7d381f]

goroutine 10119 [running]:
github.com/twmb/franz-go/pkg/kgo.(*topicPartitions).load(...)
        /go/pkg/mod/github.com/twmb/[email protected]/pkg/kgo/topics_and_partitions.go:73
github.com/twmb/franz-go/pkg/kgo.(*consumer).assignPartitions(0xc0007326a0, 0xc0030022d0, 0xc00084d500)
        /go/pkg/mod/github.com/twmb/[email protected]/pkg/kgo/consumer.go:357 +0x47f
github.com/twmb/franz-go/pkg/kgo.(*groupConsumer).fetchOffsets(0xc00084d4a0, 0xfc34a8, 0xc002bcce40, 0xc0024914a0, 0x0, 0x0)
        /go/pkg/mod/github.com/twmb/[email protected]/pkg/kgo/consumer_group.go:1201 +0x4d7
github.com/twmb/franz-go/pkg/kgo.(*groupConsumer).setupAssignedAndHeartbeat.func3(0xc002bd7080, 0xc002554060, 0xc00084d4a0, 0xc0024914a0, 0xfc34a8, 0xc002bcce40)
        /go/pkg/mod/github.com/twmb/[email protected]/pkg/kgo/consumer_group.go:750 +0x145
created by github.com/twmb/franz-go/pkg/kgo.(*groupConsumer).setupAssignedAndHeartbeat
        /go/pkg/mod/github.com/twmb/[email protected]/pkg/kgo/consumer_group.go:746 +0x387

List partition reassignments by topic

Hi,

I've bumped into an issue that has completely stumped me. I'm trying to request reassignments in progress for a specific topic and it doesn't seem to work. I can get the code to return all topics by commenting out the part where it appends to req.Topics, but if I specify a topic that I know has ongoing reassignments it doesn't return it.

I've been working to try and get to the bottom of the problem by switching out my implementation for bits of your implementation here. This is a snippet of what I'm currently trying.

	topic := "my_test_topic"

	req := &kmsg.ListPartitionReassignmentsRequest{
		TimeoutMillis: cl.TimeoutMs(),
	}

        // Copied from kcl for debugging purposes
	tps := make(map[string][]int32)
	if len(topic) > 0 {
		tps[topic] = nil
		// Tried this too, but no luck
		// tps[topic] = []int32{}
	}

        // If I comment out this section then it successfully returns all topics that have reassignments in progress
	for topic, partitions := range tps {
		req.Topics = append(req.Topics, kmsg.ListPartitionReassignmentsRequestTopic{
			Topic:      topic,
			Partitions: partitions,
		})
	}

	kresp, err := cl.Client().Request(context.Background(), req)
	if err != nil {
		return nil, err
	}
	resp := kresp.(*kmsg.ListPartitionReassignmentsResponse)

	if err := kerr.ErrorForCode(resp.ErrorCode); err != nil {
		return nil, fmt.Errorf(*resp.ErrorMessage)
	}

	for _, topic := range resp.Topics {
		for _, p := range topic.Partitions {
			fmt.Printf("Topic: %s Partition %s: +%d -%d\n", topic.Topic, fmt.Sprint(p.Partition), len(p.AddingReplicas), len(p.RemovingReplicas))
		}
	}

Any ideas what I'm doing wrong? Or is this perhaps a bug?

I'm using franz-go v0.11.0 and Kafka 2.8.0.

bug in CommitRecords?

https://pkg.go.dev/github.com/twmb/franz-go/pkg/kgo#Client.CommitRecords

GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                               HOST            CLIENT-ID
test            test            14         113803          113804          1               test-f556d15a-12a5-433d-af67-fe107b303a6b /172.18.0.1     test
test            test            6          70845           70846           1               test-f556d15a-12a5-433d-af67-fe107b303a6b /172.18.0.1     test
test            test            17         83443           83444           1               test-f556d15a-12a5-433d-af67-fe107b303a6b /172.18.0.1     test
test            test            20         92923           92924           1               test-f556d15a-12a5-433d-af67-fe107b303a6b /172.18.0.1     test
test            test            29         115043          115044          1               test-f556d15a-12a5-433d-af67-fe107b303a6b /172.18.0.1     test
test            test            23         91864           91865           1               test-f556d15a-12a5-433d-af67-fe107b303a6b /172.18.0.1     test
test            test            24         163181          163182          1               test-f556d15a-12a5-433d-af67-fe107b303a6b /172.18.0.1     test
test            test            11         126313          126314          1               test-f556d15a-12a5-433d-af67-fe107b303a6b /172.18.0.1     test
test            test            10         26007           26008           1               test-f556d15a-12a5-433d-af67-fe107b303a6b /172.18.0.1     test
test            test            28         49406           49407           1               test-f556d15a-12a5-433d-af67-fe107b303a6b /172.18.0.1     test
test            test            19         189287          189288          1               test-f556d15a-12a5-433d-af67-fe107b303a6b /172.18.0.1     test
test            test            0          143943          143944          1               test-f556d15a-12a5-433d-af67-fe107b303a6b /172.18.0.1     test
test            test            7          47460           47461           1               test-f556d15a-12a5-433d-af67-fe107b303a6b /172.18.0.1     test
test            test            18         293857          293858          1               test-f556d15a-12a5-433d-af67-fe107b303a6b /172.18.0.1     test
test            test            22         78649           78650           1               test-f556d15a-12a5-433d-af67-fe107b303a6b /172.18.0.1     test
test            test            25         130283          130284          1               test-f556d15a-12a5-433d-af67-fe107b303a6b /172.18.0.1     test
test            test            5          139839          139840          1               test-f556d15a-12a5-433d-af67-fe107b303a6b /172.18.0.1     test
test            test            8          229405          229406          1               test-f556d15a-12a5-433d-af67-fe107b303a6b /172.18.0.1     test
test            test            1          177599          177600          1               test-f556d15a-12a5-433d-af67-fe107b303a6b /172.18.0.1     test
test            test            26         143427          143428          1               test-f556d15a-12a5-433d-af67-fe107b303a6b /172.18.0.1     test
test            test            27         227426          227427          1               test-f556d15a-12a5-433d-af67-fe107b303a6b /172.18.0.1     test
test            test            13         48765           48766           1               test-f556d15a-12a5-433d-af67-fe107b303a6b /172.18.0.1     test
test            test            4          205139          205140          1               test-f556d15a-12a5-433d-af67-fe107b303a6b /172.18.0.1     test
test            test            30         217249          217250          1               test-f556d15a-12a5-433d-af67-fe107b303a6b /172.18.0.1     test
test            test            16         118387          118388          1               test-f556d15a-12a5-433d-af67-fe107b303a6b /172.18.0.1     test
test            test            9          183699          183700          1               test-f556d15a-12a5-433d-af67-fe107b303a6b /172.18.0.1     test
test            test            3          227301          227302          1               test-f556d15a-12a5-433d-af67-fe107b303a6b /172.18.0.1     test
test            test            21         91192           91193           1               test-f556d15a-12a5-433d-af67-fe107b303a6b /172.18.0.1     test
test            test            12         180762          180763          1               test-f556d15a-12a5-433d-af67-fe107b303a6b /172.18.0.1     test
test            test            32         88184           88185           1               test-f556d15a-12a5-433d-af67-fe107b303a6b /172.18.0.1     test
test            test            15         38668           38669           1               test-f556d15a-12a5-433d-af67-fe107b303a6b /172.18.0.1     test
test            test            2          77828           77829           1               test-f556d15a-12a5-433d-af67-fe107b303a6b /172.18.0.1     test
test            test            31         26093           26094           1               test-f556d15a-12a5-433d-af67-fe107b303a6b /172.18.0.1     test

Documentation for AdminClient API

Hey @twmb ,
I am currently considering migrating from sarama to your library for https://github.com/cloudhut/kowl (and potentially for other projects too). Your code looks clean, very well (!) documented and it supports many of the modern features while still supporting all the old Kafka versions as well. Great arguments to use this library, good work!

Kowl does way more lowlevel stuff than the common consumers and producers, most Kafka libraries do not expose any of the lower level functions which make them unusable for such projects.

My question

It's not very clear how one can use the AdminClient API. I don't fully understand yet how you generate the code for the Kafka messages. I think there's a lot potential to improve the documentation so that it can cope with the rest of the documentation qualitywise :-).

I looked at your CLI to figure out how to use the AdminClient: https://github.com/twmb/kcl

'unable to initialize sasl' exception when connects kerberized Kafka.

Hi
I am using kminion which embeded franz-go client to connect kerberized Kafka. After set up config.yaml, I got a 'unable to initialize sasl' exception. The config.yaml and exception as follows.
config.yaml:

kafka:
brokers: [172.16.1.102:21007]
clientId: "kminion"
rackId: ""
tls:
enabled: false
caFilepath: ""
certFilepath: ""
keyFilepath: ""
passphrase: ""
insecureSkipTlsVerify: false
sasl:
# Whether or not SASL authentication will be used for authentication
enabled: true
# Username to use for PLAIN or SCRAM mechanism
username: ""
# Password to use for PLAIN or SCRAM mechanism
password: ""
# Mechanism to use for SASL Authentication. Valid values are PLAIN, SCRAM-SHA-256, SCRAM-SHA-512, GSSAPI
mechanism: "GSSAPI"
# GSSAPI / Kerberos config properties
gssapi:
authType: "KEYTAB_AUTH"
keyTabPath: "C:\user.keytab"
kerberosConfigPath: "C:\krb5.conf"
serviceName: "kafka"
username: "qdgakk"
password: ""
realm: "BOTECH.COM"

Debug Info:

{"level":"info","ts":"2021-06-02T14:54:33.297+0800","msg":"started kminion","version":""}
{"level":"info","ts":"2021-06-02T14:54:33.311+0800","msg":"connecting to Kafka seed brokers, trying to fetch cluster metadata","seed_brokers":"172.16.1.102:21007"}
{"level":"debug","ts":"2021-06-02T14:54:33.311+0800","msg":"opening connection to broker","source":"kafka_client","addr":"172.16.1.102:21007","broker":-2147483648}
{"level":"debug","ts":"2021-06-02T14:54:33.315+0800","msg":"kafka connection succeeded","source":"kafka_client_hooks","host":"172.16.1.102","dial_duration":0.0037961}
{"level":"debug","ts":"2021-06-02T14:54:33.315+0800","msg":"connection opened to broker","source":"kafka_client","addr":"172.16.1.102:21007","broker":-2147483648}
{"level":"debug","ts":"2021-06-02T14:54:33.316+0800","msg":"issuing api versions request","source":"kafka_client","broker":-2147483648,"version":3}
{"level":"debug","ts":"2021-06-02T14:54:33.317+0800","msg":"wrote ApiVersions v3","source":"kafka_client","broker":-2147483648,"bytes_written":33,"write_wait":0,"time_to_write":0.0005889,"err":null}
{"level":"debug","ts":"2021-06-02T14:54:33.319+0800","msg":"read ApiVersions v3","source":"kafka_client","broker":-2147483648,"bytes_read":14,"read_wait":0,"time_to_read":0.0020606,"err":null}
{"level":"debug","ts":"2021-06-02T14:54:33.319+0800","msg":"kafka does not know our ApiVersions version, downgrading to version 0 and retrying","source":"kafka_client","broker":-2147483648}
{"level":"debug","ts":"2021-06-02T14:54:33.319+0800","msg":"issuing api versions request","source":"kafka_client","broker":-2147483648,"version":0}
{"level":"debug","ts":"2021-06-02T14:54:33.319+0800","msg":"wrote ApiVersions v0","source":"kafka_client","broker":-2147483648,"bytes_written":21,"write_wait":0,"time_to_write":0,"err":null}
{"level":"debug","ts":"2021-06-02T14:54:33.325+0800","msg":"read ApiVersions v0","source":"kafka_client","broker":-2147483648,"bytes_read":218,"read_wait":0,"time_to_read":0.0050713,"err":null}
{"level":"debug","ts":"2021-06-02T14:54:33.328+0800","msg":"beginning sasl authentication","source":"kafka_client","broker":-2147483648,"mechanism":"GSSAPI","authenticate":false}
{"level":"debug","ts":"2021-06-02T14:54:33.438+0800","msg":"issuing raw sasl authenticate","source":"kafka_client","broker":-2147483648,"step":0}
{"level":"error","ts":"2021-06-02T14:54:33.443+0800","msg":"unable to initialize sasl","source":"kafka_client","broker":-2147483648,"err":"EOF"}
{"level":"debug","ts":"2021-06-02T14:54:33.443+0800","msg":"connection initialization failed","source":"kafka_client","addr":"172.16.1.102:21007","broker":-2147483648,"err":"EOF"}
{"level":"debug","ts":"2021-06-02T14:54:33.444+0800","msg":"kafka broker disconnected","source":"kafka_client_hooks","host":"172.16.1.102"}
{"level":"debug","ts":"2021-06-02T14:54:33.530+0800","msg":"opening connection to broker","source":"kafka_client","addr":"172.16.1.102:21007","broker":-2147483648}
{"level":"debug","ts":"2021-06-02T14:54:33.534+0800","msg":"kafka connection succeeded","source":"kafka_client_hooks","host":"172.16.1.102","dial_duration":0.0025582}
{"level":"debug","ts":"2021-06-02T14:54:33.534+0800","msg":"connection opened to broker","source":"kafka_client","addr":"172.16.1.102:21007","broker":-2147483648}
{"level":"debug","ts":"2021-06-02T14:54:33.536+0800","msg":"issuing api versions request","source":"kafka_client","broker":-2147483648,"version":3}
{"level":"debug","ts":"2021-06-02T14:54:33.537+0800","msg":"wrote ApiVersions v3","source":"kafka_client","broker":-2147483648,"bytes_written":33,"write_wait":0,"time_to_write":0,"err":null}
{"level":"debug","ts":"2021-06-02T14:54:33.541+0800","msg":"read ApiVersions v3","source":"kafka_client","broker":-2147483648,"bytes_read":14,"read_wait":0,"time_to_read":0.0041925,"err":null}
{"level":"debug","ts":"2021-06-02T14:54:33.541+0800","msg":"kafka does not know our ApiVersions version, downgrading to version 0 and retrying","source":"kafka_client","broker":-2147483648}
{"level":"debug","ts":"2021-06-02T14:54:33.541+0800","msg":"issuing api versions request","source":"kafka_client","broker":-2147483648,"version":0}
{"level":"debug","ts":"2021-06-02T14:54:33.542+0800","msg":"wrote ApiVersions v0","source":"kafka_client","broker":-2147483648,"bytes_written":21,"write_wait":0,"time_to_write":0,"err":null}
{"level":"debug","ts":"2021-06-02T14:54:33.545+0800","msg":"read ApiVersions v0","source":"kafka_client","broker":-2147483648,"bytes_read":218,"read_wait":0,"time_to_read":0.0020165,"err":null}
{"level":"debug","ts":"2021-06-02T14:54:33.545+0800","msg":"beginning sasl authentication","source":"kafka_client","broker":-2147483648,"mechanism":"GSSAPI","authenticate":false}
{"level":"debug","ts":"2021-06-02T14:54:33.545+0800","msg":"issuing raw sasl authenticate","source":"kafka_client","broker":-2147483648,"step":0}
{"level":"error","ts":"2021-06-02T14:54:33.552+0800","msg":"unable to initialize sasl","source":"kafka_client","broker":-2147483648,"err":"EOF"}
{"level":"debug","ts":"2021-06-02T14:54:33.552+0800","msg":"connection initialization failed","source":"kafka_client","addr":"172.16.1.102:21007","broker":-2147483648,"err":"EOF"}
{"level":"debug","ts":"2021-06-02T14:54:33.552+0800","msg":"kafka broker disconnected","source":"kafka_client_hooks","host":"172.16.1.102"}
{"level":"info","ts":"2021-06-02T14:54:33.765+0800","msg":"received a signal, going to shut down KMinion"}
{"level":"debug","ts":"2021-06-02T14:54:33.765+0800","msg":"opening connection to broker","source":"kafka_client","addr":"172.16.1.102:21007","broker":-2147483648}
{"level":"debug","ts":"2021-06-02T14:54:33.767+0800","msg":"kafka connection failed","source":"kafka_client_hooks","broker_host":"172.16.1.102","error":"dial tcp 172.16.1.102:21007: operation was canceled"}
{"level":"warn","ts":"2021-06-02T14:54:33.768+0800","msg":"unable to open connection to broker","source":"kafka_client","addr":"172.16.1.102:21007","broker":-2147483648,"err":"dial tcp 172.16.1.102:21007: operation was canceled"}
{"level":"fatal","ts":"2021-06-02T14:54:33.771+0800","msg":"failed to test connectivity to Kafka cluster","error":"failed to request metadata: unable to dial: dial tcp 172.16.1.102:21007: operation was canceled"}

Do you know how I can avoid this exception? Appreciate

producer ID request failure

Hi, I'm running franz at a production system. We are receiving the error from the image. The system looked okay in our tests, but after 7 days it triggered producer ID failure in one microservice and the same error happened in other microservice pod after 10 days. Unfortunally the verbose log was not activate, as we use a centralized log for all pods in production. Does anyone could give some light on it ? I'm concerned that this could be a non-deterministic issue, maybe cause by starvation or deadlocks and we like this lib very much.
Thanks
image

Connecting fails (Unexpected Kafka request during SASL handshake)

Hi,
even though I already succeeded to connect to some Kafka clusters with franz-go now I have a case where the connection or the metadata request fails. I turned on debug logs in the client but I couldn't tell what exactly is wrong.

I'm trying to connect against a remote Kafka cluster that is protected with SASL_SSL (plain mechanism). It fails right at the first request I try to send (which is a metadata request). This fails because the 20s context expires at some point.

Franz-go v0.6.1

func (s *Service) GetMetadata(ctx context.Context) (*kmsg.MetadataResponse, error) {
	req := kmsg.MetadataRequest{
		Topics: nil,
	}

	return req.RequestWith(ctx, s.KafkaClient)
}

Client logs:

(The client logs indicate a succesful connection however the metadata request never returns until the context is exceeded):

{"level":"info","ts":"2020-11-24T11:41:05.369+0100","msg":"initializing customer service","service":"customer_service"}
{"level":"debug","ts":"2020-11-24T11:41:05.369+0100","msg":"opening connection to broker","source":"kafka_client","addr":"bootstrap.redacted-kafka-dns.company.com:9094","id":-2147483648}
{"level":"debug","ts":"2020-11-24T11:41:05.600+0100","msg":"connection opened to broker","source":"kafka_client","addr":"bootstrap.redacted-kafka-dns.company.com:9094","id":-2147483648}
{"level":"debug","ts":"2020-11-24T11:41:05.600+0100","msg":"issuing api versions request","source":"kafka_client","version":3}
{"level":"debug","ts":"2020-11-24T11:41:05.625+0100","msg":"initialized api versions","source":"kafka_client","versions":[8,11,5,9,4,3,6,3,8,7,3,7,4,4,5,5,4,1,3,5,4,2,3,3,1,1,1,0,3,2,2,2,3,1,1,2,2,2,2,2,2,2,2,2,1,0,0,0,0,0,-1,-1,-1,-1,-1,-1,-1,-1]}
{"level":"debug","ts":"2020-11-24T11:41:05.625+0100","msg":"connection initialized successfully","source":"kafka_client","addr":"bootstrap.redacted-kafka-dns.company.com:9094","id":-2147483648}
{"level":"debug","ts":"2020-11-24T11:41:06.045+0100","msg":"opening connection to broker","source":"kafka_client","addr":"bootstrap.redacted-kafka-dns.company.com:9094","id":-2147483648}
{"level":"debug","ts":"2020-11-24T11:41:06.169+0100","msg":"connection opened to broker","source":"kafka_client","addr":"bootstrap.redacted-kafka-dns.company.com:9094","id":-2147483648}
{"level":"debug","ts":"2020-11-24T11:41:06.169+0100","msg":"issuing api versions request","source":"kafka_client","version":3}
{"level":"debug","ts":"2020-11-24T11:41:06.194+0100","msg":"initialized api versions","source":"kafka_client","versions":[8,11,5,9,4,3,6,3,8,7,3,7,4,4,5,5,4,1,3,5,4,2,3,3,1,1,1,0,3,2,2,2,3,1,1,2,2,2,2,2,2,2,2,2,1,0,0,0,0,0,-1,-1,-1,-1,-1,-1,-1,-1]}
{"level":"debug","ts":"2020-11-24T11:41:06.194+0100","msg":"connection initialized successfully","source":"kafka_client","addr":"bootstrap.redacted-kafka-dns.company.com:9094","id":-2147483648}
{"level":"debug","ts":"2020-11-24T11:41:06.702+0100","msg":"opening connection to broker","source":"kafka_client","addr":"bootstrap.redacted-kafka-dns.company.com:9094","id":-2147483648}
{"level":"debug","ts":"2020-11-24T11:41:06.810+0100","msg":"connection opened to broker","source":"kafka_client","addr":"bootstrap.redacted-kafka-dns.company.com:9094","id":-2147483648}
{"level":"debug","ts":"2020-11-24T11:41:06.810+0100","msg":"issuing api versions request","source":"kafka_client","version":3}
{"level":"debug","ts":"2020-11-24T11:41:06.835+0100","msg":"initialized api versions","source":"kafka_client","versions":[8,11,5,9,4,3,6,3,8,7,3,7,4,4,5,5,4,1,3,5,4,2,3,3,1,1,1,0,3,2,2,2,3,1,1,2,2,2,2,2,2,2,2,2,1,0,0,0,0,0,-1,-1,-1,-1,-1,-1,-1,-1]}
{"level":"debug","ts":"2020-11-24T11:41:06.835+0100","msg":"connection initialized successfully","source":"kafka_client","addr":"bootstrap.redacted-kafka-dns.company.com:9094","id":-2147483648}
{"level":"debug","ts":"2020-11-24T11:41:07.580+0100","msg":"opening connection to broker","source":"kafka_client","addr":"bootstrap.redacted-kafka-dns.company.com:9094","id":-2147483648}
{"level":"debug","ts":"2020-11-24T11:41:07.684+0100","msg":"connection opened to broker","source":"kafka_client","addr":"bootstrap.redacted-kafka-dns.company.com:9094","id":-2147483648}
{"level":"debug","ts":"2020-11-24T11:41:07.684+0100","msg":"issuing api versions request","source":"kafka_client","version":3}
{"level":"debug","ts":"2020-11-24T11:41:07.709+0100","msg":"initialized api versions","source":"kafka_client","versions":[8,11,5,9,4,3,6,3,8,7,3,7,4,4,5,5,4,1,3,5,4,2,3,3,1,1,1,0,3,2,2,2,3,1,1,2,2,2,2,2,2,2,2,2,1,0,0,0,0,0,-1,-1,-1,-1,-1,-1,-1,-1]}
{"level":"debug","ts":"2020-11-24T11:41:07.709+0100","msg":"connection initialized successfully","source":"kafka_client","addr":"bootstrap.redacted-kafka-dns.company.com:9094","id":-2147483648}
{"level":"debug","ts":"2020-11-24T11:41:08.988+0100","msg":"opening connection to broker","source":"kafka_client","addr":"bootstrap.redacted-kafka-dns.company.com:9094","id":-2147483648}
{"level":"debug","ts":"2020-11-24T11:41:09.092+0100","msg":"connection opened to broker","source":"kafka_client","addr":"bootstrap.redacted-kafka-dns.company.com:9094","id":-2147483648}
{"level":"debug","ts":"2020-11-24T11:41:09.092+0100","msg":"issuing api versions request","source":"kafka_client","version":3}
{"level":"debug","ts":"2020-11-24T11:41:09.117+0100","msg":"initialized api versions","source":"kafka_client","versions":[8,11,5,9,4,3,6,3,8,7,3,7,4,4,5,5,4,1,3,5,4,2,3,3,1,1,1,0,3,2,2,2,3,1,1,2,2,2,2,2,2,2,2,2,1,0,0,0,0,0,-1,-1,-1,-1,-1,-1,-1,-1]}
{"level":"debug","ts":"2020-11-24T11:41:09.117+0100","msg":"connection initialized successfully","source":"kafka_client","addr":"bootstrap.redacted-kafka-dns.company.com:9094","id":-2147483648}
{"level":"debug","ts":"2020-11-24T11:41:10.442+0100","msg":"opening connection to broker","source":"kafka_client","addr":"bootstrap.redacted-kafka-dns.company.com:9094","id":-2147483648}
{"level":"debug","ts":"2020-11-24T11:41:10.562+0100","msg":"connection opened to broker","source":"kafka_client","addr":"bootstrap.redacted-kafka-dns.company.com:9094","id":-2147483648}
{"level":"debug","ts":"2020-11-24T11:41:10.563+0100","msg":"issuing api versions request","source":"kafka_client","version":3}
{"level":"debug","ts":"2020-11-24T11:41:10.588+0100","msg":"initialized api versions","source":"kafka_client","versions":[8,11,5,9,4,3,6,3,8,7,3,7,4,4,5,5,4,1,3,5,4,2,3,3,1,1,1,0,3,2,2,2,3,1,1,2,2,2,2,2,2,2,2,2,1,0,0,0,0,0,-1,-1,-1,-1,-1,-1,-1,-1]}
{"level":"debug","ts":"2020-11-24T11:41:10.588+0100","msg":"connection initialized successfully","source":"kafka_client","addr":"bootstrap.redacted-kafka-dns.company.com:9094","id":-2147483648}
{"level":"debug","ts":"2020-11-24T11:41:11.927+0100","msg":"opening connection to broker","source":"kafka_client","addr":"bootstrap.redacted-kafka-dns.company.com:9094","id":-2147483648}
{"level":"debug","ts":"2020-11-24T11:41:12.055+0100","msg":"connection opened to broker","source":"kafka_client","addr":"bootstrap.redacted-kafka-dns.company.com:9094","id":-2147483648}
{"level":"debug","ts":"2020-11-24T11:41:12.056+0100","msg":"issuing api versions request","source":"kafka_client","version":3}
{"level":"debug","ts":"2020-11-24T11:41:12.081+0100","msg":"initialized api versions","source":"kafka_client","versions":[8,11,5,9,4,3,6,3,8,7,3,7,4,4,5,5,4,1,3,5,4,2,3,3,1,1,1,0,3,2,2,2,3,1,1,2,2,2,2,2,2,2,2,2,1,0,0,0,0,0,-1,-1,-1,-1,-1,-1,-1,-1]}
{"level":"debug","ts":"2020-11-24T11:41:12.081+0100","msg":"connection initialized successfully","source":"kafka_client","addr":"bootstrap.redacted-kafka-dns.company.com:9094","id":-2147483648}

Broker logs (broker id: 0)

[2020-11-24 10:33:55,065] INFO (data-plane-kafka-network-thread-0-ListenerName(EXTERNAL)-SASL_SSL-7:Selector) [SocketServer brokerId=0] Failed authentication with gke-clustername-misc-v1-91481fd4-bdx7.c.redacted-project.internal/172.24.127.216 (Unexpected Kafka request of type METADATA during SASL handshake.)
[2020-11-24 10:33:56,525] INFO (data-plane-kafka-network-thread-0-ListenerName(EXTERNAL)-SASL_SSL-8:Selector) [SocketServer brokerId=0] Failed authentication with gke-clustername-zoo-plat-sandbox-v1-5bb0c044-rfwi.c.redacted-project.internal/172.24.127.24 (Unexpected Kafka request of type METADATA during SASL handshake.)
[2020-11-24 10:33:56,525] INFO (data-plane-kafka-network-thread-0-ListenerName(EXTERNAL)-SASL_SSL-8:Selector) [SocketServer brokerId=0] Failed authentication with gke-clustername-zoo-plat-sandbox-v1-5bb0c044-rfwi.c.redacted-project.internal/172.24.127.24 (Unexpected Kafka request of type METADATA during SASL handshake.)
[2020-11-24 10:34:00,943] INFO (data-plane-kafka-network-thread-0-ListenerName(EXTERNAL)-SASL_SSL-6:Selector) [SocketServer brokerId=0] Failed authentication with gke-clustername-kafka-plat-sandbox-v1-d0cad066-qurw.c.redacted-project.internal/172.24.127.4 (Unexpected Kafka request of type METADATA during SASL handshake.)
[2020-11-24 10:34:00,943] INFO (data-plane-kafka-network-thread-0-ListenerName(EXTERNAL)-SASL_SSL-6:Selector) [SocketServer brokerId=0] Failed authentication with gke-clustername-kafka-plat-sandbox-v1-d0cad066-qurw.c.redacted-project.internal/172.24.127.4 (Unexpected Kafka request of type METADATA during SASL handshake.)

new feature

what you think about semi-autocommit feature?
you already have CommitRecords func, but if i want to specify autocommit interval and CommitRecords not commit passed records immediately but in provided interval. So i can call CommitRecords with 1,2,3,5 records that not commited right now, but accumulated. For example i'm specify autocommit interval 5s, in this case i can call CommitRecords multiple times with one or many records and after interval happens - client send commit offsets request to kafka with accumulated offsets.
What you think ?

round-robin partitioner

Why franz-go does not contain roundrobin partitioner ?
My use case - fill all partitions with equal number of messages

User feedback

Hi,
since there hasn't been any user feedback so far I'd like to give some more feedback from a user's perspective how you can make kgo easier to pick up:

  1. Provide some usage examples for getting started. You could create a dedicated folder for this, let's say "recipes" or "examples".

  2. Add two code samples how to connect to a Kafka broker. One in the simplest form (localhost, plaintext, no auth) and another more complex but still kinda common sample (multiple brokers, modified dns lookup strategy, SASL, TLS).

  3. From just looking at the documentation it is unclear when kgo.NewClient() will return. Some libraries return if at least one Kafka broker is successfully connected. I don't think this is the case here, but I can see that it spins up a goroutine to periodically update the metadata. See: https://github.com/twmb/kafka-go/blob/master/pkg/kgo/client.go#L88-L90

  4. It would be nice if I can access literally all data you ever receive from the Kafka cluster. For instance right after connecting I might want to log all discovered and connected broker IDs, rack IDs and their actual addresses. If I use DNS aliases (e.g. all-brokers.hostname.com) I don't know the actual broker hostnames at this point. I assume I can get that data anyways via additional metadata requests, but if the data is already there why sending another request for it?

I'm happy to provide more feedback as I explore Kafka-go if desired :)

Partition numbers

Hi,

This is not an issue and more of a question about the franz-go API / Kafka internals.

I notice that some requests require the client to specify the partition number, and it seems like it's a deliberate choice to have an array of assignment structs rather than a simple [][]int32 array. Is there a reason behind this? Can the numbering of partitions deviate from a strictly incrementing 1, 2, 3, 4, ... and sometimes have missing numbers?

e.g.
https://pkg.go.dev/github.com/twmb/[email protected]/pkg/kmsg#MetadataResponseTopicPartition
https://pkg.go.dev/github.com/twmb/[email protected]/pkg/kmsg#CreateTopicsRequestTopicReplicaAssignment

Support for IAM roles in AWS_MSK_IAM SASL mechanism

Hi, currently (as of v0.7.4) the aws.Auth object only accepts AccessKey and SecretKey, which is good for supporting long-lived IAM user credentials. However, this struct does not support short-lived IAM role credentials, which also need a session token.

Support for IAM roles will be quite given the common use of EC2 instance profile roles to identify applications.

This is something that the canonical Java implementation supports. As described in there, the session token (if present) will need to be wired into the x-amz-security-token field in the SASL authentication JSON payload.

Would be nice if we could support this in franz-go as well.

ipv6 addr not supported

if colon := strings.IndexByte(addr, ':'); colon > 0 {
			port64, err := strconv.ParseInt(addr[colon+1:], 10, 64)
			if err != nil {
				return nil, fmt.Errorf("unable to parse addr:port in %q", seedBroker)
			}
			addr = addr[:colon]
			port = int32(port64)
		}

you need to use net.SplitHostPort func

CLUSTER_AUTHORIZATION_FAILED when producing.

Using franz-go 0.6.9 with Go 1.16

I create client like

	client, err := kgo.NewClient(
		kgo.SeedBrokers(fmt.Sprintf("%s:%d", address, port)),
		kgo.SASL(scram.Auth{User: username, Pass: secret}.AsSha256Mechanism()),
		kgo.BatchCompression(kgo.SnappyCompression()),
		kgo.ClientID("my-client-id"),
		kgo.MaxVersions(kversion.V2_6_0()),
		kgo.WithLogger(&loggerForFranz{log, kgo.LogLevelInfo}),
	)

and use it to issue MetadataRequest and ListOffsetsRequest, then set client.AssignPartitions() based on response and consume records from topic. This works ok. I leave an background goroutine PollFetches-ing.

Using the same client I also try to produce record to topic:

	err := client.Produce(ctx,
		&kgo.Record{
			Topic: "my.topic.name",
			Key:   key,
			Value: value,
		},
		func(r *kgo.Record, e error) {
			if e != nil {
				// log error
			}
		})

this logs error from promise: "CLUSTER_AUTHORIZATION_FAILED: Cluster authorization failed.".

stack trace

/app/pkg/mod/github.com/twmb/[email protected]/pkg/kgo/producer.go : 147 : github.com/twmb/franz-go/pkg/kgo.(*Client).finishRecordPromise
/app/pkg/mod/github.com/twmb/[email protected]/pkg/kgo/sink.go : 1184 : github.com/twmb/franz-go/pkg/kgo.(*recBuf).lockedFailAllRecords
/app/pkg/mod/github.com/twmb/[email protected]/pkg/kgo/sink.go : 1166 : github.com/twmb/franz-go/pkg/kgo.(*recBuf).failAllRecords
/app/pkg/mod/github.com/twmb/[email protected]/pkg/kgo/sink.go : 305 : github.com/twmb/franz-go/pkg/kgo.(*sink).drain

and logs from the lib:

initializing producer id
producer id initialization errored
map[err:CLUSTER_AUTHORIZATION_FAILED: Cluster authorization failed.]
InitProducerID or AddPartitionsToTxn, failing producer id
map[err:CLUSTER_AUTHORIZATION_FAILED: Cluster authorization failed.]
ignoring a fail producer id request due to current id being different
map[current_epoch:-1 current_err:CLUSTER_AUTHORIZATION_FAILED: Cluster authorization failed. current_id:-1 fail_epoch:0 fail_err:CLUSTER_AUTHORIZATION_FAILED: Cluster authorization failed. fail_id:0]

What I'm doing wrong?

wait for reconnect with backoff

i'm simulate kafka server down via adding lag/packet drops or disconnect.
you package works fine and try to reconnect but

{"addr":"127.0.0.1:29091","broker":"seed 1","caller":"kgo/broker.go:558","level":"debug","msg":"opening connection to broker","timestamp":"2021-08-23 23:12:59"}
{"addr":"127.0.0.1:29091","broker":"seed 1","caller":"kgo/broker.go:568","err":{"Op":"dial","Net":"tcp","Source":null,"Addr":{"IP":"127.0.0.1","Port":29091,"Zone":""},"Err":{"Syscall":"connect","Err":61}},"level":"warn","msg":"unable to open connection to broker","timestamp":"2021-08-23 23:12:59"}
{"addr":"127.0.0.2:29092","broker":"seed 2","caller":"kgo/broker.go:558","level":"debug","msg":"opening connection to broker","timestamp":"2021-08-23 23:12:59"}
{"addr":"127.0.0.2:29092","broker":"seed 2","caller":"kgo/broker.go:568","err":{"Op":"dial","Net":"tcp","Source":null,"Addr":{"IP":"127.0.0.2","Port":29092,"Zone":""},"Err":{"Syscall":"connect","Err":61}},"level":"warn","msg":"unable to open connection to broker","timestamp":"2021-08-23 23:12:59"}
{"addr":"127.0.0.1:29091","broker":"1001","caller":"kgo/broker.go:558","level":"debug","msg":"opening connection to broker","timestamp":"2021-08-23 23:12:59"}
{"addr":"127.0.0.1:29091","broker":"1001","caller":"kgo/broker.go:568","err":{"Op":"dial","Net":"tcp","Source":null,"Addr":{"IP":"127.0.0.1","Port":29091,"Zone":""},"Err":{"Syscall":"connect","Err":61}},"level":"warn","msg":"unable to open connection to broker","timestamp":"2021-08-23 23:12:59"}
{"addr":"127.0.0.3:29093","broker":"seed 0","caller":"kgo/broker.go:558","level":"debug","msg":"opening connection to broker","timestamp":"2021-08-23 23:12:59"}
{"addr":"127.0.0.3:29093","broker":"seed 0","caller":"kgo/broker.go:568","err":{"Op":"dial","Net":"tcp","Source":null,"Addr":{"IP":"127.0.0.3","Port":29093,"Zone":""},"Err":{"Syscall":"connect","Err":61}},"level":"warn","msg":"unable to open connection to broker","timestamp":"2021-08-23 23:12:59"}

as you see - it tries to connect without waiting, so it try to ddos kafka server if it have low resources and also spam in logs.
what you think about adding some func that can add backoff for reconnecting ?

fetches.EachDistinctPartition

how can i get all messages what belongs to different partitions ? I want to have slice of records in which all items belongs to different partitions?

[Question] PollFetch Error Handling

While writing a cooperative consumer, I read in the doc that all FetchErrors should be inspected prior to iterating records. I'm wondering what'd be the best course of action to handle retriable & non-retriable errors respectively from consumer side.

My current implementation simply ignores any records from partitions that returned errors, and retries PollFetch() later on, hoping that the next PollFetch() will return records from subscribed partitions (assuming all broker & network errors are fixed).

Is it safe to assume that retrying PollFetch() will return records from partitions that returned either retriable or non-retriable errors?

For a bit of context, this is how my consumer loop currently works:

  1. Calls PollFetch()
  2. Loop Fetches.Errors() and build a map of map[topic][partition] = error
  3. For each Record, checks if the record's topic & partition exists in the map from step 2.
    • If true, skip the record
    • If false, process the record and commit offset
  4. Repeat

options to limit max memory uaage

I saw in docs many options to limit memory per partition, broker, fetch and other.
But in microservices we need to limit upper memory size not , and number of brokers and partitions does not matter because it can change over time.
Can you suggest how to limit?

Data race in consumer

Version: 0.6.14

==================
WARNING: DATA RACE
Write at 0x00c0007e50b0 by goroutine 148:
  runtime.mapassign_fast64()
      runtime/map_fast64.go:92 +0x0
  github.com/twmb/franz-go/pkg/kgo.(*usedCursors).use()
      github.com/twmb/[email protected]/pkg/kgo/consumer.go:134 +0x186
  github.com/twmb/franz-go/pkg/kgo.(*consumerSession).handleListOrEpochResults.func2()
      github.com/twmb/[email protected]/pkg/kgo/consumer.go:1135 +0xef
  github.com/twmb/franz-go/pkg/kgo.(*consumerSession).handleListOrEpochResults()
      github.com/twmb/[email protected]/pkg/kgo/consumer.go:1144 +0x429
  github.com/twmb/franz-go/pkg/kgo.(*consumerSession).listOrEpoch()
      github.com/twmb/[email protected]/pkg/kgo/consumer.go:1070 +0xa44

Previous write at 0x00c0007e50b0 by goroutine 70:
  runtime.mapassign_fast64()
      runtime/map_fast64.go:92 +0x0
  github.com/twmb/franz-go/pkg/kgo.(*usedCursors).use()
      github.com/twmb/[email protected]/pkg/kgo/consumer.go:134 +0x186
  github.com/twmb/franz-go/pkg/kgo.(*consume  github.com/twmb/franz-go/pkg/kgo.(*consumerSession).handleListOrEpochResults()
      github.com/twmb/[email protected]/pkg/kgo/consumer.go:1144 +0x429
  github.com/twmb/franz-go/pkg/kgo.(*consumerSession).listOrEpoch()
      github.com/twmb/[email protected]/pkg/kgo/consumer.go:1070 +0xa44

Goroutine 148 (running) created at:
  github.com/twmb/franz-go/pkg/kgo.listOrEpochLoads.loadWithSession()
      github.com/twmb/[email protected]/pkg/kgo/consumer.go:748 +0xca
  github.com/twmb/franz-go/pkg/kgo.(*consumerSession).handleListOrEpochResults.func1.1()
      github.com/twmb/[email protected]/pkg/kgo/consumer.go:1124 +0x259

Goroutine 70 (finished) created at:
  github.com/twmb/franz-go/pkg/kgo.listOrEpochLoads.loadWithSession()
      github.com/twmb/[email protected]/pkg/kgo/consumer.go:748 +0xca
  github.com/twmb/franz-go/pkg/kgo.(*consumerSession).handleListOrEpochResults.func1.1()
      github.com/twmb/[email protected]/pkg/kgo/consumer.go:1124 +0x259
==================

Predetermine supported API versions before making requests

Thank you for this excellent library 🥇

I'm working on a small client to handle some admin requests and I'm trying to get my head around how to handle different Kafka API versions for the features I would like to support. Take for example the command you have to alter config in kcl here. I notice it requires the user to opt-in with --inc for use of the incremental method introduced in Kafka 2.3.0.

My question is, is there a way to predetermine what API version the broker(s) support and in a client automatically chose to issue a IncrementalAlterConfigsRequest or AlterConfigsRequest?

AssignGroup fails with an invalid coordinator ID

The first AssignGroup() call with a new consumer group name always fails with this error: Kafka replied that group xxx has broker coordinator -1, but did not reply with that broker in the broker list. The weird thing is when I restart my consumer, it successfully creates and joins the group. I'm using v0.7.0, but I confirmed that the same issue occurs with v0.6.10.

Here are the logs from the consumer:

DEBUG	kgoclient   opening connection to broker	{"addr": "kafka:9092", "broker": -2147483648}
DEBUG	kgoclient   connection opened to broker	{"addr": "kafka:9092", "broker": -2147483648}
DEBUG	kgoclient   issuing api versions request	{"broker": -2147483648, "version": 3}
DEBUG	kgoclient   wrote ApiVersions v3	{"broker": -2147483648, "bytes_written": 29, "write_wait": "193.9µs", "time_to_write": "270.4µs", "err": null}
DEBUG	kgoclient   read ApiVersions v3	{"broker": -2147483648, "bytes_read": 404, "read_wait": "114.5µs", "time_to_read": "17.6136ms", "err": null}
DEBUG	kgoclient   connection initialized successfully	{"addr": "kafka:9092", "broker": -2147483648}
DEBUG	kgoclient   wrote Metadata v9	{"broker": -2147483648, "bytes_written": 34, "write_wait": "22.8808ms", "time_to_write": "309.6µs", "err": null}
DEBUG	kgoclient   read Metadata v9	{"broker": -2147483648, "bytes_read": 108, "read_wait": "94.4µs", "time_to_read": "5.4235ms", "err": null}
INFO	kgoclient   beginning to manage the group lifecycle
INFO	kgoclient   joining group
DEBUG	kgoclient   wrote FindCoordinator v3	{"broker": -2147483648, "bytes_written": 52, "write_wait": "87.9µs", "time_to_write": "339.1µs", "err": null}
DEBUG	kgoclient   read FindCoordinator v3	{"broker": -2147483648, "bytes_read": 59, "read_wait": "99.9µs", "time_to_read": "48.9366ms", "err": null}
DEBUG	kgoclient   wrote Metadata v9	{"broker": -2147483648, "bytes_written": 23, "write_wait": "121.6µs", "time_to_write": "72.5µs", "err": null}
DEBUG	kgoclient   read Metadata v9	{"broker": -2147483648, "bytes_read": 481, "read_wait": "91.4µs", "time_to_read": "2.9127ms", "err": null}
DEBUG	kgoclient   wrote Metadata v9	{"broker": -2147483648, "bytes_written": 23, "write_wait": "108.7µs", "time_to_write": "62.6µs", "err": null}
DEBUG	kgoclient   read Metadata v9	{"broker": -2147483648, "bytes_read": 481, "read_wait": "292.6µs", "time_to_read": "5.8912ms", "err": null}
ERROR	kgoclient   join and sync loop errored	{"err": "Kafka replied that group XXXXX has broker coordinator -1, but did not reply with that broker in the broker list", "consecutive_errors": 1, "backoff": "2s"}

Maybe my broker isn't setup correctly? I'm running a Kafka broker and a Zookeeper locally with docker-compose, making sure broker.id is set correctly:

zookeeper:
  image: zookeeper
  ports:
    - "2181:2181"
kafka:
  image: wurstmeister/kafka:latest
  ports:
    - "9092:9092"
  environment:
    KAFKA_BROKER_ID: 1
    KAFKA_ADVERTISED_HOST_NAME: kafka
    KAFKA_PORT: 9092
    KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
  depends_on:
      - zookeeper

IPV4 Broker Address Parsing Regression

8da4eaa has introduced a regression where using IPV4 addr:port fails to parse:

package main

import "github.com/twmb/franz-go/pkg/kgo"

func main() {
	seeds := []string{"localhost:9092"}
	cl, err := kgo.NewClient(
		kgo.SeedBrokers(seeds...),
		kgo.ConsumerGroup("my-group-identifier"),
		kgo.ConsumeTopics("foo"),
	)
	if err != nil {
		panic(err)
	}
	defer cl.Close()
}

Results in:

panic: unable to split host port in "localhost:9092:9092": address localhost:9092:9092
: too many colons in address

Looks like the parsing code incorrectly treats this as an IPV6 address without a port and appends the default port

} else if colon > 0 && seedBroker[colon-1] != ']' { // ipv6 without port (since port would follow bracket)

kadmin package design meta-issue

It may be worth it to add a kadmin package that provides convenience functions for interacting with Kafka in an "admin" like way. This would include creating / deleting topics, listing groups, getting the lag for groups, etc.

I've avoided adding this because I'm not sure how to do this in a forward compatible way as Kafka can add new fields to requests or responses at any time. If an API can be designed such that methods are either specific enough to not need extending, or generic enough that they can be extended in necessary, then a kadmin package would be great.

At a high level, the following should be supported:

  • create topics
  • delete topics
  • query topic (likely metadata request for an individual topic)
  • list groups
  • get lag for groups

The list above is to be extended as more needs come up. This comment will grow to eventually encompass an actual design, and once settled, this issue will track adding the kadmin package. Or, in the worst case, a kadmin package just cannot have a clean design and is not worth implementing.

Expose producer buffered record count in public API and/or metrics

Providing read only access to the number of records currently buffered within a producer would be very useful for production monitoring, performance measurements and estimation of configuration values (primarily MaxBufferedRecords). Currently, producer.bufferedRecords is completely internal and franz-go provides no public API (or metrics) to read the current value.

I'm not yet 100% sure on what the API should be for this, however I am more than happy to discuss further implementation details if it is a feature that franz-go would consider supporting 😄

Crash in metadata.go

Hi there 👋 😄

I'm getting a crash in franz-go at this line because cursor is nil.
The comment for that field says // Only one of records or cursor is non-nil. and it seems that the code I linked isn't checking for that.

https://github.com/twmb/franz-go/blob/master/pkg/kgo/metadata.go#L566

Not sure if its helpful, but the issue happens with 0.6.13, but does not happen with 0.6.12.

LMK if there's some more info I can provide.

Support search for offsets by timestamp

Hello there!

It is actually super useful to have this little thingy out of the box:
Get offset by given timestamp

It is supported by sarama, actually. But this lib suites our needs way better
So we'd glad to have it here =)

Link to OfffsetAPI https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetRequest

Link to corresponding KIP: https://cwiki.apache.org/confluence/display/KAFKA/KIP-33+-+Add+a+time+based+log+index

strange profile data

487.41MB 32.71% 32.71%   487.41MB 32.71%  github.com/unistack-org/micro-broker-kgo/v3.(*subscriber).handleFetches.func2
  462.56MB 31.04% 63.75%   462.56MB 31.04%  github.com/twmb/franz-go/pkg/kgo.recordToRecord
  294.14MB 19.74% 83.48%   294.14MB 19.74%  github.com/twmb/franz-go/pkg/kgo.readRawRecords
  125.83MB  8.44% 91.93%   125.83MB  8.44%  github.com/twmb/franz-go/pkg/kgo.(*cursorOffsetNext).maybeKeepRecord (inline)
   59.51MB  3.99% 95.92%    59.51MB  3.99%  github.com/klauspost/compress/s2.Decode
   32.28MB  2.17% 98.09%    32.28MB  2.17%  github.com/unistack-org/micro-broker-kgo/v3.(*subscriber).handleFetches
   21.48MB  1.44% 99.53%    21.48MB  1.44%  github.com/twmb/franz-go/pkg/kgo.(*brokerCxn).readConn.func2
         0     0% 99.53%   942.04MB 63.21%  github.com/twmb/franz-go/pkg/kgo.(*cursorOffsetNext).processRecordBatch
         0     0% 99.53%   942.04MB 63.21%  github.com/twmb/franz-go/pkg/kgo.(*cursorOffsetNext).processRespPartition
         0     0% 99.53%    59.51MB  3.99%  github.com/twmb/franz-go/pkg/kgo.(*decompressor).decompress
         0     0% 99.53%   942.04MB 63.21%  github.com/twmb/franz-go/pkg/kgo.(*source).fetch.func3
         0     0% 99.53%   942.04MB 63.21%  github.com/twmb/franz-go/pkg/kgo.(*source).handleReqResp
         0     0% 99.53%    32.28MB  2.17%  github.com/unistack-org/micro-broker-kgo/v3.(*subscriber).run
         0     0% 99.53%   487.91MB 32.74%  golang.org/x/sync/errgroup.(*Group).Go.func1

i'm not using any compression and pass NoCompresion

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.