Git Product home page Git Product logo

Comments (10)

simonsouter avatar simonsouter commented on August 24, 2024

Hi,

Thanks for your interest. We have looked into using the async call here, but its not actually as trivial as you might think. The Consumer Actor would need additional state logic to recognise it is in state 'Awaiting Commit Confirmation', and that it can't attempt to make further polls. We decided that is was a reasonable compromise to block here to avoid this additional complexity.

While we have managed to avoid much of the blocking that would occur if you used the Java driver directly, there are some cases that are impossible/ difficult to avoid given the blocking nature of the underlying driver.

We would consider making further optimisation, such as the one you suggest, if we felt there would be any real improvement to performance or thread utilisation. We have profiled the ConsumerActor in detail and its pretty optimal and provides excellent throughput. If there is any use case or profiling that would to suggest otherwise, we would certainly look into it.

Cheers

from scala-kafka-client.

haggy avatar haggy commented on August 24, 2024

Is the client not able to poll while it's committing the offsets?

from scala-kafka-client.

simonsouter avatar simonsouter commented on August 24, 2024

We don't do that. The call to commitSync will throw an exception if a rebalance is occurring (expected in multi-partition mode). If we use commitAsync and continue to process messages, its likely that a rebalance will cause redeliveries to occur unnecessarily.

This logic is complex due to the Kafka driver not really supporting our async use case very well. If the Kafka driver team make some improvements in this area we will revisit it, for sure. I know it has been discussed on the Kafka mailing list in the past.

from scala-kafka-client.

haggy avatar haggy commented on August 24, 2024

Sorry for the long reply time and thanks for the info. Im seeing pretty slow consumer rates which I believe is attributed to only running 1 consumer per node which blocks on every commit. I am not using auto commit because I need to be able to tightly control when messages are considered "processed". Any tips to speed this up? I was thinking of running multiple Kafka Consumer actors per node but I am not sure what the commit semantics look like with that setup. I know that it will log a warning if you try to commit offsets that the client wasn't expecting. Any thoughts?

from scala-kafka-client.

jkpl avatar jkpl commented on August 24, 2024

Hiya,

A couple of questions to start with:

  • What kind of rates are you seeing? What are your expectations?
  • What kind of computation do you do while consuming messages? Any IO calls (e.g. HTTP requests)?
  • What kind of batch sizes are you pulling from Kafka? Tuning the batch size might help you reduce the overhead in your message processing code.

Indeed, auto-commit mode is rarely something that should be used. The consumer actor expects the Offsets to arrive as a confirmation to processed messages. It's actually only used for distinguishing different confirmation messages from each other in order to deduplicate confirmations, and it's not used for anything else.

Launching multiple consumers is a valid strategy. Each consumer will subscribe to a subset of the available partitions in the topic. In order to get the most out of it, you want to have a message processor per consumer. If you want to scale the message processing beyond single machine, you can launch multiple instances of your application to consume from the same topic. In the auto-partition mode, Kafka will automatically manage partition to node assignment. Remember that the parallelism of the topic is bounded by the partition count and the message ordering is only guaranteed per partition.

You might also find the discussions in issue #32 useful.

from scala-kafka-client.

haggy avatar haggy commented on August 24, 2024

Some answers below

What kind of rates are you seeing? What are your expectations?

This is surprisingly difficult to calculate due to crappy lag metrics in kafka but preliminary (and very simple) metric'ing shows around 50/second which is much lower than needed even in our development env. I expect to see hundreds/thousands per second. We are running 3 consumers on the kafka cluster (these are physical nodes, with each one currently having just 1 instance of KafkaConsumer).

What kind of computation do you do while consuming messages? Any IO calls (e.g. HTTP requests)?

Yes there is IO to Cassandra but Cassandra is currently very bored. The bottleneck isn't there.

What kind of batch sizes are you pulling from Kafka? Tuning the batch size might help you reduce the overhead in your message processing code.

I have max.poll.messages set to 1000.

Launching multiple consumers is a valid strategy.

So what I would want to do is put N consumers behind an akka router. How would this work with offset confirm? After skimming through the scala client source quite a bit, it looks like the kafka consumer actors do keep offset state and warnings are shown if a consumer is given offsets to confirm that it was not expecting.

You might also find the discussions in issue #32 useful.

Im looking through that now :)

from scala-kafka-client.

jkpl avatar jkpl commented on August 24, 2024

There's a couple of other tunables in KafkaConsumerActor.Conf I forgot to mention:

  • Try setting unconfirmedTimeout higher or disable it completely. It might be that you're being spammed by redeliveries.
  • Reduce the scheduleInterval. It dictates how often Kafka is polled by the consumer actor.

I'd be interested to hear what settings you've tried, and if you find success in tuning these values.

So what I would want to do is put N consumers behind an akka router. How would this work with offset confirm? After skimming through the scala client source quite a bit, it looks like the kafka consumer actors do keep offset state and warnings are shown if a consumer is given offsets to confirm that it was not expecting.

Make sure you send the offsets back to the same consumer actor that originally sent the data. The offsets are used for deduplicating the batch confirmations. Nothing more.

from scala-kafka-client.

haggy avatar haggy commented on August 24, 2024

Reduce the scheduleInterval. It dictates how often Kafka is polled by the consumer actor.

Ah good call. I'll try that asap.

Make sure you send the offsets back to the same consumer actor

Yea that was my main question so knowing that, I think I can come up with a semi-simple way to map Offsets -> Consumer via some kind of correlation ID setup.

I'll update the scheduleInterval and let you know what difference that makes. Thanks!

from scala-kafka-client.

jkpl avatar jkpl commented on August 24, 2024

I think I can come up with a semi-simple way to map Offsets -> Consumer via some kind of correlation ID setup.

You could start with something simple like creating a dedicated message processor for each consumer rather than a pool of processors. The consumers will not send messages to the message processors until they've received a confirmation back, so there's not much to load balance. Therefore, if you have more message processors than consumer actors, you will not be fully utilising the pool. If you have less message processors than consumer actors, some of the consumers will remain idle and block other consumers from accessing the partitions assigned to the idle consumer. Thus, you'll end up with 1-to-1 consumer to message processor mapping anyway.

from scala-kafka-client.

haggy avatar haggy commented on August 24, 2024

Ok so a little status update. I ended going with the N consumers per application instance strategy and so far it's paying off quite well. Here's a before/after graph:

image

At 19:15 UTC is when I deployed that to our dev environment. We are much more parallelized now and confirming many more batches of offsets than before. Im going to get a description and some code together for what the solution ended up being after I verify everything is still working as expected.

from scala-kafka-client.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.