Git Product home page Git Product logo

poseidon's Introduction

Unmaintained

This project is currently unmaintained. There are a handful of other options for interacting with Kafka from Ruby:

  • A pure ruby client, ruby-kafka, which is 0.9 compatible and support consumer groups.
  • A REST proxy, Kafka Rest.
  • For JRuby there is jruby-kafka which wraps the Java consumer.

Poseidon Build Status Code Climate

Poseidon is a Kafka client. Poseidon only supports the 0.8 API and above.

Until 1.0.0 this should be considered ALPHA software and not neccessarily production ready.

Usage

API Documentation

Installing a Kafka broker locally

Follow the instructions on the Kafka wiki to build Kafka 0.8 and get a test broker up and running.

Sending messages to Kafka

require 'poseidon'

producer = Poseidon::Producer.new(["localhost:9092"], "my_test_producer")

messages = []
messages << Poseidon::MessageToSend.new("topic1", "value1")
messages << Poseidon::MessageToSend.new("topic2", "value2")
producer.send_messages(messages)

More detailed Poseidon::Producer documentation.

Fetching messages from Kafka

require 'poseidon'

consumer = Poseidon::PartitionConsumer.new("my_test_consumer", "localhost", 9092,
                                            "topic1", 0, :earliest_offset)

loop do
  messages = consumer.fetch
  messages.each do |m|
    puts m.value
  end
end

More detailed Poseidon::PartitionConsumer documentation.

Using snappy compression

To use snappy compression in your producers or consumers, install the snappy gem or simply add gem 'snappy' to your project's Gemfile.

Semantic Versioning

This gem follows SemVer. In particular, the public API should not be considered stable and anything may change without warning until Version 1.0.0. Additionally, for the purposes of the versioning the public API is everything documented in the public API docs.

Requirements

  • Ruby 1.9.3 or higher (1.9.2 and below not supported!!!)
  • Kafka 0.8 or higher

Integration Tests

In order to run integration tests you must specify a KAFKA_PATH environment variable which points to a built Kafka installation. To build Kafka locally follow the instructions provided by the project.

# cd ~/src/poseidon/
# bundle
# KAFKA_PATH=~/src/kafka bundle exec rake spec:all # run all unit and integration specs

The poseidon test suite will take care of spinning up and down the broker(s) needed for the integration tests.

poseidon's People

Contributors

aaronlasseigne avatar airhorns avatar bpot avatar dim avatar hadronzoo avatar kazjote avatar mnogu avatar rb2k avatar sclasen avatar sirupsen avatar tamird avatar v-a avatar zachmargolis avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

poseidon's Issues

Does not handle a single down broker

I have two brokers, and am configuring a partition producer as such:

I am getting this error when broker1 is down:

brokers = [ 'broker1.example.com:9092', 'broker2.example.com:9092' ]

end

gems/poseidon-0.0.5/lib/poseidon/sync_producer.rb:66:in `send_messages': Failed to send all messages: [list]

Any idea how to make this resilient against a broker being down?

Exception in ensure_connected is unhelpful

If someone specifies host.name=localhost in kafka server.properties or omits this config, the exception encountered when using poseidon to connect from another host is not helpful.

A common case would be to use vagrant and specify its ip address (as kafka_host).

producer = Poseidon::Producer.new(["#{kafka_host}:#{port}"], "sfx_producer")

poseidon reassigns @host (from kafka_host) to a value returned by kafka.

The subsequent exception is confusing, considering the ip address is valid. poseidon will obtain a valid socket, then fail when attempting to obtain another with the hostname provided by kafka.

If this hostname were identified via the exception (or the issue otherwise anticipated), users would be able to fix their hosts file or the kafka configuration -- without needing to obtain this information via debugging connection.rb.

host, port==>
192.168.111.88
9092
<==host,port

host, port==>
kafka.beatport.com
9092
<==host,port

/usr/local/Cellar/ruby/2.0.0-p247/lib/ruby/gems/2.0.0/gems/poseidon-0.0.4/lib/poseidon/connection.rb:101:in `initialize': getaddrinfo: nodename nor servname provided, or not known (SocketError)
    from /usr/local/Cellar/ruby/2.0.0-p247/lib/ruby/gems/2.0.0/gems/poseidon-0.0.4/lib/poseidon/connection.rb:101:in `new'
    from /usr/local/Cellar/ruby/2.0.0-p247/lib/ruby/gems/2.0.0/gems/poseidon-0.0.4/lib/poseidon/connection.rb:101:in `ensure_connected'
    from /usr/local/Cellar/ruby/2.0.0-p247/lib/ruby/gems/2.0.0/gems/poseidon-0.0.4/lib/poseidon/connection.rb:39:in `produce'
    from /usr/local/Cellar/ruby/2.0.0-p247/lib/ruby/gems/2.0.0/gems/poseidon-0.0.4/lib/poseidon/broker_pool.rb:42:in `execute_api_call'
    from /usr/local/Cellar/ruby/2.0.0-p247/lib/ruby/gems/2.0.0/gems/poseidon-0.0.4/lib/poseidon/sync_producer.rb:112:in `send_to_broker'
    from /usr/local/Cellar/ruby/2.0.0-p247/lib/ruby/gems/2.0.0/gems/poseidon-0.0.4/lib/poseidon/sync_producer.rb:53:in `block (2 levels) in send_messages'
    from /usr/local/Cellar/ruby/2.0.0-p247/lib/ruby/gems/2.0.0/gems/poseidon-0.0.4/lib/poseidon/sync_producer.rb:52:in `each'
    from /usr/local/Cellar/ruby/2.0.0-p247/lib/ruby/gems/2.0.0/gems/poseidon-0.0.4/lib/poseidon/sync_producer.rb:52:in `block in send_messages'
    from /usr/local/Cellar/ruby/2.0.0-p247/lib/ruby/gems/2.0.0/gems/poseidon-0.0.4/lib/poseidon/sync_producer.rb:43:in `times'
    from /usr/local/Cellar/ruby/2.0.0-p247/lib/ruby/gems/2.0.0/gems/poseidon-0.0.4/lib/poseidon/sync_producer.rb:43:in `send_messages'
    from /usr/local/Cellar/ruby/2.0.0-p247/lib/ruby/gems/2.0.0/gems/poseidon-0.0.4/lib/poseidon/producer.rb:159:in `send_messages'
    from /Users/direwolf/Desktop/producer_test.rb:31:in `<main>'

Use Zookeeper to discover brokers?

Other Kafka client libraries do only need Zookeeper node locations to start their clients. I believe that they discover the Kafka broker locations from Zookeeper then. Poseidon on the other hand needs both, Kafka Broker and Zookeeper locations to start the client.

I was wondering what the reasoning here was? Is this by design or something that just is not implement yet?

CPU consumption from consumers?

I was just experimenting with the gem, running on 2.0.0p353. When I fired up two consumers patterned after the example in the readme, my laptop fan spun up so I checked top; I noticed the 2 ruby processes at 60% in top and java at 111% CPU

I recall running into this before and on investigating, it seemed like the consumer never blocked even the default 100ms and was constantly fetching even on no data, so it drove the CPU crazy. Anybody ever seen this?

I can try additional debug and present extra info if suggested.

timeouts not handled gracefully

Poseidon doesn't appear to handle network latency or timeouts gracefully.

specifically, the documented :request_timeout_ms option doesn't appear to be implemented, and as such I don't have any control over the length I'm willing to wait for kafka to respond (on the producer side, at least).

additionally, when creating a producer within rails it seems wrapping calls within Timeout::timeout() has no effect.

definitely difficult to roll into production without granular control over timeouts. any help appreciated.

Release to rubygems

Can you push the gem to rubygems.org? I'd like to depend on it from a gemspec.

Sporadic error fetching data from consumer caused by checksum and decoding issue

I guess I should also investigate on my end how the Kafka messages in the subscribed topic are encoded, but for now, I get this error every once in a while when fetching messages as consumer. And this is from code based on the sample code.

When it doesn't happen, messages fetch fine, for messages in same topic. It can happen real quick, or after some time, I never ran it long enough (past 1-5 minutes or more) to be able to know if it might never happen at all. I believe the messages I fetch are of all the same format with varying data.

Since it works half the time at least, this seems to be some communications reliability issue than basic functional issue. Could it be timing related?

Is there a way to see what the consumer got when it failed at decoding the message? To help narrow down the problem. Also like if for checksum error, what was the (expected) computed and received checksums.

require 'poseidon'

consumer = Poseidon::PartitionConsumer.new("my_test_consumer", "localhost", 9092,
                                            "topic1", 0, :earliest_offset)

loop do
  messages = consumer.fetch
  messages.each do |m|
    puts m.value # or do whatever to m.value
  end
end

and here's the error message I often get:

/Users/daluu/.rvm/gems/ruby-2.2.1/gems/poseidon-0.0.5/lib/poseidon/protocol/protocol_struct.rb:192:in rescue in block in read': Error while reading messages in Poseidon::Protocol::MessageSetStructWithSize (Poseidon::Errors::ChecksumError: Poseidon::Errors::ChecksumError)) (Poseidon::Protocol::ProtocolStruct::DecodingError) from /Users/daluu/.rvm/gems/ruby-2.2.1/gems/poseidon-0.0.5/lib/poseidon/protocol/protocol_struct.rb:186:inblock in read'
from /Users/daluu/.rvm/gems/ruby-2.2.1/gems/poseidon-0.0.5/lib/poseidon/protocol/protocol_struct.rb:185:in each' from /Users/daluu/.rvm/gems/ruby-2.2.1/gems/poseidon-0.0.5/lib/poseidon/protocol/protocol_struct.rb:185:inread'
from /Users/daluu/.rvm/gems/ruby-2.2.1/gems/poseidon-0.0.5/lib/poseidon/protocol/protocol_struct.rb:155:in read' from /Users/daluu/.rvm/gems/ruby-2.2.1/gems/poseidon-0.0.5/lib/poseidon/message_set.rb:12:inread'
from /Users/daluu/.rvm/gems/ruby-2.2.1/gems/poseidon-0.0.5/lib/poseidon/protocol/protocol_struct.rb:239:in read_type' from /Users/daluu/.rvm/gems/ruby-2.2.1/gems/poseidon-0.0.5/lib/poseidon/protocol/protocol_struct.rb:230:inread_member'
from /Users/daluu/.rvm/gems/ruby-2.2.1/gems/poseidon-0.0.5/lib/poseidon/protocol/protocol_struct.rb:187:in block in read' from /Users/daluu/.rvm/gems/ruby-2.2.1/gems/poseidon-0.0.5/lib/poseidon/protocol/protocol_struct.rb:185:ineach'
from /Users/daluu/.rvm/gems/ruby-2.2.1/gems/poseidon-0.0.5/lib/poseidon/protocol/protocol_struct.rb:185:in read' from /Users/daluu/.rvm/gems/ruby-2.2.1/gems/poseidon-0.0.5/lib/poseidon/protocol/protocol_struct.rb:155:inread'
from /Users/daluu/.rvm/gems/ruby-2.2.1/gems/poseidon-0.0.5/lib/poseidon/protocol/protocol_struct.rb:239:in read_type' from /Users/daluu/.rvm/gems/ruby-2.2.1/gems/poseidon-0.0.5/lib/poseidon/protocol/protocol_struct.rb:227:inblock in read_member'
from /Users/daluu/.rvm/gems/ruby-2.2.1/gems/poseidon-0.0.5/lib/poseidon/protocol/protocol_struct.rb:227:in times' from /Users/daluu/.rvm/gems/ruby-2.2.1/gems/poseidon-0.0.5/lib/poseidon/protocol/protocol_struct.rb:227:ineach'
from /Users/daluu/.rvm/gems/ruby-2.2.1/gems/poseidon-0.0.5/lib/poseidon/protocol/protocol_struct.rb:227:in map' from /Users/daluu/.rvm/gems/ruby-2.2.1/gems/poseidon-0.0.5/lib/poseidon/protocol/protocol_struct.rb:227:inread_member'
from /Users/daluu/.rvm/gems/ruby-2.2.1/gems/poseidon-0.0.5/lib/poseidon/protocol/protocol_struct.rb:187:in block in read' from /Users/daluu/.rvm/gems/ruby-2.2.1/gems/poseidon-0.0.5/lib/poseidon/protocol/protocol_struct.rb:185:ineach'
from /Users/daluu/.rvm/gems/ruby-2.2.1/gems/poseidon-0.0.5/lib/poseidon/protocol/protocol_struct.rb:185:in read' from /Users/daluu/.rvm/gems/ruby-2.2.1/gems/poseidon-0.0.5/lib/poseidon/protocol/protocol_struct.rb:155:inread'
from /Users/daluu/.rvm/gems/ruby-2.2.1/gems/poseidon-0.0.5/lib/poseidon/protocol/protocol_struct.rb:239:in read_type' from /Users/daluu/.rvm/gems/ruby-2.2.1/gems/poseidon-0.0.5/lib/poseidon/protocol/protocol_struct.rb:227:inblock in read_member'
from /Users/daluu/.rvm/gems/ruby-2.2.1/gems/poseidon-0.0.5/lib/poseidon/protocol/protocol_struct.rb:227:in times' from /Users/daluu/.rvm/gems/ruby-2.2.1/gems/poseidon-0.0.5/lib/poseidon/protocol/protocol_struct.rb:227:ineach'
from /Users/daluu/.rvm/gems/ruby-2.2.1/gems/poseidon-0.0.5/lib/poseidon/protocol/protocol_struct.rb:227:in map' from /Users/daluu/.rvm/gems/ruby-2.2.1/gems/poseidon-0.0.5/lib/poseidon/protocol/protocol_struct.rb:227:inread_member'
from /Users/daluu/.rvm/gems/ruby-2.2.1/gems/poseidon-0.0.5/lib/poseidon/protocol/protocol_struct.rb:187:in block in read' from /Users/daluu/.rvm/gems/ruby-2.2.1/gems/poseidon-0.0.5/lib/poseidon/protocol/protocol_struct.rb:185:ineach'
from /Users/daluu/.rvm/gems/ruby-2.2.1/gems/poseidon-0.0.5/lib/poseidon/protocol/protocol_struct.rb:185:in read' from /Users/daluu/.rvm/gems/ruby-2.2.1/gems/poseidon-0.0.5/lib/poseidon/protocol/protocol_struct.rb:155:inread'
from /Users/daluu/.rvm/gems/ruby-2.2.1/gems/poseidon-0.0.5/lib/poseidon/connection.rb:120:in read_response' from /Users/daluu/.rvm/gems/ruby-2.2.1/gems/poseidon-0.0.5/lib/poseidon/connection.rb:76:infetch'
from /Users/daluu/.rvm/gems/ruby-2.2.1/gems/poseidon-0.0.5/lib/poseidon/partition_consumer.rb:108:in `fetch'

handle broker rebalances

I've noticed that if run the kafka preferred leader election tool and a new broker becomes leader for a given partition, I will see this kind of thing in the broker logs.

Produce request with correlation id X from client X partition [topic,partition] failed due to 
Leader not local for partition [topic,partition] on broker XXXX (kafka.server.KafkaApis)

Poseidon is the only producer in this case.

From the kafka ml, this is supposedly a transient state when using the java client, as it will eventually figure out the new leader.

Poseidon doesnt seem to. It keeps showing those warnings. If I restart the broker in question (forcibly closing the poseidon connections) things get back to a correct state.

Consumer does not appear to fetch all messages from a topic?

I'm new to Kafka, so let me know if this is a config/setup issue instead (and how I might workaround it), but I don't think it is. I'll try to test with test/dummy topics but so far using existing topics we have in our system. Not sure if I can replicate with a dummy topic.

Trying out this Ruby client, I notice that the client seems to only fetch a portion of the messages. We have messages that contain info like IP address and some unique ID, and we get a lot of messages through the system, too many to view on screen manually (big JSON strings), so I filter the output through grep on command line (or filter in the Ruby client code against the message collection object).

When I filter for certain IP address or ID that I know should exist in the messages, I get no output. Performing the same thing with the _kafka-console-consumer.sh_ script that's part of Kafka, it works fine, although executing that, we do specify and use zookeeper and not a simple consumer.

I just started trying out Kafka clients for other language bindings to compare. It looks like pykafka Python client works fine for me also with their simple consumer (not using zookeeper). I can filter by IP or ID and get output.

The messages all come from a single partition (zero) on one leader Kafka server, with two others that are not leaders. I connect to the leader. There is zookeeper running, and I'm only testing with one consumer client, although there might be other consumers running from other folks in my organization (or when I simultaneously debug with the Kafka consumer shell script, etc.). I also tried poseidon_cluster using their sample code (that uses zookeeper) but that didn't seem to help with this problem at all.

Has anyone encountered a similar issue? FYI, I'm just using consumer like in the sample code for consumer. And topics come from existing Kafka system, not publishing from this Ruby client to test with (yet).

I can try and attach message info or logs, whatever's needed, but I'm not sure what I can share, with respect to my organization's data. Don't have anything to post right now other than this.

Is there no support for consumer groups?

I'm sorry this isn't really an issue but I wasn't sure where to ask.

When creating a consumer instance the format is

consumer = Poseidon::PartitionConsumer.new(client_id, host, port, topic, partition)

Do I understand correctly that client_id is nothing related to a consumer group, and that poseidon does not currently support consumer groups?

If so, whats the purpose of the client_id? I see its used in the connection but I can't find references on the kafka documents to it.

Thanks

Write to Kafka when loosing a Broker Connection

Greetings,

we are running a 3 node broker kafka cluster with:

num.partitions=3
default.replication.factor=3

with losing a broker we can no longer write to kafka via poseidon.
the reference java producer works fine.

working with poseidon (ruby) and java:

root@kafka-001:/opt/kafka# bin/kafka-topics.sh --zookeeper zookeeper-001.com --describe
Topic:click     PartitionCount:3        ReplicationFactor:3     Configs:
        Topic: click    Partition: 0    Leader: 3       Replicas: 3,1,2 Isr: 3,2,1
        Topic: click    Partition: 1    Leader: 3       Replicas: 1,2,3 Isr: 3,2,1
        Topic: click    Partition: 2    Leader: 2       Replicas: 2,3,1 Isr: 3,2,1
Topic:flow_id_tracking  PartitionCount:3        ReplicationFactor:3     Configs:
        Topic: flow_id_tracking Partition: 0    Leader: 3       Replicas: 3,2,1 Isr: 3,2,1
        Topic: flow_id_tracking Partition: 1    Leader: 3       Replicas: 1,3,2 Isr: 3,2,1
        Topic: flow_id_tracking Partition: 2    Leader: 2       Replicas: 2,1,3 Isr: 3,2,1

Non working with poseidon (ruby) but working with java:

root@kafka-001:/opt/kafka# bin/kafka-topics.sh --zookeeper zookeeper-001.com --describe
Topic:click     PartitionCount:3        ReplicationFactor:3     Configs:
        Topic: click    Partition: 0    Leader: 3       Replicas: 3,1,2 Isr: 3,2
        Topic: click    Partition: 1    Leader: 3       Replicas: 1,2,3 Isr: 3,2
        Topic: click    Partition: 2    Leader: 2       Replicas: 2,3,1 Isr: 3,2
Topic:flow_id_tracking  PartitionCount:3        ReplicationFactor:3     Configs:
        Topic: flow_id_tracking Partition: 0    Leader: 3       Replicas: 3,2,1 Isr: 3,2
        Topic: flow_id_tracking Partition: 1    Leader: 3       Replicas: 1,3,2 Isr: 3,2
        Topic: flow_id_tracking Partition: 2    Leader: 2       Replicas: 2,1,3 Isr: 3,2

This is the error message I have :

 #<RuntimeError: Failed to send all messages: [#<Poseidon::Message:0x007f7def47cb78 @struct=#<struct Poseidon::Protocol::MessageWithOffsetStruct offset=0, message=#<struct Poseidon::Protocol::MessageStruct magic_type=0, attributes=0, key=nil, value="{"something"}}}}">>, @topic="click">] remaining>!!!

0.0.4 broken for Kafka 0.8.1.1 partitioned topics

Hello,

The latest release of the gem (0.0.4) does not work properly with the latest release of Kafka. Non partitioned topics are fine, but the strategy for sending messages for partitions fails, with no writes actually being committed to Kafka.

I swapped out the last release for what you have in master, and once you account for the fix needed here (#61), the gem works for partitioned topics.

Are you planning on issuing a new release, even if it's a beta one, to make working with Poseidon easier?

What Kafka version is supported exactly? 0.8 doesn't seem to work.

I tried 0.8.0-beta1, 0.8.0 and 0.8.1 Kafkas, but none of them seem to work with Poseidon. Same error message:

[2014-04-21 14:27:45,526] ERROR Closing socket for /127.0.0.1 because of error (kafka.network.Processor)
kafka.common.KafkaException: Wrong request type 768
    at kafka.api.RequestKeys$.deserializerForKey(RequestKeys.scala:53)
    at kafka.network.RequestChannel$Request.<init>(RequestChannel.scala:49)
    at kafka.network.Processor.read(SocketServer.scala:345)
    at kafka.network.Processor.run(SocketServer.scala:245)
    at java.lang.Thread.run(Thread.java:744)

Here is a snippet I use to send a message

require 'poseidon'

producer = Poseidon::Producer.new(["localhost:9092"], "my_test_producer")
messages = []
topic = "test"
messages << Poseidon::MessageToSend.new(topic, "Hello from Ruby producer")
producer.send_messages(messages)

Investigating how to keep consumers up if a kafka node is lost

Apologies if this isn't the proper forum to ask questions about design/behavior.

I have some ruby daemon/services (made with poseidon and poseidon_cluster) consuming from a kafka cluster. In trying to do maintenance on the kafka cluster, I was attempting to some rolling shutdowns with the services still running, operating on the remaining kafka nodes. But, as soon as the first node of the cluster goes down, my ruby services crash calling PartitionConsumer#fetch.

As a side-note, when I look through the kafka docs and java consumer clients, there doesn't appear to ever be an assumption of broker location when starting consumers, it's all looked up from zookeeper as far as I can tell. I'm curious if that's a goal of poseidon as well?

Question is though, are others building ruby consumers that tolerate kafka node failures? Are there things to consider to failover more seamlessly?

Poseidon not fetching from within Redstorm

I'm using Redstorm to run a Storm topology in JRuby (jruby 1.7.19 (1.9.3p551)) and need to consume messages from Kafka topics. I want to build my own Kafka spout (input stream for Storm) but this fails to fetch messages, although a similar JRuby consumer outside of Redstorm does successfully fetch messages.

The RedStorm spout is written as follows, adapted from the example Poseidon consumer, but split into a init and consume part:

require 'red_storm'
require 'poseidon'

class KafkaSpout < RedStorm::DSL::Spout

  on_init do
    # Poseidon consumer to consume Kafka messages
    @consumer = Poseidon::PartitionConsumer.new "storm_consumer", "localhost", 9092, "cashless_transactions", 0, :earliest_offset

    # message buffer with initial set of messages, we do a fetch to make sure that all attributes of the consumer are set
    @messages = @consumer.fetch
    print_debug
  end

  on_send do
    # Try to consume messages from Kafka if the @messages buffer is empty
    if @messages.empty?
      @messages = @consumer.fetch
      print_debug
    end

    # If we received new messages, output them from the spout, one by one
    @messages.shift if @messages.size > 0
  end

  def print_debug
    # Print some debug info to verify that all behaves as expected
    puts "Highwater mark = #{@consumer.highwater_mark}"
    puts "Offset         = #{@consumer.offset}"
    puts "Next offset    = #{@consumer.next_offset}"
    puts "Host           = #{@consumer.host}"
    puts "Port           = #{@consumer.port}"
    puts "Topic          = #{@consumer.topic}"
    puts "Message buffer = #{@messages.size}"
  end

end

After every fetch, I'm doing a bit of debugging to check on Poseidon's behavior. When the Redstorm topology is executed (with JRuby), no messages are being fetched. The debug repeats itself and looks as follows:

[...]
Highwater mark = 261037
Offset         = 0
Next offset    = 0
Host           = localhost
Port           = 9092
Topic          = cashless_transactions
Message buffer = 0
Highwater mark = 261037
Offset         = 0
Next offset    = 0
Host           = localhost
Port           = 9092
Topic          = cashless_transactions
Message buffer = 0
[...]

Poseidon correctly detects that I have 261037 (highwater mark) messages in my topic, but isn't advancing, not fetching messages and remaining at offset 0 after each fetch. It should be noted that all messages are small (~100 bytes). I have attempted a few fetch option settings, but neither of these help.

Poseidon's example consumer on the same Kafka instance and topic works just fine (without Redstorm, but with JRuby):

require 'poseidon'

consumer = Poseidon::PartitionConsumer.new("my_test_consumer", "localhost", 9092,
                                            "cashless_transactions", 0, :earliest_offset)

loop do
  messages = consumer.fetch
  puts "#{messages.size} messages fetched"
end

with output:

3016 messages fetched
2955 messages fetched
2938 messages fetched
2930 messages fetched
2923 messages fetched
2912 messages fetched
[...]

I have no idea how to further debug/resolve this issue. All seems fine, except for fetching messages. Any input into getting Poseidon to fetch the messages, or help debugging the issue that only seems to exist when ran from within Redstorm (JRuby works fine on the example consumer), would be very appreciated!

Publisher topic generates successfully but message doesn't post.

Consumer works like a charm but the producer is another story. When attempting to post to a topic the library will generate the topic, if not already there, but the message to be posted never posts. Infact I get the following error.

/Users/foo/.rbenv/versions/2.1.0/gemsets/kafka/gems/poseidon-0.0.5/lib/poseidon/sync_producer.rb:66:in `send_messages': Failed to send all messages: [#<Poseidon::Message:0x007fbb7a9f1ef0 @struct=#<struct Poseidon::Protocol::MessageWithOffsetStruct offset=0, message=#<struct Poseidon::Protocol::MessageStruct magic_type=0, attributes=0, key=nil, value="hello">>, @topic="test">] remaining (RuntimeError)
    from /Users/foo/.rbenv/versions/2.1.0/gemsets/kafka/gems/poseidon-0.0.5/lib/poseidon/producer.rb:163:in `send_messages'
    from producer.rb:6:in `<main>'

My implementation is very simple.

require 'poseidon'
producer = Poseidon::Producer.new(['foo:9092'], 'test_producer')
message_buffer = []
message_buffer << Poseidon::MessageToSend.new('test_topic', 'bar')
producer.send_messages(message_buffer)

Kafka install: kafka_2.9.2-0.8.1.1

Please advise...

1.0 Release

What additional functionality do we want for a 1.0 release?

  • Unified async/sync callback oriented producer interface like Java producer in 0.8.2.
  • Separate reactor style communication thread. Instead of communicating with brokers in serial we will have a single thread which communicates with the brokers and triggers callbacks.
  • High level consumer in new API.
  • Investigate adding activesupport notification events.
  • LZ4?

There will be a 0.0.9 release with these API changes and then a 1.0.0 release once they are solid.

Partition Consumer - "fetch_one" option

Hi bpot. Thanks for this GEM. I've been putting it through its paces and its great.

I'm looking for advice - I have a use case where I need to read the next message for a topic queue. I want only one message (or none if there are none available). Message sizes vary.

I realize that Kafka may not be really engineered for this, but all of my other use cases work well with Kafka with this one exception. I really don't want to introduce another messaging system into my architecture just to solve this issue.

Would you entertain on option to the PartitionConsumer (e.g. "fetch_one") which would only acknowledge the first message, and mark its offset? I can make the changes if you like.

I realize it will be somewhat inefficient...Thoughts?

Another feature that would be really useful is knowing how big the next message is (e.g. it was truncated, so not returned). It looks like truncated messages are discarded, but I think we know how big they actually will be. My use case for this would be used in conjunction with the above option - set max_bytes to the expected message size (making the "fetch_one" option more efficient), but if no messages are retrieved (and highwater_mark > current offset), then we need to re-read but with a larger max_bytes. Instead of guessing how large, it would be helpful to know exactly how big we need to make the request. I've tried to figure out how to do this, but my meta programming skills are not up to the task. Makes my head hurt. LOL.

Thoughts?

Again - thanks for this awesome GEM.

Logging?

Is there a way to turn on additional logging or anything?

I created a producer that appears to connect - in fact, I see the topics get created. The result of "@producer.send_messages" returns true. However, no message appears to get sent.

(Well - kafka-avro-console-consumer tells me it consumed 0 messages. I'm new to kafka and am almost certainly doing something silly.)

In any case, I'm hoping there is a way to turn on some level of extended logging to determine what is not right...

Ray

Obtaining topic metadata

I would like to use Poseidon as part of a Kafka monitoring tool. This means I'd like to get the topic metadata periodically, which I hope includes the first and last offset on the topic, the current leader, and the ISR status.

Ideally, this would also have a way to list the topics as well.

Is this possible with the current code, and if so, how? If not, what approach would you use?

Thanks!

Message#value seems to be escaped

I publish, using kafka-console-producer, escaped JSON such as {\"name\":\"kris\",\"color\":\"blue\"}.

When I receive it using Poseidon I get "{\\\"name\\\":\\\"kris\\\",\\\"color\\\":\\\"blue\\\"}", which is doubly escaped.

Using ``kafka-console-consumer` I get the message as published.

Is this expected?

Handle request timeout in Producer

If required_acks is > 1 and the leader is unable to get acks from replicas quick enough it will return a timeout error. We should add a setting that allows a user to indicate that they want to retry when this happens. We don't want to do this by default because retrying timeout errors is not idempotent.

Prior to implementing this investigate what other conditions could cause a timeout error. If required_acks is 0 will a leader issue a timeout if it's under high-load and the write to disk takes too long?

This would also be a good time to fix #31

Unexpected behavior with keyless messages when batch size < partitions

A behavior that makes sense after looking at the code, but you might want to document, is as follows.

If you are sending keyless messages to a topic that has more partitions than your usual message batch size, some partitions will never get messages. Found this testing sending to a topic with 8 partitions where the batch size was always 1,2 or 3.

Even if you have a batch size that is larger than num partitions, if it is a fixed size, some partitions will get more messages than others. May be a good idea to suggest producer batch sizes are some multiple of the number of partitons to evenly spread messages?

This is because MessagesToSendBatch creates a new MessageConductor and hence a new @partition_counter each batch.

Poseidon::Protocol::ProtocolStruct::EncodingError

I'm frequently getting the following error (similar to #11):

Poseidon::Protocol::ProtocolStruct::EncodingError: Error writting messages in Poseidon::Protocol::MessageSetStructWithSize (Poseidon::Protocol::ProtocolStruct::EncodingError: Error writting message in Poseidon::Protocol::MessageWithOffsetStruct (Poseidon::Protocol::ProtocolStruct::EncodingError: Error writting value in Poseidon::Protocol::MessageStruct (Encoding::CompatibilityError: incompatible character encodings: ASCII-8BIT and UTF-8)))

When calling:

Poseidon::MessageToSend.new(topic, JSON.dump(data))

Here's the backtrace:

/lib/poseidon/protocol/protocol_struct.rb:106:in `rescue in block (3 levels) in write`
/lib/poseidon/protocol/protocol_struct.rb:103:in `block (3 levels) in write`
/lib/poseidon/protocol/protocol_struct.rb:102:in `each_pair`
/lib/poseidon/protocol/protocol_struct.rb:102:in `block (2 levels) in write`
/lib/poseidon/protocol/protocol_struct.rb:129:in `maybe_prepend_crc32`
/lib/poseidon/protocol/protocol_struct.rb:101:in `block in write`
/lib/poseidon/protocol/protocol_struct.rb:116:in `block in maybe_prepend_size`
/lib/poseidon/protocol/request_buffer.rb:69:in `prepend_size`
/lib/poseidon/protocol/protocol_struct.rb:115:in `maybe_prepend_size`
/lib/poseidon/protocol/protocol_struct.rb:100:in `write`
/lib/poseidon/message_set.rb:73:in `to_compressed_message`
/lib/poseidon/message_set.rb:52:in `compress`
/lib/poseidon/messages_for_broker.rb:30:in `block (2 levels) in build_protocol_objects`
/lib/poseidon/messages_for_broker.rb:27:in `each`
/lib/poseidon/messages_for_broker.rb:27:in `map`
/lib/poseidon/messages_for_broker.rb:27:in `block in build_protocol_objects`
/lib/poseidon/messages_for_broker.rb:24:in `each`
/lib/poseidon/messages_for_broker.rb:24:in `map`
/lib/poseidon/messages_for_broker.rb:24:in `build_protocol_objects`
/lib/poseidon/sync_producer.rb:111:in `send_to_broker`
/lib/poseidon/sync_producer.rb:53:in `block (2 levels) in send_messages`
/lib/poseidon/sync_producer.rb:52:in `each`
/lib/poseidon/sync_producer.rb:52:in `block in send_messages`
/lib/poseidon/sync_producer.rb:43:in `times`
/lib/poseidon/sync_producer.rb:43:in `send_messages`
/lib/poseidon/producer.rb:159:in `send_messages`

The resulting JSON is UTF-8 encoded, and I can manually encode it to ASCII-8BIT without exception. What are the encoding requirements for Poseidon::MessageToSend.new that will prevent this error?

Trouble producing if too many brokers are down

I was experimenting with different failure conditions and found that if I shut down two of five Kafka brokers poseidon would often fail to produce, and instead responded with "Failed to send all messages". More details:

  • I ran zookeeper and 5 Kafka brokers locally (more or less as described here).
  • I created a topic with 5 partitions and a replication factor of 3:
    bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 3 --topic 3
  • I killed two of the brokers using an interrupt signal, and allowed time for the brokers to rebalance.
  • I attempted to produce:
producer = Poseidon::Producer.new(["127.0.0.1:9091", "127.0.0.1:9092", "127.0.0.1:9093", "127.0.0.1:9094", "127.0.0.1:9095"], "my_test_producer", {required_acks: 0 })
  messages = [Poseidon::MessageToSend.new('3', "value1")]
  producer.send_messages(messages)
  • I saw a RuntimeError with the message 'Failed to send all messages'

For certain combinations of brokers being down I was able to produce, for other combinations I was not able to produce.

Seeking a consumer by offset

Heya,

I have a need for seeking a consumer, i.e. the equivalent of reopening a consumer with the desired offset. Simply setting the @offset variable in the current implementation would work just fine, I believe [1]. Will you take a patch for this?

[1] More than believing, if I do "consumer.instance_variable_set(:@offset, N)" , everything works as expected :)

Handle partitions with no leader for Partition.consumer_for_partition

  1) producing with broke failures produces a bunch of messages and consumes all without error
     Failure/Error: consumer = PartitionConsumer.consumer_for_partition("consumer_failure_spect",
     NoMethodError:
       undefined method `host' for nil:NilClass
     # ./lib/poseidon/partition_consumer.rb:29:in `consumer_for_partition'
     # ./spec/integration/multiple_brokers/failure_spec.rb:46:in `block (3 levels) in <top (required)>'
     # ./spec/integration/multiple_brokers/failure_spec.rb:45:in `upto'
     # ./spec/integration/multiple_brokers/failure_spec.rb:45:in `block (2 levels) in <top (required)>'

Produce request failed on 0.8.1.1 cluster

I have a 3 node kafka cluster running 0.8.1.1, recently updated from 0.8.1 and noticing now that producing from Ruby/Poseidon is having trouble. If I'm reading correctly, it appears that Poseidon is attempting to produce on partition 1 on kafka3, but partition 1 is on kafka1.

Does this look like a client problem (Poseidon)?

client code

producer = Poseidon::Producer.new(["kafka1.internal:9092"], "my_test_producer")
messages = []
topic = "topic1"
messages << Poseidon::MessageToSend.new(topic, "Hello from Ruby producer")
producer.send_messages(messages)
=> true

Error message on kafka3

2014-09-03 06:10:04,553] WARN [KafkaApi-3] Produce request with correlation id 1 from client my_test_producer on partition [topic1,1] failed due to Partition [topic1,1] doesn't exist on 3 (kafka.server.KafkaApis)

Describe

Topic:topic1 PartitionCount:2 ReplicationFactor:1 Configs:
Topic: topic1 Partition: 0 Leader: 3 Replicas: 3 Isr: 3
Topic: topic1 Partition: 1 Leader: 1 Replicas: 1 Isr: 1

kafka1 log

[2014-09-03 06:05:01,359] INFO Verifying properties (kafka.utils.VerifiableProperties)
[2014-09-03 06:05:01,404] INFO Property broker.id is overridden to 1 (kafka.utils.VerifiableProperties)
[2014-09-03 06:05:01,405] INFO Property host.name is overridden to stage-kafka1.internal (kafka.utils.VerifiableProperties)
[2014-09-03 06:05:01,405] WARN Property log.cleanup.interval.mins is not valid (kafka.utils.VerifiableProperties)
[2014-09-03 06:05:01,405] INFO Property log.dirs is overridden to /var/log/kafka-logs (kafka.utils.VerifiableProperties)
[2014-09-03 06:05:01,406] INFO Property log.flush.interval.messages is overridden to 10000 (kafka.utils.VerifiableProperties)
[2014-09-03 06:05:01,406] INFO Property log.flush.interval.ms is overridden to 1000 (kafka.utils.VerifiableProperties)
[2014-09-03 06:05:01,406] INFO Property log.retention.hours is overridden to 168 (kafka.utils.VerifiableProperties)
[2014-09-03 06:05:01,406] INFO Property log.segment.bytes is overridden to 536870912 (kafka.utils.VerifiableProperties)
[2014-09-03 06:05:01,407] INFO Property num.io.threads is overridden to 2 (kafka.utils.VerifiableProperties)
[2014-09-03 06:05:01,407] INFO Property num.network.threads is overridden to 2 (kafka.utils.VerifiableProperties)
[2014-09-03 06:05:01,407] INFO Property num.partitions is overridden to 2 (kafka.utils.VerifiableProperties)
[2014-09-03 06:05:01,407] INFO Property port is overridden to 9092 (kafka.utils.VerifiableProperties)
[2014-09-03 06:05:01,407] INFO Property socket.receive.buffer.bytes is overridden to 1048576 (kafka.utils.VerifiableProperties)
[2014-09-03 06:05:01,408] INFO Property socket.request.max.bytes is overridden to 104857600 (kafka.utils.VerifiableProperties)
[2014-09-03 06:05:01,408] INFO Property socket.send.buffer.bytes is overridden to 1048576 (kafka.utils.VerifiableProperties)
[2014-09-03 06:05:01,408] INFO Property zookeeper.connect is overridden to 10.208.106.230:2181,10.208.9.144:2181,10.208.0.161:2181 (kafka.utils.VerifiableProperties)
[2014-09-03 06:05:01,408] INFO Property zookeeper.connection.timeout.ms is overridden to 1000000 (kafka.utils.VerifiableProperties)
[2014-09-03 06:05:01,424] INFO [Kafka Server 1], starting (kafka.server.KafkaServer)
[2014-09-03 06:05:01,426] INFO [Kafka Server 1], Connecting to zookeeper on 10.208.106.230:2181,10.208.9.144:2181,10.208.0.161:2181 (kafka.server.KafkaServer)
[2014-09-03 06:05:01,642] INFO Log directory '/var/log/kafka-logs' not found, creating it. (kafka.log.LogManager)
[2014-09-03 06:05:01,659] INFO Starting log cleanup with a period of 300000 ms. (kafka.log.LogManager)
[2014-09-03 06:05:01,664] INFO Starting log flusher with a default period of 9223372036854775807 ms. (kafka.log.LogManager)
[2014-09-03 06:05:01,708] INFO Awaiting socket connections on stage-kafka1.internal:9092. (kafka.network.Acceptor)
[2014-09-03 06:05:01,709] INFO [Socket Server on Broker 1], Started (kafka.network.SocketServer)
[2014-09-03 06:05:01,793] INFO Will not load MX4J, mx4j-tools.jar is not in the classpath (kafka.utils.Mx4jLoader$)
[2014-09-03 06:05:01,835] INFO 1 successfully elected as leader (kafka.server.ZookeeperLeaderElector)
[2014-09-03 06:05:01,928] INFO Registered broker 1 at path /brokers/ids/1 with address stage-kafka1.internal:9092. (kafka.utils.ZkUtils$)
[2014-09-03 06:05:01,947] INFO [Kafka Server 1], started (kafka.server.KafkaServer)
[2014-09-03 06:05:02,014] INFO New leader is 1 (kafka.server.ZookeeperLeaderElector$LeaderChangeListener)
[2014-09-03 06:10:04,287] INFO Topic creation {"version":1,"partitions":{"1":[1],"0":[3]}} (kafka.admin.AdminUtils$)
[2014-09-03 06:10:04,291] INFO [KafkaApi-1] Auto creation of topic topic1 with 2 partitions and replication factor 1 is successful! (kafka.server.KafkaApis)
[2014-09-03 06:10:04,402] INFO [ReplicaFetcherManager on broker 1] Removed fetcher for partitions topic1,1
[2014-09-03 06:10:04,431] INFO Completed load of log topic1-1 with log end offset 0 (kafka.log.Log)
[2014-09-03 06:10:04,433] INFO Created log for partition [topic1,1] in /var/log/kafka-logs with properties {segment.index.bytes -> 10485760, file.delete.delay.ms -> 60000, segment.bytes -> 536870912, flush.ms -> 1000, delete.retention.ms -> 86400000, index.interval.bytes -> 4096, retention.bytes -> -1, cleanup.policy -> delete, segment.ms -> 604800000, max.message.bytes -> 1000012, flush.messages -> 10000, min.cleanable.dirty.ratio -> 0.5, retention.ms -> 604800000}. (kafka.log.LogManager)
[2014-09-03 06:10:04,434] WARN Partition [topic1,1] on broker 1: No checkpointed highwatermark is found for partition topic1,1

kafka2 log

[2014-09-03 06:05:23,451] INFO Verifying properties (kafka.utils.VerifiableProperties)
[2014-09-03 06:05:23,496] INFO Property broker.id is overridden to 2 (kafka.utils.VerifiableProperties)
[2014-09-03 06:05:23,496] INFO Property host.name is overridden to stage-kafka2.internal (kafka.utils.VerifiableProperties)
[2014-09-03 06:05:23,497] WARN Property log.cleanup.interval.mins is not valid (kafka.utils.VerifiableProperties)
[2014-09-03 06:05:23,497] INFO Property log.dirs is overridden to /var/log/kafka-logs (kafka.utils.VerifiableProperties)
[2014-09-03 06:05:23,497] INFO Property log.flush.interval.messages is overridden to 10000 (kafka.utils.VerifiableProperties)
[2014-09-03 06:05:23,497] INFO Property log.flush.interval.ms is overridden to 1000 (kafka.utils.VerifiableProperties)
[2014-09-03 06:05:23,498] INFO Property log.retention.hours is overridden to 168 (kafka.utils.VerifiableProperties)
[2014-09-03 06:05:23,498] INFO Property log.segment.bytes is overridden to 536870912 (kafka.utils.VerifiableProperties)
[2014-09-03 06:05:23,498] INFO Property num.io.threads is overridden to 2 (kafka.utils.VerifiableProperties)
[2014-09-03 06:05:23,498] INFO Property num.network.threads is overridden to 2 (kafka.utils.VerifiableProperties)
[2014-09-03 06:05:23,498] INFO Property num.partitions is overridden to 2 (kafka.utils.VerifiableProperties)
[2014-09-03 06:05:23,499] INFO Property port is overridden to 9092 (kafka.utils.VerifiableProperties)
[2014-09-03 06:05:23,499] INFO Property socket.receive.buffer.bytes is overridden to 1048576 (kafka.utils.VerifiableProperties)
[2014-09-03 06:05:23,499] INFO Property socket.request.max.bytes is overridden to 104857600 (kafka.utils.VerifiableProperties)
[2014-09-03 06:05:23,499] INFO Property socket.send.buffer.bytes is overridden to 1048576 (kafka.utils.VerifiableProperties)
[2014-09-03 06:05:23,500] INFO Property zookeeper.connect is overridden to 10.208.106.230:2181,10.208.9.144:2181,10.208.0.161:2181 (kafka.utils.VerifiableProperties)
[2014-09-03 06:05:23,500] INFO Property zookeeper.connection.timeout.ms is overridden to 1000000 (kafka.utils.VerifiableProperties)
[2014-09-03 06:05:23,515] INFO [Kafka Server 2], starting (kafka.server.KafkaServer)
[2014-09-03 06:05:23,517] INFO [Kafka Server 2], Connecting to zookeeper on 10.208.106.230:2181,10.208.9.144:2181,10.208.0.161:2181 (kafka.server.KafkaServer)
[2014-09-03 06:05:23,656] INFO Log directory '/var/log/kafka-logs' not found, creating it. (kafka.log.LogManager)
[2014-09-03 06:05:23,673] INFO Starting log cleanup with a period of 300000 ms. (kafka.log.LogManager)
[2014-09-03 06:05:23,678] INFO Starting log flusher with a default period of 9223372036854775807 ms. (kafka.log.LogManager)
[2014-09-03 06:05:23,722] INFO Awaiting socket connections on stage-kafka2.internal:9092. (kafka.network.Acceptor)
[2014-09-03 06:05:23,723] INFO [Socket Server on Broker 2], Started (kafka.network.SocketServer)
[2014-09-03 06:05:23,813] INFO Will not load MX4J, mx4j-tools.jar is not in the classpath (kafka.utils.Mx4jLoader$)
[2014-09-03 06:05:23,869] INFO conflict in /controller data: {"version":1,"brokerid":2,"timestamp":"1409724323819"} stored data: {"version":1,"brokerid":1,"timestamp":"1409724301799"} (kafka.utils.ZkUtils$)
[2014-09-03 06:05:24,004] INFO Registered broker 2 at path /brokers/ids/2 with address stage-kafka2.internal:9092. (kafka.utils.ZkUtils$)
[2014-09-03 06:05:24,019] INFO [Kafka Server 2], started (kafka.server.KafkaServer)

kafka3 log

[2014-09-03 06:05:44,300] INFO Verifying properties (kafka.utils.VerifiableProperties)
[2014-09-03 06:05:44,342] INFO Property broker.id is overridden to 3 (kafka.utils.VerifiableProperties)
[2014-09-03 06:05:44,342] INFO Property host.name is overridden to stage-kafka3.internal (kafka.utils.VerifiableProperties)
[2014-09-03 06:05:44,343] WARN Property log.cleanup.interval.mins is not valid (kafka.utils.VerifiableProperties)
[2014-09-03 06:05:44,343] INFO Property log.dirs is overridden to /var/log/kafka-logs (kafka.utils.VerifiableProperties)
[2014-09-03 06:05:44,343] INFO Property log.flush.interval.messages is overridden to 10000 (kafka.utils.VerifiableProperties)
[2014-09-03 06:05:44,344] INFO Property log.flush.interval.ms is overridden to 1000 (kafka.utils.VerifiableProperties)
[2014-09-03 06:05:44,344] INFO Property log.retention.hours is overridden to 168 (kafka.utils.VerifiableProperties)
[2014-09-03 06:05:44,344] INFO Property log.segment.bytes is overridden to 536870912 (kafka.utils.VerifiableProperties)
[2014-09-03 06:05:44,344] INFO Property num.io.threads is overridden to 2 (kafka.utils.VerifiableProperties)
[2014-09-03 06:05:44,344] INFO Property num.network.threads is overridden to 2 (kafka.utils.VerifiableProperties)
[2014-09-03 06:05:44,345] INFO Property num.partitions is overridden to 2 (kafka.utils.VerifiableProperties)
[2014-09-03 06:05:44,345] INFO Property port is overridden to 9092 (kafka.utils.VerifiableProperties)
[2014-09-03 06:05:44,345] INFO Property socket.receive.buffer.bytes is overridden to 1048576 (kafka.utils.VerifiableProperties)
[2014-09-03 06:05:44,345] INFO Property socket.request.max.bytes is overridden to 104857600 (kafka.utils.VerifiableProperties)
[2014-09-03 06:05:44,346] INFO Property socket.send.buffer.bytes is overridden to 1048576 (kafka.utils.VerifiableProperties)
[2014-09-03 06:05:44,346] INFO Property zookeeper.connect is overridden to 10.208.106.230:2181,10.208.9.144:2181,10.208.0.161:2181 (kafka.utils.VerifiableProperties)
[2014-09-03 06:05:44,346] INFO Property zookeeper.connection.timeout.ms is overridden to 1000000 (kafka.utils.VerifiableProperties)
[2014-09-03 06:05:44,361] INFO [Kafka Server 3], starting (kafka.server.KafkaServer)
[2014-09-03 06:05:44,362] INFO [Kafka Server 3], Connecting to zookeeper on 10.208.106.230:2181,10.208.9.144:2181,10.208.0.161:2181 (kafka.server.KafkaServer)
[2014-09-03 06:05:44,476] INFO Log directory '/var/log/kafka-logs' not found, creating it. (kafka.log.LogManager)
[2014-09-03 06:05:44,491] INFO Starting log cleanup with a period of 300000 ms. (kafka.log.LogManager)
[2014-09-03 06:05:44,496] INFO Starting log flusher with a default period of 9223372036854775807 ms. (kafka.log.LogManager)
[2014-09-03 06:05:44,537] INFO Awaiting socket connections on stage-kafka3.internal:9092. (kafka.network.Acceptor)
[2014-09-03 06:05:44,538] INFO [Socket Server on Broker 3], Started (kafka.network.SocketServer)
[2014-09-03 06:05:44,618] INFO Will not load MX4J, mx4j-tools.jar is not in the classpath (kafka.utils.Mx4jLoader$)
[2014-09-03 06:05:44,662] INFO conflict in /controller data: {"version":1,"brokerid":3,"timestamp":"1409724344623"} stored data: {"version":1,"brokerid":1,"timestamp":"1409724301799"} (kafka.utils.ZkUtils$)
[2014-09-03 06:05:44,786] INFO Registered broker 3 at path /brokers/ids/3 with address stage-kafka3.internal:9092. (kafka.utils.ZkUtils$)
[2014-09-03 06:05:44,799] INFO [Kafka Server 3], started (kafka.server.KafkaServer)
[2014-09-03 06:10:04,437] INFO [ReplicaFetcherManager on broker 3] Removed fetcher for partitions topic1,0
[2014-09-03 06:10:04,471] INFO Completed load of log topic1-0 with log end offset 0 (kafka.log.Log)
[2014-09-03 06:10:04,476] INFO Created log for partition [topic1,0] in /var/log/kafka-logs with properties {segment.index.bytes -> 10485760, file.delete.delay.ms -> 60000, segment.bytes -> 536870912, flush.ms -> 1000, delete.retention.ms -> 86400000, index.interval.bytes -> 4096, retention.bytes -> -1, cleanup.policy -> delete, segment.ms -> 604800000, max.message.bytes -> 1000012, flush.messages -> 10000, min.cleanable.dirty.ratio -> 0.5, retention.ms -> 604800000}. (kafka.log.LogManager)
[2014-09-03 06:10:04,477] WARN Partition [topic1,0] on broker 3: No checkpointed highwatermark is found for partition topic1,0
[2014-09-03 06:10:04,553] WARN [KafkaApi-3] Produce request with correlation id 1 from client my_test_producer on partition [topic1,1] failed due to Partition [topic1,1] doesn't exist on 3 (kafka.server.KafkaApis)
[2014-09-03 06:10:04,558] INFO [KafkaApi-3] Send the close connection response due to error handling produce request [clientId = my_test_producer, correlationId = 1, topicAndPartition = [topic1,1]] with Ack=0 (kafka.server.KafkaApis)

Missing messages in consumer

Hi,
I have a simple consumer script exactly the same as the one in the readme. I put numbers from 1 to 5 into a topic using a producer. I can consume and see these numbers from 1 to 5 using the kafka-console-consumer.sh, but using poseidon I receive the messages like below.

1
3
5

Any suggestions?

zookeeper support

AFAICT, the Java client is able to store its offsets in ZooKeeper, which makes sure that a client will not lose messages even if the consumer goes down for a while.

Are there any plans to add ZooKeeper support to poseidon? Would you want a patch that adds this?

Don't throw an exception when no leader is available.

/home/bpot/.gem/ruby/1.9.3/gems/poseidon-0.0.1/lib/poseidon/message_conductor.rb:32:in `destination': undefined method `leader' for nil:NilClass (NoMethodError)
        from /home/bpot/.gem/ruby/1.9.3/gems/poseidon-0.0.1/lib/poseidon/messages_to_send_batch.rb:20:in `block in messages_for_brokers'
        from /home/bpot/.gem/ruby/1.9.3/gems/poseidon-0.0.1/lib/poseidon/messages_to_send_batch.rb:18:in `each'
        from /home/bpot/.gem/ruby/1.9.3/gems/poseidon-0.0.1/lib/poseidon/messages_to_send_batch.rb:18:in `messages_for_brokers'
        from /home/bpot/.gem/ruby/1.9.3/gems/poseidon-0.0.1/lib/poseidon/messages_to_send.rb:30:in `messages_for_brokers'
        from /home/bpot/.gem/ruby/1.9.3/gems/poseidon-0.0.1/lib/poseidon/sync_producer.rb:45:in `block in send_messages'
        from /home/bpot/.gem/ruby/1.9.3/gems/poseidon-0.0.1/lib/poseidon/sync_producer.rb:40:in `times'
        from /home/bpot/.gem/ruby/1.9.3/gems/poseidon-0.0.1/lib/poseidon/sync_producer.rb:40:in `send_messages'
        from /home/bpot/.gem/ruby/1.9.3/gems/poseidon-0.0.1/lib/poseidon/producer.rb:159:in `send_messages'
        from wut.rb:10:in `<main>'

encoding compatibility errors

When writing UTF-8 chars in a kafka message we are getting

Encoding::CompatibilityError: incompatible character encodings: ASCII-8BIT and UTF-8

 app error: Error writting messages_for_topics in Poseidon::Protocol::ProduceRequest (Poseidon::Protocol::ProtocolStruct::EncodingError: Error writting messages_for_partitions in Poseidon::Protocol::MessagesForTopic (Poseidon::Protocol::ProtocolStruct::EncodingError: Error writting message_set in Poseidon::Protocol::MessagesForPartition (Poseidon::Protocol::ProtocolStruct::EncodingError: Error writting messages in Poseidon::Protocol::MessageSetStructWithSize (Poseidon::Protocol::ProtocolStruct::EncodingError: Error writting message in Poseidon::Protocol::MessageWithOffsetStruct (Poseidon::Protocol::ProtocolStruct::EncodingError: Error writting value in Poseidon::Protocol::MessageStruct (Encoding::CompatibilityError: incompatible character encodings: ASCII-8BIT and UTF-8)))))) (Poseidon::Protocol::ProtocolStruct::EncodingError)

Probably due to

https://github.com/bpot/poseidon/blob/master/lib/poseidon/protocol/request_buffer.rb#L10

Can/Should that encoding be made configurable?

Fetching metadata when no routing to leader does not comply to socket_timeout_ms parameter.

How to reproduce:

  • initialise poseidon publisher, start kafka console consumer, publish test message (works)
  • on kafka leader machine:
    sudo iptables -A INPUT -p tcp --dport 7200 -j DROP && sudo service kafka stop
  • publish test message (message not delivered)
  • refresh metadata - hangs for > 1min (no aggressive timeout here). After this time publishing messages works again

Will poseidon be maintained continuously in the coming future?

I'd like to use poseidon in production environment, but I found the message in README.md--"Until 1.0.0 this should be considered ALPHA software and not neccessarily production ready.".
And unfortunately, it seems that poseidon have not been updated for a long time.

So I wondered that if poseidon will be maitained continuously in the coming future ?
I'd like to help maitain poseidon if possible.

Sporadic NoMethodErrors when calling Poseidon::PartitionConsumer.consumer_for_partition

I'm using Poseidon::PartitionConsumer.consumer_for_partition to get a PartitionConsumer on a cluster of kafka servers. Every once in a while it will blow up with a stack trace like this:

NoMethodError: undefined method `partition_leader' for nil:NilClass
  from poseidon/partition_consumer.rb:26:in `consumer_for_partition'
  from poseidon/broker_pool.rb:12:in `open'
  from poseidon/partition_consumer.rb:30:in `block in consumer_for_partition'
  from poseidon/cluster_metadata.rb:52:in `lead_broker_for_partition'

My calling code is like so:

  def consumer
    @consumer ||= Poseidon::PartitionConsumer.consumer_for_partition(@app, kafka_servers,
                                                                     @topic, @partition, @offset)
  end

  def kafka_servers
   ["host1:9092", "host2:9092", "host3:9092"]
  end

I'm able to rescue and retry (at which point it recovers), but it seems like a bad thing. What am I doing wrong?

Don't attempt to send to partitions with unavailable brokers

For keyless messages we should exclude these partitions from the set of available partitions. Right now we assume the leader value in the topic metadata will be nil, but Kafka actually returns a -1 if a partition has no leader, it also sets the error code which we should probably be checking.

https://github.com/bpot/poseidon/blob/master/lib/poseidon/topic_metadata.rb#L54

For keyed messages we shouldn't attempt to send those messages at all and fail after a period of time if we're not getting a new leader from the refreshed metadata.

Poseidon::Connection#read_response hides Timeout errors

I want to be able to read from Kafka indefinitely in an infinite loop. When developing locally on my laptop, I often have no messages to read. I'd like to just block the process calling #fetch by rescuing timeouts and rerunning #fetch. However, the exception class ConnectionFailedError is raised in place of not only TimeoutException, but also Errno::ECONNRESET and SocketError.

The consequence for me is that my spin loop can't distinguish between network errors and a partition that simply has no messages to send down to the client. Is there another way I should be doing this, e.g. detecting if I have no messages before I call fetch?

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.