karafka / waterdrop Goto Github PK
View Code? Open in Web Editor NEWStandalone Karafka library for producing Kafka messages
Home Page: https://karafka.io
License: MIT License
Standalone Karafka library for producing Kafka messages
Home Page: https://karafka.io
License: MIT License
.........../home/mencio/Software/Github/karafka/waterdrop/lib/water_drop/producer/buffer.rb:95: warning: Using the last argument as keyword parameters is deprecated; maybe ** should be added to the call
/home/mencio/Software/Github/karafka/waterdrop/.bundle/ruby/2.7.0/bundler/gems/rdkafka-ruby-d07c9595a063/lib/rdkafka/producer.rb:61: warning: The called method `produce' is defined here
..................................................................................................................................................................................../home/mencio/Software/Github/karafka/waterdrop/lib/water_drop/producer/async.rb:24: warning: Using the last argument as keyword parameters is deprecated; maybe ** should be added to the call
/home/mencio/Software/Github/karafka/waterdrop/.bundle/ruby/2.7.0/bundler/gems/rdkafka-ruby-d07c9595a063/lib/rdkafka/producer.rb:61: warning: The called method `produce' is defined here
...../home/mencio/Software/Github/karafka/waterdrop/lib/water_drop/producer/async.rb:46: warning: Using the last argument as keyword parameters is deprecated; maybe ** should be added to the call
/home/mencio/Software/Github/karafka/waterdrop/.bundle/ruby/2.7.0/bundler/gems/rdkafka-ruby-d07c9595a063/lib/rdkafka/producer.rb:61: warning: The called method `produce' is defined here
....................................................../home/mencio/Software/Github/karafka/waterdrop/lib/water_drop/producer/sync.rb:28: warning: Using the last argument as keyword parameters is deprecated; maybe ** should be added to the call
Switch from Poseidon to Kafka-Ruby (https://github.com/zendesk/ruby-kafka)
Needs to be a string instead of symbol
Similar issue exists in karafka
karafka/karafka#16
Each time I start waterdrop
I receive those messages in console:
2015-12-28 12:42:13 +0100 | Aspector | INFO | WaterDrop::Aspects::AfterAspect | define-advice | advice 1: AFTER [#<Aspector::DeferredOption:0x007fae0a137b70 @key=:method>] DO stuff in block WITH OPTIONS {:result_arg=>true, :interception_arg=>true}
2015-12-28 12:42:13 +0100 | Aspector | INFO | WaterDrop::Aspects::AroundAspect | define-advice | advice 1: AROUND [#<Aspector::DeferredOption:0x007fae0a134f88 @key=:method>] DO stuff in block WITH OPTIONS {:interception_arg=>true}
2015-12-28 12:42:13 +0100 | Aspector | INFO | WaterDrop::Aspects::BeforeAspect | define-advice | advice 1: BEFORE [#<Aspector::DeferredOption:0x007fae0a13ee98 @key=:method>] DO stuff in block WITH OPTIONS {:interception_arg=>true, :skip_if_false=>false}
I think there should be a way to resolve it without configuring aspector directly in my app.
https://github.com/karafka/waterdrop/blob/master/lib/water_drop/async_producer.rb#L18 calls DeliveryBoy.deliver_async
on outbound messages. But, if you're producing messages faster than the framework or the network can send them off, https://github.com/zendesk/delivery_boy/blob/master/lib/delivery_boy.rb#L37 will drop them.
It would be nice if there were a config option to deliver_async!
so that the application could respond to the buffer-full state.
Instrumentation Question:
I'm planning to do the production of messages in Sidekiq and I'd have lots of jobs doing the produce, I am planning on creating a producer on every job and then closing it after.
Is it actually recommended to create a lot of producers just as long as I close them?
In the documentation, the subscription to the producer events are per producer...I'm wondering if there's a way for it to be hooked into some initializer once and not per producer.
Recovery Question:
Currently, I am rescuing WaterDrop::Errors::MessageInvalidError
and WaterDrop::Errors::ProducerClosedError
on every call to produce_many_sync
... These are the 2 errors that I saw that would make the whole batch stop. Is there anything else that would make a whole batch stop? I understand that the rest would be an array of Rdkafka::Producer::DeliveryReport
and individually can have an .error
.
Thank you.
The problem is that Karafka and Waterdrop requires options in different format. Karafka wants options in this format:
config.kafka_hosts = ["123.123.123.23:9092"]
and waterdrop in this one:
config.kafka_host = "123.123.123.23"
config.kafka_ports = ["9092"]
I feel that it's not unified enough and there should be the same option names and structure. Then I'd be able to use the same settignslogic namespace for both karafka and waterdrop.
Much performance, better async. Such wow!
waterdrop documentation has options to setup kafka.ssl.ca_cert and kafka.ssl.client_cert, but Waterdrop::Config is missing these options under setting :kafka. it only allows setting kafka.hosts as a result.
This raises an error in the waterdrop config initializer:
/config/initializers/waterdrop.rb:7:in `block in <top (required)>': undefined method `ssl' for #<#<Class:0x007fd3912c0580>:0x007fd3912c0350> (NoMethodError)
Allow setting the message max size and raise an exception when too big.
Sometimes when you look at a project with very little context, you want to know what problem it solves from 1000 meters (metaphor).
I sometimes add a "Motivation" section which explains what problem I was trying to solve, and sometimes include a short example of code in the usage, e.g. here is a particular problem you can solve and here is the code to do it. Not just a single line, but an actual working script.
Finally people coming to your project may not know what Kafka is (I don't). So linking to it and explaining it in half a sentence could be great, e.g.
Waterdrop is a messaging system for Kafka (link), which is an open source job processing tool.
We should have the same instrumentation engine in WaterDrop
To follow karafka conventions
We can assume that it is broken and that we should regenerate it
We're using it for months now. I think it's time to craft the 2.0 release.
Unfortunately, we need to add basic rdkafka validations as for seed brokers, those fail silently async.
Let's start simple: seed brokers should have a format like so: host1:9092,host2:9092,192.168.1.2:9091
. String with coma separated hosts with ports
kafka = { hosts: %w() } instead of kafka_hosts
After removing to_json from message:
@message = message.respond_to?(:to_json) ? message.to_json : message.to_s
We can not send messages using aspects. Even if I manually change message which want to send to json, waterdrop send hash using Formatter class:
def message
{
topic: @options[:topic],
method: @options[:method],
message: @result,
args: @args
}
end
Then appear error:
NoMethodError: undefined method `bytesize' for #Hash:0x007fe5841235d8
NoMethodError: undefined method `produce' for nil:NilClass
lib/water_drop/producer/async.rb:24
This was thrown in a Sidekiq worker just once and then the issue resolved itself.
When the producer uses ruby-kafka and it is not authorized to publish messages to a certain topic (because of Kafka's ACLs) the Kafka::TopicAuthorizationFailed
exception is raised:
Traceback (most recent call last):
4: from produce_sasl.rb:16:in `<main>'
3: from /Users/rupert/.rbenv/versions/2.5.5/lib/ruby/gems/2.5.0/gems/ruby-kafka-0.7.9/lib/kafka/client.rb:152:in `deliver_message'
2: from /Users/rupert/.rbenv/versions/2.5.5/lib/ruby/gems/2.5.0/gems/ruby-kafka-0.7.9/lib/kafka/cluster.rb:145:in `partitions_for'
1: from /Users/rupert/.rbenv/versions/2.5.5/lib/ruby/gems/2.5.0/gems/ruby-kafka-0.7.9/lib/kafka/protocol/metadata_response.rb:134:in `partitions_for'
/Users/rupert/.rbenv/versions/2.5.5/lib/ruby/gems/2.5.0/gems/ruby-kafka-0.7.9/lib/kafka/protocol.rb:160:in `handle_error': Kafka::TopicAuthorizationFailed (Kafka::TopicAuthorizationFailed)
However when using waterdrop we get Kafka::DeliveryFailed
with the message such as Failed to assign partitions to 1 messages in test-topic
exception which hides the real reason behind the failure:
E, [2019-10-01T17:00:44.672771 #35917] ERROR -- : Failed to assign partitions to 1 messages in e2
W, [2019-10-01T17:00:44.672910 #35917] WARN -- : Failed to send all messages to ; attempting retry 1 of 2 after 1s
E, [2019-10-01T17:00:45.687680 #35917] ERROR -- : Failed to assign partitions to 1 messages in e2
W, [2019-10-01T17:00:45.687861 #35917] WARN -- : Failed to send all messages to ; attempting retry 2 of 2 after 1s
E, [2019-10-01T17:00:46.705185 #35917] ERROR -- : Failed to assign partitions to 1 messages in e2
E, [2019-10-01T17:00:46.705252 #35917] ERROR -- : Failed to send all messages to ; keeping remaining messages in buffer
E, [2019-10-01T17:00:46.720882 #35917] ERROR -- : Failed to assign partitions to 1 messages in e2
W, [2019-10-01T17:00:46.720940 #35917] WARN -- : Failed to send all messages to ; attempting retry 1 of 2 after 1s
E, [2019-10-01T17:00:47.736301 #35917] ERROR -- : Failed to assign partitions to 1 messages in e2
W, [2019-10-01T17:00:47.736369 #35917] WARN -- : Failed to send all messages to ; attempting retry 2 of 2 after 1s
E, [2019-10-01T17:00:48.750387 #35917] ERROR -- : Failed to assign partitions to 1 messages in e2
E, [2019-10-01T17:00:48.750574 #35917] ERROR -- : Failed to send all messages to ; keeping remaining messages in buffer
E, [2019-10-01T17:00:48.762185 #35917] ERROR -- : Failed to assign partitions to 1 messages in e2
W, [2019-10-01T17:00:48.762278 #35917] WARN -- : Failed to send all messages to ; attempting retry 1 of 2 after 1s
E, [2019-10-01T17:00:49.785085 #35917] ERROR -- : Failed to assign partitions to 1 messages in e2
W, [2019-10-01T17:00:49.785191 #35917] WARN -- : Failed to send all messages to ; attempting retry 2 of 2 after 1s
E, [2019-10-01T17:00:50.810873 #35917] ERROR -- : Failed to assign partitions to 1 messages in e2
E, [2019-10-01T17:00:50.811024 #35917] ERROR -- : Failed to send all messages to ; keeping remaining messages in buffer
Traceback (most recent call last):
7: from produce_sasl_waterdrop.rb:28:in `<main>'
6: from /Users/rupert/.rbenv/versions/2.5.5/lib/ruby/gems/2.5.0/bundler/gems/waterdrop-74b2c35bdd82/lib/water_drop/sync_producer.rb:19:in `call'
5: from /Users/rupert/.rbenv/versions/2.5.5/lib/ruby/gems/2.5.0/bundler/gems/delivery_boy-d875e1e791cc/lib/delivery_boy.rb:28:in `deliver'
4: from /Users/rupert/.rbenv/versions/2.5.5/lib/ruby/gems/2.5.0/bundler/gems/delivery_boy-d875e1e791cc/lib/delivery_boy/instance.rb:14:in `deliver'
3: from /Users/rupert/.rbenv/versions/2.5.5/lib/ruby/gems/2.5.0/gems/ruby-kafka-0.7.10/lib/kafka/producer.rb:246:in `deliver_messages'
2: from /Users/rupert/.rbenv/versions/2.5.5/lib/ruby/gems/2.5.0/gems/ruby-kafka-0.7.10/lib/kafka/instrumenter.rb:23:in `instrument'
1: from /Users/rupert/.rbenv/versions/2.5.5/lib/ruby/gems/2.5.0/gems/ruby-kafka-0.7.10/lib/kafka/producer.rb:253:in `block in deliver_messages'
/Users/rupert/.rbenv/versions/2.5.5/lib/ruby/gems/2.5.0/gems/ruby-kafka-0.7.10/lib/kafka/producer.rb:426:in `deliver_messages_with_retries': Failed to assign partitions to 1 messages (Kafka::DeliveryFailed)
I reproduced this issue locally on confluentinc/cp-kafka:5.1.2
after configuring SASL and ACLs as well as on Confluent Cloud Enterprise cluster.
Any chance we could get a new release for #109 ? I'd love to not have to fork this until there's another one!
Cheers ๐ป
Hello! Came by to ask the advantages of this Kafka client compared to the others. I'm looking for a performant producer which I can't achieve in my current producer client unfortunately.
Some of my questions:
Thanks in advance.
I would like to be able to provide a batch of messages that are suppose to be sent, not looping and sending one after another for both sync and async
I want the ability to confirm that a message was successfully produced to the topic on the broker. I don't care or want to rely on an acknowledgement from a downstream consumer. I simply wish to know if I successfully posted in the first place with some sort of return value or callback. It looks like neither responders nor WaterDrop::SyncProducer provide return values to validate the successful delivery of your message to the topic.
Consider the scenario where perhaps the topic does not exist or there's an ACL error, etc.
Topic does not exist:
irb(main):001:0> WaterDrop::SyncProducer.call({'test' => 'test'}.to_json, topic: 'not.a.real.topic')
I, [2019-11-21T17:05:45.387698 #2436] INFO -- : New topics added to target list: not.a.real.topic
I, [2019-11-21T17:05:45.388066 #2436] INFO -- : Fetching cluster metadata from kafka://broker.kafka:9093
I, [2019-11-21T17:05:51.377430 #2436] INFO -- : Discovered cluster metadata; nodes: broker2.kafka:9093 (node_id=218), broker3.kafka:9093 (node_id=254), broker1.kafka:9093 (node_id=175)
E, [2019-11-21T17:05:51.378117 #2436] ERROR -- : Failed to assign partitions to 1 messages in not.a.real.topic
W, [2019-11-21T17:05:51.378265 #2436] WARN -- : Failed to send all messages to ; attempting retry 1 of 2 after 1s
I, [2019-11-21T17:05:52.380422 #2436] INFO -- : Fetching cluster metadata from kafka://broker.kafka:9093
I, [2019-11-21T17:05:52.692198 #2436] INFO -- : Discovered cluster metadata; nodes: broker3.kafka:9093 (node_id=254), broker2.kafka:9093 (node_id=218), broker1.kafka:9093 (node_id=175)
I, [2019-11-21T17:05:52.692766 #2436] INFO -- : Sending 1 messages to broker1.kafka:9093 (node_id=175)
=> nil
We can see above that ruby kafka logs out some useful information denoting that the message fails to send. But functionality which should behave one way or another based on if the message successfully sent, has no way to rollback if this is the case. Ideally I'd want something to the effect of:
ActiveRecord::Base.transaction do
# modify some local data
# notify downstream services of this change
responder_call = Responders::MyCoolResponder.call(message)
# rollback i.e. do not commit local changes if unable to publish message for downstream
consumption
raise ActiveRecord::Rollback unless responder_call.successful?
end
I could in theory convert this to a fully async process which then waits for the consuming service to publish an acknowledgement message to a topic I consume from. But in a workflow like this I don't actually care or need to take action based on the downstream systems receiving it. I simply wish to know that I posted the original message to a topic for their consumption. The current error handling for responders as I understand it protects against responding to a topic which has not been registered or total broker failures but not whether or not you can successfully produce a message in an acks=all
model.
I have followed the example app instructions and gotten the Ping Pong game to work. However, when I try to send a message with either WaterDrop::SyncProducer.call('message', topic: 'my-topic')
or WaterDrop::AsyncProducer.call('message', topic: 'my-topic')
, I receive the error syntax error near unexpected token 'message','
.
How do I resolve this?
DeliveryBoy
.send(:instance)
.send(:async_producer)
.instance_variable_get('@worker')
.instance_variable_get('@producer')
.instance_variable_get('@cluster')
.disconnect
rescue
false
end
on_worker_boot do
DeliveryBoy
.send(:instance)
.send(:async_producer)
.instance_variable_get('@worker')
.instance_variable_get('@producer')
.instance_variable_get('@cluster')
.topics
Hello,
(Follow up from this thread
Here are the specs i'm using in my app: karafka 1.2.13, ruby-kafka 0.7.9, rails 5.2.3, ruby 2.6.3
Karafka depends on waterdrop which depends on delivery_boy.
There is a new option added quite recently in ruby-kafka: ssl_verify_hostname
which defaults to true ( both consumers and producers). On the consumer side, everything works good, karafka is in sync with ruby-kafka. And this can be disabled by changing the config in karafka.rb
I have a similar error message as the one described in this issue.
OpenSSL::SSL::SSLError: SSL_connect returned=1 errno=0 state=error: certificate verify failed (unspecified certificate verification error)
Basically the idea is to sync waterdrop the same way karafka was synced by changing the "dry config" file and adding the same setting (ssl_verify_hostname
). so that when the async producer is created, it can pass a different value other than the default.
Now delivery_boy has this "fix" but only on master (not released yet :/).
So to make this work with karafka, all I had to do was fork delivery_boy, change the default from true to false, until delivery_boy releases the patch, and waterdrop gem bumps delivery boy. and karafka gem bumps waterdrop lol.
Keep getting this error. Not able to send messages to kafka cluster.
This is what I'm testing:
message = WaterDrop::Message.new('hits', {test: 76}.to_json)
message.send!
And this is what it results in:
Kafka::DeliveryFailed: Failed to send messages to hits/0
from /path/to/gem/ruby-kafka-0.3.15/lib/kafka/producer.rb:332:in `deliver_messages_with_retries'
WaterDrop.config
#<#<Class:0x007ffc8356a090>:0x007ffc835698e8
@config={:connection_pool_timeout=>5, :send_messages=>true, :raise_on_failure=>true, :connection_pool_size=>25, :kafka=>#<Dry::Configurable::NestedConfig:0x007ffc850ca768 @klass=#<Class:0x007ffc850ca740>>}>
WaterDrop.config.kafka.hosts
["kafka://ubuntudock:9092", "kafka://ubuntudock:9093", "kafka://ubuntudock:9094"]
Result of operation.execute from /ruby-kafka-0.3.15/lib/kafka/producer.rb:`deliver_messages_with_retries'
Attempt 1
{#<Kafka::Broker:0x007ffe1e811a90 @connection=#<Kafka::Connection:0x007ffe1e811bf8 @client_id="ruby-kafka", @port=9093, @host="kafka2", @logger=#<Logger:0x007ffe1e858b20 @progname=nil, @level=0, @default_formatter=#<Logger::Formatter:0x007ffe1e858af8 @datetime_format=nil>, @formatter=nil, @logdev=nil>, @instrumenter=#<Kafka::Instrumenter:0x007ffe1e858a80 @default_payload={:client_id=>"ruby-kafka"}, @backend=ActiveSupport::Notifications>, @connect_timeout=10, @socket_timeout=10, @ssl_context=nil>, @node_id=2, @logger=#<Logger:0x007ffe1e858b20 @progname=nil, @level=0, @default_formatter=#<Logger::Formatter:0x007ffe1e858af8 @datetime_format=nil>, @formatter=nil, @logdev=nil>>=>#<Kafka::MessageBuffer:0x007ffe1e811860 @buffer={"hits"=>{0=>[#<Kafka::Protocol::Message:0x007ffe1e8125f8 @key=nil, @value="{\"test\":76}", @codec_id=0, @offset=-1, @create_time=2017-06-06 12:33:05 -0400, @bytesize=11>]}}, @size=1, @bytesize=11>}
Attempt 2
{#<Kafka::Broker:0x007ffe1e811a90 @connection=#<Kafka::Connection:0x007ffe1e811bf8 @client_id="ruby-kafka", @port=9093, @host="kafka2", @logger=#<Logger:0x007ffe1e858b20 @progname=nil, @level=0, @default_formatter=#<Logger::Formatter:0x007ffe1e858af8 @datetime_format=nil>, @formatter=nil, @logdev=nil>, @instrumenter=#<Kafka::Instrumenter:0x007ffe1e858a80 @default_payload={:client_id=>"ruby-kafka"}, @backend=ActiveSupport::Notifications>, @connect_timeout=10, @socket_timeout=10, @ssl_context=nil>, @node_id=2, @logger=#<Logger:0x007ffe1e858b20 @progname=nil, @level=0, @default_formatter=#<Logger::Formatter:0x007ffe1e858af8 @datetime_format=nil>, @formatter=nil, @logdev=nil>>=>#<Kafka::MessageBuffer:0x007ffe187eeb68 @buffer={"hits"=>{0=>[#<Kafka::Protocol::Message:0x007ffe1e8125f8 @key=nil, @value="{\"test\":76}", @codec_id=0, @offset=-1, @create_time=2017-06-06 12:33:05 -0400, @bytesize=11>]}}, @size=1, @bytesize=11>}
Attempt 3
{#<Kafka::Broker:0x007ffe1e811a90 @connection=#<Kafka::Connection:0x007ffe1e811bf8 @client_id="ruby-kafka", @port=9093, @host="kafka2", @logger=#<Logger:0x007ffe1e858b20 @progname=nil, @level=0, @default_formatter=#<Logger::Formatter:0x007ffe1e858af8 @datetime_format=nil>, @formatter=nil, @logdev=nil>, @instrumenter=#<Kafka::Instrumenter:0x007ffe1e858a80 @default_payload={:client_id=>"ruby-kafka"}, @backend=ActiveSupport::Notifications>, @connect_timeout=10, @socket_timeout=10, @ssl_context=nil>, @node_id=2, @logger=#<Logger:0x007ffe1e858b20 @progname=nil, @level=0, @default_formatter=#<Logger::Formatter:0x007ffe1e858af8 @datetime_format=nil>, @formatter=nil, @logdev=nil>>=>#<Kafka::MessageBuffer:0x007ffe1d880590 @buffer={"hits"=>{0=>[#<Kafka::Protocol::Message:0x007ffe1e8125f8 @key=nil, @value="{\"test\":76}", @codec_id=0, @offset=-1, @create_time=2017-06-06 12:33:05 -0400, @bytesize=11>]}}, @size=1, @bytesize=11>}
Attempt 1
{#<Kafka::Broker:0x007ffe187833b8 @connection=#<Kafka::Connection:0x007ffe187834a8 @client_id="ruby-kafka", @port=9093, @host="kafka2", @logger=#<Logger:0x007ffe1878dcc8 @progname=nil, @level=0, @default_formatter=#<Logger::Formatter:0x007ffe1878dc50 @datetime_format=nil>, @formatter=nil, @logdev=nil>, @instrumenter=#<Kafka::Instrumenter:0x007ffe1878db10 @default_payload={:client_id=>"ruby-kafka"}, @backend=ActiveSupport::Notifications>, @connect_timeout=10, @socket_timeout=10, @ssl_context=nil>, @node_id=2, @logger=#<Logger:0x007ffe1878dcc8 @progname=nil, @level=0, @default_formatter=#<Logger::Formatter:0x007ffe1878dc50 @datetime_format=nil>, @formatter=nil, @logdev=nil>>=>#<Kafka::MessageBuffer:0x007ffe18782ff8 @buffer={"hits"=>{0=>[#<Kafka::Protocol::Message:0x007ffe18783bd8 @key=nil, @value="{\"test\":76}", @codec_id=0, @offset=-1, @create_time=2017-06-06 12:33:08 -0400, @bytesize=11>]}}, @size=1, @bytesize=11>}
Attempt 2
{#<Kafka::Broker:0x007ffe187833b8 @connection=#<Kafka::Connection:0x007ffe187834a8 @client_id="ruby-kafka", @port=9093, @host="kafka2", @logger=#<Logger:0x007ffe1878dcc8 @progname=nil, @level=0, @default_formatter=#<Logger::Formatter:0x007ffe1878dc50 @datetime_format=nil>, @formatter=nil, @logdev=nil>, @instrumenter=#<Kafka::Instrumenter:0x007ffe1878db10 @default_payload={:client_id=>"ruby-kafka"}, @backend=ActiveSupport::Notifications>, @connect_timeout=10, @socket_timeout=10, @ssl_context=nil>, @node_id=2, @logger=#<Logger:0x007ffe1878dcc8 @progname=nil, @level=0, @default_formatter=#<Logger::Formatter:0x007ffe1878dc50 @datetime_format=nil>, @formatter=nil, @logdev=nil>>=>#<Kafka::MessageBuffer:0x007ffe17691f50 @buffer={"hits"=>{0=>[#<Kafka::Protocol::Message:0x007ffe18783bd8 @key=nil, @value="{\"test\":76}", @codec_id=0, @offset=-1, @create_time=2017-06-06 12:33:08 -0400, @bytesize=11>]}}, @size=1, @bytesize=11>}
Attempt 3
{#<Kafka::Broker:0x007ffe187833b8 @connection=#<Kafka::Connection:0x007ffe187834a8 @client_id="ruby-kafka", @port=9093, @host="kafka2", @logger=#<Logger:0x007ffe1878dcc8 @progname=nil, @level=0, @default_formatter=#<Logger::Formatter:0x007ffe1878dc50 @datetime_format=nil>, @formatter=nil, @logdev=nil>, @instrumenter=#<Kafka::Instrumenter:0x007ffe1878db10 @default_payload={:client_id=>"ruby-kafka"}, @backend=ActiveSupport::Notifications>, @connect_timeout=10, @socket_timeout=10, @ssl_context=nil>, @node_id=2, @logger=#<Logger:0x007ffe1878dcc8 @progname=nil, @level=0, @default_formatter=#<Logger::Formatter:0x007ffe1878dc50 @datetime_format=nil>, @formatter=nil, @logdev=nil>>=>#<Kafka::MessageBuffer:0x007ffe16d24698 @buffer={"hits"=>{0=>[#<Kafka::Protocol::Message:0x007ffe18783bd8 @key=nil, @value="{\"test\":76}", @codec_id=0, @offset=-1, @create_time=2017-06-06 12:33:08 -0400, @bytesize=11>]}}, @size=1, @bytesize=11>}
Kafka::DeliveryFailed: Failed to send messages to hits/0
from /path/to/gem/ruby-2.3.3@goliath/gems/ruby-kafka-0.3.15/lib/kafka/producer.rb:332:in `deliver_messages_with_retries'
I think it's useful to know who(what kind of app) was a sender. Of course, I can just add sender: :api_name
to message, but I'd be great to have it on waterdrop internals level.
We should check, that message that someone wants to send is less or equal to what is being defined as a max
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.