Git Product home page Git Product logo

Comments (7)

briangmotive avatar briangmotive commented on May 14, 2024 3

Please add this to the README. It is incredibly important for people to know that the behavior of the Reader (even when calling FetchMessage() is not to replay messages that weren't committed back, its to keep reading no matter what.

😢 spent a LOT of time trying to figure this out.

from kafka-go.

kidlj avatar kidlj commented on May 14, 2024 2

You can use a buffered channel to retry failed message, until it succeeds and proceeds to next message automatically.

func (s *Server) collectionWorker(ctx context.Context) {
    ch := make(chan kafka.Message, 1)

    // message processing logic
    do := func(m kafka.Message) error {
        err, o := unmarshal(m)
        if err != nil {
             // if errored, don't retry
             return nil
        }
        err = s.collectionService.Save(o)
        return err
    }

    for {
		select {
		case <-ctx.Done():
			s.collectionService.KafkaReader.Close()
			return
		case m := <-ch:
                        // retry back-off
			time.Sleep(3 * time.Second)
			fmt.Printf("*** RETRY message at topic/partition/offset %v/%v/%v\n", m.Topic, m.Partition, m.Offset)
			err := do(m)
			if err != nil {
				ch <- m
				break
			}
			s.collectionService.KafkaReader.CommitMessages(ctx, m)
		default:
			m, err := s.collectionService.KafkaReader.FetchMessage(ctx)
			if err != nil {
				fmt.Printf("fetch messages error: %v\n", err)
				break
			}
			err = do(m)
			if err != nil {
				ch <- m
				break
			}
			s.collectionService.KafkaReader.CommitMessages(ctx, m)
		}
	}
}

from kafka-go.

michalkarolik avatar michalkarolik commented on May 14, 2024 1

Thank you for answer, at least I know I did not confuse some things in my head :) I have to admit it was a little of surprise (in kafka itself, not library) that you can even poll next message without commiting previous one, I was sure that poll will fetch the same not commited message again and all we need to do is to ensure that message/processing is idempotent.

from kafka-go.

achille-roussel avatar achille-roussel commented on May 14, 2024 1

I'll update the documentation to make it more explicit what the behavior of the reader is, then close this issue.

from kafka-go.

abraithwaite avatar abraithwaite commented on May 14, 2024

I've been thinking about this problem quite a bit recently.

I've had difficulty coming up with solutions beyond just not processing events in a concurrent context. We can provide a solid guarantee that messages from an individual partition will be returned from the client library in-order but it's the responsibility of the caller to ensure that you don't ack a more recent event before and older event has finished processing.

This obviously is not the dream. The dream is that the library validates acking events in-order for you, but that presents a ton of complications as well suchas communicating when the library hasn't seen a particular offset back to the library consumer. What do you do in those cases?

Anyway, at the very least we should document this behavior and come up with some general guidance.

from kafka-go.

savaki avatar savaki commented on May 14, 2024

Agree that more documentation would be useful. However, I'm not sure this is a kafka-go issue. While polling for messages and committing offsets seem like they should be tightly coupled activities, they're actually independent activities within Kafka. One can poll for messages and never commit an offset. Likewise, one can commit offsets without ever having polled for a message.

This lack of coupling enables the consumer to provide logic suitable for its scenarios. For example:

  • commit after each message - the example the OP gave
  • commit after N messages e.g. batch operations
  • commit periodically - very useful for high throughput async operations
  • commit without ever reading a message - useful for transferring consumers to another topic (copy messages from topic A to topic B, update offsets on topic B for all consumers of topic A)

Rather than changing the core library, I could adding utility functions (perhaps in a contrib package) to help with each of the use cases.

from kafka-go.

abraithwaite avatar abraithwaite commented on May 14, 2024

We could leave them decoupled, but I don't think any kafka library is complete without an API for offset management.

from kafka-go.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.