Git Product home page Git Product logo

ruby-kafka's People

Contributors

abicky avatar beneddy avatar bquorning avatar charkost avatar cstyles avatar dasch avatar elyahou avatar gilibelz2 avatar isaacseymour avatar jasondoc3 avatar jobeus avatar jturkel avatar kirkokada avatar klippx avatar lairen avatar lmduc avatar mensfeld avatar nguyenquangminh0711 avatar pmustel avatar pt2pham avatar rkrage avatar sparrovv avatar spuun avatar tcrayford avatar tjwp avatar tomerpe avatar trthomps avatar vvuibert avatar yingnanliu avatar zerosign avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

ruby-kafka's Issues

Always getting Kafka::LeaderNotAvailable

kafka = Kafka.new(:seed_brokers => ["127.0.0.1:9093", "127.0.0.1:9094", "127.0.0.1:9095"], :client_id => "my-app", :logger => Logger.new(STDOUT))
producer = kafka.get_producer
producer.produce("value", :topic => "my-test")
I, [2016-02-06T20:27:55.716158 #12271]  INFO -- : Trying to initialize broker pool from node 127.0.0.1:9093
I, [2016-02-06T20:27:55.716365 #12271]  INFO -- : Opening connection to 127.0.0.1:9093 with client id my-app...
D, [2016-02-06T20:27:55.716890 #12271] DEBUG -- : Sending request 1 to 127.0.0.1:9093
D, [2016-02-06T20:27:55.717281 #12271] DEBUG -- : Waiting for response 1 from 127.0.0.1:9093
D, [2016-02-06T20:27:55.736754 #12271] DEBUG -- : Received response 1 from 127.0.0.1:9093
D, [2016-02-06T20:27:55.737007 #12271] DEBUG -- : Closing socket to 127.0.0.1:9093
Kafka::LeaderNotAvailable: Kafka::LeaderNotAvailable
    from /var/lib/gems/2.1.0/gems/ruby-kafka-0.1.2/lib/kafka/protocol.rb:32:in `handle_error'
    from /var/lib/gems/2.1.0/gems/ruby-kafka-0.1.2/lib/kafka/broker.rb:37:in `block (2 levels) in fetch_metadata'
    from /var/lib/gems/2.1.0/gems/ruby-kafka-0.1.2/lib/kafka/broker.rb:35:in `each'
    from /var/lib/gems/2.1.0/gems/ruby-kafka-0.1.2/lib/kafka/broker.rb:35:in `block in fetch_metadata'
    from /var/lib/gems/2.1.0/gems/ruby-kafka-0.1.2/lib/kafka/broker.rb:32:in `each'
    from /var/lib/gems/2.1.0/gems/ruby-kafka-0.1.2/lib/kafka/broker.rb:32:in `fetch_metadata'
    from /var/lib/gems/2.1.0/gems/ruby-kafka-0.1.2/lib/kafka/broker_pool.rb:95:in `block in fetch_cluster_info'
    from /var/lib/gems/2.1.0/gems/ruby-kafka-0.1.2/lib/kafka/broker_pool.rb:81:in `each'
    from /var/lib/gems/2.1.0/gems/ruby-kafka-0.1.2/lib/kafka/broker_pool.rb:81:in `fetch_cluster_info'
    from /var/lib/gems/2.1.0/gems/ruby-kafka-0.1.2/lib/kafka/broker_pool.rb:69:in `cluster_info'
    from /var/lib/gems/2.1.0/gems/ruby-kafka-0.1.2/lib/kafka/broker_pool.rb:44:in `partitions_for'
    from /var/lib/gems/2.1.0/gems/ruby-kafka-0.1.2/lib/kafka/producer.rb:158:in `produce'
    from (irb):36
    from /usr/bin/irb:11:in `<main>'

Using kafka_2.11-0.9.0.0. It's working with poseidon, as well as a python kafka client.

The last message in each group of fetched messages can have a truncated value

When I put 100,000 lorem ipsum messages into a topic they can all be retrieved fine from the kafka command line client, but the Ruby Kafka consumer was only getting 99,913 complete messages and 87 incomplete messages.

After a lot of digging I determined that the broker was running 87 requests that received 1148 messages and then one final request that received the remaining 124. The incomplete message was always the last message of each "full-size" batch of 1148.

Setting the min_bytes key did not seem to have any effect.

In my testing I've been able to get a consumer to retrieve all 100,000 complete records by adding a value size check to the loop. With this check the incomplete messages are not "processed" and the offset isn't incremented.

next unless message.value.size > 800

I'm going to keep digging and see if I can find out where the actual cut off is happening. I've done a fair bit of debugging in fetch_operation.rb and it looks like the problem is farther in.

# fetch_operation.rb#L66
response = broker.fetch_messages(**options) # => exhibits the truncated value problem

Logger is not currently optional

The README says by default nothing is logged, but currently exceptions are raised unless a logger is passed to Kafka.new

require "kafka"
kafka = Kafka.new(seed_brokers: BROKERS)
producer = kafka.producer
producer.produce("foo", topic: "test")
producer.deliver_messages

NoMethodError: undefined method `info' for nil:NilClass
    from .../ruby-kafka-0.2.0/lib/kafka/cluster.rb:39:in `add_target_topics'

#info is trying to be called on the nil logger object. This could be fixed by either setting a default logger (e.g. Logger.new("/dev/null")) or fixing all the logger calls to guard against nil.

Add timeouts to all network calls

Timeouts should be added when

  • connecting to a broker;
  • writing requests; and
  • reading responses.

We should never hang indefinitely.

Purge buffered messages for a topic if it doesn't exist

We ran into this in production: a topic hadn't been properly created in a production environment. Rather than dropping messages to that topic and raising an error, the producer held on to them. Eventually, the producer buffers filled up and new messages were dropped.

There are several options for how to handle this:

  • Raise an exception when delivering messages to a non-existing topic. This can be tricky with the async producer.
  • Drop messages to the topic and log the error.

Set up a full integration test suite

In order to ensure we don't introduce regressions, we should have a test suite running with realistic scenarios. Currently, setting up the Kafka cluster needed to run the tests is not even fully automated. Ideally, the suite would also allow simulating network failures, partition leader changes, etc.

I'm thinking that a Docker based setup would work the best.

Consumer sporadically fails to read offsets from the Kafka offset store on startup

I've been playing around with consumer APIs and I've noticed that consumer sporadically fails to read the last committed offset from the Kafka offset store on startup. My consumer looks pretty similar to the example from the doc:

require "kafka"

kafka = Kafka.new(seed_brokers: ["kafka.server.local:9092"])
consumer = kafka.consumer(group_id: "rules-service")
consumer.subscribe("rule_events")

consumer.each_message do |message|
  puts "Topic = #{message.topic}, Partition = #{message.partition}, Offset = #{message.offset}"
end

Every couple runs of this consumer it fails to fetch the last committed offset from the Kafka store and falls back to reading the earliest available offset.

Here's an extract from the logs before and after the consumer restart (note I added the "Left group" log statement):

Waiting for response 77 from 192.168.99.100:9092
Received response 77 from 192.168.99.100:9092
Sending heartbeat...
Sending request 78 to 192.168.99.100:9092
Waiting for response 78 from 192.168.99.100:9092
Received response 78 from 192.168.99.100:9092
Fetching batch from rule_events/1 starting at offset 5
Fetching batch from rule_events/0 starting at offset 8
Sending request 79 to 192.168.99.100:9092
Waiting for response 79 from 192.168.99.100:9092
Leaving group `rules-service`
Sending request 80 to 192.168.99.100:9092
Received response 79 from 192.168.99.100:9092
Received out-of-order response id 79, was expecting 80
Waiting for response 80 from 192.168.99.100:9092
Received response 80 from 192.168.99.100:9092
Left group `rules-service`

<CONSUMER RESTARTED HERE>

New topics added to target list: rule_events
Fetching cluster metadata from kafka.server.local:9092
Opening connection to kafka.server.local:9092 with client id ruby-kafka...
Sending request 1 to kafka.server.local:9092
Waiting for response 1 from kafka.server.local:9092
Received response 1 from kafka.server.local:9092
Discovered cluster metadata; nodes: 192.168.99.100:9092 (node_id=0)
Closing socket to kafka.server.local:9092
Joining group `rules-service`
Getting group coordinator for `rules-service`
Opening connection to 192.168.99.100:9092 with client id ruby-kafka...
Sending request 1 to 192.168.99.100:9092
Waiting for response 1 from 192.168.99.100:9092
Received response 1 from 192.168.99.100:9092
Coordinator for group `rules-service` is 192.168.99.100:9092 (node_id=0)
Sending request 2 to 192.168.99.100:9092
Waiting for response 2 from 192.168.99.100:9092
Received response 2 from 192.168.99.100:9092
Joined group `rules-service` with member id `ruby-kafka-a12a21d8-025a-45fb-b37d-4b2a5ebd4d04`
Chosen as leader of group `rules-service`
Sending request 3 to 192.168.99.100:9092
Waiting for response 3 from 192.168.99.100:9092
Received response 3 from 192.168.99.100:9092
Partitions assigned for `rule_events`: 1, 0
Sending request 4 to 192.168.99.100:9092
Waiting for response 4 from 192.168.99.100:9092
Received response 4 from 192.168.99.100:9092
Fetching batch from rule_events/1 starting at offset earliest
Fetching batch from rule_events/0 starting at offset earliest
Resolving offset `-2` for rule_events/1...
Resolving offset `-2` for rule_events/0...
Sending request 5 to 192.168.99.100:9092
Waiting for response 5 from 192.168.99.100:9092
Received response 5 from 192.168.99.100:9092
Offset for rule_events/1 is 0
Offset for rule_events/0 is 0
Sending request 6 to 192.168.99.100:9092
Waiting for response 6 from 192.168.99.100:9092
Received response 6 from 192.168.99.100:9092

I'm happy to work on a reproducible test case but wanted to see if anything obvious jumps out from the logs first.

Determine a partition based on a proc

Hey,

Most kafka producer implementation have an ability to decide on the partition based on some supplied logic that depends on a key and the partition count.

It is critical for us to have this ability.

Currently, a partition is created by
Zlib.crc32(key) % partition_count

Would it be possible to have it by a supplied proc and if not supplied, the above will be the default?

I can send a PR, which if a partition.respond_to?(:call) it will calculate it based on the proc

Would that be possible/acceptable?

Retry produce requests

There are several reasons why a produce request could fail:

  1. the broker is no longer leader of a partition being written to;
  2. network timeouts and stuff;
  3. not enough replicas are available;
  4. etc.

For some errors we need to re-fetch the cluster metadata.

Implement automatic partitioning

There are several strategies for assigning partitions to messages:

  1. Based on a digest of the message key;
  2. round-robin;
  3. randomized;
  4. based on a digest of a user-supplied partition key; or
  5. based on a user-supplied custom strategy.

I think if we have 1) and 4) we should have all cases covered, as the rest can be implemented in terms of those.

.partitions_for method and auto-create topics don't work well together

We encountered a problem with the Kafka::Producer. We have several services that publish to specific topics, each with a different number of paritions and different partition keys.

Our production and staging environments work fine because the Kafka clusters are already defined and running.

Our CI on the other hand uses docker images for integration tests between services, and specifically the Kafka cluster is set up with the auto-create topics flag set to true (which is not suitable for production environments).

Before publishing we use the number of partitions for each topic for our hashing function. When calling .partitions_for if the topic is not created ahead of time raises a Kafka::LeaderNotAvailable error.

These unveils two problems:

  1. The call to .partitions_for does cause the topic to be created, but consecutive calls to .partitions_for still raise the error (I'm not sure why this happens).
  2. There's no way around this in ruby-kafka. Topics can't be created via the gem, and we're forced to create ahead of time the topics, which makes the auto-create topics flag irrelevant.

I'd recommend either making available an API for creation of topics, or maybe having .partitions_for return the default value for auto-created topics? it's called num.partitions in the configuration (see here)

WDYT?

Implement the Consumer API

In Kafka, consumers form consumer groups around topics, dividing the responsibility for consuming individual topic partitions among themselves. Furthermore, a consumer can subscribe to multiple topics, thus participating in several groups at once. This is useful for e.g. joining two topics.

We need to figure out what level of abstraction to provide to users of ruby-kafka. Since processing frameworks can be built on top of the library, flexibility is important. At the same time, it should be possible to just use this library, so simplicity is also desirable.

The Java API is based on the consumer subscribing to one or more topics, and then polling for new messages:

KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
consumer.subscribe(Arrays.asList("foo", "bar"));
while (true) {
    ConsumerRecords<String, String> records = consumer.poll(100);
    for (ConsumerRecord<String, String> record : records)
        System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());
}

The most low-level Ruby API could look like this:

consumer = kafka.get_consumer
consumer.subcribe("ad-impressions", "ad-clicks")

loop do
  messages = consumer.poll
  messages.each do |message|
    puts message.topic, message.partition, message.offset, message.key, message.value
  end
  consumer.commit_offsets
end

This would mix messages from all topics that a consumer subscribes to. Higher level consumer loops can then be added to e.g. direct messages to topic-specific handlers. Another option is to instead return a list of message batches, with each batch being specific to a topic partition:

batches = consumer.poll
batches.each do |batch|
  puts batch.topic, batch.partition, batch.highwater_mark_offset

  batch.messages.each do |message|
    puts message.offset, message.key, message.value
  end

  batch.commit_offset
end

One important issue to consider is offset commits. In the above examples, we either commit all offsets for a consumer or, in the second example, individually commit offsets for specific partitions.

I want ruby-kafka to align itself with the protocol so that we can achieve maximum performance, hopefully without the API being too weird. The protocol specifically allows offset commits to include the offsets for several topic partitions at a time, so if we intend to minimize the number of round trips to the brokers, we should commit offsets in bulk (the first example.) That does, however, provide less granular control. I think allowing both would be preferable.

Furthermore, we can provide a high-level message loop that automatically commits offsets after each batch has been processed:

consumer.each_message do |message|
  puts message.topic, message.partition, message.offset, message.key, message.value
end
Tasks
  • Implement high level message oriented API.
  • Implement low level batch oriented API.

Add instrumentation hooks

There are several moving parts in the producer, and it will be important to measure:

  • The size of local buffers, both in number of pending messages and relative to the max buffer size.
  • Latency and throughput of individual API requests/responses.
  • Latency and throughput of high-level operations such as produce operations: #81.
  • Errors, timeouts, and retries.
  • Throughput and latency for individual topics/partitions: #60.
  • Per-message delivery delay – time spent in the buffer: #137.
  • Producer delivery rate – time between deliveries.

Exactly how the hooks will work also needs to be decided. One option would be to simply use ActiveSupport::Notifications if it's available, or an API compatible null sink if it's not. An alternative is a custom callback system, but that feels a bit like overkill to me.

Allow blocking when producing with the async producer

Currently, when calling #produce on an async producer when the message queue is full a BufferOverflow exception is raised. It would be nice to have the option of blocking instead. Bonus points if there's a configurable timeout.

Note that in order to do this correctly a more sophisticated bounded queue would need to be implemented.

No exception raised when `assign_partitions!` fails

deliver_messages silently (only logs an error) fails when assign_partitions! doesn't succeed.

rescue Kafka::Error => e
@logger.error "Failed to assign pending message to a partition: #{e}"
@cluster.mark_as_stale!
end

unless @buffer.empty?
partitions = @buffer.map {|topic, partition, _| "#{topic}/#{partition}" }.join(", ")
raise DeliveryFailed, "Failed to send messages to #{partitions}"
end

Messages still in @pending_message_queue might get lost after deliver_messages returns.

Namespace conflicts

Hey,

we use a different gem for the consumer, and there is a naming conflict since both gems (this and the one we use) use the Kafka namespace

Would it be possible to have this gem in a different namespace? (like ZenDesk::Kafka or RubyKafka or ZKafka, or whatever)?

Can't produce async messages

I tried adding the example configuration for async producing of messages to an initializer in my Rails app as follows:

$kafka = Kafka.new(seed_brokers: ['localhost:9092'], logger: Rails.logger)

$kafka_producer = $kafka.async_producer(delivery_interval: 1)

at_exit { $kafka_producer.shutdown }

Nothing happens when I try to produce a message from my Rails console.

If I instantiate a new producer it works however. Any ideas?

Consumer group API writes invalid user_data field

I have an app using ruby-kafka 0.3.2 to consume from a Kafka 0.9.0.0 broker. When I use the kafka-consumer-groups.sh admin script provided with the Kafka 0.9.0.0 distribution, e.g. to check the lag of the consumer group, I get the following error:

$ ~/opt/kafka/bin/kafka-consumer-groups.sh --new-consumer --command-config client-ssl.properties --bootstrap-server my-broker:9096 --describe --group searchify-1
Error while executing consumer group command Error reading field 'user_data': java.lang.IllegalArgumentException
org.apache.kafka.common.protocol.types.SchemaException: Error reading field 'user_data': java.lang.IllegalArgumentException
    at org.apache.kafka.common.protocol.types.Schema.read(Schema.java:71)
    at org.apache.kafka.clients.consumer.internals.ConsumerProtocol.deserializeAssignment(ConsumerProtocol.java:109)
    at kafka.admin.AdminClient$$anonfun$describeConsumerGroup$1.apply(AdminClient.scala:165)
    at kafka.admin.AdminClient$$anonfun$describeConsumerGroup$1.apply(AdminClient.scala:164)
    at scala.collection.immutable.List.map(List.scala:273)
    at kafka.admin.AdminClient.describeConsumerGroup(AdminClient.scala:164)
    at kafka.admin.ConsumerGroupCommand$KafkaConsumerGroupService.describeGroup(ConsumerGroupCommand.scala:314)
    at kafka.admin.ConsumerGroupCommand$ConsumerGroupService$class.describe(ConsumerGroupCommand.scala:84)
    at kafka.admin.ConsumerGroupCommand$KafkaConsumerGroupService.describe(ConsumerGroupCommand.scala:302)
    at kafka.admin.ConsumerGroupCommand$.main(ConsumerGroupCommand.scala:63)
    at kafka.admin.ConsumerGroupCommand.main(ConsumerGroupCommand.scala)

We have verified that the broker logs don't contain any errors corresponding to this error, so it seems like the broker is returning a response that the admin tool can't parse. I haven't dug into the source of kafka-consumer-groups.sh yet, but it seems like the ruby-kafka consumer group implementation may be setting some metadata in a way that trips up the admin tool?

I don't think it's a problem with the kafka-consumer-groups.sh script communicating with the broker, because the --list option works, with the same configuration:

$ ~/opt/kafka/bin/kafka-consumer-groups.sh --new-consumer --command-config client-ssl.properties --bootstrap-server my-broker:9096 --list
searchify-1

Adding a CHANGELOG

Would it be possible to add a changelog file so we will know what changed between versions?

thanks

Parallelize produce requests

Currently, we sequentially send requests to each broker involved when flushing messages from the producer. We could parallelize these requests in order to improve performance.

individual exceptions for connect timeouts / socket wait timeouts

Hi again,

question:

when i fetch messages, i regularly get a "connection timed out" after 10 seconds filling up my logfile. Increasing max_wait_time doesn't help because it's the connect/socket timeouts firing.
Sure, i could increase these timeout values, but when i plug off power from my kafka node, my client will timeout later than i'd like it to. So, what i'd really want is to know whether the timeout occurred while connecting/sending the fetch messages request to kafka or while waiting for messages. However, currently it seems i get a Errno::ETIMEDOUT for all cases. Maybe it's possible to subclass Errno::ETIMEDOUT for the individual cases of ConnectTimeout, ReadTimeout, WriteTimeout?

fetch_messages doesn't refresh metadata / doesn't add target topic

and thus under certain cases thinks a topic doesn't exist, because it operates on incomplete memoized cluster info. See:

irb> kafka = Kafka.new(...)
irb> producer = kafka.get_producer

irb> kafka.fetch_messages :topic => "my-topic", :partition => 0, :offset => 0
=> [#<Kafka::FetchedMessage:...>, #<Kafka::FetchedMessage:...>, ...]

# so far so good, let's produce a message

irb> producer.produce "message", :topic => "other-topic"
irb> producer.send_messages

# now fetching no longer works:

irb> kafka.fetch_messages :topic => "my-topic", :partition => 0, :offset => 0
Kafka::UnknownTopicOrPartition: no topic "my-topic"
    from .../lib/kafka/protocol/metadata_response.rb:99:in `find_leader_id'
    from .../lib/kafka/broker_pool.rb:87:in `get_leader_id'
    from .../lib/kafka/broker_pool.rb:66:in `get_leader'
    from .../lib/kafka/fetch_operation.rb:46:in `block (2 levels) in execute'
    from .../lib/kafka/fetch_operation.rb:45:in `each'
        ....

FetchOperation#execute should IMHO call @broker_pool.add_target_topics(target_topics) to refresh topic metadata like a producer does

Implement a broker connection pool

  • We need to discover the broker topology with a metadata request.
  • When producing message, we must figure out the leader for each message, group messages that should go to the same leader, and send each batch in a separate request.

Implement an asynchronous producer

A worker should run in a background thread, writing buffered messages to the brokers according to some policy, e.g. periodically, when the buffer reaches a specified size, etc.

Clients can write to the buffer through e.g. a queue.

Producer#send_messages signature

Looking at Producer#send_messages, we expect the producer instance to already have messages (which makes sense).

However, I was thinking about a more ruby-ish interface could be implemented as well. Something like:

producer.send_messages do |buffer|
  buffer << Message.new payload, key: key
end

# ...

def send_messages(&block)
  # ...
  yield block if block_given?
  # ...
  shutdown
end

I'm thinking there might be a clean way to relieve the user of the setup/teardown procedure. I am also wondering if you could use Enumerable#lazy on the buffer to flush every X number of messages passed in, or even Y number of bytes.

I don't have any concrete ideas, but I've always thought a block was a natural construct for interfacing with kafka (both producer and consumer).

Add a Consumer API without groups

The Kafka Offset API allows un-grouped consumers. It may make sense for some use cases.

From the protocol spec:

Note that when this API is used for a "simple consumer," which is not part of a consumer group, then the generationId must be set to -1 and the memberId must be empty (not null). Additionally, if there is an active consumer group with the same groupId, then the commit will be rejected (typically with an UNKNOWN_MEMBER_ID or ILLEGAL_GENERATION error).

The thing is, for offset commits to work you still need a group id. I'm not sure what a nice API would look like.

Avro Support ?

Just wondering if you have any plans to add Avro support and have an AvroProducer and AvroConsumer that integrate with the SchemaRegistry.

I'd be happy to help with the implementation if needed.

Creating a topic if it doesn't yet exist

I believe that the auto.create.topics.enable setting of the Kafka server is not being honoured. By default the value of this setting is true, which means that when trying to produce to a non-existing topic, Kafka should create it instead of returning an error.

Example:

> kafka = Kafka.new(
>         seed_brokers: [Settings.kafka.broker],
>         client_id: Settings.kafka.client_id,
>         logger: Logger.new($stderr),
>     ) 
> prod = kafka.get_producer
> prod.produce('{"a":1}', topic: "testtopic")
I, [2016-02-10T17:03:55.881822 #26727]  INFO -- : Trying to initialize broker pool from node localhost:9092
I, [2016-02-10T17:03:55.882356 #26727]  INFO -- : Opening connection to localhost:9092 with client id sessions_api...
D, [2016-02-10T17:03:55.888256 #26727] DEBUG -- : Sending request 1 to localhost:9092
D, [2016-02-10T17:03:55.888625 #26727] DEBUG -- : Waiting for response 1 from localhost:9092
D, [2016-02-10T17:03:55.898070 #26727] DEBUG -- : Received response 1 from localhost:9092
I, [2016-02-10T17:03:55.898310 #26727]  INFO -- : Initialized broker pool with brokers: [192.168.1.98:9092 (node_id=0)]
D, [2016-02-10T17:03:55.898408 #26727] DEBUG -- : Closing socket to localhost:9092
Kafka::UnknownTopicOrPartition: unknown topic testtopic
  from /Users/fcorreia/.rvm/gems/ruby-2.3.0/gems/ruby-kafka-0.1.3/lib/kafka/protocol/metadata_response.rb:131:in `partitions_for'
  from /Users/fcorreia/.rvm/gems/ruby-2.3.0/gems/ruby-kafka-0.1.3/lib/kafka/broker_pool.rb:44:in `partitions_for'
  from /Users/fcorreia/.rvm/gems/ruby-2.3.0/gems/ruby-kafka-0.1.3/lib/kafka/producer.rb:158:in `produce'
  from (irb):54

Improve API for specifying number of required ACKs

Currently, users need to pass in a number as the value of required_acks. If the number is greater than zero, it is interpreted as the number of replicas (the leader included) that must acknowledge the write before the request succeeds (not greater than the actual number of replicas.) If the number is -1, then all replicas must ACK. If the number is 0 then none need to ACK, not even the leader, and no response is sent back to the client (fire-and-forget).

Having the user need to understand these numbers is not good for usability, so I think we should use meaningful names instead. Maybe use a new parameter name for this, e.g. durability or consistency (update: even better, durability_guarantee?).

  • none – no durability guarantees.
  • one – at least one replica (the current leader) has acknowledged the write.
  • quorum – a majority of replicas have acknowledged the write.
  • all – all replicas have acknowledged the write.

Allow custom offset commit stores

While the default Kafka based offset commit & fetch API covers many use cases, sometimes it makes sense to use a custom data store. This is especially true if you store your offsets alongside your data.

manual reconnect

hi,

is it possible to manually reconnect? i have a consumer that "delays" messages by sleeping for a while if the message's timestamp is to "low". But after waking up i always get a "Connection Error: EOFError" during the following fetch_messages call. So, is it possible to proactively reconnect?

Thx for your help

Run functional specs in CI

Currently, only unit specs are run. We need to be able to dynamically set up a Kafka cluster in order to get the specs to work.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.