revpoint / logstash-codec-avro_schema_registry Goto Github PK
View Code? Open in Web Editor NEWThis project forked from logstash-plugins/logstash-codec-avro
A logstash codec plugin for decoding and encoding Avro records
License: Other
This project forked from logstash-plugins/logstash-codec-avro
A logstash codec plugin for decoding and encoding Avro records
License: Other
Im trying to publish a JSON input to a kafka message in Logstash
(in essence trying to replicate the behavior of the kafka-avro-console-producer).
The message gets added in the queue but is different from the one produced by the kafka-avro-console-producer
Message produced by kafka-avro-console-producer
{
"topic": "BellData7",
"key": "�ée",
"value": "\u0000\u0000\u0000\u0000\u0001\nBMORR\fCELTRR",
"partition": 0,
"offset": 1
},
Message produced by logstash (with the configuration below):
{
"topic": "BellData7",
"key": "�ée",
"value": "\u0000\u0000\u0000\u0000\u0001\nBMORR\fCELTRR",
"partition": 0,
"offset": 1
},
I can see the magic digits are clearly missing and the data appears to be encoded differently.... what am I doing wrong
input {
stdin {
}
}
filter{
json {
source => message
remove_field => [ "path","@timestamp","@Version","host", "message" ]
remove_tag => ["message"]
}
}
I enter the following input in the stdin
{"Customer": "BMORR", "Device": "logstash"}
Schema.avsc is as below:
I'm running logstash 8.12.2 docker image with the plugin installed.
this is my config which always used to work
output {
kafka {
codec => avro_schema_registry {
endpoint => "https://reg.my.net:8443"
schema_id => "761"
register_schema => "false"
base64_encoded => "false"
client_key => "/vault/kafka.key"
client_certificate => "/vault/kafka.pem"
ca_certificate => "/etc/pki/ca-trust/source/anchors/ca-bundle.crt"
}
bootstrap_servers => "${KAFKA_BOOTSTRAP_SERVERS}"
topic_id => "${KAFKA_AUDIT_TOPIC_ID}"
value_serializer => "org.apache.kafka.common.serialization.ByteArraySerializer"
ssl_truststore_location => "/vault/truststore.jks"
ssl_truststore_password => "${LOGSTASH_KEYSTORE_PASS}"
ssl_keystore_location => "/vault/kafka-keystore.jks"
ssl_keystore_password => "${LOGSTASH_KEYSTORE_PASS}"
security_protocol => "${KAFKA_SECURITY_PROTOCOL}"
}
}
logstash fails with:
[ERROR] 2024-05-24 13:06:15.614 [Converge PipelineAction::Create<audit-log>] agent - Failed to execute action {:action=>LogStash::PipelineAction::Create/pipeline_id:audit-log, :exception=>"Java::JavaLang::IllegalStateException", :message=>"Unable to configure plugins: (ArgumentError) wrong number of arguments (given 4, expected 1..3)", :backtrace=>["org.logstash.config.ir.CompiledPipeline.<init>(CompiledPipeline.java:120)", "org.logstash.execution.AbstractPipelineExt.initialize(AbstractPipelineExt.java:186)", "org.logstash.execution.AbstractPipelineExt$INVOKER$i$initialize.call(AbstractPipelineExt$INVOKER$i$initialize.gen)", "org.jruby.internal.runtime.methods.JavaMethod$JavaMethodN.call(JavaMethod.java:847)", "org.jruby.ir.runtime.IRRuntimeHelpers.instanceSuper(IRRuntimeHelpers.java:1319)", "org.jruby.ir.instructions.InstanceSuperInstr.interpret(InstanceSuperInstr.java:139)", "org.jruby.ir.interpreter.InterpreterEngine.processCall(InterpreterEngine.java:367)", "org.jruby.ir.interpreter.StartupInterpreterEngine.interpret(StartupInterpreterEngine.java:66)", "org.jruby.internal.runtime.methods.MixedModeIRMethod.INTERPRET_METHOD(MixedModeIRMethod.java:128)", "org.jruby.internal.runtime.methods.MixedModeIRMethod.call(MixedModeIRMethod.java:115)", "org.jruby.runtime.callsite.CachingCallSite.cacheAndCall(CachingCallSite.java:446)", "org.jruby.runtime.callsite.CachingCallSite.call(CachingCallSite.java:92)", "org.jruby.RubyClass.newInstance(RubyClass.java:931)", "org.jruby.RubyClass$INVOKER$i$newInstance.call(RubyClass$INVOKER$i$newInstance.gen)", "org.jruby.runtime.callsite.CachingCallSite.cacheAndCall(CachingCallSite.java:446)", "org.jruby.runtime.callsite.CachingCallSite.call(CachingCallSite.java:92)", "org.jruby.ir.instructions.CallBase.interpret(CallBase.java:548)", "org.jruby.ir.interpreter.InterpreterEngine.processCall(InterpreterEngine.java:367)", "org.jruby.ir.interpreter.StartupInterpreterEngine.interpret(StartupInterpreterEngine.java:66)", "org.jruby.ir.interpreter.InterpreterEngine.interpret(InterpreterEngine.java:88)", "org.jruby.internal.runtime.methods.MixedModeIRMethod.INTERPRET_METHOD(MixedModeIRMethod.java:238)", "org.jruby.internal.runtime.methods.MixedModeIRMethod.call(MixedModeIRMethod.java:225)", "org.jruby.internal.runtime.methods.DynamicMethod.call(DynamicMethod.java:228)", "org.jruby.runtime.callsite.CachingCallSite.cacheAndCall(CachingCallSite.java:476)", "org.jruby.runtime.callsite.CachingCallSite.call(CachingCallSite.java:293)", "org.jruby.ir.interpreter.InterpreterEngine.processCall(InterpreterEngine.java:328)", "org.jruby.ir.interpreter.StartupInterpreterEngine.interpret(StartupInterpreterEngine.java:66)", "org.jruby.ir.interpreter.Interpreter.INTERPRET_BLOCK(Interpreter.java:116)", "org.jruby.runtime.MixedModeIRBlockBody.commonYieldPath(MixedModeIRBlockBody.java:136)", "org.jruby.runtime.IRBlockBody.call(IRBlockBody.java:66)", "org.jruby.runtime.IRBlockBody.call(IRBlockBody.java:58)", "org.jruby.runtime.Block.call(Block.java:144)", "org.jruby.RubyProc.call(RubyProc.java:352)", "org.jruby.internal.runtime.RubyRunnable.run(RubyRunnable.java:111)", "java.base/java.lang.Thread.run(Thread.java:840)"]}
if i remove the codec settings in the kafka output logstash starts again.
Does this require a schema to already exist in the Schema Registry, or for the user to provide an avsc
?
I'm getting io.confluent.rest.exceptions.RestNotFoundException: Schema not found
using this config:
codec => avro_schema_registry {
endpoint => "http://localhost:8081"
schema_id => 1
register_schema => true
binary_encoded => true
}
value_serializer => "org.apache.kafka.common.serialization.ByteArraySerializer"
I want to use a value schema that exists in the schema registry.
If the endpoint is the URL of the schema registry, should subject string be the (namespace+)name of the schema (subject)?
Kafka: 0.10.0.1
Logstash: 5.6
Plugin version: 0.9.0
magic_byte, schema_id = datum.read(5).unpack("cI>")
gives me wrong schema_id
, have anyone been able to resolve that issue before?
I could see that Elastic forums that it has happened before but the thread had no answers: https://discuss.elastic.co/t/kakfa-is-reading-off-the-wrong-schema-id/53298
Are you supporting logstash 5.1.1?
Bundler::VersionConflict: Bundler could not find compatible versions for gem "logstash-core":
In snapshot (Gemfile.lock):
logstash-core (= 5.1.1)
In Gemfile:
logstash-core-plugin-api (>= 0) java depends on
logstash-core (= 5.1.1) java
logstash-codec-avro_schema_registry (>= 0) java depends on
logstash-core (< 3.0.0, >= 2.0.0.beta2) java
logstash-core (>= 0) java
Requested by @malonej7 and others.
We need to develop the encode capabilities using Confluent Avro Schema Registry. Luckily schema registration is already taken care of by https://github.com/wvanbergen/schema_registry so we just need to figure out the settings interface.
The options that need to be added:
If anyone can think of anything else they would need, feel free to add them here. I might have time to take a crack at this soon unless someone else wants to give it a try. I will be happy to help walk anyone through it.
Error:
Debería convertirlo directamente
[2024-07-31T06:25:33,133][ERROR][org.logstash.Logstash ][test][a9ed325a79c7c8b8081d2f9e256338696c246c834c93fa755e87ffe03ed66ff6] uncaught exception (in thread kafka-input-worker-logstash-0)
org.jruby.exceptions.RangeError: (RangeError) integer 1722256768896 too big to convert to `int'
at org.jruby.ext.stringio.StringIO.read(org/jruby/ext/stringio/StringIO.java:857) ~[stringio.jar:?]
at RUBY.read(/usr/share/logstash/vendor/bundle/jruby/3.1.0/gems/avro-1.10.2/lib/avro/io.rb:106) ~[?:?]
at RUBY.read_bytes(/usr/share/logstash/vendor/bundle/jruby/3.1.0/gems/avro-1.10.2/lib/avro/io.rb:93) ~[?:?]
at RUBY.read_string(/usr/share/logstash/vendor/bundle/jruby/3.1.0/gems/avro-1.10.2/lib/avro/io.rb:99) ~[?:?]
at RUBY.read_data(/usr/share/logstash/vendor/bundle/jruby/3.1.0/gems/avro-1.10.2/lib/avro/io.rb:276) ~[?:?]
at RUBY.read_record(/usr/share/logstash/vendor/bundle/jruby/3.1.0/gems/avro-1.10.2/lib/avro/io.rb:364) ~[?:?]
at org.jruby.RubyArray.each(org/jruby/RubyArray.java:1981) ~[jruby.jar:?]
at RUBY.read_record(/usr/share/logstash/vendor/bundle/jruby/3.1.0/gems/avro-1.10.2/lib/avro/io.rb:361) ~[?:?]
at RUBY.read_data(/usr/share/logstash/vendor/bundle/jruby/3.1.0/gems/avro-1.10.2/lib/avro/io.rb:287) ~[?:?]
at RUBY.read(/usr/share/logstash/vendor/bundle/jruby/3.1.0/gems/avro-1.10.2/lib/avro/io.rb:252) ~[?:?]
at RUBY.decode(/usr/share/logstash/vendor/bundle/jruby/3.1.0/gems/logstash-codec-avro_schema_registry-1.2.0/lib/logstash/codecs/avro_schema_registry.rb:233) ~[?:?]
at RUBY.handle_record(/usr/share/logstash/vendor/bundle/jruby/3.1.0/gems/logstash-integration-kafka-11.4.2-java/lib/logstash/inputs/kafka.rb:360) ~[?:?]
at RUBY.thread_runner(/usr/share/logstash/vendor/bundle/jruby/3.1.0/gems/logstash-integration-kafka-11.4.2-java/lib/logstash/inputs/kafka.rb:329) ~[?:?]
at RUBY.thread_runner(/usr/share/logstash/vendor/bundle/jruby/3.1.0/gems/logstash-integration-kafka-11.4.2-java/lib/logstash/inputs/kafka.rb:329) ~[?:?]
[2024-07-31T06:25:33,209][INFO ][logstash.javapipeline ][test] Pipeline terminated {"pipeline.id"=>"test"}
Is there a way to install this plugin when using docker image logstash?
the compose are just like that.
logstash:
image: docker.elastic.co/logstash/logstash:7.13.2
restart: unless-stopped
volumes:
- ./logstash/logstash.conf:/usr/share/logstash/pipeline/logstash
Hello, I keep getting errors when trying to connect to my Schema Registry. I think it can't handle well the exception the server gives.
Logstash.javapipeline ][main] Pipeline worker error, the pipeline will be stopped {:pipeline_id=>"main", :error=>"(ArgumentError) wrong number of arguments (given 1, expected 2)", :exception=>Java::OrgJrubyExceptions::ArgumentError, :backtrace=>["org.jruby.RubyException.exception(org/jruby/RubyException.java:129)", "C_3a_.datos.logstash.vendor.bundle.jruby.$2_dot_5_dot_0.gems.schema_registry_minus_0_dot_1_dot_1.lib.schema_registry.client.request(C:/datos/logstash/vendor/bundle/jruby/2.5.0/gems/schema_registry-0.1.1/lib/schema_registry/client.rb:127)", "uri_3a_classloader_3a_.META_minus_INF.jruby_dot_home.lib.ruby.stdlib.net.http.start(uri:classloader:/META-INF/jruby.home/lib/ruby/stdlib/net/http.rb:914)", "uri_3a_classloader_3a_.META_minus_INF.jruby_dot_home.lib.ruby.stdlib.net.http.start(uri:classloader:/META-INF/jruby.home/lib/ruby/stdlib/net/http.rb:609)", "RUBY.request(C:/datos/logstash/vendor/bundle/jruby/2.5.0/gems/schema_registry-0.1.1/lib/schema_registry/client.rb:101)", "RUBY.version(C:/datos/logstash/vendor/bundle/jruby/2.5.0/gems/schema_registry-0.1.1/lib/schema_registry/subject.rb:30)", "RUBY.get_write_schema_id(C:/datos/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-codec-avro_schema_registry-1.2.0/lib/logstash/codecs/avro_schema_registry.rb:189)", ... ...
In my case I'm trying to output into kafka serializing with Avro:
output {
file {
path => "C:/datos/logstash/out/logfile.json"
codec => "json_lines"
}
kafka {
acks => "1"
client_id => "My client"
bootstrap_servers => "my servers"
topic_id => "my topic"
compression_type => "gzip"
retries => 0
codec => avro_schema_registry {
endpoint=> "schema registry URI"
subject_name => "the AVRO schema in SR"
schema_version => "5"
}
value_serializer => "org.apache.kafka.common.serialization.ByteArraySerializer"
}
Hello, I maintain a repository that I forked from this repository (because I needed to add some local concerns that wouldn't be appropriate to feed back to this repository). Recently I found that my pipeline would occassionally crash because of web access logs that contained attempts at path traversal using overlong character sequences.
Because the input's encoder didn't sanitize for this condition (for example by replacing illegal character sequences with a Unicode replacement character, which is what the standard codecs seem to do) it caused an unhandled exception when Logstash evaluated a conditional such as
if [message] =~ /foo/ { ... }
This caused the pipeline to shutdown. The error in the log (using Logstash 7.14.0) would appear like this:
[2021-08-11T13:04:18,744][ERROR][logstash.javapipeline ][main] Pipeline worker error, the pipeline will be stopped {:pipeline_id=>"main", :error=>"(ArgumentError) invalid byte sequence in UTF-8", :exception=>Java::OrgJrubyExceptions::ArgumentError, :backtrace=>["org.jruby.RubyRegexp.match?(org/jruby/RubyRegexp.java:1180)", "usr.share.logstash.logstash_minus_core.lib.logstash.java_pipeline.start_workers(/usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb:295)"], :thread=>"#<Thread:0x2230aabb sleep>"}
[2021-08-11T13:04:18,749][INFO ][filewatch.observingread ][main] QUIT - closing all files and shutting down.
[2021-08-11T13:04:18,750][WARN ][logstash.javapipeline ][main] Waiting for input plugin to close {:pipeline_id=>"main", :thread=>"#<Thread:0x2230aabb run>"}
I have a post at https://discuss.elastic.co/t/logstash-7-14-0-invalid-byte-sequence-in-utf-8-in-logstash-javapipeline/280976/5
The short version is that for each string field you should use a LogStash::Util::Charset object and .convert it; this uses String.valid_encoding?
to detect if a string is dodgy and then does a hack to clean it up a bit.
We wan't to use this lib to use logstash with schemas from our schema-registry. But when deserializing the data we get this error:
Avro::SchemaParseError: Error validating default for workday_hours: at . expected type double, got BigDecimal with value 0.0
The relevant part of the schema:
{
"name" : "workday_hours",
"type" : {
"type" : "double",
"connect.default" : 0.0
},
"default" : 0.0
}
The column in our database was float(10, 2) (imported using debezium).
The call stack:
[2019-08-14T10:59:25,376][ERROR][logstash.javapipeline ] A plugin had an unrecoverable error. Will restart this plugin.
Pipeline_id:main
Plugin: <LogStash::Inputs::Kafka value_deserializer_class=>"org.apache.kafka.common.serialization.ByteArrayDeserializer", codec=><LogStash::Codecs::AvroSchemaRegistry endpoint=>"http://kafka.local:8081", id=>"9ec6aef1-7f4a-4db3-98e9-d4273a5cebbc", enable_metric=>true, check_compatibility=>false, register_schema=>false, binary_encoded=>true, tag_on_failure=>false, verify_mode=>"verify_peer">, auto_offset_reset=>"earliest", topics_pattern=>"^paymodb\\.paymoapp\\..*", id=>"kafka_paymodb", bootstrap_servers=>"kafka.local:9092", client_id=>"logstash", enable_metric=>true, auto_commit_interval_ms=>"5000", consumer_threads=>1, enable_auto_commit=>"true", group_id=>"logstash", key_deserializer_class=>"org.apache.kafka.common.serialization.StringDeserializer", topics=>["logstash"], poll_timeout_ms=>100, ssl_endpoint_identification_algorithm=>"https", security_protocol=>"PLAINTEXT", sasl_mechanism=>"GSSAPI", decorate_events=>false>
Error: Error validating default for workday_hours: at . expected type double, got BigDecimal with value 0.0
Exception: Avro::SchemaParseError
Stack: /usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/avro-1.9.0/lib/avro/schema.rb:403:in `validate_default!'
/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/avro-1.9.0/lib/avro/schema.rb:377:in `initialize'
/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/avro-1.9.0/lib/avro/schema.rb:216:in `block in make_field_objects'
org/jruby/RubyArray.java:1792:in `each'
org/jruby/RubyEnumerable.java:1194:in `each_with_index'
/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/avro-1.9.0/lib/avro/schema.rb:209:in `make_field_objects'
/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/avro-1.9.0/lib/avro/schema.rb:240:in `initialize'
/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/avro-1.9.0/lib/avro/schema.rb:72:in `real_parse'
/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/avro-1.9.0/lib/avro/schema.rb:161:in `subparse'
/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/avro-1.9.0/lib/avro/schema.rb:295:in `block in initialize'
org/jruby/RubyArray.java:1792:in `each'
org/jruby/RubyEnumerable.java:1204:in `each_with_object'
/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/avro-1.9.0/lib/avro/schema.rb:294:in `initialize'
/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/avro-1.9.0/lib/avro/schema.rb:90:in `real_parse'
/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/avro-1.9.0/lib/avro/schema.rb:161:in `subparse'
/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/avro-1.9.0/lib/avro/schema.rb:372:in `initialize'
/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/avro-1.9.0/lib/avro/schema.rb:216:in `block in make_field_objects'
org/jruby/RubyArray.java:1792:in `each'
org/jruby/RubyEnumerable.java:1194:in `each_with_index'
/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/avro-1.9.0/lib/avro/schema.rb:209:in `make_field_objects'
/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/avro-1.9.0/lib/avro/schema.rb:240:in `initialize'
/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/avro-1.9.0/lib/avro/schema.rb:72:in `real_parse'
/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/avro-1.9.0/lib/avro/schema.rb:38:in `parse'
/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-codec-avro_schema_registry-1.1.1/lib/logstash/codecs/avro_schema_registry.rb:158:in `get_schema'
/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-codec-avro_schema_registry-1.1.1/lib/logstash/codecs/avro_schema_registry.rb:228:in `decode'
/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-input-kafka-9.0.0/lib/logstash/inputs/kafka.rb:256:in `block in thread_runner'
/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-input-kafka-9.0.0/lib/logstash/inputs/kafka.rb:255:in `block in thread_runner'
[2019-08-14T10:59:25,405][FATAL][logstash.runner ] An unexpected error occurred! {:error=>#<Avro::SchemaParseError: Error validating default for workday_hours: at . expected type double, got BigDecimal with value 0.0>, :backtrace=>["/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/avro-1.9.0/lib/avro/schema.rb:403:in `validate_default!'", "/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/avro-1.9.0/lib/avro/schema.rb:377:in `initialize'", "/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/avro-1.9.0/lib/avro/schema.rb:216:in `block in make_field_objects'", "org/jruby/RubyArray.java:1792:in `each'", "org/jruby/RubyEnumerable.java:1194:in `each_with_index'", "/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/avro-1.9.0/lib/avro/schema.rb:209:in `make_field_objects'", "/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/avro-1.9.0/lib/avro/schema.rb:240:in `initialize'", "/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/avro-1.9.0/lib/avro/schema.rb:72:in `real_parse'", "/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/avro-1.9.0/lib/avro/schema.rb:161:in `subparse'", "/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/avro-1.9.0/lib/avro/schema.rb:295:in `block in initialize'", "org/jruby/RubyArray.java:1792:in `each'", "org/jruby/RubyEnumerable.java:1204:in `each_with_object'", "/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/avro-1.9.0/lib/avro/schema.rb:294:in `initialize'", "/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/avro-1.9.0/lib/avro/schema.rb:90:in `real_parse'", "/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/avro-1.9.0/lib/avro/schema.rb:161:in `subparse'", "/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/avro-1.9.0/lib/avro/schema.rb:372:in `initialize'", "/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/avro-1.9.0/lib/avro/schema.rb:216:in `block in make_field_objects'", "org/jruby/RubyArray.java:1792:in `each'", "org/jruby/RubyEnumerable.java:1194:in `each_with_index'", "/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/avro-1.9.0/lib/avro/schema.rb:209:in `make_field_objects'", "/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/avro-1.9.0/lib/avro/schema.rb:240:in `initialize'", "/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/avro-1.9.0/lib/avro/schema.rb:72:in `real_parse'", "/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/avro-1.9.0/lib/avro/schema.rb:38:in `parse'", "/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-codec-avro_schema_registry-1.1.1/lib/logstash/codecs/avro_schema_registry.rb:158:in `get_schema'", "/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-codec-avro_schema_registry-1.1.1/lib/logstash/codecs/avro_schema_registry.rb:228:in `decode'", "/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-input-kafka-9.0.0/lib/logstash/inputs/kafka.rb:256:in `block in thread_runner'", "/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-input-kafka-9.0.0/lib/logstash/inputs/kafka.rb:255:in `block in thread_runner'"]}
Hello
I've got this error on logstash 5.6.4
with installed plugin version 1.1.1
Sending Logstash's logs to /var/log/logstash which is now configured via log4j2.properties
21:38:59.734 [main] INFO logstash.modules.scaffold - Initializing module {:module_name=>"fb_apache", :directory=>"/usr/share/logstash/modules/fb_apache/configuration"}
21:38:59.737 [main] INFO logstash.modules.scaffold - Initializing module {:module_name=>"netflow", :directory=>"/usr/share/logstash/modules/netflow/configuration"}
21:38:59.743 [main] INFO logstash.setting.writabledirectory - Creating directory {:setting=>"path.queue", :path=>"/var/lib/logstash/queue"}
21:38:59.744 [main] INFO logstash.setting.writabledirectory - Creating directory {:setting=>"path.dead_letter_queue", :path=>"/var/lib/logstash/dead_letter_queue"}
21:38:59.772 [LogStash::Runner] INFO logstash.agent - No persistent UUID file found. Generating new UUID {:uuid=>"45ee9744-50b2-41bc-a468-39c68e0713f6", :path=>"/var/lib/logstash/uuid"}
SyntaxError: /usr/share/logstash/vendor/bundle/jruby/1.9/gems/schema_registry-0.1.0/lib/schema_registry/client.rb:33: syntax error, unexpected tPOW
def initialize(endpoint, username = nil, password = nil, **http_options)
^
require at org/jruby/RubyKernel.java:1040
require at /usr/share/logstash/vendor/bundle/jruby/1.9/gems/polyglot-0.3.5/lib/polyglot.rb:65
(root) at /usr/share/logstash/vendor/bundle/jruby/1.9/gems/schema_registry-0.1.0/lib/schema_registry.rb:16
require at org/jruby/RubyKernel.java:1040
require at /usr/share/logstash/vendor/bundle/jruby/1.9/gems/polyglot-0.3.5/lib/polyglot.rb:65
(root) at /usr/share/logstash/vendor/bundle/jruby/1.9/gems/logstash-codec-avro_schema_registry-1.1.1/lib/logstash/codecs/avro_schema_registry.rb:1
require at org/jruby/RubyKernel.java:1040
require at /usr/share/logstash/vendor/bundle/jruby/1.9/gems/polyglot-0.3.5/lib/polyglot.rb:65
(root) at /usr/share/logstash/vendor/bundle/jruby/1.9/gems/logstash-codec-avro_schema_registry-1.1.1/lib/logstash/codecs/avro_schema_registry.rb:4
(root) at /usr/share/logstash/logstash-core/lib/logstash/plugins/registry.rb:1
legacy_lookup at /usr/share/logstash/logstash-core/lib/logstash/plugins/registry.rb:156
lookup at /usr/share/logstash/logstash-core/lib/logstash/plugins/registry.rb:138
lookup_pipeline_plugin at /usr/share/logstash/logstash-core/lib/logstash/plugins/registry.rb:180
lookup at /usr/share/logstash/logstash-core/lib/logstash/plugin.rb:140
eval at org/jruby/RubyKernel.java:1079
plugin at /usr/share/logstash/logstash-core/lib/logstash/pipeline.rb:103
initialize at (eval):8
initialize at /usr/share/logstash/logstash-core/lib/logstash/pipeline.rb:75
initialize at /usr/share/logstash/logstash-core/lib/logstash/pipeline.rb:165
create_pipeline at /usr/share/logstash/logstash-core/lib/logstash/agent.rb:296
register_pipeline at /usr/share/logstash/logstash-core/lib/logstash/agent.rb:95
execute at /usr/share/logstash/logstash-core/lib/logstash/runner.rb:313
run at /usr/share/logstash/vendor/bundle/jruby/1.9/gems/clamp-0.6.5/lib/clamp/command.rb:67
(root) at /usr/share/logstash/lib/bootstrap/environment.rb:71
This is my Docker file
FROM logstash:5.6.4
# Install Logstash Codec Plugin for Avro Schema Registry
# https://github.com/revpoint/logstash-codec-avro_schema_registry#logstash-codec---avro-schema-registry
RUN logstash-plugin install --version 1.1.1 logstash-codec-avro_schema_registry
Hello, I'm doing a bit of development with this codec (thank you for making this available, much appreciated). I struck an issue that may be due to change in logstash.
With logstash 6.3 (I haven't tried other versions), I get the following stack-trace
Jun 18 12:47:02 node-1 logstash[11790]: [FATAL] 2018-06-18 00:47:02.731 [LogStash::Runner] runner - An unexpected error occurred! {:error=>#<NoMethodError: undefined method `to_java_bytes' for #<#<Class:0x427befaa>:0x3ad546be>
Jun 18 12:47:02 node-1 logstash[11790]: Did you mean? to_java_object>, :backtrace=>[
"/usr/share/logstash/vendor/bundle/jruby/2.3.0/gems/logstash-output-kafka-7.0.10/lib/logstash/outputs/kafka.rb:199:in `block in register'",
"/usr/share/logstash/vendor/bundle/jruby/2.3.0/gems/logstash-codec-avro_schema_registry-1.1.0/lib/logstash/codecs/avro_schema_registry.rb:251:in `encode'",
"/usr/share/logstash/vendor/bundle/jruby/2.3.0/gems/logstash-output-kafka-7.0.10/lib/logstash/outputs/kafka.rb:221:in `block in multi_receive'",
"org/jruby/RubyArray.java:1734:in `each'",
"/usr/share/logstash/vendor/bundle/jruby/2.3.0/gems/logstash-output-kafka-7.0.10/lib/logstash/outputs/kafka.rb:219:in `multi_receive'",
"org/logstash/config/ir/compiler/OutputStrategyExt.java:109:in `multi_receive'",
"org/logstash/config/ir/compiler/OutputDelegatorExt.java:156:in `multi_receive'", "/usr/share/logstash/logstash-core/lib/logstash/pipeline.rb:475:in `block in output_batch'",
"org/jruby/RubyHash.java:1343:in `each'",
"/usr/share/logstash/logstash-core/lib/logstash/pipeline.rb:474:in `output_batch'",
"/usr/share/logstash/logstash-core/lib/logstash/pipeline.rb:426:in `worker_loop'",
"/usr/share/logstash/logstash-core/lib/logstash/pipeline.rb:384:in `block in start_workers'"]}
Jun 18 12:47:03 node-1 logstash[11790]: [ERROR] 2018-06-18 00:47:03.076 [LogStash::Runner] Logstash - java.lang.IllegalStateException: Logstash stopped processing because of an error: (SystemExit) exit
This appears to occur because of the following code in avro_schema_registry.rb
/usr/share/logstash/vendor/bundle/jruby/2.3.0/gems/logstash-codec-avro_schema_registry-1.1.0/lib/logstash/codecs/avro_schema_registry.rb
if @binary_encoded
@on_event.call(event, buffer.string.to_java_bytes) # <--- line 251
This points to a to_java_bytes method, but this seems to cause issues with kafka.rb
(/usr/share/logstash/vendor/bundle/jruby/2.3.0/gems/logstash-output-kafka-7.0.10/lib/logstash/outputs/kafka.rb)
elsif value_serializer == 'org.apache.kafka.common.serialization.ByteArraySerializer'
@codec.on_event do |event, data|
write_to_kafka(event, data.to_java_bytes) # <--- line 199
end
So data
here is presumably already a to_java_bytes.... and is why jruby is complaining that it doesn't have to_java_bytes method.
Changing avro_schema_registry.rb to pass in buffer.string instead of buffer.string.to_java_bytes seems to fix this problem, and I get the results I am expecting.
if @binary_encoded
#@on_event.call(event, buffer.string.to_java_bytes)
@on_event.call(event, buffer.string)
else
@on_event.call(event, Base64.strict_encode64(buffer.string))
end
Disclaimer: I am doing some custom development on your plugin to support my own use-case (its not in any public repo at this time, too early). In my use-case, I have an AVRO schema that I use to encapsulate the event in a logstash reception tier. I have various fields that are used in a backbone format, and the event goes into a 'message' field (and could be json, some binary payload, etc.). That said, the same behaviour occurs when I use avro_schema_registry.rb as per your current HEAD.
Possibly related to logstash-plugins/logstash-output-kafka#123
Here is my configuration and testing; after fixing the issue.
My logstash output configuration:
input {
tcp {
host => "0.0.0.0"
port => 5140
mode => "server"
codec => "json_lines"
}
}
output {
kafka {
topic_id => "wapiti_backbone_sandpit"
compression_type => "snappy"
codec => avro_schema_registry {
endpoint => "http://127.0.0.1:8081"
subject_name => "wapiti_backbone_submitted-value"
schema_version => "3"
register_schema => false
binary_encoded => true
}
value_serializer => "org.apache.kafka.common.serialization.ByteArraySerializer"
}
}
Current version of schema (will change 'message' to bytes soon).
{
"name": "wapiti_backbone",
"namespace": "local.wapiti",
"doc":"A log event on a Wapiti Backbone topic",
"type": "record",
"fields": [
{
"name":"submission_time",
"type":"long",
"logical-type":"timestamp-millis",
"doc":"When first seen at log submission. ms since Unix epoch"
},
{
"name":"submitted_from",
"type":"string",
"doc":"Source Hostname or IP at entry of log submission"
},
{
"name":"originating_host",
"type":"string",
"doc":"Hostname of the host the log was first created on"
},
{
"name":"vertical",
"type":"string",
"doc":"Business application, eg. 'corporate-website'"
},
{
"name":"environment",
"type":"string",
"doc":"Business environment, eg. 'dev' or 'prod'"
},
{
"name":"processing_key",
"type":"string",
"doc":"Processing key, eg. 'apache-httpd-access-combined'"
},
{
"name":"message_format",
"type":"string",
"doc":"format of data contained in 'message' field, eg. 'json', 'syslog', ..."
},
{
"name":"message",
"type":"string",
"doc":"The encapsulated message payload."
}
]
}
Test input:
[vagrant@node-1 ~]$ echo "{\"breath_in\": \"$(date --iso-8601=ns)\"}" | nc 127.0.0.1 5140
Output when viewed using kafka-avro-console-consumer:
{"submission_time":0,"submitted_from":"SUBMITTED_FROM","originating_host":"ORIGINATING_HOST","vertical":"VERTICAL","environment":"ENVIRONMENT","processing_key":"PROCESSING_KEY","message_format":"json","message":"{\"host\":\"node-1\",\"@timestamp\":\"2018-06-18T01:27:59.221Z\",\"breath_in\":\"2018-06-18T13:27:59,081679157+1200\",\"@version\":\"1\",\"port\":48710}"}
Using Logstash version 8.13.0
When tombstones are sent to the input topic, they are swallowed by the plugin and never surface in the pipeline.
Kafka tombstones are events with a null value.
They are used to signal the deletion of the entity behind the key of the message.
Here, we use an Avro-schema for the topic: this may be the source of the problem.
See an example pipeline below.
This null-management is crucial because we use Logstash to ingest Kafka topics into ElasticSearch, so we can index the entities present in the events, and entites can be deleted (they are streamed from a database): such deleted entities should be unindexed in ElasticSearch.
It seems like the problem lies in this file:
https://github.com/revpoint/logstash-codec-avro_schema_registry/blob/master/lib/logstash/codecs/avro_schema_registry.rb#L218
We clearly see the warning that is printed by Logstash when it encounters a null value:
public
def decode(data)
if data.length < 5
@logger.error('message is too small to decode')
else
It could be just a test if data.length == 0 to trigger the tombstone handling:
Giving an empty log with one field: [@metadata][kafka][tombstone]=true
Steps to reproduce:
Here is a minimum pipeline showing that null events are not sent to the pipeline, and a sample of our use-case we would like to achieve:
input {
kafka {
bootstrap_servers => ["${KAFKA_BROKER_URL}"]
topics => ["${TOPIC}"]
security_protocol => "SASL_SSL"
sasl_mechanism => "PLAIN"
sasl_jaas_config => "${SASL_JAAS_CONFIG}"
ssl_endpoint_identification_algorithm => "https"
group_id => "${GROUP_ID}"
auto_offset_reset => "${AUTO_OFFSET_RESET}"
isolation_level => "read_committed"
codec => avro_schema_registry {
endpoint => "${KAFKA_SCHEMA_REGISTRY_URL}"
username => "${KAFKA_SCHEMA_REGISTRY_USERNAME}"
password => "${KAFKA_SCHEMA_REGISTRY_PASSWORD}"
}
value_deserializer_class => "org.apache.kafka.common.serialization.ByteArrayDeserializer"
decorate_events => "basic"
}
}
# Here, it's already too late: the event is not processed, so there is nothing we can filter on...
# filter {
# }
output {
# Here, we'd like to do this:
# if [@metadata][kafka][tombstone] {
# elasticsearch {
# action => "delete"
# ...
# }
# } else {
# elasticsearch {
# action => "index"
# ...
# }
# }
stdout {
codec => rubydebug { metadata => true }
}
}
Confluent registry supports TLS authentication the logstash codec should support this ( optionally ).
This is my input kafka plugin configuration
input {
kafka {
bootstrap_servers => "th-kafka-svc:9093" # example: dc-cdf-master.ad.interset.com:9092
topics => ["avro-event"]
...
decorate_events => true
key_deserializer_class => "org.apache.kafka.common.serialization.ByteArrayDeserializer"
value_deserializer_class => "org.apache.kafka.common.serialization.ByteArrayDeserializer"
codec => avro_schema_registry {
endpoint => "https://th-schema-svc:8081"
client_key => "/vault-crt/logstash.key"
client_certificate => "/vault-crt/logstash.crt"
ca_certificate => "/vault-crt/trustedCAs/RE_ca.crt"
verify_mode => "verify_peer"
}
}
}
so do decorate_events is set to true, i see schema registry codec throws exception as below,
org.logstash.MissingConverterException: Missing Converter handling for full class name=[B, simple name=byte[]
at org.logstash.Valuefier.fallbackConvert(Valuefier.java:118)
at org.logstash.Valuefier.convert(Valuefier.java:96)
at org.logstash.Valuefier.lambda$static$3(Valuefier.java:72)
at org.logstash.Valuefier.convert(Valuefier.java:94)
at org.logstash.ext.JrubyEventExtLibrary$RubyEvent.safeValueifierConvert(JrubyEventExtLibrary.java:355)
at org.logstash.ext.JrubyEventExtLibrary$RubyEvent.ruby_set_field(JrubyEventExtLibrary.java:121)
at org.logstash.ext.JrubyEventExtLibrary$RubyEvent$INVOKER$i$2$0$ruby_set_field.call(JrubyEventExtLibrary$RubyEvent$INVOKER$i$2$0$ruby_set_field.gen)
at org.jruby.internal.runtime.methods.JavaMethod$JavaMethodN.call(JavaMethod.java:835)
at org.jruby.ir.targets.InvokeSite.invoke(InvokeSite.java:207)
at usr.share.logstash.vendor.bundle.jruby.$2_dot_5_dot_0.gems.logstash_minus_integration_minus_kafka_minus_10_dot_5_dot_3_minus_java.lib.logstash.inputs.kafka.RUBY$block$thread_runner$3(/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-integration-kafka-10.5.3-java/lib/logstash/inputs/kafka.rb:286)
at org.jruby.ir.targets.YieldSite.yield(YieldSite.java:110)
at usr.share.logstash.vendor.bundle.jruby.$2_dot_5_dot_0.gems.logstash_minus_codec_minus_avro_schema_registry_minus_1_dot_2_dot_0.lib.logstash.codecs.avro_schema_registry.RUBY$method$decode$0(/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-codec-avro_schema_registry-1.2.0/lib/logstash/codecs/avro_schema_registry.rb:233)
at usr.share.logstash.vendor.bundle.jruby.$2_dot_5_dot_0.gems.logstash_minus_codec_minus_avro_schema_registry_minus_1_dot_2_dot_0.lib.logstash.codecs.avro_schema_registry.RUBY$method$decode$0$VARARGS(/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-codec-avro_schema_registry-1.2.0/lib/logstash/codecs/avro_schema_registry.rb)
at org.jruby.internal.runtime.methods.CompiledIRMethod.call(CompiledIRMethod.java:80)
at org.jruby.internal.runtime.methods.MixedModeIRMethod.call(MixedModeIRMethod.java:70)
er$2(/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-integration-kafka-10.5.3-java/lib/logstash/inputs/kafka.rb:279)
at org.jruby.runtime.CompiledIRBlockBody.yieldDirect(CompiledIRBlockBody.java:148)
at org.jruby.runtime.BlockBody.yield(BlockBody.java:106)
at org.jruby.runtime.Block.yield(Block.java:184)
at org.jruby.javasupport.ext.JavaLang$Iterable.each(JavaLang.java:98)
at org.jruby.javasupport.ext.JavaLang$Iterable$INVOKER$s$0$0$each.call(JavaLang$Iterable$INVOKER$s$0$0$each.gen)
at org.jruby.internal.runtime.methods.JavaMethod$JavaMethodZeroBlock.call(JavaMethod.java:555)
at org.jruby.ir.targets.InvokeSite.invoke(InvokeSite.java:197)
at usr.share.logstash.vendor.bundle.jruby.$2_dot_5_dot_0.gems.logstash_minus_integration_minus_kafka_minus_10_dot_5_dot_3_minus_java.lib.logstash.inputs.kafka.RUBY$block$thread_runner$1(/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-integration-kafka-10.5.3-java/lib/logstash/inputs/kafka.rb:278)
at org.jruby.runtime.CompiledIRBlockBody.callDirect(CompiledIRBlockBody.java:138)
at org.jruby.runtime.IRBlockBody.call(IRBlockBody.java:58)
at org.jruby.runtime.IRBlockBody.call(IRBlockBody.java:52)
at org.jruby.runtime.Block.call(Block.java:139)
at org.jruby.RubyProc.call(RubyProc.java:318)
at org.jruby.internal.runtime.RubyRunnable.run(RubyRunnable.java:105)
at java.lang.Thread.run(Thread.java:748)
Can you help to fix this issue.
key_deserializer_class => "org.apache.kafka.common.serialization.ByteArrayDeserializer"
value_deserializer_class => "org.apache.kafka.common.serialization.ByteArrayDeserializer"
should be added to the kafka input options in the examples.
Error:
Debería convertirlo directamente
[2024-07-31T06:25:33,133][ERROR][org.logstash.Logstash ][test][a9ed325a79c7c8b8081d2f9e256338696c246c834c93fa755e87ffe03ed66ff6] uncaught exception (in thread kafka-input-worker-logstash-0)
org.jruby.exceptions.RangeError: (RangeError) integer 1722256768896 too big to convert to `int'
at org.jruby.ext.stringio.StringIO.read(org/jruby/ext/stringio/StringIO.java:857) ~[stringio.jar:?]
at RUBY.read(/usr/share/logstash/vendor/bundle/jruby/3.1.0/gems/avro-1.10.2/lib/avro/io.rb:106) ~[?:?]
at RUBY.read_bytes(/usr/share/logstash/vendor/bundle/jruby/3.1.0/gems/avro-1.10.2/lib/avro/io.rb:93) ~[?:?]
at RUBY.read_string(/usr/share/logstash/vendor/bundle/jruby/3.1.0/gems/avro-1.10.2/lib/avro/io.rb:99) ~[?:?]
at RUBY.read_data(/usr/share/logstash/vendor/bundle/jruby/3.1.0/gems/avro-1.10.2/lib/avro/io.rb:276) ~[?:?]
at RUBY.read_record(/usr/share/logstash/vendor/bundle/jruby/3.1.0/gems/avro-1.10.2/lib/avro/io.rb:364) ~[?:?]
at org.jruby.RubyArray.each(org/jruby/RubyArray.java:1981) ~[jruby.jar:?]
at RUBY.read_record(/usr/share/logstash/vendor/bundle/jruby/3.1.0/gems/avro-1.10.2/lib/avro/io.rb:361) ~[?:?]
at RUBY.read_data(/usr/share/logstash/vendor/bundle/jruby/3.1.0/gems/avro-1.10.2/lib/avro/io.rb:287) ~[?:?]
at RUBY.read(/usr/share/logstash/vendor/bundle/jruby/3.1.0/gems/avro-1.10.2/lib/avro/io.rb:252) ~[?:?]
at RUBY.decode(/usr/share/logstash/vendor/bundle/jruby/3.1.0/gems/logstash-codec-avro_schema_registry-1.2.0/lib/logstash/codecs/avro_schema_registry.rb:233) ~[?:?]
at RUBY.handle_record(/usr/share/logstash/vendor/bundle/jruby/3.1.0/gems/logstash-integration-kafka-11.4.2-java/lib/logstash/inputs/kafka.rb:360) ~[?:?]
at RUBY.thread_runner(/usr/share/logstash/vendor/bundle/jruby/3.1.0/gems/logstash-integration-kafka-11.4.2-java/lib/logstash/inputs/kafka.rb:329) ~[?:?]
at RUBY.thread_runner(/usr/share/logstash/vendor/bundle/jruby/3.1.0/gems/logstash-integration-kafka-11.4.2-java/lib/logstash/inputs/kafka.rb:329) ~[?:?]
[2024-07-31T06:25:33,209][INFO ][logstash.javapipeline ][test] Pipeline terminated {"pipeline.id"=>"test"}
L.S.
I get this error:
OpenSSL::SSL::SSLError: Received fatal alert: handshake_failure
connect_nonblock at org/jruby/ext/openssl/SSLSocket.java:276
ssl_socket_connect at uri:classloader:/META-INF/jruby.home/lib/ruby/stdlib/net/protocol.rb:44
connect at uri:classloader:/META-INF/jruby.home/lib/ruby/stdlib/net/http.rb:985
do_start at uri:classloader:/META-INF/jruby.home/lib/ruby/stdlib/net/http.rb:924
start at uri:classloader:/META-INF/jruby.home/lib/ruby/stdlib/net/http.rb:913
start at uri:classloader:/META-INF/jruby.home/lib/ruby/stdlib/net/http.rb:609
request at /usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/schema_registry-0.1.0/lib/schema_registry/client.rb:101
schema at /usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/schema_registry-0.1.0/lib/schema_registry/client.rb:40
get_schema at /usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-codec-avro_schema_registry-1.2.0/lib/logstash/codecs/avro_schema_registry.rb:160
encode at /usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-codec-avro_schema_registry-1.2.0/lib/logstash/codecs/avro_schema_registry.rb:248
multi_receive at /usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-output-kafka-7.3.2/lib/logstash/outputs/kafka.rb:223
each at org/jruby/RubyArray.java:1792
multi_receive at /usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-output-kafka-7.3.2/lib/logstash/outputs/kafka.rb:221
multi_receive at org/logstash/config/ir/compiler/OutputStrategyExt.java:118
multi_receive at org/logstash/config/ir/compiler/AbstractOutputDelegatorExt.java:101
output_batch at /usr/share/logstash/logstash-core/lib/logstash/pipeline.rb:390
each at org/jruby/RubyHash.java:1419
output_batch at /usr/share/logstash/logstash-core/lib/logstash/pipeline.rb:389
worker_loop at /usr/share/logstash/logstash-core/lib/logstash/pipeline.rb:341
start_workers at /usr/share/logstash/logstash-core/lib/logstash/pipeline.rb:304
The config is:
kafka {
codec => avro_schema_registry {
endpoint => "https://XXXXXX.somewhere"
subject_name => "subj-name"
schema_id => 1121
schema_uri => "https://XXXXXX.somewhere"
base64_encoded => false
verify_mode => "verify_none"
}
ssl_truststore_location => "/XXXX/XXXXX/xxxx.jks"
ssl_truststore_password => "xxxxxxx"
ssl_keystore_location => "/XXXX/XXXXXs/xxxxx.jks"
ssl_key_password => "xxxxxxx"
ssl_keystore_password => "xxxxxxx"
security_protocol => "SSL"
bootstrap_servers => "XXXXXX.somewhere:9092"
topic_id => "scema_name"
value_serializer => "org.apache.kafka.common.serialization.ByteArraySerializer"
}
What could cause this handshake failure?
Hello, I've installed the plugin on my LS installation (v.5.2.1) and I configured my kafka input section for using it. I started LS and the events start flowing from my Kafka queue towards a file (just for testing) and I was surprised to see that every field is correctly converted except the time fields.
{"eventId":"my_eventId","@timestamp":"2017-08-11T10:26:53.799Z","activity":{"activityTime":-111032204562303474814398456,"startTime":-52944280892516839416,"type":"my_type","watchType":"my_watch_type","uuid":"my_uuid"},"@Version":"1","type":"My_type","device":{"deviceId":"my_deviceId","version":"My_version"},"timestamp":-111032204562303474814398456}
I thought about that there were problems on the data so I tried to consume them with a kafka-consumer and they are correctly interpreted and translated
{"eventId":"my-eventId","timestamp":1502445430,"device":{"deviceId":"my_deviceId","version":"my_version"},"activity":{"activityTime":1502445430,"type":"my_type","watchType":"my_watch_type","startTime":1501848509,"uuid":"my_uuid"}}
As you can see, the events managed by Logstash are damaged: all the fields that contain a date are wrongly converted while the kafka consumer converts them to the correct UNIX format.
Into the AVRO schema the fields activityTime, activityStart and timestamp are defined as int.
Logstash do nothing apart taking the input from the Kafka queue and sending it to a test file adding its service fields (no filters are applied).
Regards.
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.