Git Product home page Git Product logo

waterdrop's Introduction

WaterDrop

Build Status Gem Version Join the chat at https://slack.karafka.io

WaterDrop is a standalone gem that sends messages to Kafka easily with an extra validation layer. It is a part of the Karafka ecosystem.

It:

  • Is thread-safe
  • Supports sync producing
  • Supports async producing
  • Supports transactions
  • Supports buffering
  • Supports producing messages to multiple clusters
  • Supports multiple delivery policies
  • Works with Kafka 1.0+ and Ruby 2.7+
  • Works with and without Karafka

Documentation

Karafka ecosystem components documentation, including WaterDrop, can be found here.

Getting Started

If you want to both produce and consume messages, please use Karafka. It integrates WaterDrop automatically.

To get started with WaterDrop:

  1. Add it to your Gemfile:
bundle add waterdrop
  1. Create and configure a producer:
producer = WaterDrop::Producer.new do |config|
  config.deliver = true
  config.kafka = {
    'bootstrap.servers': 'localhost:9092',
    'request.required.acks': 1
  }
end
  1. Use it as follows:
# sync producing
producer.produce_sync(topic: 'my-topic', payload: 'my message')

# or for async
producer.produce_async(topic: 'my-topic', payload: 'my message')

# or in sync batches
producer.produce_many_sync(
  [
    { topic: 'my-topic', payload: 'my message'},
    { topic: 'my-topic', payload: 'my message'}
  ]
)

# and async batches
producer.produce_many_async(
  [
    { topic: 'my-topic', payload: 'my message'},
    { topic: 'my-topic', payload: 'my message'}
  ]
)

# transactions
producer.transaction do
  producer.produce_async(topic: 'my-topic', payload: 'my message')
  producer.produce_async(topic: 'my-topic', payload: 'my message')
end

waterdrop's People

Contributors

agwozdowski avatar bruno-b-martins avatar dependabot[bot] avatar filiptepper avatar isturdy avatar konalegi avatar kylekthompson avatar mach-kernel avatar mensfeld avatar nijikon avatar olleolleolle avatar pavlo-vavruk avatar renovate[bot] avatar sandipsubedi avatar solnic avatar stellarxo avatar tabdollahi avatar wallin avatar webandtech avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

waterdrop's Issues

Introduce batch API

I would like to be able to provide a batch of messages that are suppose to be sent, not looping and sending one after another for both sync and async

Logger for Aspector

Each time I start waterdrop I receive those messages in console:

2015-12-28 12:42:13 +0100 | Aspector | INFO  | WaterDrop::Aspects::AfterAspect | define-advice | advice 1: AFTER [#<Aspector::DeferredOption:0x007fae0a137b70 @key=:method>] DO stuff in block WITH OPTIONS {:result_arg=>true, :interception_arg=>true}
2015-12-28 12:42:13 +0100 | Aspector | INFO  | WaterDrop::Aspects::AroundAspect | define-advice | advice 1: AROUND [#<Aspector::DeferredOption:0x007fae0a134f88 @key=:method>] DO stuff in block WITH OPTIONS {:interception_arg=>true}
2015-12-28 12:42:13 +0100 | Aspector | INFO  | WaterDrop::Aspects::BeforeAspect | define-advice | advice 1: BEFORE [#<Aspector::DeferredOption:0x007fae0a13ee98 @key=:method>] DO stuff in block WITH OPTIONS {:interception_arg=>true, :skip_if_false=>false}

I think there should be a way to resolve it without configuring aspector directly in my app.

ssl_verify_hostname sync with ruby-kafka/delivery_boy

Hello,
(Follow up from this thread

Here are the specs i'm using in my app: karafka 1.2.13, ruby-kafka 0.7.9, rails 5.2.3, ruby 2.6.3

Karafka depends on waterdrop which depends on delivery_boy.
There is a new option added quite recently in ruby-kafka: ssl_verify_hostname which defaults to true ( both consumers and producers). On the consumer side, everything works good, karafka is in sync with ruby-kafka. And this can be disabled by changing the config in karafka.rb

I have a similar error message as the one described in this issue.
OpenSSL::SSL::SSLError: SSL_connect returned=1 errno=0 state=error: certificate verify failed (unspecified certificate verification error)

Basically the idea is to sync waterdrop the same way karafka was synced by changing the "dry config" file and adding the same setting (ssl_verify_hostname). so that when the async producer is created, it can pass a different value other than the default.

Now delivery_boy has this "fix" but only on master (not released yet :/).

So to make this work with karafka, all I had to do was fork delivery_boy, change the default from true to false, until delivery_boy releases the patch, and waterdrop gem bumps delivery boy. and karafka gem bumps waterdrop lol.

Missing config options

waterdrop documentation has options to setup kafka.ssl.ca_cert and kafka.ssl.client_cert, but Waterdrop::Config is missing these options under setting :kafka. it only allows setting kafka.hosts as a result.

This raises an error in the waterdrop config initializer:

/config/initializers/waterdrop.rb:7:in `block in <top (required)>': undefined method `ssl' for #<#<Class:0x007fd3912c0580>:0x007fd3912c0350> (NoMethodError)

Unfortunate error message when missing ACLs

When the producer uses ruby-kafka and it is not authorized to publish messages to a certain topic (because of Kafka's ACLs) the Kafka::TopicAuthorizationFailed exception is raised:

Traceback (most recent call last):
	4: from produce_sasl.rb:16:in `<main>'
	3: from /Users/rupert/.rbenv/versions/2.5.5/lib/ruby/gems/2.5.0/gems/ruby-kafka-0.7.9/lib/kafka/client.rb:152:in `deliver_message'
	2: from /Users/rupert/.rbenv/versions/2.5.5/lib/ruby/gems/2.5.0/gems/ruby-kafka-0.7.9/lib/kafka/cluster.rb:145:in `partitions_for'
	1: from /Users/rupert/.rbenv/versions/2.5.5/lib/ruby/gems/2.5.0/gems/ruby-kafka-0.7.9/lib/kafka/protocol/metadata_response.rb:134:in `partitions_for'
/Users/rupert/.rbenv/versions/2.5.5/lib/ruby/gems/2.5.0/gems/ruby-kafka-0.7.9/lib/kafka/protocol.rb:160:in `handle_error': Kafka::TopicAuthorizationFailed (Kafka::TopicAuthorizationFailed)

However when using waterdrop we get Kafka::DeliveryFailed with the message such as Failed to assign partitions to 1 messages in test-topic exception which hides the real reason behind the failure:

E, [2019-10-01T17:00:44.672771 #35917] ERROR -- : Failed to assign partitions to 1 messages in e2
W, [2019-10-01T17:00:44.672910 #35917]  WARN -- : Failed to send all messages to ; attempting retry 1 of 2 after 1s
E, [2019-10-01T17:00:45.687680 #35917] ERROR -- : Failed to assign partitions to 1 messages in e2
W, [2019-10-01T17:00:45.687861 #35917]  WARN -- : Failed to send all messages to ; attempting retry 2 of 2 after 1s
E, [2019-10-01T17:00:46.705185 #35917] ERROR -- : Failed to assign partitions to 1 messages in e2
E, [2019-10-01T17:00:46.705252 #35917] ERROR -- : Failed to send all messages to ; keeping remaining messages in buffer
E, [2019-10-01T17:00:46.720882 #35917] ERROR -- : Failed to assign partitions to 1 messages in e2
W, [2019-10-01T17:00:46.720940 #35917]  WARN -- : Failed to send all messages to ; attempting retry 1 of 2 after 1s
E, [2019-10-01T17:00:47.736301 #35917] ERROR -- : Failed to assign partitions to 1 messages in e2
W, [2019-10-01T17:00:47.736369 #35917]  WARN -- : Failed to send all messages to ; attempting retry 2 of 2 after 1s
E, [2019-10-01T17:00:48.750387 #35917] ERROR -- : Failed to assign partitions to 1 messages in e2
E, [2019-10-01T17:00:48.750574 #35917] ERROR -- : Failed to send all messages to ; keeping remaining messages in buffer
E, [2019-10-01T17:00:48.762185 #35917] ERROR -- : Failed to assign partitions to 1 messages in e2
W, [2019-10-01T17:00:48.762278 #35917]  WARN -- : Failed to send all messages to ; attempting retry 1 of 2 after 1s
E, [2019-10-01T17:00:49.785085 #35917] ERROR -- : Failed to assign partitions to 1 messages in e2
W, [2019-10-01T17:00:49.785191 #35917]  WARN -- : Failed to send all messages to ; attempting retry 2 of 2 after 1s
E, [2019-10-01T17:00:50.810873 #35917] ERROR -- : Failed to assign partitions to 1 messages in e2
E, [2019-10-01T17:00:50.811024 #35917] ERROR -- : Failed to send all messages to ; keeping remaining messages in buffer
Traceback (most recent call last):
	7: from produce_sasl_waterdrop.rb:28:in `<main>'
	6: from /Users/rupert/.rbenv/versions/2.5.5/lib/ruby/gems/2.5.0/bundler/gems/waterdrop-74b2c35bdd82/lib/water_drop/sync_producer.rb:19:in `call'
	5: from /Users/rupert/.rbenv/versions/2.5.5/lib/ruby/gems/2.5.0/bundler/gems/delivery_boy-d875e1e791cc/lib/delivery_boy.rb:28:in `deliver'
	4: from /Users/rupert/.rbenv/versions/2.5.5/lib/ruby/gems/2.5.0/bundler/gems/delivery_boy-d875e1e791cc/lib/delivery_boy/instance.rb:14:in `deliver'
	3: from /Users/rupert/.rbenv/versions/2.5.5/lib/ruby/gems/2.5.0/gems/ruby-kafka-0.7.10/lib/kafka/producer.rb:246:in `deliver_messages'
	2: from /Users/rupert/.rbenv/versions/2.5.5/lib/ruby/gems/2.5.0/gems/ruby-kafka-0.7.10/lib/kafka/instrumenter.rb:23:in `instrument'
	1: from /Users/rupert/.rbenv/versions/2.5.5/lib/ruby/gems/2.5.0/gems/ruby-kafka-0.7.10/lib/kafka/producer.rb:253:in `block in deliver_messages'
/Users/rupert/.rbenv/versions/2.5.5/lib/ruby/gems/2.5.0/gems/ruby-kafka-0.7.10/lib/kafka/producer.rb:426:in `deliver_messages_with_retries': Failed to assign partitions to 1 messages (Kafka::DeliveryFailed)

I reproduced this issue locally on confluentinc/cp-kafka:5.1.2 after configuring SASL and ACLs as well as on Confluent Cloud Enterprise cluster.

Motivation/example

Sometimes when you look at a project with very little context, you want to know what problem it solves from 1000 meters (metaphor).

I sometimes add a "Motivation" section which explains what problem I was trying to solve, and sometimes include a short example of code in the usage, e.g. here is a particular problem you can solve and here is the code to do it. Not just a single line, but an actual working script.

Finally people coming to your project may not know what Kafka is (I don't). So linking to it and explaining it in half a sentence could be great, e.g.

Waterdrop is a messaging system for Kafka (link), which is an open source job processing tool.

Kafka::DeliveryFailed: Failed to send messages to topic/partition

Keep getting this error. Not able to send messages to kafka cluster.

This is what I'm testing:

message = WaterDrop::Message.new('hits', {test: 76}.to_json)
message.send!

And this is what it results in:

Kafka::DeliveryFailed: Failed to send messages to hits/0
from /path/to/gem/ruby-kafka-0.3.15/lib/kafka/producer.rb:332:in `deliver_messages_with_retries'

WaterDrop.config

#<#<Class:0x007ffc8356a090>:0x007ffc835698e8
 @config={:connection_pool_timeout=>5, :send_messages=>true, :raise_on_failure=>true, :connection_pool_size=>25, :kafka=>#<Dry::Configurable::NestedConfig:0x007ffc850ca768 @klass=#<Class:0x007ffc850ca740>>}>

WaterDrop.config.kafka.hosts

["kafka://ubuntudock:9092", "kafka://ubuntudock:9093", "kafka://ubuntudock:9094"]

Result of operation.execute from /ruby-kafka-0.3.15/lib/kafka/producer.rb:`deliver_messages_with_retries'

Attempt 1
{#<Kafka::Broker:0x007ffe1e811a90 @connection=#<Kafka::Connection:0x007ffe1e811bf8 @client_id="ruby-kafka", @port=9093, @host="kafka2", @logger=#<Logger:0x007ffe1e858b20 @progname=nil, @level=0, @default_formatter=#<Logger::Formatter:0x007ffe1e858af8 @datetime_format=nil>, @formatter=nil, @logdev=nil>, @instrumenter=#<Kafka::Instrumenter:0x007ffe1e858a80 @default_payload={:client_id=>"ruby-kafka"}, @backend=ActiveSupport::Notifications>, @connect_timeout=10, @socket_timeout=10, @ssl_context=nil>, @node_id=2, @logger=#<Logger:0x007ffe1e858b20 @progname=nil, @level=0, @default_formatter=#<Logger::Formatter:0x007ffe1e858af8 @datetime_format=nil>, @formatter=nil, @logdev=nil>>=>#<Kafka::MessageBuffer:0x007ffe1e811860 @buffer={"hits"=>{0=>[#<Kafka::Protocol::Message:0x007ffe1e8125f8 @key=nil, @value="{\"test\":76}", @codec_id=0, @offset=-1, @create_time=2017-06-06 12:33:05 -0400, @bytesize=11>]}}, @size=1, @bytesize=11>}
Attempt 2
{#<Kafka::Broker:0x007ffe1e811a90 @connection=#<Kafka::Connection:0x007ffe1e811bf8 @client_id="ruby-kafka", @port=9093, @host="kafka2", @logger=#<Logger:0x007ffe1e858b20 @progname=nil, @level=0, @default_formatter=#<Logger::Formatter:0x007ffe1e858af8 @datetime_format=nil>, @formatter=nil, @logdev=nil>, @instrumenter=#<Kafka::Instrumenter:0x007ffe1e858a80 @default_payload={:client_id=>"ruby-kafka"}, @backend=ActiveSupport::Notifications>, @connect_timeout=10, @socket_timeout=10, @ssl_context=nil>, @node_id=2, @logger=#<Logger:0x007ffe1e858b20 @progname=nil, @level=0, @default_formatter=#<Logger::Formatter:0x007ffe1e858af8 @datetime_format=nil>, @formatter=nil, @logdev=nil>>=>#<Kafka::MessageBuffer:0x007ffe187eeb68 @buffer={"hits"=>{0=>[#<Kafka::Protocol::Message:0x007ffe1e8125f8 @key=nil, @value="{\"test\":76}", @codec_id=0, @offset=-1, @create_time=2017-06-06 12:33:05 -0400, @bytesize=11>]}}, @size=1, @bytesize=11>}
Attempt 3
{#<Kafka::Broker:0x007ffe1e811a90 @connection=#<Kafka::Connection:0x007ffe1e811bf8 @client_id="ruby-kafka", @port=9093, @host="kafka2", @logger=#<Logger:0x007ffe1e858b20 @progname=nil, @level=0, @default_formatter=#<Logger::Formatter:0x007ffe1e858af8 @datetime_format=nil>, @formatter=nil, @logdev=nil>, @instrumenter=#<Kafka::Instrumenter:0x007ffe1e858a80 @default_payload={:client_id=>"ruby-kafka"}, @backend=ActiveSupport::Notifications>, @connect_timeout=10, @socket_timeout=10, @ssl_context=nil>, @node_id=2, @logger=#<Logger:0x007ffe1e858b20 @progname=nil, @level=0, @default_formatter=#<Logger::Formatter:0x007ffe1e858af8 @datetime_format=nil>, @formatter=nil, @logdev=nil>>=>#<Kafka::MessageBuffer:0x007ffe1d880590 @buffer={"hits"=>{0=>[#<Kafka::Protocol::Message:0x007ffe1e8125f8 @key=nil, @value="{\"test\":76}", @codec_id=0, @offset=-1, @create_time=2017-06-06 12:33:05 -0400, @bytesize=11>]}}, @size=1, @bytesize=11>}
Attempt 1
{#<Kafka::Broker:0x007ffe187833b8 @connection=#<Kafka::Connection:0x007ffe187834a8 @client_id="ruby-kafka", @port=9093, @host="kafka2", @logger=#<Logger:0x007ffe1878dcc8 @progname=nil, @level=0, @default_formatter=#<Logger::Formatter:0x007ffe1878dc50 @datetime_format=nil>, @formatter=nil, @logdev=nil>, @instrumenter=#<Kafka::Instrumenter:0x007ffe1878db10 @default_payload={:client_id=>"ruby-kafka"}, @backend=ActiveSupport::Notifications>, @connect_timeout=10, @socket_timeout=10, @ssl_context=nil>, @node_id=2, @logger=#<Logger:0x007ffe1878dcc8 @progname=nil, @level=0, @default_formatter=#<Logger::Formatter:0x007ffe1878dc50 @datetime_format=nil>, @formatter=nil, @logdev=nil>>=>#<Kafka::MessageBuffer:0x007ffe18782ff8 @buffer={"hits"=>{0=>[#<Kafka::Protocol::Message:0x007ffe18783bd8 @key=nil, @value="{\"test\":76}", @codec_id=0, @offset=-1, @create_time=2017-06-06 12:33:08 -0400, @bytesize=11>]}}, @size=1, @bytesize=11>}
Attempt 2
{#<Kafka::Broker:0x007ffe187833b8 @connection=#<Kafka::Connection:0x007ffe187834a8 @client_id="ruby-kafka", @port=9093, @host="kafka2", @logger=#<Logger:0x007ffe1878dcc8 @progname=nil, @level=0, @default_formatter=#<Logger::Formatter:0x007ffe1878dc50 @datetime_format=nil>, @formatter=nil, @logdev=nil>, @instrumenter=#<Kafka::Instrumenter:0x007ffe1878db10 @default_payload={:client_id=>"ruby-kafka"}, @backend=ActiveSupport::Notifications>, @connect_timeout=10, @socket_timeout=10, @ssl_context=nil>, @node_id=2, @logger=#<Logger:0x007ffe1878dcc8 @progname=nil, @level=0, @default_formatter=#<Logger::Formatter:0x007ffe1878dc50 @datetime_format=nil>, @formatter=nil, @logdev=nil>>=>#<Kafka::MessageBuffer:0x007ffe17691f50 @buffer={"hits"=>{0=>[#<Kafka::Protocol::Message:0x007ffe18783bd8 @key=nil, @value="{\"test\":76}", @codec_id=0, @offset=-1, @create_time=2017-06-06 12:33:08 -0400, @bytesize=11>]}}, @size=1, @bytesize=11>}
Attempt 3
{#<Kafka::Broker:0x007ffe187833b8 @connection=#<Kafka::Connection:0x007ffe187834a8 @client_id="ruby-kafka", @port=9093, @host="kafka2", @logger=#<Logger:0x007ffe1878dcc8 @progname=nil, @level=0, @default_formatter=#<Logger::Formatter:0x007ffe1878dc50 @datetime_format=nil>, @formatter=nil, @logdev=nil>, @instrumenter=#<Kafka::Instrumenter:0x007ffe1878db10 @default_payload={:client_id=>"ruby-kafka"}, @backend=ActiveSupport::Notifications>, @connect_timeout=10, @socket_timeout=10, @ssl_context=nil>, @node_id=2, @logger=#<Logger:0x007ffe1878dcc8 @progname=nil, @level=0, @default_formatter=#<Logger::Formatter:0x007ffe1878dc50 @datetime_format=nil>, @formatter=nil, @logdev=nil>>=>#<Kafka::MessageBuffer:0x007ffe16d24698 @buffer={"hits"=>{0=>[#<Kafka::Protocol::Message:0x007ffe18783bd8 @key=nil, @value="{\"test\":76}", @codec_id=0, @offset=-1, @create_time=2017-06-06 12:33:08 -0400, @bytesize=11>]}}, @size=1, @bytesize=11>}
Kafka::DeliveryFailed: Failed to send messages to hits/0
from /path/to/gem/ruby-2.3.3@goliath/gems/ruby-kafka-0.3.15/lib/kafka/producer.rb:332:in `deliver_messages_with_retries'

Add some basic validations of the kafka scope of the config

Unfortunately, we need to add basic rdkafka validations as for seed brokers, those fail silently async.

Let's start simple: seed brokers should have a format like so: host1:9092,host2:9092,192.168.1.2:9091. String with coma separated hosts with ports

Sender option

I think it's useful to know who(what kind of app) was a sender. Of course, I can just add sender: :api_name to message, but I'd be great to have it on waterdrop internals level.

Is instrumentation per producer only? + recovery question

Instrumentation Question:
I'm planning to do the production of messages in Sidekiq and I'd have lots of jobs doing the produce, I am planning on creating a producer on every job and then closing it after.

Is it actually recommended to create a lot of producers just as long as I close them?

In the documentation, the subscription to the producer events are per producer...I'm wondering if there's a way for it to be hooked into some initializer once and not per producer.

Recovery Question:
Currently, I am rescuing WaterDrop::Errors::MessageInvalidError and WaterDrop::Errors::ProducerClosedError on every call to produce_many_sync ... These are the 2 errors that I saw that would make the whole batch stop. Is there anything else that would make a whole batch stop? I understand that the rest would be an array of Rdkafka::Producer::DeliveryReport and individually can have an .error.

Thank you.

Can't use waterdrop in aspect way

After removing to_json from message:

@message = message.respond_to?(:to_json) ? message.to_json : message.to_s

We can not send messages using aspects. Even if I manually change message which want to send to json, waterdrop send hash using Formatter class:

def message
  {
    topic:  @options[:topic],
    method: @options[:method],
    message: @result,
    args: @args
  }
end

Then appear error:
NoMethodError: undefined method `bytesize' for #Hash:0x007fe5841235d8

kafka_host, kafka_hosts and kafka_ports options

The problem is that Karafka and Waterdrop requires options in different format. Karafka wants options in this format:

config.kafka_hosts = ["123.123.123.23:9092"]

and waterdrop in this one:

config.kafka_host = "123.123.123.23"
config.kafka_ports = ["9092"]

I feel that it's not unified enough and there should be the same option names and structure. Then I'd be able to use the same settignslogic namespace for both karafka and waterdrop.

Ability to validate successful responder call

Is your feature request related to a problem? Please describe.

I want the ability to confirm that a message was successfully produced to the topic on the broker. I don't care or want to rely on an acknowledgement from a downstream consumer. I simply wish to know if I successfully posted in the first place with some sort of return value or callback. It looks like neither responders nor WaterDrop::SyncProducer provide return values to validate the successful delivery of your message to the topic.

Describe the solution you'd like

Consider the scenario where perhaps the topic does not exist or there's an ACL error, etc.

Topic does not exist:

irb(main):001:0> WaterDrop::SyncProducer.call({'test' => 'test'}.to_json, topic: 'not.a.real.topic')
I, [2019-11-21T17:05:45.387698 #2436]  INFO -- : New topics added to target list: not.a.real.topic
I, [2019-11-21T17:05:45.388066 #2436]  INFO -- : Fetching cluster metadata from kafka://broker.kafka:9093
I, [2019-11-21T17:05:51.377430 #2436]  INFO -- : Discovered cluster metadata; nodes: broker2.kafka:9093 (node_id=218), broker3.kafka:9093 (node_id=254), broker1.kafka:9093 (node_id=175)
E, [2019-11-21T17:05:51.378117 #2436] ERROR -- : Failed to assign partitions to 1 messages in not.a.real.topic
W, [2019-11-21T17:05:51.378265 #2436]  WARN -- : Failed to send all messages to ; attempting retry 1 of 2 after 1s
I, [2019-11-21T17:05:52.380422 #2436]  INFO -- : Fetching cluster metadata from kafka://broker.kafka:9093
I, [2019-11-21T17:05:52.692198 #2436]  INFO -- : Discovered cluster metadata; nodes: broker3.kafka:9093 (node_id=254), broker2.kafka:9093 (node_id=218), broker1.kafka:9093 (node_id=175)
I, [2019-11-21T17:05:52.692766 #2436]  INFO -- : Sending 1 messages to broker1.kafka:9093 (node_id=175)
=> nil

We can see above that ruby kafka logs out some useful information denoting that the message fails to send. But functionality which should behave one way or another based on if the message successfully sent, has no way to rollback if this is the case. Ideally I'd want something to the effect of:

ActiveRecord::Base.transaction do
  # modify some local data

  # notify downstream services of this change
  responder_call = Responders::MyCoolResponder.call(message)
  
  # rollback i.e. do not commit local changes if unable to publish message for downstream 
  consumption
  raise ActiveRecord::Rollback unless responder_call.successful?
end

Describe alternatives you've considered

I could in theory convert this to a fully async process which then waits for the consuming service to publish an acknowledgement message to a topic I consume from. But in a workflow like this I don't actually care or need to take action based on the downstream systems receiving it. I simply wish to know that I posted the original message to a topic for their consumption. The current error handling for responders as I understand it protects against responding to a topic which has not been registered or total broker failures but not whether or not you can successfully produce a message in an acks=all model.

Release 2.0-beta1

We're using it for months now. I think it's time to craft the 2.0 release.

Fix deprecations for Ruby 2.7.0 in Waterdrop 2.0

.........../home/mencio/Software/Github/karafka/waterdrop/lib/water_drop/producer/buffer.rb:95: warning: Using the last argument as keyword parameters is deprecated; maybe ** should be added to the call
/home/mencio/Software/Github/karafka/waterdrop/.bundle/ruby/2.7.0/bundler/gems/rdkafka-ruby-d07c9595a063/lib/rdkafka/producer.rb:61: warning: The called method `produce' is defined here
..................................................................................................................................................................................../home/mencio/Software/Github/karafka/waterdrop/lib/water_drop/producer/async.rb:24: warning: Using the last argument as keyword parameters is deprecated; maybe ** should be added to the call
/home/mencio/Software/Github/karafka/waterdrop/.bundle/ruby/2.7.0/bundler/gems/rdkafka-ruby-d07c9595a063/lib/rdkafka/producer.rb:61: warning: The called method `produce' is defined here
...../home/mencio/Software/Github/karafka/waterdrop/lib/water_drop/producer/async.rb:46: warning: Using the last argument as keyword parameters is deprecated; maybe ** should be added to the call
/home/mencio/Software/Github/karafka/waterdrop/.bundle/ruby/2.7.0/bundler/gems/rdkafka-ruby-d07c9595a063/lib/rdkafka/producer.rb:61: warning: The called method `produce' is defined here
....................................................../home/mencio/Software/Github/karafka/waterdrop/lib/water_drop/producer/sync.rb:28: warning: Using the last argument as keyword parameters is deprecated; maybe ** should be added to the call

Provide a way to connect and disconnect from the cluster for both sync and async producers

  DeliveryBoy
    .send(:instance)
    .send(:async_producer)
    .instance_variable_get('@worker')
    .instance_variable_get('@producer')
    .instance_variable_get('@cluster')
    .disconnect
rescue
  false
end

on_worker_boot do
  DeliveryBoy
    .send(:instance)
    .send(:async_producer)
    .instance_variable_get('@worker')
    .instance_variable_get('@producer')
    .instance_variable_get('@cluster')
    .topics

Waterdrop's Performance Advantage?

Hello! Came by to ask the advantages of this Kafka client compared to the others. I'm looking for a performant producer which I can't achieve in my current producer client unfortunately.

Some of my questions:

  • do you have some performance metrics to easily compare at a glance?
  • what was your main motivation for this gem? How is it different from the others?
  • what was the best producer throughput that you have ever achieved?
  • why did you shift to rdkafka?

Thanks in advance.

New release

Any chance we could get a new release for #109 ? I'd love to not have to fork this until there's another one!

Cheers ๐Ÿป

Cannot send message with producer

I have followed the example app instructions and gotten the Ping Pong game to work. However, when I try to send a message with either WaterDrop::SyncProducer.call('message', topic: 'my-topic') or WaterDrop::AsyncProducer.call('message', topic: 'my-topic'), I receive the error syntax error near unexpected token 'message','.

How do I resolve this?

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.