Git Product home page Git Product logo

fluent-plugin-kafka's Introduction

fluent-plugin-kafka, a plugin for Fluentd

GitHub Actions Status

A fluentd plugin to both consume and produce data for Apache Kafka.

Installation

Add this line to your application's Gemfile:

gem 'fluent-plugin-kafka'

And then execute:

$ bundle

Or install it yourself as:

$ gem install fluent-plugin-kafka --no-document

If you want to use zookeeper related parameters, you also need to install zookeeper gem. zookeeper gem includes native extension, so development tools are needed, e.g. ruby-devel, gcc, make and etc.

Requirements

  • Ruby 2.1 or later
  • Input plugins work with kafka v0.9 or later
  • Output plugins work with kafka v0.8 or later

Usage

Common parameters

SSL authentication

  • ssl_ca_cert
  • ssl_client_cert
  • ssl_client_cert_key
  • ssl_client_cert_key_password
  • ssl_ca_certs_from_system

Set path to SSL related files. See Encryption and Authentication using SSL for more detail.

SASL authentication

with GSSAPI
  • principal
  • keytab

Set principal and path to keytab for SASL/GSSAPI authentication. See Authentication using SASL for more details.

with Plain/SCRAM
  • username
  • password
  • scram_mechanism
  • sasl_over_ssl

Set username, password, scram_mechanism and sasl_over_ssl for SASL/Plain or Scram authentication. See Authentication using SASL for more details.

Input plugin (@type 'kafka')

Consume events by single consumer.

<source>
  @type kafka

  brokers <broker1_host>:<broker1_port>,<broker2_host>:<broker2_port>,..
  topics <listening topics(separate with comma',')>
  format <input text type (text|json|ltsv|msgpack)> :default => json
  message_key <key (Optional, for text format only, default is message)>
  add_prefix <tag prefix (Optional)>
  add_suffix <tag suffix (Optional)>

  # Optionally, you can manage topic offset by using zookeeper
  offset_zookeeper    <zookeer node list (<zookeeper1_host>:<zookeeper1_port>,<zookeeper2_host>:<zookeeper2_port>,..)>
  offset_zk_root_node <offset path in zookeeper> default => '/fluent-plugin-kafka'

  # ruby-kafka consumer options
  max_bytes     (integer) :default => nil (Use default of ruby-kafka)
  max_wait_time (integer) :default => nil (Use default of ruby-kafka)
  min_bytes     (integer) :default => nil (Use default of ruby-kafka)
</source>

Supports a start of processing from the assigned offset for specific topics.

<source>
  @type kafka

  brokers <broker1_host>:<broker1_port>,<broker2_host>:<broker2_port>,..
  format <input text type (text|json|ltsv|msgpack)>
  <topic>
    topic     <listening topic>
    partition <listening partition: default=0>
    offset    <listening start offset: default=-1>
  </topic>
  <topic>
    topic     <listening topic>
    partition <listening partition: default=0>
    offset    <listening start offset: default=-1>
  </topic>
</source>

See also ruby-kafka README for more detailed documentation about ruby-kafka.

Consuming topic name is used for event tag. So when the target topic name is app_event, the tag is app_event. If you want to modify tag, use add_prefix or add_suffix parameters. With add_prefix kafka, the tag is kafka.app_event.

Input plugin (@type 'kafka_group', supports kafka group)

Consume events by kafka consumer group features..

<source>
  @type kafka_group

  brokers <broker1_host>:<broker1_port>,<broker2_host>:<broker2_port>,..
  consumer_group <consumer group name, must set>
  topics <listening topics(separate with comma',')>
  format <input text type (text|json|ltsv|msgpack)> :default => json
  message_key <key (Optional, for text format only, default is message)>
  kafka_message_key <key (Optional, If specified, set kafka's message key to this key)>
  add_headers <If true, add kafka's message headers to record>
  add_prefix <tag prefix (Optional)>
  add_suffix <tag suffix (Optional)>
  retry_emit_limit <Wait retry_emit_limit x 1s when BuffereQueueLimitError happens. The default is nil and it means waiting until BufferQueueLimitError is resolved>
  use_record_time (Deprecated. Use 'time_source record' instead.) <If true, replace event time with contents of 'time' field of fetched record>
  time_source <source for message timestamp (now|kafka|record)> :default => now
  time_format <string (Optional when use_record_time is used)>

  # ruby-kafka consumer options
  max_bytes               (integer) :default => 1048576
  max_wait_time           (integer) :default => nil (Use default of ruby-kafka)
  min_bytes               (integer) :default => nil (Use default of ruby-kafka)
  offset_commit_interval  (integer) :default => nil (Use default of ruby-kafka)
  offset_commit_threshold (integer) :default => nil (Use default of ruby-kafka)
  fetcher_max_queue_size  (integer) :default => nil (Use default of ruby-kafka)
  refresh_topic_interval  (integer) :default => nil (Use default of ruby-kafka)
  start_from_beginning    (bool)    :default => true
</source>

See also ruby-kafka README for more detailed documentation about ruby-kafka options.

topics supports regex pattern since v0.13.1. If you want to use regex pattern, use /pattern/ like /foo.*/.

Consuming topic name is used for event tag. So when the target topic name is app_event, the tag is app_event. If you want to modify tag, use add_prefix or add_suffix parameter. With add_prefix kafka, the tag is kafka.app_event.

Input plugin (@type 'rdkafka_group', supports kafka consumer groups, uses rdkafka-ruby)

โš ๏ธ The in_rdkafka_group consumer was not yet tested under heavy production load. Use it at your own risk!

With the introduction of the rdkafka-ruby based input plugin we hope to support Kafka brokers above version 2.1 where we saw compatibility issues when using the ruby-kafka based @kafka_group input type. The rdkafka-ruby lib wraps the highly performant and production ready librdkafka C lib.

<source>
  @type rdkafka_group
  topics <listening topics(separate with comma',')>
  format <input text type (text|json|ltsv|msgpack)> :default => json
  message_key <key (Optional, for text format only, default is message)>
  kafka_message_key <key (Optional, If specified, set kafka's message key to this key)>
  add_headers <If true, add kafka's message headers to record>
  add_prefix <tag prefix (Optional)>
  add_suffix <tag suffix (Optional)>
  retry_emit_limit <Wait retry_emit_limit x 1s when BuffereQueueLimitError happens. The default is nil and it means waiting until BufferQueueLimitError is resolved>
  use_record_time (Deprecated. Use 'time_source record' instead.) <If true, replace event time with contents of 'time' field of fetched record>
  time_source <source for message timestamp (now|kafka|record)> :default => now
  time_format <string (Optional when use_record_time is used)>

  # kafka consumer options
  max_wait_time_ms 500
  max_batch_size 10000
  kafka_configs {
    "bootstrap.servers": "brokers <broker1_host>:<broker1_port>,<broker2_host>:<broker2_port>",
    "group.id": "<consumer group name>"
  }
</source>

See also rdkafka-ruby and librdkafka for more detailed documentation about Kafka consumer options.

Consuming topic name is used for event tag. So when the target topic name is app_event, the tag is app_event. If you want to modify tag, use add_prefix or add_suffix parameter. With add_prefix kafka, the tag is kafka.app_event.

Output plugin

This kafka2 plugin is for fluentd v1 or later. This plugin uses ruby-kafka producer for writing data. If ruby-kafka doesn't fit your kafka environment, check rdkafka2 plugin instead. This will be out_kafka plugin in the future.

<match app.**>
  @type kafka2

  brokers               <broker1_host>:<broker1_port>,<broker2_host>:<broker2_port>,.. # Set brokers directly

  # Kafka topic, placerholders are supported. Chunk keys are required in the Buffer section inorder for placeholders
  # to work.
  topic                 (string) :default => nil
  topic_key             (string) :default => 'topic'
  partition_key         (string) :default => 'partition'
  partition_key_key     (string) :default => 'partition_key'
  message_key_key       (string) :default => 'message_key'
  default_topic         (string) :default => nil
  default_partition_key (string) :default => nil
  record_key            (string) :default => nil
  default_message_key   (string) :default => nil
  exclude_topic_key     (bool)   :default => false
  exclude_partition_key (bool)   :default => false
  exclude_partition     (bool)   :default => false
  exclude_message_key   (bool)   :default => false
  get_kafka_client_log  (bool)   :default => false
  headers               (hash)   :default => {}
  headers_from_record   (hash)   :default => {}
  use_event_time        (bool)   :default => false
  use_default_for_unknown_topic (bool) :default => false
  discard_kafka_delivery_failed (bool) :default => false (No discard)
  partitioner_hash_function (enum) (crc32|murmur2) :default => 'crc32'
  share_producer        (bool)   :default => false

  # If you intend to rely on AWS IAM auth to MSK with long lived credentials
  # https://docs.aws.amazon.com/msk/latest/developerguide/iam-access-control.html
  #
  # For AWS STS support, see status in
  # - https://github.com/zendesk/ruby-kafka/issues/944
  # - https://github.com/zendesk/ruby-kafka/pull/951
  sasl_aws_msk_iam_access_key_id (string) :default => nil
  sasl_aws_msk_iam_secret_key_id (string) :default => nil
  sasl_aws_msk_iam_aws_region    (string) :default => nil

  <format>
    @type (json|ltsv|msgpack|attr:<record name>|<formatter name>) :default => json
  </format>

  # Optional. See https://docs.fluentd.org/v/1.0/configuration/inject-section
  <inject>
    tag_key tag
    time_key time
  </inject>

  # See fluentd document for buffer related parameters: https://docs.fluentd.org/v/1.0/configuration/buffer-section
  # Buffer chunk key should be same with topic_key. If value is not found in the record, default_topic is used.
  <buffer topic>
    flush_interval 10s
  </buffer>

  # ruby-kafka producer options
  idempotent        (bool)    :default => false
  sasl_over_ssl     (bool)    :default => true
  max_send_retries  (integer) :default => 1
  required_acks     (integer) :default => -1
  ack_timeout       (integer) :default => nil (Use default of ruby-kafka)
  compression_codec (string)  :default => nil (No compression. Depends on ruby-kafka: https://github.com/zendesk/ruby-kafka#compression)
</match>

The <formatter name> in <format> uses fluentd's formatter plugins. See formatter article.

Note: Java based Kafka client uses murmur2 as partitioner function by default. If you want to use same partitioning behavior with fluent-plugin-kafka, change it to murmur2 instead of crc32. Note that for using murmur2 hash partitioner function, you must install digest-murmurhash gem.

ruby-kafka sometimes returns Kafka::DeliveryFailed error without good information. In this case, get_kafka_client_log is useful for identifying the error cause. ruby-kafka's log is routed to fluentd log so you can see ruby-kafka's log in fluentd logs.

Supports following ruby-kafka's producer options.

  • max_send_retries - default: 2 - Number of times to retry sending of messages to a leader.
  • required_acks - default: -1 - The number of acks required per request. If you need flush performance, set lower value, e.g. 1, 2.
  • ack_timeout - default: nil - How long the producer waits for acks. The unit is seconds.
  • compression_codec - default: nil - The codec the producer uses to compress messages.
  • max_send_limit_bytes - default: nil - Max byte size to send message to avoid MessageSizeTooLarge. For example, if you set 1000000(message.max.bytes in kafka), Message more than 1000000 byes will be dropped.
  • discard_kafka_delivery_failed - default: false - discard the record where Kafka::DeliveryFailed occurred

If you want to know about detail of monitoring, see also https://github.com/zendesk/ruby-kafka#monitoring

See also Kafka::Client for more detailed documentation about ruby-kafka.

This plugin supports compression codec "snappy" also. Install snappy module before you use snappy compression.

$ gem install snappy --no-document

snappy gem uses native extension, so you need to install several packages before. On Ubuntu, need development packages and snappy library.

$ sudo apt-get install build-essential autoconf automake libtool libsnappy-dev

On CentOS 7 installation is also necessary.

$ sudo yum install gcc autoconf automake libtool snappy-devel

This plugin supports compression codec "lz4" also. Install extlz4 module before you use lz4 compression.

$ gem install extlz4 --no-document

This plugin supports compression codec "zstd" also. Install zstd-ruby module before you use zstd compression.

$ gem install zstd-ruby --no-document

Load balancing

Messages will be assigned a partition at random as default by ruby-kafka, but messages with the same partition key will always be assigned to the same partition by setting default_partition_key in config file. If key name partition_key_key exists in a message, this plugin set the value of partition_key_key as key.

default_partition_key partition_key_key behavior
Not set Not exists All messages are assigned a partition at random
Set Not exists All messages are assigned to the specific partition
Not set Exists Messages which have partition_key_key record are assigned to the specific partition, others are assigned a partition at random
Set Exists Messages which have partition_key_key record are assigned to the specific partition with partition_key_key, others are assigned to the specific partition with default_parition_key

If key name message_key_key exists in a message, this plugin publishes the value of message_key_key to kafka and can be read by consumers. Same message key will be assigned to all messages by setting default_message_key in config file. If message_key_key exists and if partition_key_key is not set explicitly, messsage_key_key will be used for partitioning.

Headers

It is possible to set headers on Kafka messages. This only works for kafka2 and rdkafka2 output plugin.

The format is like key1:value1,key2:value2. For example:

<match app.**>
  @type kafka2
  [...]
  headers some_header_name:some_header_value
<match>

You may set header values based on a value of a fluentd record field. For example, imagine a fluentd record like:

{"source": { "ip": "127.0.0.1" }, "payload": "hello world" }

And the following fluentd config:

<match app.**>
  @type kafka2
  [...]
  headers_from_record source_ip:$.source.ip
<match>

The Kafka message will have a header of source_ip=12.7.0.0.1.

The configuration format is jsonpath. It is descibed in https://docs.fluentd.org/plugin-helper-overview/api-plugin-helper-record_accessor

Excluding fields

Fields can be excluded from output data. Only works for kafka2 and rdkafka2 output plugin.

Fields must be specified using an array of dot notation $., for example:

<match app.**>
  @type kafka2
  [...]
  exclude_fields $.source.ip,$.HTTP_FOO
<match>

This config can be used to remove fields used on another configs.

For example, $.source.ip can be extracted with config headers_from_record and excluded from message payload.

Using this config to remove unused fields is discouraged. A filter plugin can be used for this purpose.

Send only a sub field as a message payload

If record_key is provided, the plugin sends only a sub field given by that key. The configuration format is jsonpath.

e.g. When the following configuration and the incoming record are given:

configuration:

<match **>
  @type kafka2
  [...]
  record_key '$.data'
</match>

record:

{
    "specversion" : "1.0",
    "type" : "com.example.someevent",
    "id" : "C234-1234-1234",
    "time" : "2018-04-05T17:31:00Z",
    "datacontenttype" : "application/json",
    "data" : {
        "appinfoA" : "abc",
        "appinfoB" : 123,
        "appinfoC" : true
    },
    ...
}

only the data field will be serialized by the formatter and sent to Kafka. The toplevel data key will be removed.

Buffered output plugin

This plugin uses ruby-kafka producer for writing data. This plugin is for v0.12. If you use v1, see kafka2. Support of fluentd v0.12 has ended. kafka_buffered will be an alias of kafka2 and will be removed in the future.

<match app.**>
  @type kafka_buffered

  # Brokers: you can choose either brokers or zookeeper. If you are not familiar with zookeeper, use brokers parameters.
  brokers             <broker1_host>:<broker1_port>,<broker2_host>:<broker2_port>,.. # Set brokers directly
  zookeeper           <zookeeper_host>:<zookeeper_port> # Set brokers via Zookeeper
  zookeeper_path      <broker path in zookeeper> :default => /brokers/ids # Set path in zookeeper for kafka

  topic_key             (string) :default => 'topic'
  partition_key         (string) :default => 'partition'
  partition_key_key     (string) :default => 'partition_key'
  message_key_key       (string) :default => 'message_key'
  default_topic         (string) :default => nil
  default_partition_key (string) :default => nil
  default_message_key   (string) :default => nil
  exclude_topic_key     (bool)   :default => false
  exclude_partition_key (bool)   :default => false
  exclude_partition     (bool)   :default => false
  exclude_message_key   (bool)   :default => false
  output_data_type      (json|ltsv|msgpack|attr:<record name>|<formatter name>) :default => json
  output_include_tag    (bool) :default => false
  output_include_time   (bool) :default => false
  exclude_topic_key     (bool) :default => false
  exclude_partition_key (bool) :default => false
  get_kafka_client_log  (bool) :default => false
  use_event_time        (bool) :default => false
  partitioner_hash_function (enum) (crc32|murmur2) :default => 'crc32'

  # See fluentd document for buffer related parameters: https://docs.fluentd.org/v/0.12/buffer

  # ruby-kafka producer options
  idempotent                   (bool)    :default => false
  sasl_over_ssl                (bool)    :default => true
  max_send_retries             (integer) :default => 1
  required_acks                (integer) :default => -1
  ack_timeout                  (integer) :default => nil (Use default of ruby-kafka)
  compression_codec            (string)  :default => nil (No compression. Depends on ruby-kafka: https://github.com/zendesk/ruby-kafka#compression)
  kafka_agg_max_bytes          (integer) :default => 4096
  kafka_agg_max_messages       (integer) :default => nil (No limit)
  max_send_limit_bytes         (integer) :default => nil (No drop)
  discard_kafka_delivery_failed   (bool) :default => false (No discard)
  monitoring_list              (array)   :default => []
</match>

kafka_buffered supports the following ruby-kafka parameters:

  • max_send_retries - default: 2 - Number of times to retry sending of messages to a leader.
  • required_acks - default: -1 - The number of acks required per request. If you need flush performance, set lower value, e.g. 1, 2.
  • ack_timeout - default: nil - How long the producer waits for acks. The unit is seconds.
  • compression_codec - default: nil - The codec the producer uses to compress messages.
  • max_send_limit_bytes - default: nil - Max byte size to send message to avoid MessageSizeTooLarge. For example, if you set 1000000(message.max.bytes in kafka), Message more than 1000000 byes will be dropped.
  • discard_kafka_delivery_failed - default: false - discard the record where Kafka::DeliveryFailed occurred
  • monitoring_list - default: [] - library to be used to monitor. statsd and datadog are supported

kafka_buffered has two additional parameters:

  • kafka_agg_max_bytes - default: 4096 - Maximum value of total message size to be included in one batch transmission.
  • kafka_agg_max_messages - default: nil - Maximum number of messages to include in one batch transmission.

Note: Java based Kafka client uses murmur2 as partitioner function by default. If you want to use same partitioning behavior with fluent-plugin-kafka, change it to murmur2 instead of crc32. Note that for using murmur2 hash partitioner function, you must install digest-murmurhash gem.

Non-buffered output plugin

This plugin uses ruby-kafka producer for writing data. For performance and reliability concerns, use kafka_bufferd output instead. This is mainly for testing.

<match app.**>
  @type kafka

  # Brokers: you can choose either brokers or zookeeper.
  brokers        <broker1_host>:<broker1_port>,<broker2_host>:<broker2_port>,.. # Set brokers directly
  zookeeper      <zookeeper_host>:<zookeeper_port> # Set brokers via Zookeeper
  zookeeper_path <broker path in zookeeper> :default => /brokers/ids # Set path in zookeeper for kafka

  default_topic         (string) :default => nil
  default_partition_key (string) :default => nil
  default_message_key   (string) :default => nil
  output_data_type      (json|ltsv|msgpack|attr:<record name>|<formatter name>) :default => json
  output_include_tag    (bool) :default => false
  output_include_time   (bool) :default => false
  exclude_topic_key     (bool) :default => false
  exclude_partition_key (bool) :default => false
  partitioner_hash_function (enum) (crc32|murmur2) :default => 'crc32'

  # ruby-kafka producer options
  max_send_retries    (integer) :default => 1
  required_acks       (integer) :default => -1
  ack_timeout         (integer) :default => nil (Use default of ruby-kafka)
  compression_codec   (string)  :default => nil (No compression. Depends on ruby-kafka: https://github.com/zendesk/ruby-kafka#compression)
  max_buffer_size     (integer) :default => nil (Use default of ruby-kafka)
  max_buffer_bytesize (integer) :default => nil (Use default of ruby-kafka)
</match>

This plugin also supports ruby-kafka related parameters. See Buffered output plugin section.

Note: Java based Kafka client uses murmur2 as partitioner function by default. If you want to use same partitioning behavior with fluent-plugin-kafka, change it to murmur2 instead of crc32. Note that for using murmur2 hash partitioner function, you must install digest-murmurhash gem.

rdkafka based output plugin

This plugin uses rdkafka instead of ruby-kafka for kafka client. You need to install rdkafka gem.

# rdkafka is C extension library. Need to install development tools like ruby-devel, gcc and etc
# for v0.12 or later
$ gem install rdkafka --no-document
# for v0.11 or earlier
$ gem install rdkafka -v 0.6.0 --no-document

rdkafka2 is for fluentd v1.0 or later.

<match app.**>
  @type rdkafka2

  brokers <broker1_host>:<broker1_port>,<broker2_host>:<broker2_port>,.. # Set brokers directly

  topic_key             (string) :default => 'topic'
  default_topic         (string) :default => nil
  partition_key         (string) :default => 'partition'
  partition_key_key     (string) :default => 'partition_key'
  message_key_key       (string) :default => 'message_key'
  default_topic         (string) :default => nil
  use_default_for_unknown_topic           (bool) :default => false
  use_default_for_unknown_partition_error (bool) :default => false
  default_partition_key (string) :default => nil
  default_message_key   (string) :default => nil
  exclude_topic_key     (bool) :default => false
  exclude_partition_key (bool) :default => false
  discard_kafka_delivery_failed (bool) :default => false (No discard)
  discard_kafka_delivery_failed_regex (regexp) :default => nil (No discard)
  use_event_time        (bool) :default => false

  # same with kafka2
  headers               (hash) :default => {}
  headers_from_record   (hash) :default => {}
  record_key            (string) :default => nil

  <format>
    @type (json|ltsv|msgpack|attr:<record name>|<formatter name>) :default => json
  </format>

  # Optional. See https://docs.fluentd.org/v/1.0/configuration/inject-section
  <inject>
    tag_key tag
    time_key time
  </inject>

  # See fluentd document for buffer section parameters: https://docs.fluentd.org/v/1.0/configuration/buffer-section
  # Buffer chunk key should be same with topic_key. If value is not found in the record, default_topic is used.
  <buffer topic>
    flush_interval 10s
  </buffer>

  # You can set any rdkafka configuration via this parameter: https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
  rdkafka_options {
    "log_level" : 7
  }

  # rdkafka2 specific parameters

  # share kafka producer between flush threads. This is mainly for reducing kafka operations like kerberos
  share_producer (bool) :default => false
  # Timeout for polling message wait. If 0, no wait.
  rdkafka_delivery_handle_poll_timeout (integer) :default => 30
  # If the record size is larger than this value, such records are ignored. Default is no limit
  max_send_limit_bytes (integer) :default => nil
  # The maximum number of enqueueing bytes per second. It can reduce the
  # load of both Fluentd and Kafka when excessive messages are attempted
  # to send. Default is no limit.
  max_enqueue_bytes_per_second (integer) :default => nil
</match>

rdkafka2 supports discard_kafka_delivery_failed_regex parameter:

  • discard_kafka_delivery_failed_regex - default: nil - discard the record where the Kafka::DeliveryFailed occurred and the emitted message matches the given regex pattern, such as /unknown_topic/.

If you use v0.12, use rdkafka instead.

<match kafka.**>
  @type rdkafka

  default_topic kafka
  flush_interval 1s
  output_data_type json

  rdkafka_options {
    "log_level" : 7
  }
</match>

FAQ

Why fluent-plugin-kafka can't send data to our kafka cluster?

We got lots of similar questions. Almost cases, this problem happens by version mismatch between ruby-kafka and kafka cluster. See ruby-kafka README for more details: https://github.com/zendesk/ruby-kafka#compatibility

To avoid the problem, there are 2 approaches:

  • Upgrade your kafka cluster to latest version. This is better because recent version is faster and robust.
  • Downgrade ruby-kafka/fluent-plugin-kafka to work with your older kafka.

Contributing

  1. Fork it
  2. Create your feature branch (git checkout -b my-new-feature)
  3. Commit your changes (git commit -am 'Added some feature')
  4. Push to the branch (git push origin my-new-feature)
  5. Create new Pull Request

fluent-plugin-kafka's People

Contributors

arturoera avatar ashie avatar bohacekm avatar cosmo0920 avatar danmx avatar dependabot[bot] avatar ganmacs avatar giedriuss avatar gigkokman avatar htgc avatar jihyunsong avatar kenhys avatar kubotat avatar kyleboyer-optum avatar like-inspur avatar lucasmesquitaborges avatar mcarbonneaux avatar okkez avatar philgrayson avatar raytung avatar repeatedly avatar sawanoboly avatar spacewander avatar t-kitamura avatar takebayashi avatar tkornai avatar varun-tdt avatar wyukawa avatar ymattw avatar yoelcabo avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

fluent-plugin-kafka's Issues

Buffer back log of over 90 mins

There seems to be a huge buffer backlog , and every time a single file is flushed when there is a new file added, is this due to requried_acks , buffer_type file or compression ?

-rw-r--r-- 1 td-agent td-agent 8.0M Oct 27 15:32 /var/log/td-agent/kafka/proxy_logs.production.proxy.logs.log.q53fd3baec7ce1e20.log
-rw-r--r-- 1 td-agent td-agent 8.0M Oct 27 15:31 /var/log/td-agent/kafka/proxy_logs.production.proxy.logs.log.q53fd3b5740f812f1.log
-rw-r--r-- 1 td-agent td-agent 8.0M Oct 27 15:29 /var/log/td-agent/kafka/proxy_logs.production.proxy.logs.log.q53fd3b0408b930e9.log
-rw-r--r-- 1 td-agent td-agent 8.0M Oct 27 15:28 /var/log/td-agent/kafka/proxy_logs.production.proxy.logs.log.q53fd3ab52a94a2a3.log
-rw-r--r-- 1 td-agent td-agent 8.0M Oct 27 15:27 /var/log/td-agent/kafka/proxy_logs.production.proxy.logs.log.q53fd3a6285baf633.log
-rw-r--r-- 1 td-agent td-agent 8.0M Oct 27 15:25 /var/log/td-agent/kafka/proxy_logs.production.proxy.logs.log.q53fd3a0e7c58f816.log
-rw-r--r-- 1 td-agent td-agent 8.0M Oct 27 15:24 /var/log/td-agent/kafka/proxy_logs.production.proxy.logs.log.q53fd39b63f5a9314.log
-rw-r--r-- 1 td-agent td-agent 8.0M Oct 27 15:22 /var/log/td-agent/kafka/proxy_logs.production.proxy.logs.log.q53fd39681be550ee.log
-rw-r--r-- 1 td-agent td-agent 8.0M Oct 27 15:21 /var/log/td-agent/kafka/proxy_logs.production.proxy.logs.log.q53fd391598f20a48.log
-rw-r--r-- 1 td-agent td-agent 8.0M Oct 27 15:19 /var/log/td-agent/kafka/proxy_logs.production.proxy.logs.log.q53fd38c318e73e13.log
-rw-r--r-- 1 td-agent td-agent 8.0M Oct 27 15:18 /var/log/td-agent/kafka/proxy_logs.production.proxy.logs.log.q53fd387068cb0c35.log
-rw-r--r-- 1 td-agent td-agent 8.0M Oct 27 15:16 /var/log/td-agent/kafka/proxy_logs.production.proxy.logs.log.q53fd381ea82cf7ce.log
-rw-r--r-- 1 td-agent td-agent 8.0M Oct 27 15:15 /var/log/td-agent/kafka/proxy_logs.production.proxy.logs.log.q53fd37ccdc451d65.log
-rw-r--r-- 1 td-agent td-agent 8.0M Oct 27 15:14 /var/log/td-agent/kafka/proxy_logs.production.proxy.logs.log.q53fd377caefcfc1f.log
-rw-r--r-- 1 td-agent td-agent 8.0M Oct 27 15:12 /var/log/td-agent/kafka/proxy_logs.production.proxy.logs.log.q53fd3731ec22b2e0.log
-rw-r--r-- 1 td-agent td-agent 8.0M Oct 27 15:11 /var/log/td-agent/kafka/proxy_logs.production.proxy.logs.log.q53fd36e5127b482b.log
-rw-r--r-- 1 td-agent td-agent 8.0M Oct 27 15:10 /var/log/td-agent/kafka/proxy_logs.production.proxy.logs.log.q53fd36962e873b7d.log
-rw-r--r-- 1 td-agent td-agent 8.0M Oct 27 15:08 /var/log/td-agent/kafka/proxy_logs.production.proxy.logs.log.q53fd365080a0ffc8.log
-rw-r--r-- 1 td-agent td-agent 8.0M Oct 27 15:07 /var/log/td-agent/kafka/proxy_logs.production.proxy.logs.log.q53fd3615d8b9b24d.log
-rw-r--r-- 1 td-agent td-agent 8.0M Oct 27 15:06 /var/log/td-agent/kafka/proxy_logs.production.proxy.logs.log.q53fd35d65345c681.log
-rw-r--r-- 1 td-agent td-agent 8.0M Oct 27 15:05 /var/log/td-agent/kafka/proxy_logs.production.proxy.logs.log.q53fd358d069c5aa6.log
-rw-r--r-- 1 td-agent td-agent 8.0M Oct 27 15:04 /var/log/td-agent/kafka/proxy_logs.production.proxy.logs.log.q53fd354670af655c.log
-rw-r--r-- 1 td-agent td-agent 8.0M Oct 27 15:02 /var/log/td-agent/kafka/proxy_logs.production.proxy.logs.log.q53fd3500751a27f5.log
-rw-r--r-- 1 td-agent td-agent 8.0M Oct 27 15:01 /var/log/td-agent/kafka/proxy_logs.production.proxy.logs.log.q53fd34b8aa448b15.log
-rw-r--r-- 1 td-agent td-agent 8.0M Oct 27 15:00 /var/log/td-agent/kafka/proxy_logs.production.proxy.logs.log.q53fd3467f82ddec5.log
-rw-r--r-- 1 td-agent td-agent 8.0M Oct 27 14:58 /var/log/td-agent/kafka/proxy_logs.production.proxy.logs.log.q53fd341784e034a5.log
-rw-r--r-- 1 td-agent td-agent 8.0M Oct 27 14:57 /var/log/td-agent/kafka/proxy_logs.production.proxy.logs.log.q53fd33c6d854e0a9.log
-rw-r--r-- 1 td-agent td-agent 8.0M Oct 27 14:56 /var/log/td-agent/kafka/proxy_logs.production.proxy.logs.log.q53fd33779636fb6c.log
-rw-r--r-- 1 td-agent td-agent 8.0M Oct 27 14:54 /var/log/td-agent/kafka/proxy_logs.production.proxy.logs.log.q53fd3325f5846553.log
-rw-r--r-- 1 td-agent td-agent 8.0M Oct 27 14:53 /var/log/td-agent/kafka/proxy_logs.production.proxy.logs.log.q53fd32d8c4ffd740.log
-rw-r--r-- 1 td-agent td-agent 8.0M Oct 27 14:51 /var/log/td-agent/kafka/proxy_logs.production.proxy.logs.log.q53fd3286bf493aa5.log
-rw-r--r-- 1 td-agent td-agent 8.0M Oct 27 14:50 /var/log/td-agent/kafka/proxy_logs.production.proxy.logs.log.q53fd3231dca75740.log
-rw-r--r-- 1 td-agent td-agent 8.0M Oct 27 14:49 /var/log/td-agent/kafka/proxy_logs.production.proxy.logs.log.q53fd31ded78c9134.log
-rw-r--r-- 1 td-agent td-agent 8.0M Oct 27 14:47 /var/log/td-agent/kafka/proxy_logs.production.proxy.logs.log.q53fd318adfe86f05.log
-rw-r--r-- 1 td-agent td-agent 8.0M Oct 27 14:46 /var/log/td-agent/kafka/proxy_logs.production.proxy.logs.log.q53fd3136f16a68f9.log
-rw-r--r-- 1 td-agent td-agent 8.0M Oct 27 14:44 /var/log/td-agent/kafka/proxy_logs.production.proxy.logs.log.q53fd30e2876f1575.log
-rw-r--r-- 1 td-agent td-agent 8.0M Oct 27 14:43 /var/log/td-agent/kafka/proxy_logs.production.proxy.logs.log.q53fd3095d2ecf1de.log
-rw-r--r-- 1 td-agent td-agent 8.0M Oct 27 14:41 /var/log/td-agent/kafka/proxy_logs.production.proxy.logs.log.q53fd304a52294346.log
-rw-r--r-- 1 td-agent td-agent 8.0M Oct 27 14:40 /var/log/td-agent/kafka/proxy_logs.production.proxy.logs.log.q53fd2ffa7c43328c.log
-rw-r--r-- 1 td-agent td-agent 8.0M Oct 27 14:39 /var/log/td-agent/kafka/proxy_logs.production.proxy.logs.log.q53fd2fafe5a3abcb.log
-rw-r--r-- 1 td-agent td-agent 8.0M Oct 27 14:37 /var/log/td-agent/kafka/proxy_logs.production.proxy.logs.log.q53fd2f63af0db41c.log
-rw-r--r-- 1 td-agent td-agent 8.0M Oct 27 14:36 /var/log/td-agent/kafka/proxy_logs.production.proxy.logs.log.q53fd2f1b5a786063.log
-rw-r--r-- 1 td-agent td-agent 8.0M Oct 27 14:35 /var/log/td-agent/kafka/proxy_logs.production.proxy.logs.log.q53fd2ecfcd676221.log
-rw-r--r-- 1 td-agent td-agent 8.0M Oct 27 14:33 /var/log/td-agent/kafka/proxy_logs.production.proxy.logs.log.q53fd2e81ff233d0a.log
-rw-r--r-- 1 td-agent td-agent 8.0M Oct 27 14:32 /var/log/td-agent/kafka/proxy_logs.production.proxy.logs.log.q53fd2e32146e4d7a.log
-rw-r--r-- 1 td-agent td-agent 8.0M Oct 27 14:31 /var/log/td-agent/kafka/proxy_logs.production.proxy.logs.log.q53fd2de6c3794b38.log
-rw-r--r-- 1 td-agent td-agent 8.0M Oct 27 14:29 /var/log/td-agent/kafka/proxy_logs.production.proxy.logs.log.q53fd2d95cf66b65f.log
-rw-r--r-- 1 td-agent td-agent 8.0M Oct 27 14:28 /var/log/td-agent/kafka/proxy_logs.production.proxy.logs.log.q53fd2d4baa05160d.log
-rw-r--r-- 1 td-agent td-agent 8.0M Oct 27 14:27 /var/log/td-agent/kafka/proxy_logs.production.proxy.logs.log.q53fd2d00f24fb738.log
-rw-r--r-- 1 td-agent td-agent 8.0M Oct 27 14:25 /var/log/td-agent/kafka/proxy_logs.production.proxy.logs.log.q53fd2cb0e42ca88d.log
-rw-r--r-- 1 td-agent td-agent 8.0M Oct 27 14:24 /var/log/td-agent/kafka/proxy_logs.production.proxy.logs.log.q53fd2c6380be6f7d.log
-rw-r--r-- 1 td-agent td-agent 8.0M Oct 27 14:23 /var/log/td-agent/kafka/proxy_logs.production.proxy.logs.log.q53fd2c15e122d715.log
-rw-r--r-- 1 td-agent td-agent 8.0M Oct 27 14:21 /var/log/td-agent/kafka/proxy_logs.production.proxy.logs.log.q53fd2bc852e82b47.log
-rw-r--r-- 1 td-agent td-agent 8.0M Oct 27 14:20 /var/log/td-agent/kafka/proxy_logs.production.proxy.logs.log.q53fd2b76ac931b6c.log
-rw-r--r-- 1 td-agent td-agent 8.0M Oct 27 14:18 /var/log/td-agent/kafka/proxy_logs.production.proxy.logs.log.q53fd2b1dcea32979.log
-rw-r--r-- 1 td-agent td-agent 8.0M Oct 27 14:17 /var/log/td-agent/kafka/proxy_logs.production.proxy.logs.log.q53fd2ac5466baf9d.log
-rw-r--r-- 1 td-agent td-agent 8.0M Oct 27 14:15 /var/log/td-agent/kafka/proxy_logs.production.proxy.logs.log.q53fd2a6d688e8393.log
-rw-r--r-- 1 td-agent td-agent 8.0M Oct 27 14:14 /var/log/td-agent/kafka/proxy_logs.production.proxy.logs.log.q53fd2a13624bb112.log
-rw-r--r-- 1 td-agent td-agent 8.0M Oct 27 14:12 /var/log/td-agent/kafka/proxy_logs.production.proxy.logs.log.q53fd29bb18d088f4.log
-rw-r--r-- 1 td-agent td-agent 8.0M Oct 27 14:11 /var/log/td-agent/kafka/proxy_logs.production.proxy.logs.log.q53fd29641e397293.log
-rw-r--r-- 1 td-agent td-agent 8.0M Oct 27 14:09 /var/log/td-agent/kafka/proxy_logs.production.proxy.logs.log.q53fd290b8028dc6a.log
-rw-r--r-- 1 td-agent td-agent 7.8M Oct 27 15:34 /var/log/td-agent/kafka/proxy_logs.production.proxy.logs.log.b53fd3c04448a6273.log

kafka_buffered has poor performance than old version kafka?

0.1.2 kafka direct output seems to have better performance than 0.3.1 kafka_buffered output.

test result:
0.1.2 kafka : 7200 record/s
0.3.1 kafka_buffer : 6700 record/s

test conf

<match applog.test>
  type                kafka
  brokers          {brokers}
  default_topic       test
  output_data_type    attr:all
</match>
<match applog.test>
  type                kafka_buffered
  brokers            {brokers}
  default_topic       test

  buffer_type memory
  buffer_chunk_limit 256m
  buffer_queue_limit 128
  flush_interval 0.1s
  kafka_agg_max_bytes 20000000

  output_data_type    attr:all
</match>

Unable to stop td-agent, continuous errors from kafka plugin

When I restarted zookeeper nodes, I started getting continuous errors from td-agent and it was opening connections continuously on the zookeeper nodes, making it run out of file handles.

I am unable to stop the agent as well.

2016-01-04 09:51:53 +0000 [warn]: emit transaction failed: error_class=Zookeeper::Exceptions::ContinuationTimeoutError error="response for meth: :get_children, args: [0, "/brokers/ids", nil, nil], not received within 30 seconds" tag="log.hits"
2016-01-04 09:51:53 +0000 [warn]: suppressed same stacktrace
2016-01-04 09:51:54 +0000 [warn]: Send exception occurred: Poseidon::Errors::UnableToFetchMetadata
2016-01-04 09:51:53 +0000 [warn]: emit transaction failed: error_class=Zookeeper::Exceptions::ContinuationTimeoutError error="response for meth: :get_children, args: [0, "/brokers/ids", nil, nil], not received within 30 seconds" tag="log.hits"
2016-01-04 09:51:53 +0000 [warn]: suppressed same stacktrace
2016-01-04 09:51:54 +0000 [warn]: Send exception occurred: Poseidon::Errors::UnableToFetchMetadata

Had to do a kill -3.

Kafka .9 plugin

Does this plugin support Kafka .9? Also, our cluster has Kafka Kerberos enabled which require producers and consumers to use jaas file. Any suggestion on what need to change to accomplish this?

Buffered Message Sending Fails after broker restart in background

Hi,

We have one of our FluentD setup boxes streaming into one of the brokers (the only one in our test cluster). I noticed that after restarting the broker service (different server) that the plugin was connected to, further sending of back-log'd/buffered messages fail with the below error. The issue gets corrected after restarting Fluentd service.

Looks like the Poseidon connection does not reconnect with Kafka broker - after it looses connection upon broker restart. Any help with this?

2015-01-06 12:31:07 +0000 [warn]: temporarily failed to flush the buffer. next_retry=2015-01-06 12:33:54 +0000 error_class="RuntimeError" error="Failed to send all messages: [#<Poseidon::Message:0x007ff01d1617e0 @struct=#<struct Poseidon::Protocol::MessageWithOffsetStruct offset=0, message=#<struct Poseidon::Protocol::MessageStruct magic_type=0, attributes=0, key=nil, value="{\"tstamp\":\"2015-01-06T12:27:59+00:00\",\"ver\":\"v1\",\"host\":......... some more data ......\"}">>, @topic="zmi-weblogs">, #<Poseidon::Message:0x007ff01d1616a0 @struct=#<struct Poseidon::Protocol::MessageWithOffsetStruct offset=0, message=#<struct Poseidon::Protocol::MessageStruct magic_type=0, attributes=0, key=nil, value="{\"tstamp\":\"2015-01-06T12:28:01+00:00\",\"ver\":\"v1\",\"host\":\"....... some more data .....\"}">>, @topic="zmi-weblogs">] remaining" plugin_id="object:d57dc0"
2015-01-06 12:31:07 +0000 [warn]: /usr/lib64/fluent/ruby/lib/ruby/gems/1.9.1/gems/poseidon-0.0.5/lib/poseidon/sync_producer.rb:66:in send_messages' 2015-01-06 12:31:07 +0000 [warn]: /usr/lib64/fluent/ruby/lib/ruby/gems/1.9.1/gems/poseidon-0.0.5/lib/poseidon/producer.rb:163:insend_messages'
2015-01-06 12:31:07 +0000 [warn]: /usr/lib64/fluent/ruby/lib/ruby/gems/1.9.1/gems/fluent-plugin-kafka-0.0.8/lib/fluent/plugin/out_kafka_buffered.rb:112:in block (2 levels) in publish' 2015-01-06 12:31:07 +0000 [warn]: /usr/lib64/fluent/ruby/lib/ruby/gems/1.9.1/gems/fluent-plugin-kafka-0.0.8/lib/fluent/plugin/out_kafka_buffered.rb:109:ineach'
2015-01-06 12:31:07 +0000 [warn]: /usr/lib64/fluent/ruby/lib/ruby/gems/1.9.1/gems/fluent-plugin-kafka-0.0.8/lib/fluent/plugin/out_kafka_buffered.rb:109:in each_with_index' 2015-01-06 12:31:07 +0000 [warn]: /usr/lib64/fluent/ruby/lib/ruby/gems/1.9.1/gems/fluent-plugin-kafka-0.0.8/lib/fluent/plugin/out_kafka_buffered.rb:109:inblock in publish'
2015-01-06 12:31:07 +0000 [warn]: /usr/lib64/fluent/ruby/lib/ruby/gems/1.9.1/gems/fluent-plugin-kafka-0.0.8/lib/fluent/plugin/out_kafka_buffered.rb:108:in each' 2015-01-06 12:31:07 +0000 [warn]: /usr/lib64/fluent/ruby/lib/ruby/gems/1.9.1/gems/fluent-plugin-kafka-0.0.8/lib/fluent/plugin/out_kafka_buffered.rb:108:inpublish'
2015-01-06 12:31:07 +0000 [warn]: /usr/lib64/fluent/ruby/lib/ruby/gems/1.9.1/gems/fluent-plugin-kafka-0.0.8/lib/fluent/plugin/out_kafka_buffered.rb:104:in write' 2015-01-06 12:31:07 +0000 [warn]: /usr/lib64/fluent/ruby/lib/ruby/gems/1.9.1/gems/fluentd-0.10.57/lib/fluent/buffer.rb:300:inwrite_chunk'
2015-01-06 12:31:07 +0000 [warn]: /usr/lib64/fluent/ruby/lib/ruby/gems/1.9.1/gems/fluentd-0.10.57/lib/fluent/buffer.rb:280:in pop' 2015-01-06 12:31:07 +0000 [warn]: /usr/lib64/fluent/ruby/lib/ruby/gems/1.9.1/gems/fluentd-0.10.57/lib/fluent/output.rb:311:intry_flush'
2015-01-06 12:31:07 +0000 [warn]: /usr/lib64/fluent/ruby/lib/ruby/gems/1.9.1/gems/fluentd-0.10.57/lib/fluent/output.rb:132:in `run'

UTF-8 encoding problem

when tail utf-8 file in in_tail plugin, then occur

emit transaction failed  error_class=Encoding::UndefinedConversionError  error=#<Encoding::UndefinedConversionError: "\xE9" from ASCII-8BIT to UTF-8>

I try YAJL instead of JSON, it's ok

About partition, how can this configuration?

My Kafka is one topic of 8 partitions,and I use input plugin, how to configure the parttion, then I read the all data?
I am currently configured as follows:

type kafka #host c3-b2c-b2cop-log-kafka01.bj,c3-b2c-b2cop-log-kafka02.bj,c3-b2c-b2cop-log-kafka03.bj host c3-b2c-b2cop-log-kafka01.bj port 9092 topic ngx_order_log partition 0 offset -1 format json add_prefix new

<match *.**>
type elasticsearch
host 10.108.173.29
port 9200
path /
logstash_format true
utc_index false
flush_interval 3s
include_tag_key true
tag_key tag
num_threads 5
logstash_prefix new_ngx_order_log

However, so that I can only read one of the 8 points of the data, and can not read allใ€‚
If I configure the partition parameter to be 8, I just tip the error:

2015-10-22 11:00:30 +0800 [error]: Poseidon::Errors::UnknownTopicOrPartition
2015-10-22 11:00:30 +0800 [error]: suppressed same stacktrace
2015-10-22 11:00:31 +0800 [error]: Poseidon::Errors::UnknownTopicOrPartition
2015-10-22 11:00:31 +0800 [error]: suppressed same stacktrace

1ใ€How I can read complete How to configure,๏ผŸ
2ใ€If I use fluent-plugin-kafka to read data on multiple machines, how to configure?

2015-08-14 13:20:09 +0800 [warn]: fluent/engine.rb:330:emit: no patterns matched tag="topic1"

1.fluentd configure
[root@localhost fluent-plugin-kafka_0.15]# cat /etc/td-agent/td-agent.conf

type kafka topics topic1 host 192.168.9.6 port 9092 format json #client_id TTTTTTTTTTTTTTTTTTTTTT max_bytes 1048576 max_wait_ms 5000 min_bytes 1 socket_timeout_ms 50000

2.kafka describe topics
[root@localhost td-agent]# /usr/local/kafka/bin/kafka-topics.sh --describe --zookeeper 192.168.9.6:2181,192.168.9.7:2181,192.168.9.8:2181
[2015-08-14 13:47:54,456] WARN Connected to an old server; r-o mode will be unavailable (org.apache.zookeeper.ClientCnxnSocket)
Topic:topic1 PartitionCount:1 ReplicationFactor:3 Configs:
Topic: topic1 Partition: 0 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0

3.error log
2015-08-14 13:19:32 +0800 [warn]: fluent/engine.rb:330:emit: no patterns matched tag="topic1"
2015-08-14 13:19:32 +0800 [warn]: fluent/engine.rb:330:emit: no patterns matched tag="topic1"
2015-08-14 13:19:33 +0800 [warn]: fluent/engine.rb:330:emit: no patterns matched tag="topic1"
2015-08-14 13:20:09 +0800 [warn]: fluent/engine.rb:330:emit: no patterns matched tag="topic1"
2015-08-14 13:20:46 +0800 [warn]: fluent/engine.rb:330:emit: no patterns matched tag="topic1"

4.why ?
Why does it appear "patterns matched tag= no" "topic1" error log, what should I do? Look forward to your help... Thank you

plugin puts duplicate messages in kafka topic

The plugin puts duplicate messages into kafka for some percentage of the messages.
Both for buffered and unbuffered outputs( thought with buffered output is seems the duplicates rate is lower).
The input comes from tail plugin.
It was verified that tailed files have no duplicates.
I also tried to put events both to kafka and local files with http://docs.fluentd.org/articles/out_copy
one store to kafka other with http://docs.fluentd.org/articles/out_file
the files written by out_file does not have any duplicates while kafka topic does.

This happens also with increasing ack_timeout_ms and even with required_acks off.
Also with max_send_retries set to zero.

Does not work with td-agent

I install via gem install fluent-plugin-kafka, but I still keep getting

[error]: fluent/supervisor.rb:373:rescue in main_process: config error file="/etc/td-agent/td-agent.conf" error="Unknown input plugin 'kafka'. Run 'gem search -rd fluent-plugin' to find plugins"

High CPU usage for 1000req/sec

Hi,

Whenever I apply 1000req/sec load on fluentd with kafka buffered output plugin, my CPU usage goes beyond 90%, so is this a problem of poseidon(as it is implemented in pure ruby)?

Thanks,
Gaurav

Remove zookeeper dependency by default

ruby-kafka doesn't depend on zookeeper because it uses newer kafka APIs.
Current input plugins and output plugins work without zookeeper by default.
So we can remove zookeeper dependency from fluent-plugin-kafka.

Zookeeper dependency fails on ppc64le

This fluent-plugin-kafka gem has dependency on Zookeeper and when Zookeeper dependency fails for known reason : zk-ruby/zookeeper#79
I have been able to fix the zookeeper changes locally and am looking forward to contribute back my changes to zookeeper repository . Can someone suggest how I can build this fluent-plugin-kafka gem locally incorporating local changes of zookeeper and then be able to test fluent-plugin-kafka .

Lost messages, strange buffer files after upgrading

We recently bumped our version of the plugin to the latest on Sept 21.

Later on when trying to consolidate data we realized a lot of data from that day is missing, and when I go to our folders where we are storing the buffers for fluentd I see a bunch of strange "..*.log" files from that day that I can't explain

Wed Oct 05 09:56:20 [6:0]$ ls -lah
total 1.4M
drwxr-xr-x. 2 lumoslabs lumoslabs 4.0K Oct  5 09:56 .
drwxr-xr-x. 4 root      root        41 Jul 14 16:42 ..
-rw-r--r--. 1 root      root       93K Sep 21 14:01 ..b53d0adad2c926a4e.log
-rw-r--r--. 1 root      root       78K Oct  5 09:56 buffer.b53e210e8a540d529c7409b2af3b263c8.log
-rw-r--r--. 1 root      root        75 Oct  5 09:56 buffer.b53e210e8a540d529c7409b2af3b263c8.log.meta
-rw-r--r--. 1 root      root      1.1M Oct  5 09:56 buffer.q53e210df2541167ab5cede2521dd8af6.log
-rw-r--r--. 1 root      root        77 Oct  5 09:56 buffer.q53e210df2541167ab5cede2521dd8af6.log.meta

Do these files contain our lost data? (Specifically the ..b53d0adad2c926a4e.log file). Are they indicative that something broke, and can we get any data that might be in there out of there? Is there a different procedure we should follow when bumping our version to prevent any chance of data loss?

output_data_type msgpack support

Can msgpack be supported? In our environment, message type is msgpack, it is much smaller and faster than json. It is great if msgpack of output_data_type can be supported.

output can be define brokes by zookeeper

if output can be define brokes by zookeeper, became kafka brokes could Changed and some broke could be downใ€‚

eg.
<match *.**>
type kafka
zookeepers localhost:2181
default_topic

I want you to take over the plugin name "fluent-plugin-kafka"

Hi @htgc,

My name is kiyoto, and I just saw your blog post about your Kafka plugin.

First and away, thanks for the contribution! Also, sorry about my prototype plugin (which I wrote at one of those hack day events last year) taking up the plugin name.

Here is something I want to suggest: I would like to deprecate my project and have you take over "fluent-plugin-kakfa" with your fluent-plugin-kafka-poseidon codebase.

There are a couple of reasons for this:

  1. I don't use Kafka day to day, and it's unrealistic for me to stay up to date with its API. So, I would rather have someone else maintain the "de facto" Kafka output/input plugins.
  2. My day (and night!) job is not programming but developer marketing.
  3. I want to stop the proliferation of "similar but slightly different" plugins. This has already become quite an issue with filter/tail plugins, and I don't want the same thing to happen with Kafka.

Let me know what you think! If you are willing to take over the name "fluent-plugin-kafka", all we will need to do is:

  1. Announce on my fluent-plugin-kafka repo that it is going away and will be taken over by you.
  2. Kill my gem on rubygems by yanking all versions.
  3. You update your repo to fluent-plugin-kafka on Github
  4. You release your gem to fluent-plugin-kafka
  5. Now you are the developer and maintainer of fluent-plugin-kafka!
  6. I will link my old repo to your new repo.

Again, thanks for the great work!

output_data_type single_value not working

Hi,

Please find my confguration below, its now able to fetch the message key value from the Json

type kafka_buffered zookeeper 10.100.1.36:2181 default_topic testlogs flush_interval 5s buffer_chunk_limit 20m buffer_queue_limit 2000 num_threads 1 output_data_type single_value add_newline false message_key message flush_at_shutdown true buffer_type memory output_include_time false output_include_tag false disable_retry_limit false retry_limit 5 type file path /home/ubuntu/testlogs

"Failed to send all messages" exception

Kafka version: 0.8.1
OS: Ubuntu

No idea why fluentd plugin can't send logs to kafka. Kafka is up and running. Tried playing with the timeout and acks attribute in match config but didn't work. Any ideas?

==> hotels_impression_rates_from_tail_log.conf <==
<source>
  type tail
  tag staging.logs.hotels.impressions.rates
  path /var/apps/a/hoteru/shared/log/fluentd/rates.staging.log
  pos_file /var/apps/a/hoteru/shared/log/fluentd/rates.staging.log.pos
  format json
</source>

==> hotels_impression_rates_to_kafka.conf <==
<match staging.logs.hotels.impressions.rates>
  type kafka
  brokers kafka-staging-a-1.bezurk.org:9092
  default_topic alex
  output_data_type json
</match>
2015-04-20 13:23:04 +0800 [info]: brokers has been set directly: ["kafka-staging-a-1.bezurk.org:9092"]
2015-04-20 13:23:04 +0800 [info]: adding match pattern="td.*.*" type="tdlog"
2015-04-20 13:23:04 +0800 [info]: adding match pattern="debug.**" type="stdout"
2015-04-20 13:23:04 +0800 [info]: adding source type="tail"
2015-04-20 13:23:04 +0800 [info]: adding source type="forward"
2015-04-20 13:23:04 +0800 [info]: adding source type="debug_agent"
2015-04-20 13:23:04 +0800 [info]: initialized producer kafka
2015-04-20 13:23:04 +0800 [info]: following tail of /var/apps/a/hoteru/shared/log/fluentd/rates.staging.log
2015-04-20 13:23:04 +0800 [info]: listening fluent socket on 0.0.0.0:24224
2015-04-20 13:23:04 +0800 [info]: listening dRuby uri="druby://127.0.0.1:24230" object="Engine"
2015-04-20 13:23:21 +0800 [warn]: Send exception occurred: Failed to send all messages: [#<Poseidon::Message:0x007f97be415fe8 @struct=#<struct Poseidon::Protocol::MessageWithOffsetStruct offset=0, message=#<struct Poseidon::Protocol::MessageStruct magic_type=0, attributes=0, key=nil, value="{\"search_id\":\"39790490\",\"location_id\":\"6667\",\"country_code\":\"SG\",\"location_name\":\"Singapore\",\"locale\":\"en\",\"site_code\":\"US\",\"wego_visitor_id\":\"1429252550160-QmuHwg\",\"wego_session_id\":\"1429506752934-lJt96p\",\"device_type\":\"desktop_linux\",\"ip\":\"219.74.18.84\",\"timestamp\":\"2015-04-20T13:23:14.849+08:00\",\"hotel_name\":\"Marina Bay Sands\",\"hotel_id\":\"258101\",\"ecpc_max\":0.8,\"ecpc_min\":0.05,\"ecpc_card_min\":0.7,\"hotel_rank\":1,\"ecpc\":0.8,\"provider_hotel_id\":\"245881\",\"provider_code\":\"booking.com\",\"price_usd\":394.1027,\"description\":\"Premier Double or Twin Room - Free cancellation - Book now, pay when you stay - Free WiFi - Found your room online at a lower price? We'll match it.\",\"rate_rank\":1,\"ecpc_rank\":1,\"bucket\":\"0.8\",\"placement\":\"listing_sponsored\",\"params\":{}}">>, @topic="alex">] remaining
2015-04-20 13:23:21 +0800 [info]: initialized producer kafka
2015-04-20 13:23:21 +0800 [warn]: emit transaction failed: error_class=RuntimeError error="Failed to send all messages: [#<Poseidon::Message:0x007f97be415fe8 @struct=#<struct Poseidon::Protocol::MessageWithOffsetStruct offset=0, message=#<struct Poseidon::Protocol::MessageStruct magic_type=0, attributes=0, key=nil, value=\"{\\\"search_id\\\":\\\"39790490\\\",\\\"location_id\\\":\\\"6667\\\",\\\"country_code\\\":\\\"SG\\\",\\\"location_name\\\":\\\"Singapore\\\",\\\"locale\\\":\\\"en\\\",\\\"site_code\\\":\\\"US\\\",\\\"wego_visitor_id\\\":\\\"1429252550160-QmuHwg\\\",\\\"wego_session_id\\\":\\\"1429506752934-lJt96p\\\",\\\"device_type\\\":\\\"desktop_linux\\\",\\\"ip\\\":\\\"219.74.18.84\\\",\\\"timestamp\\\":\\\"2015-04-20T13:23:14.849+08:00\\\",\\\"hotel_name\\\":\\\"Marina Bay Sands\\\",\\\"hotel_id\\\":\\\"258101\\\",\\\"ecpc_max\\\":0.8,\\\"ecpc_min\\\":0.05,\\\"ecpc_card_min\\\":0.7,\\\"hotel_rank\\\":1,\\\"ecpc\\\":0.8,\\\"provider_hotel_id\\\":\\\"245881\\\",\\\"provider_code\\\":\\\"booking.com\\\",\\\"price_usd\\\":394.1027,\\\"description\\\":\\\"Premier Double or Twin Room - Free cancellation - Book now, pay when you stay - Free WiFi - Found your room online at a lower price? We'll match it.\\\",\\\"rate_rank\\\":1,\\\"ecpc_rank\\\":1,\\\"bucket\\\":\\\"0.8\\\",\\\"placement\\\":\\\"listing_sponsored\\\",\\\"params\\\":{}}\">>, @topic=\"alex\">] remaining" tag="staging.logs.hotels.impressions.rates"
  2015-04-20 13:23:21 +0800 [warn]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/poseidon-0.0.5/lib/poseidon/sync_producer.rb:67:in `send_messages'
  2015-04-20 13:23:21 +0800 [warn]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/poseidon-0.0.5/lib/poseidon/producer.rb:163:in `send_messages'
  2015-04-20 13:23:21 +0800 [warn]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluent-plugin-kafka-0.0.12/lib/fluent/plugin/out_kafka.rb:135:in `block in emit'
  2015-04-20 13:23:21 +0800 [warn]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluentd-0.12.7/lib/fluent/event.rb:128:in `call'
  2015-04-20 13:23:21 +0800 [warn]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluentd-0.12.7/lib/fluent/event.rb:128:in `block in each'
  2015-04-20 13:23:21 +0800 [warn]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluentd-0.12.7/lib/fluent/event.rb:127:in `each'
  2015-04-20 13:23:21 +0800 [warn]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluentd-0.12.7/lib/fluent/event.rb:127:in `each'
  2015-04-20 13:23:21 +0800 [warn]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluent-plugin-kafka-0.0.12/lib/fluent/plugin/out_kafka.rb:128:in `emit'
  2015-04-20 13:23:21 +0800 [warn]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluentd-0.12.7/lib/fluent/event_router.rb:88:in `emit_stream'
  2015-04-20 13:23:21 +0800 [warn]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluentd-0.12.7/lib/fluent/plugin/in_tail.rb:229:in `receive_lines'
  2015-04-20 13:23:21 +0800 [warn]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluentd-0.12.7/lib/fluent/plugin/in_tail.rb:320:in `call'
  2015-04-20 13:23:21 +0800 [warn]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluentd-0.12.7/lib/fluent/plugin/in_tail.rb:320:in `wrap_receive_lines'
  2015-04-20 13:23:21 +0800 [warn]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluentd-0.12.7/lib/fluent/plugin/in_tail.rb:513:in `call'
  2015-04-20 13:23:21 +0800 [warn]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluentd-0.12.7/lib/fluent/plugin/in_tail.rb:513:in `on_notify'
  2015-04-20 13:23:21 +0800 [warn]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluentd-0.12.7/lib/fluent/plugin/in_tail.rb:345:in `on_notify'
  2015-04-20 13:23:21 +0800 [warn]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluentd-0.12.7/lib/fluent/plugin/in_tail.rb:446:in `call'
  2015-04-20 13:23:21 +0800 [warn]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluentd-0.12.7/lib/fluent/plugin/in_tail.rb:446:in `on_change'
  2015-04-20 13:23:21 +0800 [warn]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/cool.io-1.3.0/lib/cool.io/loop.rb:88:in `run_once'
  2015-04-20 13:23:21 +0800 [warn]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/cool.io-1.3.0/lib/cool.io/loop.rb:88:in `run'
  2015-04-20 13:23:21 +0800 [warn]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluentd-0.12.7/lib/fluent/plugin/in_tail.rb:214:in `run'

Failed to connect to localhost:9092

Hi,

I started Kafka broker successfully (port 9092)
But I can't access broker when I started fluentd, What happened?

Thanks

Instance

Kafka * 1
Zookeeper * 1
Mongod * 1

Send some messages (Kafka)

$ kafka-console-producer.sh --broker-list localhost:9092 --topic test
This is a message
This is another message

Start a consumer (Kafka)

$ kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
This is a message
This is another message     (kafka is OK)

Config file (Fluentd)

<source>
  type   kafka
  host   localhost
  port   9092
  topics test
  format ltsv
  message_key message
  max_bytes           nil
  max_wait_ms         nil
  min_bytes           nil
  socket_timeout_ms   nil
</source>

<match *.**>
  type copy
  <store>
    buffer_type memory
    buffer_queue_limit 100000000
    buffer_chunk_limit 8m
    flush_interval 1s
    flush_at_shutdown true

    type mongo
    host localhost
    port 27017
    database test
    collection kafka_output
  </store>
  <store>
    type file
    path /home/vagrant/fluentd/out_file/kafka_output
    time_slice_format %Y%m%d
    time_slice_wait 10m
    time_format %Y%m%dT%H%M%S%z
    compress gzip
    utc
  </store>
</match>

Error message (Fluentd)

2016-01-07 09:43:08 +0000 [error]: Failed to connect to localhost:9092
gems/poseidon-0.0.5/lib/poseidon/connection.rb:166:in `raise_connection_failed_error'     
gems/poseidon-0.0.5/lib/poseidon/connection.rb:123:in `rescue in read_response'           
gems/poseidon-0.0.5/lib/poseidon/connection.rb:113:in `read_response'                     
gems/poseidon-0.0.5/lib/poseidon/connection.rb:85:in `offset'                             
gems/poseidon-0.0.5/lib/poseidon/partition_consumer.rb:190:in `resolve_offset_if_necessary'
gems/poseidon-0.0.5/lib/poseidon/partition_consumer.rb:136:in `next_offset'               
gems/poseidon-0.0.5/lib/poseidon/partition_consumer.rb:218:in `build_topic_fetch_request' 
gems/poseidon-0.0.5/lib/poseidon/partition_consumer.rb:107:in `fetch'                     
gems/fluent-plugin-kafka-0.1.2/lib/fluent/plugin/in_kafka.rb:159:in `consume'             
gems/fluent-plugin-kafka-0.1.2/lib/fluent/plugin/in_kafka.rb:142:in `call'                 
gems/fluent-plugin-kafka-0.1.2/lib/fluent/plugin/in_kafka.rb:142:in `on_timer'             
gems/cool.io-1.4.2/lib/cool.io/loop.rb:88:in `run_once'                                   
gems/cool.io-1.4.2/lib/cool.io/loop.rb:88:in `run'                                         
gems/fluent-plugin-kafka-0.1.2/lib/fluent/plugin/in_kafka.rb:110:in `run'   

Doesn't seem to compile on Arm architecture (e.g. Raspberry Pi)

This won't compile on my Raspberry Pi, and my research suggests that some of the assembler instructions are not appropriate for the arm architecture (e.g. https://www.raspberrypi.org/forums/viewtopic.php?t=43410&p=346925).

It would be great if this could work on IoT devices running on Arm!

Actual error message:

$ /usr/bin/gem install --user-install --no-document fluent-plugin-kafka
Error installing fluent-plugin-kafka:
    ERROR: Failed to build gem native extension.

    current directory: /home/smc/.gem/ruby/2.3.0/gems/zookeeper-1.4.11/ext
/usr/bin/ruby2.3 -r ./siteconf20160830-8302-7m694p.rb extconf.rb
Building zkc.
tar xzf zkc-3.4.5.tar.gz 2>&1
patch -p0 < patches/zkc-3.4.5-out-of-order-ping.patch 2>&1
patching file zkc-3.4.5/c/src/zookeeper.c
patching file zkc-3.4.5/c/tests/TestOperations.cc
patch -p0 < patches/zkc-3.4.5-yosemite-htonl-fix.patch 2>&1
patching file zkc-3.4.5/c/include/recordio.h
patching file zkc-3.4.5/c/src/recordio.c
patching file zkc-3.4.5/c/src/zookeeper.c
Hunk #1 succeeded at 1403 (offset -5 lines).
Hunk #2 succeeded at 1411 (offset -5 lines).
Hunk #3 succeeded at 1442 (offset -5 lines).
patching file zkc-3.4.5/c/tests/ZKMocks.cc
patch -p0 < patches/zkc-3.4.5-logging.patch 2>&1
patching file zkc-3.4.5/c/src/zookeeper.c
Hunk #1 succeeded at 1644 (offset -6 lines).
Hunk #2 succeeded at 1665 (offset -6 lines).
./configure --prefix=/home/smc/.gem/ruby/2.3.0/gems/zookeeper-1.4.11/ext --with-pic --without-cppunit --disable-dependency-tracking  2>&1
checking for doxygen... no
checking for perl... /usr/bin/perl
checking for dot... no
checking for a BSD-compatible install... /usr/bin/install -c
checking whether build environment is sane... yes
checking for gawk... no
checking for mawk... mawk
checking whether make sets $(MAKE)... yes
checking for generated/zookeeper.jute.c... yes
checking for generated/zookeeper.jute.h... yes
checking for gcc... gcc
checking for C compiler default output file name... a.out
checking whether the C compiler works... yes
checking whether we are cross compiling... no
checking for suffix of executables... 
checking for suffix of object files... o
checking whether we are using the GNU C compiler... yes
checking whether gcc accepts -g... yes
checking for gcc option to accept ANSI C... none needed
checking for style of include used by make... GNU
checking dependency style of gcc... none
checking whether gcc and cc understand -c and -o together... yes
checking for g++... g++
checking whether we are using the GNU C++ compiler... yes
checking whether g++ accepts -g... yes
checking dependency style of g++... none
checking for a BSD-compatible install... /usr/bin/install -c
checking whether ln -s works... yes
checking build system type... armv7l-unknown-linux-gnu
checking host system type... armv7l-unknown-linux-gnu
checking for a sed that does not truncate output... /bin/sed
checking for egrep... grep -E
checking for ld used by gcc... /usr/bin/ld
checking if the linker (/usr/bin/ld) is GNU ld... yes
checking for /usr/bin/ld option to reload object files... -r
checking for BSD-compatible nm... /usr/bin/nm -B
checking how to recognise dependent libraries... pass_all
checking how to run the C preprocessor... gcc -E
checking for ANSI C header files... yes
checking for sys/types.h... yes
checking for sys/stat.h... yes
checking for stdlib.h... yes
checking for string.h... yes
checking for memory.h... yes
checking for strings.h... yes
checking for inttypes.h... yes
checking for stdint.h... yes
checking for unistd.h... yes
checking dlfcn.h usability... yes
checking dlfcn.h presence... yes
checking for dlfcn.h... yes
checking how to run the C++ preprocessor... g++ -E
checking for g77... no
checking for f77... no
checking for xlf... no
checking for frt... no
checking for pgf77... no
checking for fort77... no
checking for fl32... no
checking for af77... no
checking for f90... no
checking for xlf90... no
checking for pgf90... no
checking for epcf90... no
checking for f95... no
checking for fort... no
checking for xlf95... no
checking for ifc... no
checking for efc... no
checking for pgf95... no
checking for lf95... no
checking for gfortran... no
checking whether we are using the GNU Fortran 77 compiler... no
checking whether  accepts -g... no
checking the maximum length of command line arguments... 32768
checking command to parse /usr/bin/nm -B output from gcc object... ok
checking for objdir... .libs
checking for ar... ar
checking for ranlib... ranlib
checking for strip... strip
checking if gcc supports -fno-rtti -fno-exceptions... no
checking for gcc option to produce PIC... -fPIC
checking if gcc PIC flag -fPIC works... yes
checking if gcc static flag -static works... yes
checking if gcc supports -c -o file.o... yes
checking whether the gcc linker (/usr/bin/ld) supports shared libraries... yes
checking whether -lc should be explicitly linked in... no
checking dynamic linker characteristics... GNU/Linux ld.so
checking how to hardcode library paths into programs... immediate
checking whether stripping libraries is possible... yes
checking if libtool supports shared libraries... yes
checking whether to build shared libraries... yes
checking whether to build static libraries... yes
configure: creating libtool
appending configuration tag \"CXX\" to libtool
checking for ld used by g++... /usr/bin/ld
checking if the linker (/usr/bin/ld) is GNU ld... yes
checking whether the g++ linker (/usr/bin/ld) supports shared libraries... yes
checking for g++ option to produce PIC... -fPIC
checking if g++ PIC flag -fPIC works... yes
checking if g++ static flag -static works... yes
checking if g++ supports -c -o file.o... yes
checking whether the g++ linker (/usr/bin/ld) supports shared libraries... yes
checking dynamic linker characteristics... GNU/Linux ld.so
checking how to hardcode library paths into programs... immediate
appending configuration tag \"F77\" to libtool
checking for pthread_mutex_lock in -lpthread... yes
configure: building with SyncAPI support
checking for ANSI C header files... (cached) yes
checking arpa/inet.h usability... yes
checking arpa/inet.h presence... yes
checking for arpa/inet.h... yes
checking fcntl.h usability... yes
checking fcntl.h presence... yes
checking for fcntl.h... yes
checking netdb.h usability... yes
checking netdb.h presence... yes
checking for netdb.h... yes
checking netinet/in.h usability... yes
checking netinet/in.h presence... yes
checking for netinet/in.h... yes
checking for stdlib.h... (cached) yes
checking for string.h... (cached) yes
checking sys/socket.h usability... yes
checking sys/socket.h presence... yes
checking for sys/socket.h... yes
checking sys/time.h usability... yes
checking sys/time.h presence... yes
checking for sys/time.h... yes
checking for unistd.h... (cached) yes
checking sys/utsname.h usability... yes
checking sys/utsname.h presence... yes
checking for sys/utsname.h... yes
checking for an ANSI C-conforming const... yes
checking for inline... inline
checking whether time.h and sys/time.h may both be included... yes
checking for nfds_t... yes
checking whether to enable ipv6... yes
checking for getcwd... yes
checking for gethostbyname... yes
checking for gethostname... yes
checking for getlogin... yes
checking for getpwuid_r... yes
checking for gettimeofday... yes
checking for getuid... yes
checking for memmove... yes
checking for memset... yes
checking for poll... yes
checking for socket... yes
checking for strchr... yes
checking for strdup... yes
checking for strerror... yes
checking for strtol... yes
configure: creating ./config.status
config.status: creating Makefile
config.status: creating config.h
config.status: executing depfiles commands
make  2>&1
make  all-am
make[1]: Entering directory '/home/smc/.gem/ruby/2.3.0/gems/zookeeper-1.4.11/ext/zkc-3.4.5/c'
/bin/bash ./libtool --tag=CC --mode=compile gcc -DHAVE_CONFIG_H -I. -I. -I.  -I./include -I./tests -I./generated  -Wall -Werror  -g -O2 -D_GNU_SOURCE -c -o zookeeper.lo `test -f 'src/zookeeper.c' || echo './'`src/zookeeper.c
mkdir .libs
 gcc -DHAVE_CONFIG_H -I. -I. -I. -I./include -I./tests -I./generated -Wall -Werror -g -O2 -D_GNU_SOURCE -c src/zookeeper.c  -fPIC -DPIC -o .libs/zookeeper.o
 gcc -DHAVE_CONFIG_H -I. -I. -I. -I./include -I./tests -I./generated -Wall -Werror -g -O2 -D_GNU_SOURCE -c src/zookeeper.c  -fPIC -DPIC -o zookeeper.o >/dev/null 2>&1
/bin/bash ./libtool --tag=CC --mode=compile gcc -DHAVE_CONFIG_H -I. -I. -I.  -I./include -I./tests -I./generated  -Wall -Werror  -g -O2 -D_GNU_SOURCE -c -o recordio.lo `test -f 'src/recordio.c' || echo './'`src/recordio.c
 gcc -DHAVE_CONFIG_H -I. -I. -I. -I./include -I./tests -I./generated -Wall -Werror -g -O2 -D_GNU_SOURCE -c src/recordio.c  -fPIC -DPIC -o .libs/recordio.o
 gcc -DHAVE_CONFIG_H -I. -I. -I. -I./include -I./tests -I./generated -Wall -Werror -g -O2 -D_GNU_SOURCE -c src/recordio.c  -fPIC -DPIC -o recordio.o >/dev/null 2>&1
/bin/bash ./libtool --tag=CC --mode=compile gcc -DHAVE_CONFIG_H -I. -I. -I.  -I./include -I./tests -I./generated  -Wall -Werror  -g -O2 -D_GNU_SOURCE -c -o zookeeper.jute.lo `test -f 'generated/zookeeper.jute.c' || echo './'`generated/zookeeper.jute.c
 gcc -DHAVE_CONFIG_H -I. -I. -I. -I./include -I./tests -I./generated -Wall -Werror -g -O2 -D_GNU_SOURCE -c generated/zookeeper.jute.c  -fPIC -DPIC -o .libs/zookeeper.jute.o
 gcc -DHAVE_CONFIG_H -I. -I. -I. -I./include -I./tests -I./generated -Wall -Werror -g -O2 -D_GNU_SOURCE -c generated/zookeeper.jute.c  -fPIC -DPIC -o zookeeper.jute.o >/dev/null 2>&1
/bin/bash ./libtool --tag=CC --mode=compile gcc -DHAVE_CONFIG_H -I. -I. -I.  -I./include -I./tests -I./generated  -Wall -Werror  -g -O2 -D_GNU_SOURCE -c -o zk_log.lo `test -f 'src/zk_log.c' || echo './'`src/zk_log.c
 gcc -DHAVE_CONFIG_H -I. -I. -I. -I./include -I./tests -I./generated -Wall -Werror -g -O2 -D_GNU_SOURCE -c src/zk_log.c  -fPIC -DPIC -o .libs/zk_log.o
 gcc -DHAVE_CONFIG_H -I. -I. -I. -I./include -I./tests -I./generated -Wall -Werror -g -O2 -D_GNU_SOURCE -c src/zk_log.c  -fPIC -DPIC -o zk_log.o >/dev/null 2>&1
/bin/bash ./libtool --tag=CC --mode=compile gcc -DHAVE_CONFIG_H -I. -I. -I.  -I./include -I./tests -I./generated  -Wall -Werror  -g -O2 -D_GNU_SOURCE -c -o zk_hashtable.lo `test -f 'src/zk_hashtable.c' || echo './'`src/zk_hashtable.c
 gcc -DHAVE_CONFIG_H -I. -I. -I. -I./include -I./tests -I./generated -Wall -Werror -g -O2 -D_GNU_SOURCE -c src/zk_hashtable.c  -fPIC -DPIC -o .libs/zk_hashtable.o
 gcc -DHAVE_CONFIG_H -I. -I. -I. -I./include -I./tests -I./generated -Wall -Werror -g -O2 -D_GNU_SOURCE -c src/zk_hashtable.c  -fPIC -DPIC -o zk_hashtable.o >/dev/null 2>&1
/bin/bash ./libtool --tag=CC --mode=compile gcc -DHAVE_CONFIG_H -I. -I. -I.  -I./include -I./tests -I./generated  -Wall -Werror  -g -O2 -D_GNU_SOURCE -c -o st_adaptor.lo `test -f 'src/st_adaptor.c' || echo './'`src/st_adaptor.c
 gcc -DHAVE_CONFIG_H -I. -I. -I. -I./include -I./tests -I./generated -Wall -Werror -g -O2 -D_GNU_SOURCE -c src/st_adaptor.c  -fPIC -DPIC -o .libs/st_adaptor.o
 gcc -DHAVE_CONFIG_H -I. -I. -I. -I./include -I./tests -I./generated -Wall -Werror -g -O2 -D_GNU_SOURCE -c src/st_adaptor.c  -fPIC -DPIC -o st_adaptor.o >/dev/null 2>&1
/bin/bash ./libtool --tag=CC --mode=link gcc -Wall -Werror  -g -O2 -D_GNU_SOURCE   -o libzkst.la   zookeeper.lo recordio.lo zookeeper.jute.lo zk_log.lo zk_hashtable.lo st_adaptor.lo -lm 
ar cru .libs/libzkst.a .libs/zookeeper.o .libs/recordio.o .libs/zookeeper.jute.o .libs/zk_log.o .libs/zk_hashtable.o .libs/st_adaptor.o
ar: `u' modifier ignored since `D' is the default (see `U')
ranlib .libs/libzkst.a
creating libzkst.la
(cd .libs && rm -f libzkst.la && ln -s ../libzkst.la libzkst.la)
/bin/bash ./libtool --tag=CC --mode=compile gcc -DHAVE_CONFIG_H -I. -I. -I.  -I./include -I./tests -I./generated  -Wall -Werror  -g -O2 -D_GNU_SOURCE -c -o hashtable_itr.lo `test -f 'src/hashtable/hashtable_itr.c' || echo './'`src/hashtable/hashtable_itr.c
 gcc -DHAVE_CONFIG_H -I. -I. -I. -I./include -I./tests -I./generated -Wall -Werror -g -O2 -D_GNU_SOURCE -c src/hashtable/hashtable_itr.c  -fPIC -DPIC -o .libs/hashtable_itr.o
 gcc -DHAVE_CONFIG_H -I. -I. -I. -I./include -I./tests -I./generated -Wall -Werror -g -O2 -D_GNU_SOURCE -c src/hashtable/hashtable_itr.c  -fPIC -DPIC -o hashtable_itr.o >/dev/null 2>&1
/bin/bash ./libtool --tag=CC --mode=compile gcc -DHAVE_CONFIG_H -I. -I. -I.  -I./include -I./tests -I./generated  -Wall -Werror  -g -O2 -D_GNU_SOURCE -c -o hashtable.lo `test -f 'src/hashtable/hashtable.c' || echo './'`src/hashtable/hashtable.c
 gcc -DHAVE_CONFIG_H -I. -I. -I. -I./include -I./tests -I./generated -Wall -Werror -g -O2 -D_GNU_SOURCE -c src/hashtable/hashtable.c  -fPIC -DPIC -o .libs/hashtable.o
 gcc -DHAVE_CONFIG_H -I. -I. -I. -I./include -I./tests -I./generated -Wall -Werror -g -O2 -D_GNU_SOURCE -c src/hashtable/hashtable.c  -fPIC -DPIC -o hashtable.o >/dev/null 2>&1
/bin/bash ./libtool --tag=CC --mode=link gcc -Wall -Werror  -g -O2 -D_GNU_SOURCE   -o libhashtable.la   hashtable_itr.lo hashtable.lo  
ar cru .libs/libhashtable.a .libs/hashtable_itr.o .libs/hashtable.o
ar: `u' modifier ignored since `D' is the default (see `U')
ranlib .libs/libhashtable.a
creating libhashtable.la
(cd .libs && rm -f libhashtable.la && ln -s ../libhashtable.la libhashtable.la)
/bin/bash ./libtool --tag=CC --mode=link gcc -Wall -Werror  -g -O2 -D_GNU_SOURCE   -o libzookeeper_st.la -rpath /home/smc/.gem/ruby/2.3.0/gems/zookeeper-1.4.11/ext/lib -no-undefined -version-info 2 -export-symbols-regex '(zoo_|zookeeper_|zhandle|Z|format_log_message|log_message|logLevel|deallocate_|zerror|is_unrecoverable)'  libzkst.la libhashtable.la 
generating symbol list for `libzookeeper_st.la'
/usr/bin/nm -B   ./.libs/libzkst.a ./.libs/libhashtable.a | sed -n -e 's/^.*[     ]\\([ABCDGIRSTW][ABCDGIRSTW]*\\)[     ][     ]*\\([_A-Za-z][_A-Za-z0-9]*\\)$/\\1 \\2 \\2/p' | /bin/sed 's/.* //' | sort | uniq > .libs/libzookeeper_st.exp
grep -E -e \"(zoo_|zookeeper_|zhandle|Z|format_log_message|log_message|logLevel|deallocate_|zerror|is_unrecoverable)\" \".libs/libzookeeper_st.exp\" > \".libs/libzookeeper_st.expT\"
mv -f \".libs/libzookeeper_st.expT\" \".libs/libzookeeper_st.exp\"
echo \"{ global:\" > .libs/libzookeeper_st.ver
 cat .libs/libzookeeper_st.exp | sed -e \"s/\\(.*\\)/\\1;/\" >> .libs/libzookeeper_st.ver
 echo \"local: *; };\" >> .libs/libzookeeper_st.ver
 gcc -shared  -Wl,--whole-archive ./.libs/libzkst.a ./.libs/libhashtable.a -Wl,--no-whole-archive  -lm  -Wl,-soname -Wl,libzookeeper_st.so.2 -Wl,-version-script -Wl,.libs/libzookeeper_st.ver -o .libs/libzookeeper_st.so.2.0.0
(cd .libs && rm -f libzookeeper_st.so.2 && ln -s libzookeeper_st.so.2.0.0 libzookeeper_st.so.2)
(cd .libs && rm -f libzookeeper_st.so && ln -s libzookeeper_st.so.2.0.0 libzookeeper_st.so)
rm -fr .libs/libzookeeper_st.lax
mkdir .libs/libzookeeper_st.lax
rm -fr .libs/libzookeeper_st.lax/libzkst.a
mkdir .libs/libzookeeper_st.lax/libzkst.a
(cd .libs/libzookeeper_st.lax/libzkst.a && ar x /home/smc/.gem/ruby/2.3.0/gems/zookeeper-1.4.11/ext/zkc-3.4.5/c/./.libs/libzkst.a)
rm -fr .libs/libzookeeper_st.lax/libhashtable.a
mkdir .libs/libzookeeper_st.lax/libhashtable.a
(cd .libs/libzookeeper_st.lax/libhashtable.a && ar x /home/smc/.gem/ruby/2.3.0/gems/zookeeper-1.4.11/ext/zkc-3.4.5/c/./.libs/libhashtable.a)
ar cru .libs/libzookeeper_st.a   .libs/libzookeeper_st.lax/libzkst.a/zk_hashtable.o .libs/libzookeeper_st.lax/libzkst.a/recordio.o .libs/libzookeeper_st.lax/libzkst.a/zk_log.o .libs/libzookeeper_st.lax/libzkst.a/st_adaptor.o .libs/libzookeeper_st.lax/libzkst.a/zookeeper.jute.o .libs/libzookeeper_st.lax/libzkst.a/zookeeper.o  .libs/libzookeeper_st.lax/libhashtable.a/hashtable_itr.o .libs/libzookeeper_st.lax/libhashtable.a/hashtable.o 
ar: `u' modifier ignored since `D' is the default (see `U')
ranlib .libs/libzookeeper_st.a
rm -fr .libs/libzookeeper_st.lax
creating libzookeeper_st.la
(cd .libs && rm -f libzookeeper_st.la && ln -s ../libzookeeper_st.la libzookeeper_st.la)
/bin/bash ./libtool --tag=CC --mode=compile gcc -DHAVE_CONFIG_H -I. -I. -I.  -I./include -I./tests -I./generated  -DTHREADED -g -O2 -D_GNU_SOURCE -c -o libzkmt_la-zookeeper.lo `test -f 'src/zookeeper.c' || echo './'`src/zookeeper.c
 gcc -DHAVE_CONFIG_H -I. -I. -I. -I./include -I./tests -I./generated -DTHREADED -g -O2 -D_GNU_SOURCE -c src/zookeeper.c  -fPIC -DPIC -o .libs/libzkmt_la-zookeeper.o
 gcc -DHAVE_CONFIG_H -I. -I. -I. -I./include -I./tests -I./generated -DTHREADED -g -O2 -D_GNU_SOURCE -c src/zookeeper.c  -fPIC -DPIC -o libzkmt_la-zookeeper.o >/dev/null 2>&1
/bin/bash ./libtool --tag=CC --mode=compile gcc -DHAVE_CONFIG_H -I. -I. -I.  -I./include -I./tests -I./generated  -DTHREADED -g -O2 -D_GNU_SOURCE -c -o libzkmt_la-recordio.lo `test -f 'src/recordio.c' || echo './'`src/recordio.c
 gcc -DHAVE_CONFIG_H -I. -I. -I. -I./include -I./tests -I./generated -DTHREADED -g -O2 -D_GNU_SOURCE -c src/recordio.c  -fPIC -DPIC -o .libs/libzkmt_la-recordio.o
 gcc -DHAVE_CONFIG_H -I. -I. -I. -I./include -I./tests -I./generated -DTHREADED -g -O2 -D_GNU_SOURCE -c src/recordio.c  -fPIC -DPIC -o libzkmt_la-recordio.o >/dev/null 2>&1
/bin/bash ./libtool --tag=CC --mode=compile gcc -DHAVE_CONFIG_H -I. -I. -I.  -I./include -I./tests -I./generated  -DTHREADED -g -O2 -D_GNU_SOURCE -c -o libzkmt_la-zookeeper.jute.lo `test -f 'generated/zookeeper.jute.c' || echo './'`generated/zookeeper.jute.c
 gcc -DHAVE_CONFIG_H -I. -I. -I. -I./include -I./tests -I./generated -DTHREADED -g -O2 -D_GNU_SOURCE -c generated/zookeeper.jute.c  -fPIC -DPIC -o .libs/libzkmt_la-zookeeper.jute.o
 gcc -DHAVE_CONFIG_H -I. -I. -I. -I./include -I./tests -I./generated -DTHREADED -g -O2 -D_GNU_SOURCE -c generated/zookeeper.jute.c  -fPIC -DPIC -o libzkmt_la-zookeeper.jute.o >/dev/null 2>&1
/bin/bash ./libtool --tag=CC --mode=compile gcc -DHAVE_CONFIG_H -I. -I. -I.  -I./include -I./tests -I./generated  -DTHREADED -g -O2 -D_GNU_SOURCE -c -o libzkmt_la-zk_log.lo `test -f 'src/zk_log.c' || echo './'`src/zk_log.c
 gcc -DHAVE_CONFIG_H -I. -I. -I. -I./include -I./tests -I./generated -DTHREADED -g -O2 -D_GNU_SOURCE -c src/zk_log.c  -fPIC -DPIC -o .libs/libzkmt_la-zk_log.o
 gcc -DHAVE_CONFIG_H -I. -I. -I. -I./include -I./tests -I./generated -DTHREADED -g -O2 -D_GNU_SOURCE -c src/zk_log.c  -fPIC -DPIC -o libzkmt_la-zk_log.o >/dev/null 2>&1
/bin/bash ./libtool --tag=CC --mode=compile gcc -DHAVE_CONFIG_H -I. -I. -I.  -I./include -I./tests -I./generated  -DTHREADED -g -O2 -D_GNU_SOURCE -c -o libzkmt_la-zk_hashtable.lo `test -f 'src/zk_hashtable.c' || echo './'`src/zk_hashtable.c
 gcc -DHAVE_CONFIG_H -I. -I. -I. -I./include -I./tests -I./generated -DTHREADED -g -O2 -D_GNU_SOURCE -c src/zk_hashtable.c  -fPIC -DPIC -o .libs/libzkmt_la-zk_hashtable.o
 gcc -DHAVE_CONFIG_H -I. -I. -I. -I./include -I./tests -I./generated -DTHREADED -g -O2 -D_GNU_SOURCE -c src/zk_hashtable.c  -fPIC -DPIC -o libzkmt_la-zk_hashtable.o >/dev/null 2>&1
/bin/bash ./libtool --tag=CC --mode=compile gcc -DHAVE_CONFIG_H -I. -I. -I.  -I./include -I./tests -I./generated  -DTHREADED -g -O2 -D_GNU_SOURCE -c -o libzkmt_la-mt_adaptor.lo `test -f 'src/mt_adaptor.c' || echo './'`src/mt_adaptor.c
 gcc -DHAVE_CONFIG_H -I. -I. -I. -I./include -I./tests -I./generated -DTHREADED -g -O2 -D_GNU_SOURCE -c src/mt_adaptor.c  -fPIC -DPIC -o .libs/libzkmt_la-mt_adaptor.o
/tmp/cc2n5n3c.s: Assembler messages:
/tmp/cc2n5n3c.s:1732: Error: bad instruction `lock xaddl r1,[r0]'
Makefile:747: recipe for target 'libzkmt_la-mt_adaptor.lo' failed
make[1]: *** [libzkmt_la-mt_adaptor.lo] Error 1
make[1]: Leaving directory '/home/smc/.gem/ruby/2.3.0/gems/zookeeper-1.4.11/ext/zkc-3.4.5/c'
Makefile:468: recipe for target 'all' failed
make: *** [all] Error 2
*** extconf.rb failed ***
Could not create Makefile due to some reason, probably lack of necessary
libraries and/or headers.  Check the mkmf.log file for more details.  You may
need configuration options.

Provided configuration options:
    --with-opt-dir
    --without-opt-dir
    --with-opt-include
    --without-opt-include=${opt-dir}/include
    --with-opt-lib
    --without-opt-lib=${opt-dir}/lib
    --with-make-prog
    --without-make-prog
    --srcdir=.
    --curdir
    --ruby=/usr/bin/$(RUBY_BASE_NAME)2.3
extconf.rb:51:in `safe_sh': command failed! make  2>&1 (RuntimeError)
    from extconf.rb:76:in `block (2 levels) in <main>'
    from extconf.rb:71:in `chdir'
    from extconf.rb:71:in `block in <main>'
    from extconf.rb:55:in `chdir'
    from extconf.rb:55:in `<main>'

extconf failed, exit code 1

Gem files will remain installed in /home/smc/.gem/ruby/2.3.0/gems/zookeeper-1.4.11 for inspection.
Results logged to /home/smc/.gem/ruby/2.3.0/extensions/arm-linux/2.3.0/zookeeper-1.4.11/gem_make.out

in_kafka_group swallows error in run and stops reading input

def run
  @consumer.each_batch(@fetch_opts) { |batch|
  ...
  rescue => e
    $log.error "unexpected error", :error => e.to_s
    $log.error_backtrace
  end
end

This logic breaks out of the consumer.each_batch loop, and does not resume it. We had a kafka cluster failover this morning, resulting in the following stacktrace:

  2016-10-03 09:07:16 +0000 [error]: unexpected error error="Kafka::NotLeaderForPartition"
  2016-10-03 09:07:16 +0000 [error]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/ruby-kafka-0.3.12/lib/kafka/protocol.rb:50:in `handle_error'
  2016-10-03 09:07:16 +0000 [error]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/ruby-kafka-0.3.12/lib/kafka/fetch_operation.rb:73:in `block (3 levels) in execute'
  2016-10-03 09:07:16 +0000 [error]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/ruby-kafka-0.3.12/lib/kafka/fetch_operation.rb:71:in `map'
  2016-10-03 09:07:16 +0000 [error]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/ruby-kafka-0.3.12/lib/kafka/fetch_operation.rb:71:in `block (2 levels) in execute'
  2016-10-03 09:07:16 +0000 [error]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/ruby-kafka-0.3.12/lib/kafka/fetch_operation.rb:70:in `each'
  2016-10-03 09:07:16 +0000 [error]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/ruby-kafka-0.3.12/lib/kafka/fetch_operation.rb:70:in `flat_map'
  2016-10-03 09:07:16 +0000 [error]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/ruby-kafka-0.3.12/lib/kafka/fetch_operation.rb:70:in `block in execute'
  2016-10-03 09:07:16 +0000 [error]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/ruby-kafka-0.3.12/lib/kafka/fetch_operation.rb:59:in `each'
  2016-10-03 09:07:16 +0000 [error]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/ruby-kafka-0.3.12/lib/kafka/fetch_operation.rb:59:in `flat_map'
  2016-10-03 09:07:16 +0000 [error]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/ruby-kafka-0.3.12/lib/kafka/fetch_operation.rb:59:in `execute'
  2016-10-03 09:07:16 +0000 [error]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/ruby-kafka-0.3.12/lib/kafka/consumer.rb:244:in `fetch_batches'
  2016-10-03 09:07:16 +0000 [error]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/ruby-kafka-0.3.12/lib/kafka/consumer.rb:160:in `block in each_batch'
  2016-10-03 09:07:16 +0000 [error]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/ruby-kafka-0.3.12/lib/kafka/consumer.rb:195:in `consumer_loop'
  2016-10-03 09:07:16 +0000 [error]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/ruby-kafka-0.3.12/lib/kafka/consumer.rb:159:in `each_batch'
  2016-10-03 09:07:16 +0000 [error]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluent-plugin-kafka-0.3.1/lib/fluent/plugin/in_kafka_group.rb:120:in `run'

No more messages were consumed after this.

I think the rescue block probably wants to retry, or perhaps reraise the exception so that fluentd can retry the input as a whole.

Thanks,

remove message key

in out kakfa_buffered can the message key be removed so raw data is captured
instead of {"message": [json]} we just get [json]

kaffka_buffered emits duplicated messages endlessly

I have a very simple config: tail from a file and send via kafka. However, the same messages are send again and again, as you can see in the logs. The input file (access.log) only has two lines of content.

2016-09-20 17:28:38 +0200 [info]: fluent/supervisor.rb:471:read_config: reading config file path="td-agent-kafka.conf"
2016-09-20 17:28:38 +0200 [info]: fluent/supervisor.rb:337:supervise: starting fluentd-0.12.26
2016-09-20 17:28:38 +0200 [info]: fluent/engine.rb:126:block in configure: gem 'fluent-mixin-config-placeholders' version '0.4.0'
2016-09-20 17:28:38 +0200 [info]: fluent/engine.rb:126:block in configure: gem 'fluent-mixin-plaintextformatter' version '0.2.6'
2016-09-20 17:28:38 +0200 [info]: fluent/engine.rb:126:block in configure: gem 'fluent-plugin-elasticsearch' version '1.5.0'
2016-09-20 17:28:38 +0200 [info]: fluent/engine.rb:126:block in configure: gem 'fluent-plugin-forest' version '0.3.1'
2016-09-20 17:28:38 +0200 [info]: fluent/engine.rb:126:block in configure: gem 'fluent-plugin-kafka' version '0.3.1'
2016-09-20 17:28:38 +0200 [info]: fluent/engine.rb:126:block in configure: gem 'fluent-plugin-mongo' version '0.7.13'
2016-09-20 17:28:38 +0200 [info]: fluent/engine.rb:126:block in configure: gem 'fluent-plugin-rewrite-tag-filter' version '1.5.5'
2016-09-20 17:28:38 +0200 [info]: fluent/engine.rb:126:block in configure: gem 'fluent-plugin-s3' version '0.6.8'
2016-09-20 17:28:38 +0200 [info]: fluent/engine.rb:126:block in configure: gem 'fluent-plugin-scribe' version '0.10.14'
2016-09-20 17:28:38 +0200 [info]: fluent/engine.rb:126:block in configure: gem 'fluent-plugin-td' version '0.10.28'
2016-09-20 17:28:38 +0200 [info]: fluent/engine.rb:126:block in configure: gem 'fluent-plugin-td-monitoring' version '0.2.2'
2016-09-20 17:28:38 +0200 [info]: fluent/engine.rb:126:block in configure: gem 'fluent-plugin-webhdfs' version '0.4.2'
2016-09-20 17:28:38 +0200 [info]: fluent/engine.rb:126:block in configure: gem 'fluentd' version '0.12.26'
2016-09-20 17:28:38 +0200 [info]: fluent/agent.rb:128:add_match: adding match pattern="*.**" type="kafka_buffered"
2016-09-20 17:28:39 +0200 [info]: plugin/out_kafka_buffered.rb:103:configure: brokers has been set directly: ["hdp1:6667"]
2016-09-20 17:28:39 +0200 [info]: fluent/root_agent.rb:147:add_source: adding source type="tail"
2016-09-20 17:28:39 +0200 [info]: fluent/engine.rb:133:configure: using configuration file: <ROOT>
  <source>
    @type tail
    path /var/log/apache2/access.log
    read_from_head true
    pos_file /home/user/access.log.pos
    tag apache.access
    format apache2
  </source>
  <match *.**>
    @type kafka_buffered
    flush_interval 6s
    buffer_type file
    buffer_path /tmp/kafka.*.buffer
    brokers hdp1:6667
    default_topic testTopic
  </match>
</ROOT>
2016-09-20 17:28:39 +0200 [info]: plugin/out_kafka_buffered.rb:83:refresh_client: initialized kafka producer: kafka
2016-09-20 17:28:39 +0200 [info]: plugin/in_tail.rb:529:initialize: following tail of /var/log/apache2/access.log
2016-09-20 17:28:46 +0200 [debug]: plugin/out_kafka_buffered.rb:236:write: (records|bytes) ({"testTopic"=>3}|{"testTopic"=>142})
2016-09-20 17:28:47 +0200 [debug]: plugin/out_kafka_buffered.rb:236:write: (records|bytes) ({"testTopic"=>2}|{"testTopic"=>520})
2016-09-20 17:28:52 +0200 [debug]: plugin/out_kafka_buffered.rb:236:write: (records|bytes) ({"testTopic"=>2}|{"testTopic"=>138})
2016-09-20 17:28:58 +0200 [debug]: plugin/out_kafka_buffered.rb:236:write: (records|bytes) ({"testTopic"=>1}|{"testTopic"=>69})
2016-09-20 17:29:04 +0200 [debug]: plugin/out_kafka_buffered.rb:236:write: (records|bytes) ({"testTopic"=>1}|{"testTopic"=>68})

After the correct two lines are put in the kafka queue, the following lines are received by kafka again an again:

{"message":"(records|bytes) ({\"testTopic\"=>1}|{\"testTopic\"=>142})"}
{"message":"(records|bytes) ({\"testTopic\"=>2}|{\"testTopic\"=>520})"}
{"message":"(records|bytes) ({\"testTopic\"=>2}|{\"testTopic\"=>138})"}
{"message":"(records|bytes) ({\"testTopic\"=>1}|{\"testTopic\"=>69})"}

It seems as if the debug logs get emmited to kafka.

install fluent-plugin-kafka proble

$ sudo gem install fluent-plugin-kafka
Fetching: msgpack-0.5.12.gem (100%)
Building native extensions. This could take a while...
Successfully installed msgpack-0.5.12
Fetching: yajl-ruby-1.2.1.gem (100%)
Building native extensions. This could take a while...
Successfully installed yajl-ruby-1.2.1
Fetching: cool.io-1.4.3.gem (100%)
Building native extensions. This could take a while...
Successfully installed cool.io-1.4.3
Fetching: http_parser.rb-0.6.0.gem (100%)
Building native extensions. This could take a while...
Successfully installed http_parser.rb-0.6.0
Fetching: sigdump-0.2.4.gem (100%)
Successfully installed sigdump-0.2.4
Fetching: tzinfo-data-1.2016.1.gem (100%)
Successfully installed tzinfo-data-1.2016.1
Fetching: string-scrub-0.0.5.gem (100%)
Building native extensions. This could take a while...
Successfully installed string-scrub-0.0.5
Fetching: fluentd-0.12.20.gem (100%)
Successfully installed fluentd-0.12.20
Fetching: poseidon-0.0.5.gem (100%)
Successfully installed poseidon-0.0.5
Fetching: zookeeper-1.4.11.gem (100%)
Building native extensions. This could take a while...
ERROR: Error installing fluent-plugin-kafka:
ERROR: Failed to build gem native extension.

/usr/bin/ruby2.0 extconf.rb

Building zkc.
tar xzf zkc-3.4.5.tar.gz 2>&1
patch -p0 < patches/zkc-3.4.5-yosemite-htonl-fix.patch 2>&1
sh: patch: command not found
*** extconf.rb failed ***
Could not create Makefile due to some reason, probably lack of necessary
libraries and/or headers. Check the mkmf.log file for more details. You may
need configuration options.

Provided configuration options:
--with-opt-dir
--without-opt-dir
--with-opt-include
--without-opt-include=${opt-dir}/include
--with-opt-lib
--without-opt-lib=${opt-dir}/lib64
--with-make-prog
--without-make-prog
--srcdir=.
--curdir
--ruby=/usr/bin/ruby2.0
extconf.rb:51:in safe_sh': command failed! patch -p0 < patches/zkc-3.4.5-yosemite-htonl-fix.patch 2>&1 (RuntimeError) from extconf.rb:64:inblock (2 levels) in

'
from extconf.rb:63:in each' from extconf.rb:63:inblock in '
from extconf.rb:55:in chdir' from extconf.rb:55:in'

Gem files will remain installed in /usr/local/share/ruby/gems/2.0/gems/zookeeper-1.4.11 for inspection.
Results logged to /usr/local/share/ruby/gems/2.0/gems/zookeeper-1.4.11/ext/gem_make.out

Feature Request - support with :trail

Hi there,
I see there is a parameter options[:trail] right here in ConsumerGroup. Which is used to skip all the old messages in a specific topic. It's very useful if you want to consume an existing topic from the latest.

I would like to have it supported here in fluent plugin.

ZK was flooded with thousand of connection if Topic is not found

Steps to produce:

  1. Use kafka cluster with Automatic Topic Creation disabled
  2. Create td-agent with kafka output, using some random topic name
  3. Started td-agent

Behavior:
Since topic was not found, td-agent will keep sending request into ZK, while not closing previous connections.
Results:
Connection from td-agent to ZK is increasing rapidly, and causing ZK unresponsive.

when "output_data_type" is set to "single_value" or "csv", then output is empty.

It appears as if "single_value" nor "csv" isn't a supported format for "output_data_type"? When I try it, the events are empty. Other formats like "json" or "ltsv" seem to work fine. Below is a sample of my config:

type tail tag ts.log format none path /tmp/*-access.log refresh_interval 5 read_from_head false message_key log pos_file /tmp/ts.log.pos type kafka_buffered brokers localhost:9092 default_topic test flush_interval 1 buffer_type file buffer_path /tmp/fluentd.kafka.buffer output_include_time true #output_data_type ltsv #output_data_type json output_data_type single_value

Doesn't detect newly added logs ocassionally

Kafka version: kafka_2.11-0.9.0.1
fluent-plugin-kafka : 0.3.0

Hi, guys

With very latest version 0.3.0, the plugin sometimes doesn't detect newly added logs and doesn't send them to my kafka. Sometimes added logs are sent to kafka, and sometimes aren't. Very irregular.
It worked fine with older version 0.1.3.

My /etc/td-agent/td-agent.conf is something like this.

<source>
  @type       tail
  format      none
  path        /var/log/app.log
  pos_file    /var/log/app.log.pos
  message_key body
  tag         test_tag
</source>

<match test_tag>
  @type            kafka
  brokers          broker001:9092
  default_topic    test_topic
  output_data_type json
</match>

Thanks

Toru

Some questions about using fluent-plugin-kafka

Some questions about using fluent-plugin-kafka

After I used fluent-plugin-kafka, I found that I had no time field in my log.
follows:
{"client_ip":"39.76.243.121","domain":"i.xxx.com","method":"GET","url":"/login/callback?pwd=0&d=emXYDnLHD6emC279&tsl=0&auth=jBDNN%2BpeV7JDqZ%2Fxmyu4RVGo1HuNiMjd8gj98oMbtWbgFyQ8ZkO3FVKOyo3LIi0jC3WV5nDH%2Bi%2BWZA2RmW%2F%2FpmtmzkaivDgErjuk1x%2F07XZqUa9uo5aYZkbTAA%2FqYXMQdi4HXf4HtErNWQZx7GfZcyoq5OtiAQ%2FTG8bz%2Bn6y%2F8Q%3D&m=1&pass_eas=6.0&pass_uas=8.0&pass_ss=6.0&nonce=smvZw3bR4S4Bbxre&_ssign=5AsKKQzLv2YYHrBwDt0j1OYaN3I%3D&clientSign=VyhhPhkYecG9vmaUSOxwrtCAe4c%3D&_userIdNeedEncrypt=true","http_ver":"HTTP/1.0","http_code":"200","http_length":"741","referer":"-","ua":"2014811/wt88047; MIUI/V6.7.1.0.KHJCNCH E/V6 B/S L/zh-CN LO/CN","proxy_ip":"10.108.70.12","upstream_addr":"127.0.0.1:9000","request_time":"0.001","response_time":"0.001","upstream_status":"200","custom_status":"-","userid":"","logid":"496401249220","time":1443515401,"tag":"ngx_log_xxx"}

My fluentd configuration is as follows:

##td-agent.conf
type tail format /^(?[^ ]_) - (?[^ ]_) [(?[^]]_)] "(?[^ ]_) (?[^ ]_) (?[^ ]_)" (?[^ ]_) (?[^ ]_) "(?[^ ]_)" "(?[^\"]_)" "(?[^\/]_)/(?[^\"]_)" "(?[^\/]_)/(?[^\/]_)/(?[^\/]_)/(?[^\/]_)/(?[^\"]_)"( "(?[^"]_)")?$/ time_format %d/%b/%Y:%H:%M:%S %z pos_file /tmp/td-agent/nginx_log.pos #refresh_interval 60 path /home/work/logs/nginx/*.log tag ngx_log_xxx type kafka brokers kafka01:9092,kafka02:9092,kafka03:9092 zookeeper zk01:2181,zk02:2181,zk03:2181

default_topic kafka_test
output_data_type json
#output_include_tag true
#output_include_time true

######

But I want the result to be like this๏ผš

{"client_ip":"222.84.167.100","domain":"xmevent.xxx.com","time":"29/Sep/2015:16:31:43 +0800","method":"GET","url":"/login/callback?pwd=0&d=eSjZpzJmym0c7dlS&tsl=0&auth=SB0AyMVyyVx0lyLE%2FubXxiCvcdqbYNLWuWmDDKW9PDBb2Xjmep%2FoEL5BUjWgmUc1gR2AXosiQzvRLQMDkPN7vHtJwNHSEWaJw%2FOpFIF8LWXaoXDU%2B5DJKi5fP24SWkb7dJREJA07b%2BFFf9lLme8ODUn2dDRuSErex2EJtGbYXQg%3D&m=1&pass_eas=9.0&pass_uas=4.0&pass_ss=7.0&nonce=1JQ30TqRERYBbxrf&_ssign=ENDBT9SB%2BwjrAyypY1PLOFSBByE%3D&_userIdNeedEncrypt=true&clientSign=vJxpZfnzcw6rCcc6ZrEGtWXRcgU%3D","http_ver":"HTTP/1.0","http_code":"200","http_length":"849","referer":"-","ua":"Redmi Note 2/Redmi Note 2; MIUI/V6.7.10.0.LHMCNCH E/V6 B/S L/zh-CN LO/CN","proxy_ip":"10.108.47.18","upstream_addr":"127.0.0.1:9000","request_time":"0.003","response_time":"0.003","upstream_status":"200","custom_status":"0","userid":"","logid":"603503536912","time":1443515503,"tag":"ngx_log_xxx"}

I tried to change the time to log_time in the log format to be output, but I need time to create the pattern index, and I now find that I can't create the pattern time if it is collected by me,
1, time field in fluentd-plugin-kafka and td-agent whether there is a special significance? Is there any way to solve this problem?
2, in the fluentd-plugin-kafka is to create such a tag: ngx_xxx_log-2015.09.29

Fluentd crash when zookeeper returns no brokers or if zookeeper call fails

Hi @htgc ,

I am using v0.0.9 for this plugin. There are 2 specific exceptions we can see in our logs and both exceptions lead to entire Fluentd Agent crashing. Looks like these are uncaught errors in the code leading to full crash. Any input here would be helpful.

In both scenarios we were using the 'zookeeper' configuration and not the 'brokers'.

Error 1:
In this case Zookeeper is UP but no brokers are registered as they were down for maintenance.

2015-03-20 09:08:12 +0000 [error]: unexpected error error="undefined method `each' for nil:NilClass"
  2015-03-20 09:08:12 +0000 [error]: /usr/lib64/fluent/ruby/lib/ruby/gems/1.9.1/gems/fluent-plugin-kafka-0.0.9/lib/fluent/plugin/out_kafka_buffered.rb:39:in `configure'
  2015-03-20 09:08:12 +0000 [error]: /usr/lib64/fluent/ruby/lib/ruby/gems/1.9.1/gems/fluentd-0.12.5/lib/fluent/plugin/out_copy.rb:43:in `block in configure'
  2015-03-20 09:08:12 +0000 [error]: /usr/lib64/fluent/ruby/lib/ruby/gems/1.9.1/gems/fluentd-0.12.5/lib/fluent/plugin/out_copy.rb:32:in `each'
  2015-03-20 09:08:12 +0000 [error]: /usr/lib64/fluent/ruby/lib/ruby/gems/1.9.1/gems/fluentd-0.12.5/lib/fluent/plugin/out_copy.rb:32:in `configure'
  2015-03-20 09:08:12 +0000 [error]: /usr/lib64/fluent/ruby/lib/ruby/gems/1.9.1/gems/fluentd-0.12.5/lib/fluent/agent.rb:127:in `add_match'
  2015-03-20 09:08:12 +0000 [error]: /usr/lib64/fluent/ruby/lib/ruby/gems/1.9.1/gems/fluentd-0.12.5/lib/fluent/agent.rb:60:in `block in configure'
  2015-03-20 09:08:12 +0000 [error]: /usr/lib64/fluent/ruby/lib/ruby/gems/1.9.1/gems/fluentd-0.12.5/lib/fluent/agent.rb:54:in `each'
  2015-03-20 09:08:12 +0000 [error]: /usr/lib64/fluent/ruby/lib/ruby/gems/1.9.1/gems/fluentd-0.12.5/lib/fluent/agent.rb:54:in `configure'
  2015-03-20 09:08:12 +0000 [error]: /usr/lib64/fluent/ruby/lib/ruby/gems/1.9.1/gems/fluentd-0.12.5/lib/fluent/root_agent.rb:82:in `configure'
  2015-03-20 09:08:12 +0000 [error]: /usr/lib64/fluent/ruby/lib/ruby/gems/1.9.1/gems/fluentd-0.12.5/lib/fluent/engine.rb:97:in `configure'
  2015-03-20 09:08:12 +0000 [error]: /usr/lib64/fluent/ruby/lib/ruby/gems/1.9.1/gems/fluentd-0.12.5/lib/fluent/engine.rb:77:in `run_configure'
  2015-03-20 09:08:12 +0000 [error]: /usr/lib64/fluent/ruby/lib/ruby/gems/1.9.1/gems/fluentd-0.12.5/lib/fluent/supervisor.rb:399:in `run_configure'
  2015-03-20 09:08:12 +0000 [error]: /usr/lib64/fluent/ruby/lib/ruby/gems/1.9.1/gems/fluentd-0.12.5/lib/fluent/supervisor.rb:138:in `block in start'
  2015-03-20 09:08:12 +0000 [error]: /usr/lib64/fluent/ruby/lib/ruby/gems/1.9.1/gems/fluentd-0.12.5/lib/fluent/supervisor.rb:266:in `call'
  2015-03-20 09:08:12 +0000 [error]: /usr/lib64/fluent/ruby/lib/ruby/gems/1.9.1/gems/fluentd-0.12.5/lib/fluent/supervisor.rb:266:in `main_process'
  2015-03-20 09:08:12 +0000 [error]: /usr/lib64/fluent/ruby/lib/ruby/gems/1.9.1/gems/fluentd-0.12.5/lib/fluent/supervisor.rb:241:in `block in supervise'
  2015-03-20 09:08:12 +0000 [error]: /usr/lib64/fluent/ruby/lib/ruby/gems/1.9.1/gems/fluentd-0.12.5/lib/fluent/supervisor.rb:240:in `fork'
  2015-03-20 09:08:12 +0000 [error]: /usr/lib64/fluent/ruby/lib/ruby/gems/1.9.1/gems/fluentd-0.12.5/lib/fluent/supervisor.rb:240:in `supervise'
  2015-03-20 09:08:12 +0000 [error]: /usr/lib64/fluent/ruby/lib/ruby/gems/1.9.1/gems/fluentd-0.12.5/lib/fluent/supervisor.rb:134:in `start'
  2015-03-20 09:08:12 +0000 [error]: /usr/lib64/fluent/ruby/lib/ruby/gems/1.9.1/gems/fluentd-0.12.5/lib/fluent/command/fluentd.rb:167:in `<top (required)>'
  2015-03-20 09:08:12 +0000 [error]: /usr/lib64/fluent/ruby/lib/ruby/1.9.1/rubygems/custom_require.rb:55:in `require'
  2015-03-20 09:08:12 +0000 [error]: /usr/lib64/fluent/ruby/lib/ruby/1.9.1/rubygems/custom_require.rb:55:in `require'
  2015-03-20 09:08:12 +0000 [error]: /usr/lib64/fluent/ruby/lib/ruby/gems/1.9.1/gems/fluentd-0.12.5/bin/fluentd:6:in `<top (required)>'
  2015-03-20 09:08:12 +0000 [error]: /usr/lib64/fluent/ruby/bin/fluentd:23:in `load'
  2015-03-20 09:08:12 +0000 [error]: /usr/lib64/fluent/ruby/bin/fluentd:23:in `<top (required)>'
  2015-03-20 09:08:12 +0000 [error]: /usr/sbin/td-agent:7:in `load'
  2015-03-20 09:08:12 +0000 [error]: /usr/sbin/td-agent:7:in `<main>'
2015-03-20 09:08:12 +0000 [info]: process finished code=256
2015-03-20 09:08:12 +0000 [warn]: process died within 1 second. exit.
2015-03-20 09:10:29 +0000 [info]: reading config file path="/home/ec2-user/cloudmi/streaming/accounting/ph3-etl/td-agent.conf"

Error 2:

In this case Zookeeper itself was down for maintenance.

2015-03-20 07:54:50 +0000 [error]: unexpected error error="response for meth: :get_children, args: [0, \"/brokers/ids\", nil, nil], not received within 30 seconds"
  2015-03-20 07:54:50 +0000 [error]: /usr/lib64/fluent/ruby/lib/ruby/gems/1.9.1/gems/zookeeper-1.4.10/lib/zookeeper/continuation.rb:117:in `block in value'
  2015-03-20 07:54:50 +0000 [error]: /usr/lib64/fluent/ruby/lib/ruby/1.9.1/monitor.rb:211:in `mon_synchronize'
  2015-03-20 07:54:50 +0000 [error]: /usr/lib64/fluent/ruby/lib/ruby/gems/1.9.1/gems/zookeeper-1.4.10/lib/zookeeper/continuation.rb:107:in `value'
  2015-03-20 07:54:50 +0000 [error]: /usr/lib64/fluent/ruby/lib/ruby/gems/1.9.1/gems/zookeeper-1.4.10/ext/c_zookeeper.rb:231:in `submit_and_block'
  2015-03-20 07:54:50 +0000 [error]: /usr/lib64/fluent/ruby/lib/ruby/gems/1.9.1/gems/zookeeper-1.4.10/ext/c_zookeeper.rb:40:in `get_children'
  2015-03-20 07:54:50 +0000 [error]: /usr/lib64/fluent/ruby/lib/ruby/gems/1.9.1/gems/zookeeper-1.4.10/lib/zookeeper/client_methods.rb:69:in `get_children'
  2015-03-20 07:54:50 +0000 [error]: /usr/lib64/fluent/ruby/lib/ruby/gems/1.9.1/gems/fluent-plugin-kafka-0.0.9/lib/fluent/plugin/out_kafka_buffered.rb:39:in `configure'
  2015-03-20 07:54:50 +0000 [error]: /usr/lib64/fluent/ruby/lib/ruby/gems/1.9.1/gems/fluentd-0.12.5/lib/fluent/plugin/out_copy.rb:43:in `block in configure'
  2015-03-20 07:54:50 +0000 [error]: /usr/lib64/fluent/ruby/lib/ruby/gems/1.9.1/gems/fluentd-0.12.5/lib/fluent/plugin/out_copy.rb:32:in `each'
  2015-03-20 07:54:50 +0000 [error]: /usr/lib64/fluent/ruby/lib/ruby/gems/1.9.1/gems/fluentd-0.12.5/lib/fluent/plugin/out_copy.rb:32:in `configure'
  2015-03-20 07:54:50 +0000 [error]: /usr/lib64/fluent/ruby/lib/ruby/gems/1.9.1/gems/fluentd-0.12.5/lib/fluent/agent.rb:127:in `add_match'
  2015-03-20 07:54:50 +0000 [error]: /usr/lib64/fluent/ruby/lib/ruby/gems/1.9.1/gems/fluentd-0.12.5/lib/fluent/agent.rb:60:in `block in configure'
  2015-03-20 07:54:50 +0000 [error]: /usr/lib64/fluent/ruby/lib/ruby/gems/1.9.1/gems/fluentd-0.12.5/lib/fluent/agent.rb:54:in `each'
  2015-03-20 07:54:50 +0000 [error]: /usr/lib64/fluent/ruby/lib/ruby/gems/1.9.1/gems/fluentd-0.12.5/lib/fluent/agent.rb:54:in `configure'
  2015-03-20 07:54:50 +0000 [error]: /usr/lib64/fluent/ruby/lib/ruby/gems/1.9.1/gems/fluentd-0.12.5/lib/fluent/root_agent.rb:82:in `configure'
  2015-03-20 07:54:50 +0000 [error]: /usr/lib64/fluent/ruby/lib/ruby/gems/1.9.1/gems/fluentd-0.12.5/lib/fluent/engine.rb:97:in `configure'
  2015-03-20 07:54:50 +0000 [error]: /usr/lib64/fluent/ruby/lib/ruby/gems/1.9.1/gems/fluentd-0.12.5/lib/fluent/engine.rb:77:in `run_configure'
  2015-03-20 07:54:50 +0000 [error]: /usr/lib64/fluent/ruby/lib/ruby/gems/1.9.1/gems/fluentd-0.12.5/lib/fluent/supervisor.rb:399:in `run_configure'
  2015-03-20 07:54:50 +0000 [error]: /usr/lib64/fluent/ruby/lib/ruby/gems/1.9.1/gems/fluentd-0.12.5/lib/fluent/supervisor.rb:138:in `block in start'
  2015-03-20 07:54:50 +0000 [error]: /usr/lib64/fluent/ruby/lib/ruby/gems/1.9.1/gems/fluentd-0.12.5/lib/fluent/supervisor.rb:266:in `call'
  2015-03-20 07:54:50 +0000 [error]: /usr/lib64/fluent/ruby/lib/ruby/gems/1.9.1/gems/fluentd-0.12.5/lib/fluent/supervisor.rb:266:in `main_process'
  2015-03-20 07:54:50 +0000 [error]: /usr/lib64/fluent/ruby/lib/ruby/gems/1.9.1/gems/fluentd-0.12.5/lib/fluent/supervisor.rb:241:in `block in supervise'
  2015-03-20 07:54:50 +0000 [error]: /usr/lib64/fluent/ruby/lib/ruby/gems/1.9.1/gems/fluentd-0.12.5/lib/fluent/supervisor.rb:240:in `fork'
  2015-03-20 07:54:50 +0000 [error]: /usr/lib64/fluent/ruby/lib/ruby/gems/1.9.1/gems/fluentd-0.12.5/lib/fluent/supervisor.rb:240:in `supervise'
  2015-03-20 07:54:50 +0000 [error]: /usr/lib64/fluent/ruby/lib/ruby/gems/1.9.1/gems/fluentd-0.12.5/lib/fluent/supervisor.rb:134:in `start'
  2015-03-20 07:54:50 +0000 [error]: /usr/lib64/fluent/ruby/lib/ruby/gems/1.9.1/gems/fluentd-0.12.5/lib/fluent/command/fluentd.rb:167:in `<top (required)>'
  2015-03-20 07:54:50 +0000 [error]: /usr/lib64/fluent/ruby/lib/ruby/1.9.1/rubygems/custom_require.rb:55:in `require'
  2015-03-20 07:54:50 +0000 [error]: /usr/lib64/fluent/ruby/lib/ruby/1.9.1/rubygems/custom_require.rb:55:in `require'
  2015-03-20 07:54:50 +0000 [error]: /usr/lib64/fluent/ruby/lib/ruby/gems/1.9.1/gems/fluentd-0.12.5/bin/fluentd:6:in `<top (required)>'
  2015-03-20 07:54:50 +0000 [error]: /usr/lib64/fluent/ruby/bin/fluentd:23:in `load'
  2015-03-20 07:54:50 +0000 [error]: /usr/lib64/fluent/ruby/bin/fluentd:23:in `<top (required)>'
  2015-03-20 07:54:50 +0000 [error]: /usr/sbin/td-agent:7:in `load'
  2015-03-20 07:54:50 +0000 [error]: /usr/sbin/td-agent:7:in `<main>'
2015-03-20 07:54:50 +0000 [info]: process finished code=256
2015-03-20 07:54:50 +0000 [error]: fluentd main process died unexpectedly. restarting.
2015-03-20 07:54:50 +0000 [info]: starting fluentd-0.12.5

Expected Behaviour:

Temporary network glitches can occur or brokers could get migrated from one zookeeper to another leading to intermittent downtimes. This should lead to complete Fluentd agent crash. In such scenarios can the retry configuration be used? Or retry forever until issue resolved?

Feature request: use poseidon_cluster to support consumer group and zookeeper

Hi @htgc, thanks for your great work on this plugin, I am working on a log aggregation solution with kafka and fluentd, I would like to know, do you have any plan to support consumer group and zookeeper in the kafka input plugin

I have tried poseidon_cluster, with minimal change I made it work in my environment. It requires more work to be solid but it's not a big change. If you want I can work on a pull request, just want to touch base first in case you have the plan. Thanks!

Support for partitioning

Hi,

I would like to know if partitioning is supported by the plugin.
Poseidon round robin messages by default but if a key is specified, it will use it to send the message to the related partition.

If it is supported, could you give an example of the fluentd configuration?
If not, do you have any plan to implement it?

Thanks,

Fabien

Flush error with invalid fluentd record

Hello,

Last day, out_kafka_buffered in my fluentd cluster had the following error.

2016-10-06 17:10:27 +0900 [warn]: temporarily failed to flush the buffer. next_retry=2016-10-06 17:10:28 +0900 error_class="NoMethodError" error="undefined method `[]=' for nil:NilClass" plugin_id="to_kafka"
  2016-10-06 17:10:27 +0900 [warn]: /home/myuser/local/ruby-2.2/lib/ruby/gems/2.2.0/gems/fluent-plugin-kafka-0.2.2/lib/fluent/plugin/out_kafka_buffered.rb:215:in `block in write'
  2016-10-06 17:10:27 +0900 [warn]: /home/myuser/local/ruby-2.2/lib/ruby/gems/2.2.0/gems/fluentd-0.12.28/lib/fluent/buffer.rb:123:in `each'
  2016-10-06 17:10:27 +0900 [warn]: /home/myuser/local/ruby-2.2/lib/ruby/gems/2.2.0/gems/fluentd-0.12.28/lib/fluent/buffer.rb:123:in `block in msgpack_each'
  2016-10-06 17:10:27 +0900 [warn]: /home/myuser/local/ruby-2.2/lib/ruby/gems/2.2.0/gems/fluentd-0.12.28/lib/fluent/plugin/buf_file.rb:71:in `open'
  2016-10-06 17:10:27 +0900 [warn]: /home/myuser/local/ruby-2.2/lib/ruby/gems/2.2.0/gems/fluentd-0.12.28/lib/fluent/buffer.rb:120:in `msgpack_each'
  2016-10-06 17:10:27 +0900 [warn]: /home/myuser/local/ruby-2.2/lib/ruby/gems/2.2.0/gems/fluent-plugin-kafka-0.2.2/lib/fluent/plugin/out_kafka_buffered.rb:210:in `write'
  2016-10-06 17:10:27 +0900 [warn]: /home/myuser/local/ruby-2.2/lib/ruby/gems/2.2.0/gems/fluentd-0.12.28/lib/fluent/buffer.rb:354:in `write_chunk'
  2016-10-06 17:10:27 +0900 [warn]: /home/myuser/local/ruby-2.2/lib/ruby/gems/2.2.0/gems/fluentd-0.12.28/lib/fluent/buffer.rb:333:in `pop'
  2016-10-06 17:10:27 +0900 [warn]: /home/myuser/local/ruby-2.2/lib/ruby/gems/2.2.0/gems/fluentd-0.12.28/lib/fluent/output.rb:338:in `try_flush'
  2016-10-06 17:10:27 +0900 [warn]: /home/myuser/local/ruby-2.2/lib/ruby/gems/2.2.0/gems/fluentd-0.12.28/lib/fluent/output.rb:149:in `run'
2016-10-06 17:10:28 +0900 [warn]: Send exception occurred: undefined method `[]=' for nil:NilClass
2016-10-06 17:10:28 +0900 [info]: initialized kafka producer: kafka  

From the errors from another fluentd which receives the same log, I realized that some of our client sent records with nil.

2016-10-06 17:10:08 +0900 [warn]: dump an error event: error_class=NoMethodError error="undefined method `/' for \"POST\":String" tag="sometag" time=1475741408 record={"time"=>"POST", "record"=>nil}

I know that this is the issue of the sender, but since our fluentd cluster aggregates logs from so many different client, this can happen. And once this happens, other normal logs will also get stuck in the buffer. So, it would be great if out_kafka_buffered could check such invalid record and drop them so that other normal logs will not be affected.

Undefined method `buffer_chunk_limit'

On line 130 (https://github.com/htgc/fluent-plugin-kafka/blob/master/lib/fluent/plugin/out_kafka_buffered.rb#L130) the out_kafka_buffered.rb file refers to @buffer.buffer_chunk_limit which does not appear to be defined. However, changing that line to refer to he chunk_limit_size fixes the problem:

 max_buffer_size: @buffer.chunk_limit_size / 10, max_buffer_bytesize: @buffer.chunk_limit_size * 2}

Being a junior fluent plugin thinker, I'm not confident how this problem is effectively resolved or whether the bug I'm running into is actually a bug. Eager to give this a discussion to figure out what's right and what's wrong.

Kafka::LeaderNotAvailable

{"message":"Kafka::LeaderNotAvailable"}
{"host":"192.168.0.11","port":24224,"hard_timeout":true,"message":"detached forwarding server '192.168.0.11:24224' host=\"192.168.0.11\" port=24224 hard_timeout=true"}
{"host":"192.168.0.12","port":24224,"hard_timeout":true,"message":"detached forwarding server '192.168.0.12:24224' host=\"192.168.0.12\" port=24224 hard_timeout=true"}

What does this mean?

can not set ack_level

HI
I use kafka output plugin to insert log into the kafka server.
But i found out that i can not set the ack_level ,then when i use this plugin to insert log into a cluster kafka server, there will be some problem.
The default of ack_level is 0 ,how can i set it to 1?
Thanks a lot

Kafka::DeliveryFailed

I can successfully connect and create a topic but the message delivery fails, as there is no detail about the error, its hard to point out the issue, and where are the messages buffered ? can we specify the location if its on a file ?

2016-10-12 12:11:44 +0800 [warn]: temporarily failed to flush the buffer. next_retry=2016-10-12 12:11:43 +0800 error_class="Kafka::DeliveryFailed" error="Failed to send messages to EnigmaProxyStaging/0" plugin_id="object:3fe70b58a354"
  2016-10-12 12:11:44 +0800 [warn]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/ruby-kafka-0.3.15/lib/kafka/producer.rb:329:in `deliver_messages_with_retries'
  2016-10-12 12:11:44 +0800 [warn]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/ruby-kafka-0.3.15/lib/kafka/producer.rb:243:in `block in deliver_messages'
  2016-10-12 12:11:44 +0800 [warn]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/ruby-kafka-0.3.15/lib/kafka/instrumenter.rb:21:in `call'
  2016-10-12 12:11:44 +0800 [warn]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/ruby-kafka-0.3.15/lib/kafka/instrumenter.rb:21:in `instrument'
  2016-10-12 12:11:44 +0800 [warn]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/ruby-kafka-0.3.15/lib/kafka/producer.rb:236:in `deliver_messages'
  2016-10-12 12:11:44 +0800 [warn]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluent-plugin-kafka-0.3.1/lib/fluent/plugin/out_kafka_buffered.rb:220:in `block in write'
  2016-10-12 12:11:44 +0800 [warn]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluentd-0.12.29/lib/fluent/plugin/buf_memory.rb:67:in `feed_each'
  2016-10-12 12:11:44 +0800 [warn]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluentd-0.12.29/lib/fluent/plugin/buf_memory.rb:67:in `msgpack_each'
  2016-10-12 12:11:44 +0800 [warn]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluent-plugin-kafka-0.3.1/lib/fluent/plugin/out_kafka_buffered.rb:200:in `write'
  2016-10-12 12:11:44 +0800 [warn]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluentd-0.12.29/lib/fluent/buffer.rb:354:in `write_chunk'
  2016-10-12 12:11:44 +0800 [warn]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluentd-0.12.29/lib/fluent/buffer.rb:333:in `pop'
  2016-10-12 12:11:44 +0800 [warn]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluentd-0.12.29/lib/fluent/output.rb:338:in `try_flush'
  2016-10-12 12:11:44 +0800 [warn]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluentd-0.12.29/lib/fluent/output.rb:149:in `run'
2016-10-12 12:11:46 +0800 [warn]: Send exception occurred: Failed to send messages to EnigmaProxyStaging/0
2016-10-12 12:11:46 +0800 [info]: initialized kafka producer: kafka
2016-10-12 12:11:46 +0800 [warn]: temporarily failed to flush the buffer. next_retry=2016-10-12 12:11:45 +0800 error_class="Kafka::DeliveryFailed" error="Failed to send messages to EnigmaProxyStaging/0" plugin_id="object:3fe70b58a354"
  2016-10-12 12:11:46 +0800 [warn]: suppressed same stacktrace
2016-10-12 12:11:48 +0800 [warn]: Send exception occurred: Failed to send messages to EnigmaProxyStaging/0
2016-10-12 12:11:48 +0800 [info]: initialized kafka producer: kafka
2016-10-12 12:11:48 +0800 [warn]: temporarily failed to flush the buffer. next_retry=2016-10-12 12:11:49 +0800 error_class="Kafka::DeliveryFailed" error="Failed to send messages to EnigmaProxyStaging/0" plugin_id="object:3fe70b58a354"
  2016-10-12 12:11:48 +0800 [warn]: suppressed same stacktrace
2016-10-12 12:11:51 +0800 [warn]: Send exception occurred: Failed to send messages to EnigmaProxyStaging/0
2016-10-12 12:11:51 +0800 [info]: initialized kafka producer: kafka
2016-10-12 12:11:51 +0800 [warn]: temporarily failed to flush the buffer. next_retry=2016-10-12 12:11:57 +0800 error_class="Kafka::DeliveryFailed" error="Failed to send messages to EnigmaProxyStaging/0" plugin_id="object:3fe70b58a354"
  2016-10-12 12:11:51 +0800 [warn]: suppressed same stacktrace
2016-10-12 12:11:59 +0800 [warn]: Send exception occurred: Failed to send messages to EnigmaProxyStaging/0
2016-10-12 12:11:59 +0800 [info]: initialized kafka producer: kafka
2016-10-12 12:11:59 +0800 [warn]: temporarily failed to flush the buffer. next_retry=2016-10-12 12:12:14 +0800 error_class="Kafka::DeliveryFailed" error="Failed to send messages to EnigmaProxyStaging/0" plugin_id="object:3fe70b58a354"

It seems to be buffer full when I set flush_interval more than 2 seconds

I try to set flush_interval not so frequently as 5 second, but my client always says javax.net.ssl.SSLPeerUnverifiedException: peer not authenticated. I guess it rejects by fluentd's source. I try to increase the buffer as follows:
*buffer_chunk_limit 512m
*buffer_queue_limit 64
It's the same thing. But I set flush_interval to 2, it works fine with no loss. Can anyone give me the hint??

Segmentation fault?

Sometimes on certain messages it seems that I'm getting a segmentation fault error? If I clear out the buffer to get rid of all messages the fault stops. Here's what I'm seeing:

2016-03-27_18:09:09.88924 /opt/rbenv/versions/2.1.6/lib/ruby/gems/2.1.0/gems/yajl-ruby-1.2.0/lib/yajl.rb:73: [BUG] Segmentation fault at 0x0000007ffffffb
2016-03-27_18:09:09.88928 ruby 2.1.6p336 (2015-04-13 revision 50298) [x86_64-linux]
2016-03-27_18:09:09.88928
2016-03-27_18:09:09.88928 -- Control frame information -----------------------------------------------
2016-03-27_18:09:09.88929 c:0016 p:---- s:0087 e:000086 CFUNC  :encode
2016-03-27_18:09:09.88929 c:0015 p:0048 s:0082 e:000081 METHOD /opt/rbenv/versions/2.1.6/lib/ruby/gems/2.1.0/gems/yajl-ruby-1.2.0/lib/yajl.rb:73
2016-03-27_18:09:09.88929 c:0014 p:0017 s:0074 e:000073 BLOCK  /opt/rbenv/versions/2.1.6/lib/ruby/gems/2.1.0/gems/fluent-plugin-kafka-0.1.3/lib/fluent/plugin/out_kafka_buffered.rb:117 [FINISH]
2016-03-27_18:09:09.88930 c:0013 p:---- s:0069 e:000068 CFUNC  :call
2016-03-27_18:09:09.88931 c:0012 p:0131 s:0063 e:000062 BLOCK  /opt/rbenv/versions/2.1.6/lib/ruby/gems/2.1.0/gems/fluent-plugin-kafka-0.1.3/lib/fluent/plugin/out_kafka_buffered.rb:157 [FINISH]
2016-03-27_18:09:09.88932 c:0011 p:---- s:0054 e:000053 CFUNC  :each
2016-03-27_18:09:09.88933 c:0010 p:0031 s:0051 e:000050 BLOCK  /opt/rbenv/versions/2.1.6/lib/ruby/gems/2.1.0/gems/fluentd-0.12.22/lib/fluent/buffer.rb:123
2016-03-27_18:09:09.88933 c:0009 p:0018 s:0047 e:000046 METHOD /opt/rbenv/versions/2.1.6/lib/ruby/gems/2.1.0/gems/fluentd-0.12.22/lib/fluent/plugin/buf_file.rb:71
2016-03-27_18:09:09.88934 c:0008 p:0007 s:0043 E:000de0 METHOD /opt/rbenv/versions/2.1.6/lib/ruby/gems/2.1.0/gems/fluentd-0.12.22/lib/fluent/buffer.rb:120
2016-03-27_18:09:09.88935 c:0007 p:0031 s:0039 E:000fa8 METHOD /opt/rbenv/versions/2.1.6/lib/ruby/gems/2.1.0/gems/fluent-plugin-kafka-0.1.3/lib/fluent/plugin/out_kafka_buffered.rb:148
2016-03-27_18:09:09.88935 c:0006 p:0010 s:0030 e:000029 METHOD /opt/rbenv/versions/2.1.6/lib/ruby/gems/2.1.0/gems/fluentd-0.12.22/lib/fluent/buffer.rb:351
2016-03-27_18:09:09.88936 c:0005 p:0038 s:0025 e:000024 METHOD /opt/rbenv/versions/2.1.6/lib/ruby/gems/2.1.0/gems/fluentd-0.12.22/lib/fluent/buffer.rb:330
2016-03-27_18:09:09.88936 c:0004 p:0157 s:0019 e:000018 METHOD /opt/rbenv/versions/2.1.6/lib/ruby/gems/2.1.0/gems/fluentd-0.12.22/lib/fluent/output.rb:338
2016-03-27_18:09:09.88937 c:0003 p:0063 s:0009 e:000008 METHOD /opt/rbenv/versions/2.1.6/lib/ruby/gems/2.1.0/gems/fluentd-0.12.22/lib/fluent/output.rb:149 [FINISH]
2016-03-27_18:09:09.88938 c:0002 p:---- s:0004 e:000003 IFUNC
2016-03-27_18:09:09.88939 c:0001 p:---- s:0002 e:000001 TOP    [FINISH]
2016-03-27_18:09:09.88940
2016-03-27_18:09:09.88940 /opt/rbenv/versions/2.1.6/lib/ruby/gems/2.1.0/gems/fluentd-0.12.22/lib/fluent/output.rb:149:in `run'
2016-03-27_18:09:09.88940 /opt/rbenv/versions/2.1.6/lib/ruby/gems/2.1.0/gems/fluentd-0.12.22/lib/fluent/output.rb:338:in `try_flush'
2016-03-27_18:09:09.88940 /opt/rbenv/versions/2.1.6/lib/ruby/gems/2.1.0/gems/fluentd-0.12.22/lib/fluent/buffer.rb:330:in `pop'
2016-03-27_18:09:09.88940 /opt/rbenv/versions/2.1.6/lib/ruby/gems/2.1.0/gems/fluentd-0.12.22/lib/fluent/buffer.rb:351:in `write_chunk'
2016-03-27_18:09:09.88941 /opt/rbenv/versions/2.1.6/lib/ruby/gems/2.1.0/gems/fluent-plugin-kafka-0.1.3/lib/fluent/plugin/out_kafka_buffered.rb:148:in `write'
2016-03-27_18:09:09.88941 /opt/rbenv/versions/2.1.6/lib/ruby/gems/2.1.0/gems/fluentd-0.12.22/lib/fluent/buffer.rb:120:in `msgpack_each'
2016-03-27_18:09:09.88941 /opt/rbenv/versions/2.1.6/lib/ruby/gems/2.1.0/gems/fluentd-0.12.22/lib/fluent/plugin/buf_file.rb:71:in `open'
2016-03-27_18:09:09.88941 /opt/rbenv/versions/2.1.6/lib/ruby/gems/2.1.0/gems/fluentd-0.12.22/lib/fluent/buffer.rb:123:in `block in msgpack_each'
2016-03-27_18:09:09.88941 /opt/rbenv/versions/2.1.6/lib/ruby/gems/2.1.0/gems/fluentd-0.12.22/lib/fluent/buffer.rb:123:in `each'
2016-03-27_18:09:09.88942 /opt/rbenv/versions/2.1.6/lib/ruby/gems/2.1.0/gems/fluent-plugin-kafka-0.1.3/lib/fluent/plugin/out_kafka_buffered.rb:157:in `block in write'

Looks like it could be an incompatibility between yajl and one of my messages but I'm not sure.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.