Git Product home page Git Product logo

Comments (23)

htgc avatar htgc commented on July 24, 2024

I confirmed it can work on my local environment with following condition...

Log

{"search_id":"39790490","location_id":"6667","country_code":"SG","location_name":"Singapore","locale":"en","site_code":"US","wego_visitor_id":"1429252550160-QmuHwg","wego_session_id":"1429506752934-lJt96p","device_type":"desktop_linux","ip":"219.74.18.84","timestamp":"2015-04-20T13:23:14.849+08:00","hotel_name":"Marina Bay Sands","hotel_id":"258101","ecpc_max":0.8,"ecpc_min":0.05,"ecpc_card_min":0.7,"hotel_rank":1,"ecpc":0.8,"provider_hotel_id":"245881","provider_code":"booking.com","price_usd":394.1027,"description":"Premier Double or Twin Room - Free cancellation - Book now, pay when you stay - Free WiFi - Found your room online at a lower price? We'll match it.","rate_rank":1,"ecpc_rank":1,"bucket":"0.8","placement":"listing_sponsored","params":{}}

Environment

  • Ubuntu 14.10
  • Kafka 0.8.1
  • Scala 2.10
  • fluent-plugin-kafka (0.0.12)
  • poseidon (0.0.5)
  • yajl-ruby (1.2.1)

Did it happen just one line log also? (I mean, it was not under load test or something, right?)

from fluent-plugin-kafka.

uohzxela avatar uohzxela commented on July 24, 2024

Hey thanks for the prompt reply. The log file it's tailing contains a lot more lines. Maybe something is wrong with my Kafka config. Perhaps could you tell me what are the possible causes for this error?

from fluent-plugin-kafka.

uohzxela avatar uohzxela commented on July 24, 2024

Here are the logs from Kafka:

==> controller.log <==
2015-04-20 12:14:49,519 INFO kafka.controller.RequestSendThread: [Controller-100237-to-broker-100237-send-thread], Controller 100237 connected to id:100237,host:localhost,port:9092 for sending state change requests
2015-04-20 12:14:49,522 INFO kafka.controller.KafkaController: [Controller 100237]: New broker startup callback for 100237
2015-04-20 12:14:49,525 INFO kafka.controller.RequestSendThread: [Controller-100237-to-broker-100237-send-thread], Starting 
2015-04-20 12:15:32,241 INFO kafka.controller.PartitionStateMachine$TopicChangeListener: [TopicChangeListener on Controller 100237]: New topics: [Set(alex)], deleted topics: [Set()], new partition replica assignment [Map([alex,0] -> List(100237))]
2015-04-20 12:15:32,242 INFO kafka.controller.KafkaController: [Controller 100237]: New topic creation callback for [alex,0]
2015-04-20 12:15:32,256 INFO kafka.controller.KafkaController: [Controller 100237]: New partition creation callback for [alex,0]
2015-04-20 12:15:32,257 INFO kafka.controller.PartitionStateMachine: [Partition state machine on Controller 100237]: Invoking state change to NewPartition for partitions [alex,0]
2015-04-20 12:15:32,350 INFO kafka.controller.ReplicaStateMachine: [Replica state machine on controller 100237]: Invoking state change to NewReplica for replicas [Topic=alex,Partition=0,Replica=100237]
2015-04-20 12:15:32,357 INFO kafka.controller.PartitionStateMachine: [Partition state machine on Controller 100237]: Invoking state change to OnlinePartition for partitions [alex,0]
2015-04-20 12:15:32,476 INFO kafka.controller.ReplicaStateMachine: [Replica state machine on controller 100237]: Invoking state change to OnlineReplica for replicas [Topic=alex,Partition=0,Replica=100237]

==> kafka_init_stderr.log <==
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.

==> kafka_init_stdout.log <==
[2015-04-21 12:29:29,515] INFO Closing socket connection to /10.0.1.217. (kafka.network.Processor)
[2015-04-21 12:29:29,617] INFO Closing socket connection to /10.0.1.217. (kafka.network.Processor)
[2015-04-21 12:29:29,719] INFO Closing socket connection to /10.0.1.217. (kafka.network.Processor)
[2015-04-21 12:29:29,822] INFO Closing socket connection to /10.0.1.217. (kafka.network.Processor)
[2015-04-21 12:29:29,924] INFO Closing socket connection to /10.0.1.217. (kafka.network.Processor)
[2015-04-21 12:29:30,026] INFO Closing socket connection to /10.0.1.217. (kafka.network.Processor)
[2015-04-21 12:29:30,129] INFO Closing socket connection to /10.0.1.217. (kafka.network.Processor)
[2015-04-21 12:29:30,231] INFO Closing socket connection to /10.0.1.217. (kafka.network.Processor)
[2015-04-21 12:29:30,333] INFO Closing socket connection to /10.0.1.217. (kafka.network.Processor)
[2015-04-21 12:29:30,435] INFO Closing socket connection to /10.0.1.217. (kafka.network.Processor)

==> kafka-offset-monitor.log <==
serving resources from: jar:file:/opt/kafka-offset-monitor/KafkaOffsetMonitor-assembly-0.2.0.jar!/offsetapp
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
2015-04-20 12:21:16.556:INFO:oejs.Server:jetty-7.x.y-SNAPSHOT
log4j:WARN No appenders could be found for logger (org.I0Itec.zkclient.ZkConnection).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
2015-04-20 12:21:16.828:INFO:oejsh.ContextHandler:started o.e.j.s.ServletContextHandler{/,jar:file:/opt/kafka-offset-monitor/KafkaOffsetMonitor-assembly-0.2.0.jar!/offsetapp}
2015-04-20 12:21:16.837:INFO:oejs.AbstractConnector:Started [email protected]:8080

==> kafka-request.log <==

==> server.log <==
2015-04-21 12:29:29,515 INFO kafka.network.Processor: Closing socket connection to /10.0.1.217.
2015-04-21 12:29:29,617 INFO kafka.network.Processor: Closing socket connection to /10.0.1.217.
2015-04-21 12:29:29,719 INFO kafka.network.Processor: Closing socket connection to /10.0.1.217.
2015-04-21 12:29:29,822 INFO kafka.network.Processor: Closing socket connection to /10.0.1.217.
2015-04-21 12:29:29,924 INFO kafka.network.Processor: Closing socket connection to /10.0.1.217.
2015-04-21 12:29:30,026 INFO kafka.network.Processor: Closing socket connection to /10.0.1.217.
2015-04-21 12:29:30,129 INFO kafka.network.Processor: Closing socket connection to /10.0.1.217.
2015-04-21 12:29:30,231 INFO kafka.network.Processor: Closing socket connection to /10.0.1.217.
2015-04-21 12:29:30,333 INFO kafka.network.Processor: Closing socket connection to /10.0.1.217.
2015-04-21 12:29:30,435 INFO kafka.network.Processor: Closing socket connection to /10.0.1.217.

from fluent-plugin-kafka.

htgc avatar htgc commented on July 24, 2024

Thank you additional information!
Unfortunately, I have no idea yet. Let me keep debugging about it.
Thanks.

from fluent-plugin-kafka.

htgc avatar htgc commented on July 24, 2024

Hmm, it doesn't happen though I tried to write over 100K logs.
However, I found similar issue on Poseidon repository(bpot/poseidon#80).
Please let me know if you still have same issue on your environment.
By the way, you should use not kafka plugin, but kafka-buffered plugin to avoid lack of sending log.
Thanks,

from fluent-plugin-kafka.

IngoS11 avatar IngoS11 commented on July 24, 2024

Hi guys,
I have the very same issue now and it used to work before. I am baffled as to what has changes as no updates have happened on the machines.

from fluent-plugin-kafka.

htgc avatar htgc commented on July 24, 2024

@IngoS11 Thank you for your information.
Please let me know about your environment, and does it always happen?

from fluent-plugin-kafka.

IngoS11 avatar IngoS11 commented on July 24, 2024

We are currently using the latest td-agent install for ubuntu and:

  • Ubuntu 12.04 LTS
  • Kafka 0.8.1.1
  • Scala 2.09
  • fluent-plugin-kafka (0.0.12)
  • poseidon (0.0.5)

and the same error that uohzxela: 'emit transaction failed: error_class=RuntimeError' it is happening all the time now. Stops and reboots of the td-agent machine have not helped at all. What we have not done so far and is rebooting Kafka, as it is used by more developers and stuff. I can also not recall any updates or changes to the environment that could have caused this.
Best

Ingo

from fluent-plugin-kafka.

uohzxela avatar uohzxela commented on July 24, 2024

@htgc I've tried with kafka-buffered but I still get the same error. Maybe this issue lies with Poseidon, not with your plugin?

from fluent-plugin-kafka.

htgc avatar htgc commented on July 24, 2024

Thank you update! Can you try access to Kafka by zookeeper instead of direct connection?
I mean,
from

brokers  xxxx.yyy.zz:9092

to

zookeeper aaa.bbb.cc:2181

in your td-agent config file.

It had been working on @IngoS11 's environment, so this plugin may have any problem.

from fluent-plugin-kafka.

htgc avatar htgc commented on July 24, 2024

Hey guys, please let me know other plugins using with this plugin if exists.
Thanks,

from fluent-plugin-kafka.

IngoS11 avatar IngoS11 commented on July 24, 2024

@htgc I switched over to the zookeeper config and was amazed to only get two out of our three brokers in that cluster back. I had to find out that one of the brokers, while running, was in some weird zombie state. So this could very well be the same issue as (bpot/poseidon/#80) and poseidon fails to continue to the next broker when the first one reponds with a problem.

I have yet to experience what happens with the zookeeper config if one of the brokers again goes into this zombie state but I am assuming this is no different. I was also surprised to see that I can only configure one zookeeper and not a list. In this particular cluster we run 3 zookeeper for failover.

We have no other plugins on that td-agent, but only two matches that allow sending to two topics depending on incoming URL. I am not sure if this is the right way to do it but the config looks like this

<match ceitrial**>
   type kafka_buffered
   default_topic        ceitrial
   zookeeper mo-49fd29fe7.mo.sap.corp:2181
   output_data_type     json
   buffer_type          memory
   output_include_tag   false
   output_include_time  false
   max_send_retries     3
   required_acks        0
   ack_timeout_ms       1500
</match>

<match *.**>
    type kafka_buffered
    zookeeper mo-49fd29fe7.mo.sap.corp:2181
    default_topic        cu-test
    flush_interval       10
    buffer_type          memory
    output_data_type     json
    output_include_tag   false
    output_include_time false
    max_send_retries    3
    required_acks       0
    log_level debug
</match>

from fluent-plugin-kafka.

htgc avatar htgc commented on July 24, 2024

@IngoS11 Thank you for quick action! Let me confirm. Did it work well when you switch to zookeeper config?
In addition, you can set zookeeper as cluster also by comma separated list (e.g. zookeeper1:2181,zookeeper2:2181).
This plugin reselects next producer when bpot/poseidon fails to send message.
Please let me know if I have misunderstanding.
Thanks,

from fluent-plugin-kafka.

uohzxela avatar uohzxela commented on July 24, 2024

Strangely, accessing Kafka by zookeeper never worked for me. I tried it before and it didn't work; it still doesn't work now. I keep getting localhost:9092 as a broker, even though the broker is on a remote host (kafka-staging-a-1.bezurk.org:9092). Not sure if this is related to the above error. If it isn't, you can ignore it.

2015-04-24 14:34:42 +0800 [info]: adding match pattern="td.*.*" type="tdlog"
2015-04-24 14:34:42 +0800 [info]: adding match pattern="debug.**" type="stdout"
2015-04-24 14:34:42 +0800 [info]: adding source type="tail"
2015-04-24 14:34:42 +0800 [info]: adding source type="forward"
2015-04-24 14:34:42 +0800 [info]: adding source type="debug_agent"
2015-04-24 14:34:42 +0800 [info]: brokers has been refreshed via Zookeeper: ["localhost:9092"]
2015-04-24 14:34:42 +0800 [info]: initialized producer kafka
2015-04-24 14:34:42 +0800 [info]: following tail of /var/apps/a/hoteru/shared/log/fluentd/rates.staging.log
2015-04-24 14:34:42 +0800 [info]: listening fluent socket on 0.0.0.0:24224
2015-04-24 14:34:42 +0800 [info]: listening dRuby uri="druby://127.0.0.1:24230" object="Engine"
2015-04-24 14:46:42 +0800 [warn]: Send exception occurred: Poseidon::Errors::UnableToFetchMetadata
2015-04-24 14:46:42 +0800 [info]: brokers has been refreshed via Zookeeper: ["localhost:9092"]
2015-04-24 14:46:42 +0800 [info]: initialized producer kafka
2015-04-24 14:46:42 +0800 [warn]: emit transaction failed: error_class=Poseidon::Errors::UnableToFetchMetadata error="Poseidon::Errors::UnableToFetchMetadata" tag="staging.logs.hotels.impressions.rates"
  2015-04-24 14:46:42 +0800 [warn]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/poseidon-0.0.5/lib/poseidon/broker_pool.rb:33:in `fetch_metadata'
  2015-04-24 14:46:42 +0800 [warn]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/poseidon-0.0.5/lib/poseidon/sync_producer.rb:134:in `refresh_metadata'
  2015-04-24 14:46:42 +0800 [warn]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/poseidon-0.0.5/lib/poseidon/sync_producer.rb:46:in `send_messages'
  2015-04-24 14:46:42 +0800 [warn]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/poseidon-0.0.5/lib/poseidon/producer.rb:163:in `send_messages'
  2015-04-24 14:46:42 +0800 [warn]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluent-plugin-kafka-0.0.12/lib/fluent/plugin/out_kafka.rb:135:in `block in emit'
  2015-04-24 14:46:42 +0800 [warn]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluentd-0.12.7/lib/fluent/event.rb:128:in `call'
  2015-04-24 14:46:42 +0800 [warn]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluentd-0.12.7/lib/fluent/event.rb:128:in `block in each'
  2015-04-24 14:46:42 +0800 [warn]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluentd-0.12.7/lib/fluent/event.rb:127:in `each'
  2015-04-24 14:46:42 +0800 [warn]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluentd-0.12.7/lib/fluent/event.rb:127:in `each'
  2015-04-24 14:46:42 +0800 [warn]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluent-plugin-kafka-0.0.12/lib/fluent/plugin/out_kafka.rb:128:in `emit'
  2015-04-24 14:46:42 +0800 [warn]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluentd-0.12.7/lib/fluent/event_router.rb:88:in `emit_stream'
  2015-04-24 14:46:42 +0800 [warn]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluentd-0.12.7/lib/fluent/plugin/in_tail.rb:229:in `receive_lines'
  2015-04-24 14:46:42 +0800 [warn]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluentd-0.12.7/lib/fluent/plugin/in_tail.rb:320:in `call'
  2015-04-24 14:46:42 +0800 [warn]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluentd-0.12.7/lib/fluent/plugin/in_tail.rb:320:in `wrap_receive_lines'
  2015-04-24 14:46:42 +0800 [warn]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluentd-0.12.7/lib/fluent/plugin/in_tail.rb:513:in `call'
  2015-04-24 14:46:42 +0800 [warn]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluentd-0.12.7/lib/fluent/plugin/in_tail.rb:513:in `on_notify'
  2015-04-24 14:46:42 +0800 [warn]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluentd-0.12.7/lib/fluent/plugin/in_tail.rb:345:in `on_notify'
  2015-04-24 14:46:42 +0800 [warn]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluentd-0.12.7/lib/fluent/plugin/in_tail.rb:446:in `call'
  2015-04-24 14:46:42 +0800 [warn]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluentd-0.12.7/lib/fluent/plugin/in_tail.rb:446:in `on_change'
  2015-04-24 14:46:42 +0800 [warn]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/cool.io-1.3.0/lib/cool.io/loop.rb:88:in `run_once'
  2015-04-24 14:46:42 +0800 [warn]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/cool.io-1.3.0/lib/cool.io/loop.rb:88:in `run'
  2015-04-24 14:46:42 +0800 [warn]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluentd-0.12.7/lib/fluent/plugin/in_tail.rb:214:in `run'

Td-agent conf:

<match staging.logs.hotels.impressions.rates>
  type kafka
  zookeeper zookeeper-staging-a-1.bezurk.org:2181/kafka
  default_topic alex
  output_data_type json
  max_send_retries 60
  output_include_tag true
</match>

from fluent-plugin-kafka.

htgc avatar htgc commented on July 24, 2024

I keep getting localhost:9092 as a broker, even though the broker is on a remote host (kafka-staging-a-1.bezurk.org:9092)

Perhaps, you should setup hostname in your server.properties file on broker host.
https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-Whycan%27tmyconsumers/producersconnecttothebrokers?

from fluent-plugin-kafka.

IngoS11 avatar IngoS11 commented on July 24, 2024

@htgc thanks for the quick replies. Appreciate this very much. Yes when I replace the broker setting with the zookeeper one it works. I can also shut down a broker and it continues to work. What I do not know is what happens when a broker goes into this zombie state where it is running but responds with errors to producers. I cannot recreate that at will but need to further investigate and basically wait to the point where the Kafka broker goes into this state again.

I confirm that adding multiple zookeepers works. Maybe I had a typo there or something when I did it the first time. Sorry about that.

Quick question about sending to multiple topics: As you have seen we have to serve multiple topics and I would love to drive this via URLs. This however is only possible via setting up multiple matches. I was now wondering if there is a chance to change the plugin in a way to send to diffrent topics depending on what URL you post to? Essentially something like this.

<match *.**>
    type kafka_buffered
    default_topic        cu-test
    ...
</match>

if you POST to
http://my.td-agent.server:1080/ -> gets send to default topic
if you POST to
http://my.td-agent.server:1080/somenewtopic -> gets send to somenewtopic

that would be great in combo with the automatic topic creation on Kafka when sending the first message to a non existent topic. I would love to help, but I have unfortunately zero experience in ruby :(.

Best

Ingo

from fluent-plugin-kafka.

htgc avatar htgc commented on July 24, 2024

@IngoS11 Great to hear about it!

Quick question about sending to multiple topics: As you have seen we have to serve multiple topics and I would love to drive this via URLs

You can choose following option;
Option-1: Add key='topic' into your log.
This plugin sends message to indicated topic if exists. If it doesn't, plugin send to default topic.
e.g.

{ "topic": "another", "message": "hello" } # send to 'another' topic
{ "message": "hello" } # send to default topic

Option-2: Select by tag
You can configure in Fluentd config file as follows;

<match kafka.default.**t>
    type kafka_buffered
    default_topic default
</match>
<match kafka.another.**>
    type kafka_buffered
    default_topic  another_topic
</match>

If you'd like to send all logs to default topic and some logs are sent to another topic also, you can use fluent-plugin-forest as follows;

<match kafka.**>
    type forest
    subtype copy
    <template>
        <store>
            type kafka_buffered
            default_topic default
        </store>
    </template>
    <case kafka.another.**>
        # Add following topic also in case tag matches kafka.another.**
        <store>
            type kafka_buffered
            default_topic another
        </store>
    </case>
</match>

from fluent-plugin-kafka.

IngoS11 avatar IngoS11 commented on July 24, 2024

@htgc thanks for the reply. Option 1 would not work in most cases I have not a whole lot of influence on what they send. Option 2 is what we currently have and works quite well but requires a configuration. I have never tried the fluent-plugin-forest but I will try that out on Monday. Anyways it's not a big problem I was just wondering if we could automate it via URL's also.

Thanks again for the help.

from fluent-plugin-kafka.

uohzxela avatar uohzxela commented on July 24, 2024

Perhaps, you should setup hostname in your server.properties file on broker host.
https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-Whycan%27tmyconsumers/producersconnecttothebrokers?

Thanks for the help! Tried setting the host.name, and it can detect the remote kafka host but got a new error instead. Any ideas?

2015-04-27 11:00:59 +0800 [info]: adding match pattern="td.*.*" type="tdlog"
2015-04-27 11:00:59 +0800 [info]: adding match pattern="debug.**" type="stdout"
2015-04-27 11:00:59 +0800 [info]: adding source type="tail"
2015-04-27 11:00:59 +0800 [info]: adding source type="forward"
2015-04-27 11:00:59 +0800 [info]: adding source type="debug_agent"
2015-04-27 11:00:59 +0800 [info]: brokers has been refreshed via Zookeeper: ["kafka-staging-a-1.bezurk.org:9092"]
2015-04-27 11:00:59 +0800 [info]: initialized producer kafka
2015-04-27 11:00:59 +0800 [info]: following tail of /var/apps/a/hoteru/shared/log/fluentd/rates.staging.log
2015-04-27 11:00:59 +0800 [info]: listening fluent socket on 0.0.0.0:24224
2015-04-27 11:00:59 +0800 [info]: listening dRuby uri="druby://127.0.0.1:24230" object="Engine"
2015-04-27 11:01:06 +0800 [warn]: Send exception occurred: Poseidon::Errors::UnableToFetchMetadata
2015-04-27 11:01:06 +0800 [info]: brokers has been refreshed via Zookeeper: ["kafka-staging-a-1.bezurk.org:9092"]
2015-04-27 11:01:06 +0800 [info]: initialized producer kafka
2015-04-27 11:01:06 +0800 [warn]: emit transaction failed: error_class=Poseidon::Errors::UnableToFetchMetadata error="Poseidon::Errors::UnableToFetchMetadata" tag="staging.logs.hotels.impressions.rates"
  2015-04-27 11:01:06 +0800 [warn]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/poseidon-0.0.5/lib/poseidon/broker_pool.rb:33:in `fetch_metadata'
  2015-04-27 11:01:06 +0800 [warn]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/poseidon-0.0.5/lib/poseidon/sync_producer.rb:134:in `refresh_metadata'
  2015-04-27 11:01:06 +0800 [warn]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/poseidon-0.0.5/lib/poseidon/sync_producer.rb:46:in `send_messages'
  2015-04-27 11:01:06 +0800 [warn]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/poseidon-0.0.5/lib/poseidon/producer.rb:163:in `send_messages'
  2015-04-27 11:01:06 +0800 [warn]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluent-plugin-kafka-0.0.12/lib/fluent/plugin/out_kafka.rb:135:in `block in emit'
  2015-04-27 11:01:06 +0800 [warn]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluentd-0.12.7/lib/fluent/event.rb:128:in `call'
  2015-04-27 11:01:06 +0800 [warn]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluentd-0.12.7/lib/fluent/event.rb:128:in `block in each'
  2015-04-27 11:01:06 +0800 [warn]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluentd-0.12.7/lib/fluent/event.rb:127:in `each'
  2015-04-27 11:01:06 +0800 [warn]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluentd-0.12.7/lib/fluent/event.rb:127:in `each'
  2015-04-27 11:01:06 +0800 [warn]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluent-plugin-kafka-0.0.12/lib/fluent/plugin/out_kafka.rb:128:in `emit'
  2015-04-27 11:01:06 +0800 [warn]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluentd-0.12.7/lib/fluent/event_router.rb:88:in `emit_stream'
  2015-04-27 11:01:06 +0800 [warn]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluentd-0.12.7/lib/fluent/plugin/in_tail.rb:229:in `receive_lines'
  2015-04-27 11:01:06 +0800 [warn]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluentd-0.12.7/lib/fluent/plugin/in_tail.rb:320:in `call'
  2015-04-27 11:01:06 +0800 [warn]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluentd-0.12.7/lib/fluent/plugin/in_tail.rb:320:in `wrap_receive_lines'
  2015-04-27 11:01:06 +0800 [warn]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluentd-0.12.7/lib/fluent/plugin/in_tail.rb:513:in `call'
  2015-04-27 11:01:06 +0800 [warn]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluentd-0.12.7/lib/fluent/plugin/in_tail.rb:513:in `on_notify'
  2015-04-27 11:01:06 +0800 [warn]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluentd-0.12.7/lib/fluent/plugin/in_tail.rb:345:in `on_notify'
  2015-04-27 11:01:06 +0800 [warn]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluentd-0.12.7/lib/fluent/plugin/in_tail.rb:446:in `call'
  2015-04-27 11:01:06 +0800 [warn]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluentd-0.12.7/lib/fluent/plugin/in_tail.rb:446:in `on_change'
  2015-04-27 11:01:06 +0800 [warn]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/cool.io-1.3.0/lib/cool.io/loop.rb:88:in `run_once'
  2015-04-27 11:01:06 +0800 [warn]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/cool.io-1.3.0/lib/cool.io/loop.rb:88:in `run'
  2015-04-27 11:01:06 +0800 [warn]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluentd-0.12.7/lib/fluent/plugin/in_tail.rb:214:in `run'

from fluent-plugin-kafka.

htgc avatar htgc commented on July 24, 2024

@uohzxela I got this error when Fluentd couldn't connect to Kafka.
Can you check whether port is opened on Kafka side server? and also check whether you can push some data to Kafka by kafka-console-producer (i.e. CLI)?
Let me know if you have still problem.
Thanks,

from fluent-plugin-kafka.

uohzxela avatar uohzxela commented on July 24, 2024

@htgc Appreciate your reply. Port 9092 is opened on the Kafka host. I have pushed some data locally using kafka-console-producer CLI tool and it works. But still getting the UnableToFetchMetadata error...

from fluent-plugin-kafka.

htgc avatar htgc commented on July 24, 2024

@uohzxela hmm.. It's weird.
Do you use any firewall like ufw, iptables or hardware firewall between Fluentd and Kafka brokers?
If you use, can you check them?
I confirmed same error occurred when ufw denies 9092 port also.
In addition, can you check whether it can connect from Fluentd server by telnet or something?
Thanks,

from fluent-plugin-kafka.

uohzxela avatar uohzxela commented on July 24, 2024

@htgc

Sorry for the delay with my reply and thanks for the help! I have finally managed to resolve this error and the kafka-fluentd plugin works fine.

This not related to the firewall issue, even though using telnet to ping my kafka cluster is a great help to pinpoint the actual issue. The output from the netstat command shows that my kafka service is listening at 127.0.0.1:9092, which only works as a loopback, meaning it only accepts local connections, not remote ones. The problem occurred because my kakfa service is not configured to advertise its hostname to producers/consumers. So after setting 'advertised.host.name' field in server.properties config file, it works. The netstat is showing ':::9092' which I believe is able to accept remote connections.

from fluent-plugin-kafka.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.