Comments (10)
Hi,
Thanks for your interest. We have looked into using the async call here, but its not actually as trivial as you might think. The Consumer Actor would need additional state logic to recognise it is in state 'Awaiting Commit Confirmation', and that it can't attempt to make further polls. We decided that is was a reasonable compromise to block here to avoid this additional complexity.
While we have managed to avoid much of the blocking that would occur if you used the Java driver directly, there are some cases that are impossible/ difficult to avoid given the blocking nature of the underlying driver.
We would consider making further optimisation, such as the one you suggest, if we felt there would be any real improvement to performance or thread utilisation. We have profiled the ConsumerActor in detail and its pretty optimal and provides excellent throughput. If there is any use case or profiling that would to suggest otherwise, we would certainly look into it.
Cheers
from scala-kafka-client.
Is the client not able to poll while it's committing the offsets?
from scala-kafka-client.
We don't do that. The call to commitSync will throw an exception if a rebalance is occurring (expected in multi-partition mode). If we use commitAsync and continue to process messages, its likely that a rebalance will cause redeliveries to occur unnecessarily.
This logic is complex due to the Kafka driver not really supporting our async use case very well. If the Kafka driver team make some improvements in this area we will revisit it, for sure. I know it has been discussed on the Kafka mailing list in the past.
from scala-kafka-client.
Sorry for the long reply time and thanks for the info. Im seeing pretty slow consumer rates which I believe is attributed to only running 1 consumer per node which blocks on every commit. I am not using auto commit because I need to be able to tightly control when messages are considered "processed". Any tips to speed this up? I was thinking of running multiple Kafka Consumer actors per node but I am not sure what the commit semantics look like with that setup. I know that it will log a warning if you try to commit offsets that the client wasn't expecting. Any thoughts?
from scala-kafka-client.
Hiya,
A couple of questions to start with:
- What kind of rates are you seeing? What are your expectations?
- What kind of computation do you do while consuming messages? Any IO calls (e.g. HTTP requests)?
- What kind of batch sizes are you pulling from Kafka? Tuning the batch size might help you reduce the overhead in your message processing code.
Indeed, auto-commit mode is rarely something that should be used. The consumer actor expects the Offsets
to arrive as a confirmation to processed messages. It's actually only used for distinguishing different confirmation messages from each other in order to deduplicate confirmations, and it's not used for anything else.
Launching multiple consumers is a valid strategy. Each consumer will subscribe to a subset of the available partitions in the topic. In order to get the most out of it, you want to have a message processor per consumer. If you want to scale the message processing beyond single machine, you can launch multiple instances of your application to consume from the same topic. In the auto-partition mode, Kafka will automatically manage partition to node assignment. Remember that the parallelism of the topic is bounded by the partition count and the message ordering is only guaranteed per partition.
You might also find the discussions in issue #32 useful.
from scala-kafka-client.
Some answers below
What kind of rates are you seeing? What are your expectations?
This is surprisingly difficult to calculate due to crappy lag metrics in kafka but preliminary (and very simple) metric'ing shows around 50/second which is much lower than needed even in our development env. I expect to see hundreds/thousands per second. We are running 3 consumers on the kafka cluster (these are physical nodes, with each one currently having just 1 instance of KafkaConsumer
).
What kind of computation do you do while consuming messages? Any IO calls (e.g. HTTP requests)?
Yes there is IO to Cassandra but Cassandra is currently very bored. The bottleneck isn't there.
What kind of batch sizes are you pulling from Kafka? Tuning the batch size might help you reduce the overhead in your message processing code.
I have max.poll.messages
set to 1000
.
Launching multiple consumers is a valid strategy.
So what I would want to do is put N consumers behind an akka router. How would this work with offset confirm? After skimming through the scala client source quite a bit, it looks like the kafka consumer actors do keep offset state and warnings are shown if a consumer is given offsets to confirm that it was not expecting.
You might also find the discussions in issue #32 useful.
Im looking through that now :)
from scala-kafka-client.
There's a couple of other tunables in KafkaConsumerActor.Conf
I forgot to mention:
- Try setting
unconfirmedTimeout
higher or disable it completely. It might be that you're being spammed by redeliveries. - Reduce the
scheduleInterval
. It dictates how often Kafka is polled by the consumer actor.
I'd be interested to hear what settings you've tried, and if you find success in tuning these values.
So what I would want to do is put N consumers behind an akka router. How would this work with offset confirm? After skimming through the scala client source quite a bit, it looks like the kafka consumer actors do keep offset state and warnings are shown if a consumer is given offsets to confirm that it was not expecting.
Make sure you send the offsets back to the same consumer actor that originally sent the data. The offsets are used for deduplicating the batch confirmations. Nothing more.
from scala-kafka-client.
Reduce the scheduleInterval. It dictates how often Kafka is polled by the consumer actor.
Ah good call. I'll try that asap.
Make sure you send the offsets back to the same consumer actor
Yea that was my main question so knowing that, I think I can come up with a semi-simple way to map Offsets -> Consumer via some kind of correlation ID setup.
I'll update the scheduleInterval
and let you know what difference that makes. Thanks!
from scala-kafka-client.
I think I can come up with a semi-simple way to map Offsets -> Consumer via some kind of correlation ID setup.
You could start with something simple like creating a dedicated message processor for each consumer rather than a pool of processors. The consumers will not send messages to the message processors until they've received a confirmation back, so there's not much to load balance. Therefore, if you have more message processors than consumer actors, you will not be fully utilising the pool. If you have less message processors than consumer actors, some of the consumers will remain idle and block other consumers from accessing the partitions assigned to the idle consumer. Thus, you'll end up with 1-to-1 consumer to message processor mapping anyway.
from scala-kafka-client.
Ok so a little status update. I ended going with the N consumers per application instance
strategy and so far it's paying off quite well. Here's a before/after graph:
At 19:15 UTC is when I deployed that to our dev environment. We are much more parallelized now and confirming many more batches of offsets than before. Im going to get a description and some code together for what the solution ended up being after I verify everything is still working as expected.
from scala-kafka-client.
Related Issues (20)
- Using KafkaActor Producer to push Messages HOT 7
- Weird exception when using Protobuf messages
- Endless Loop HOT 3
- Typo in https://github.com/cakesolutions/scala-kafka-client/wiki/KafkaConsumerActor-Configuration HOT 1
- Error in https://github.com/cakesolutions/scala-kafka-client/wiki/KafkaConsumerActor-Configuration HOT 1
- Sharing Consumer or Producer threads across Actors
- In wiki, error in documentation (remove types from speficied parameters) HOT 2
- Cannot resolve version from Bintray nor Maven HOT 3
- how can get confirm finish offsets update or offsets successful ?
- I am using Maven and encountering The POM is missing error. HOT 3
- Wiki typos HOT 2
- client rediness for kafka 2.1.0 HOT 2
- Zookeeper Support
- Can't find Scala 2.13 packages HOT 8
- Add Support for Kafka 2.2.0 and Kafka 2.3.0
- Release into Maven Central
- Cached commit offsets can become stale and cause record replay after group rebalance HOT 1
- client readiness for kafka 2.4
- Is this library still mantained? HOT 2
- Bintray returns 403 HOT 1
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from scala-kafka-client.