Git Product home page Git Product logo

Comments (10)

achille-roussel avatar achille-roussel commented on May 14, 2024

Would it be possible for you to share the code sample thy triggered the panic?

from kafka-go.

tolitius avatar tolitius commented on May 14, 2024

it is spread over several functions, but comes down to something like this:

func createConsumer(config map[string]string, partition int) *kafka.Reader {

	maxBytes, err := strconv.Atoi(config[max_partition_fetch_bytes])
	brokerList := strings.Split(config[brokers], ",")

	if err != nil {
		log.Fatalf("\ncould not parse a number for 'max_partition_fetch_bytes' from config %+v due to:%+v\n", err)
	}

	consumer := kafka.NewReader(kafka.ReaderConfig{
		Brokers:   brokerList,
		Topic:     config[topic],
		Partition: partition,
		MaxBytes:  maxBytes,
	})

	fmt.Printf("created consumer: %+v\n", *consumer)

	return consumer
}

and in another consume go routine:

	for ctx.Err() == nil {
		m, err := consumer.ReadMessage(ctx)      ;;  <<<<<<< (!) PANICS FROM HERE
		if err != nil {
			//TODO: fatal for now to fail fast (and learn the reasons) until recover / retry logic is in
			log.Fatalf("\ncould not consume from partition: %+v, %+v \ndue to:%+v\n", consumer.Stats(), consumer.Config(), err)
		} else {
			...
		}
	}

from kafka-go.

tolitius avatar tolitius commented on May 14, 2024

caused by OffsetOutOfRange. It seems that even when this error occurs, the flow continues into the readLoop: and fails here:

switch offset, err = r.read(ctx, offset, conn); err {

because the connection is nil (i.e. was not created due to the OffsetOutOfRange).

the real panic due to the nil connection is here:

conn.SetReadDeadline(t0.Add(r.maxWait))

minor:

return "Ouffset Out Of Range"
error message spelling

from kafka-go.

achille-roussel avatar achille-roussel commented on May 14, 2024

Thanks for the report, I'll send a PR with a test to reproduce the issue and a fix.

from kafka-go.

achille-roussel avatar achille-roussel commented on May 14, 2024

I'm having trouble reproducing the issue, it seems like this code should be taking care of not having offsets out of range issues. Could it be a race between the reader reading from the first offset and kafka expiring those messages?

Would you happen to know what's the range of valid offsets on your partition, and which offset the reader was trying to consume from? (looking at your code snippet it seems like it's reading from the first offset).

from kafka-go.

tolitius avatar tolitius commented on May 14, 2024

Could it be a race between the reader reading from the first offset and kafka expiring those messages?

I don't think this is the case due to the next answer

Would you happen to know what's the range of valid offsets on your partition, and which offset the reader was trying to consume from?

I did point it to a different Kafka broker that did not yet have messages for a partition. But I read offsets from Consul, which remembered offsets from a previous broker. i.e. so any offset set to greater than zero for a brand new partition results in this panic

looking at your code snippet it seems like it's reading from the first offset

I do manual offset management: recording them in a sync.Map, and periodically persisting them to Consul. When the app starts it reads Consul along with all the partition offsets

from kafka-go.

achille-roussel avatar achille-roussel commented on May 14, 2024

Hm... I tried writing a test that would do r.SetOffset(42) on an empty topic but it didn't trigger the issue, I'll dig more.

from kafka-go.

achille-roussel avatar achille-roussel commented on May 14, 2024

Ah! Never mind I got it, in the test the topic didn't exist until messages were produced to it. See the PR for the test and fix.

Thanks for your help making this project better!

from kafka-go.

tolitius avatar tolitius commented on May 14, 2024

just in case: Kafka version kafka_2.12-0.10.2.0

if I put continue after OffsetOutOfRange case, as in your pull request, it does not panic on a wrong offset.

At this point this is not a blocker for me, since theoretically "it's a user error", but it would be nice to avoid a panic of course.

from kafka-go.

tolitius avatar tolitius commented on May 14, 2024

great, sure always welcome, thanks for a quick fix :)

from kafka-go.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.