Git Product home page Git Product logo

logstash-codec-avro_schema_registry's People

Contributors

camerondavison avatar cgiraldo avatar fbaligand avatar johntdyer avatar jsvd avatar oruen avatar ph avatar rbkasat avatar ryananguiano avatar suyograo avatar talevy avatar ycombinator avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

logstash-codec-avro_schema_registry's Issues

Message generated with this codec different from the one produced by kafka-avro-console-producer

Im trying to publish a JSON input to a kafka message in Logstash
(in essence trying to replicate the behavior of the kafka-avro-console-producer).

The message gets added in the queue but is different from the one produced by the kafka-avro-console-producer

Message produced by kafka-avro-console-producer
{
"topic": "BellData7",
"key": "�ée",
"value": "\u0000\u0000\u0000\u0000\u0001\nBMORR\fCELTRR",
"partition": 0,
"offset": 1
},

Message produced by logstash (with the configuration below):
{
"topic": "BellData7",
"key": "�ée",
"value": "\u0000\u0000\u0000\u0000\u0001\nBMORR\fCELTRR",
"partition": 0,
"offset": 1
},

I can see the magic digits are clearly missing and the data appears to be encoded differently.... what am I doing wrong

The logstash conf file:

input {
stdin {
}
}

filter{
json {
source => message
remove_field => [ "path","@timestamp","@Version","host", "message" ]
remove_tag => ["message"]
}

}

output {
kafka {
bootstrap_servers => "172.17.0.3:9092"
codec => avro_schema_registry {
endpoint => "http://172.17.0.3:8081"
subject_name => "MyTopic7-value"
schema_uri => "/usr/share/logstash/pipeline/schema.avsc"
register_schema => true
binary_encoded => true
}
topic_id => "MyTopic7"
}
}

I enter the following input in the stdin

{"Customer": "BMORR", "Device": "logstash"}


Schema.avsc is as below:

{
"namespace": "hello.avro",
"type": "record",
"name": "Hello",
"fields": [
{"name": "Customer", "type": "string"},
{"name": "Device", "type": "string"}
]
}

Logstash 8.12.*. fails to start using the plugin

I'm running logstash 8.12.2 docker image with the plugin installed.

this is my config which always used to work

output {
    kafka {
        codec => avro_schema_registry {
          endpoint            => "https://reg.my.net:8443"
          schema_id           => "761"
          register_schema     => "false"
          base64_encoded      => "false"
          client_key          => "/vault/kafka.key"
          client_certificate  => "/vault/kafka.pem"
          ca_certificate      => "/etc/pki/ca-trust/source/anchors/ca-bundle.crt"
        }
        bootstrap_servers       => "${KAFKA_BOOTSTRAP_SERVERS}"
        topic_id                => "${KAFKA_AUDIT_TOPIC_ID}"
        value_serializer        =>  "org.apache.kafka.common.serialization.ByteArraySerializer"
        ssl_truststore_location => "/vault/truststore.jks"
        ssl_truststore_password => "${LOGSTASH_KEYSTORE_PASS}"
        ssl_keystore_location   => "/vault/kafka-keystore.jks"
        ssl_keystore_password   => "${LOGSTASH_KEYSTORE_PASS}"
        security_protocol       => "${KAFKA_SECURITY_PROTOCOL}"
    }
}

logstash fails with:

[ERROR] 2024-05-24 13:06:15.614 [Converge PipelineAction::Create<audit-log>] agent - Failed to execute action {:action=>LogStash::PipelineAction::Create/pipeline_id:audit-log, :exception=>"Java::JavaLang::IllegalStateException", :message=>"Unable to configure plugins: (ArgumentError) wrong number of arguments (given 4, expected 1..3)", :backtrace=>["org.logstash.config.ir.CompiledPipeline.<init>(CompiledPipeline.java:120)", "org.logstash.execution.AbstractPipelineExt.initialize(AbstractPipelineExt.java:186)", "org.logstash.execution.AbstractPipelineExt$INVOKER$i$initialize.call(AbstractPipelineExt$INVOKER$i$initialize.gen)", "org.jruby.internal.runtime.methods.JavaMethod$JavaMethodN.call(JavaMethod.java:847)", "org.jruby.ir.runtime.IRRuntimeHelpers.instanceSuper(IRRuntimeHelpers.java:1319)", "org.jruby.ir.instructions.InstanceSuperInstr.interpret(InstanceSuperInstr.java:139)", "org.jruby.ir.interpreter.InterpreterEngine.processCall(InterpreterEngine.java:367)", "org.jruby.ir.interpreter.StartupInterpreterEngine.interpret(StartupInterpreterEngine.java:66)", "org.jruby.internal.runtime.methods.MixedModeIRMethod.INTERPRET_METHOD(MixedModeIRMethod.java:128)", "org.jruby.internal.runtime.methods.MixedModeIRMethod.call(MixedModeIRMethod.java:115)", "org.jruby.runtime.callsite.CachingCallSite.cacheAndCall(CachingCallSite.java:446)", "org.jruby.runtime.callsite.CachingCallSite.call(CachingCallSite.java:92)", "org.jruby.RubyClass.newInstance(RubyClass.java:931)", "org.jruby.RubyClass$INVOKER$i$newInstance.call(RubyClass$INVOKER$i$newInstance.gen)", "org.jruby.runtime.callsite.CachingCallSite.cacheAndCall(CachingCallSite.java:446)", "org.jruby.runtime.callsite.CachingCallSite.call(CachingCallSite.java:92)", "org.jruby.ir.instructions.CallBase.interpret(CallBase.java:548)", "org.jruby.ir.interpreter.InterpreterEngine.processCall(InterpreterEngine.java:367)", "org.jruby.ir.interpreter.StartupInterpreterEngine.interpret(StartupInterpreterEngine.java:66)", "org.jruby.ir.interpreter.InterpreterEngine.interpret(InterpreterEngine.java:88)", "org.jruby.internal.runtime.methods.MixedModeIRMethod.INTERPRET_METHOD(MixedModeIRMethod.java:238)", "org.jruby.internal.runtime.methods.MixedModeIRMethod.call(MixedModeIRMethod.java:225)", "org.jruby.internal.runtime.methods.DynamicMethod.call(DynamicMethod.java:228)", "org.jruby.runtime.callsite.CachingCallSite.cacheAndCall(CachingCallSite.java:476)", "org.jruby.runtime.callsite.CachingCallSite.call(CachingCallSite.java:293)", "org.jruby.ir.interpreter.InterpreterEngine.processCall(InterpreterEngine.java:328)", "org.jruby.ir.interpreter.StartupInterpreterEngine.interpret(StartupInterpreterEngine.java:66)", "org.jruby.ir.interpreter.Interpreter.INTERPRET_BLOCK(Interpreter.java:116)", "org.jruby.runtime.MixedModeIRBlockBody.commonYieldPath(MixedModeIRBlockBody.java:136)", "org.jruby.runtime.IRBlockBody.call(IRBlockBody.java:66)", "org.jruby.runtime.IRBlockBody.call(IRBlockBody.java:58)", "org.jruby.runtime.Block.call(Block.java:144)", "org.jruby.RubyProc.call(RubyProc.java:352)", "org.jruby.internal.runtime.RubyRunnable.run(RubyRunnable.java:111)", "java.base/java.lang.Thread.run(Thread.java:840)"]}

if i remove the codec settings in the kafka output logstash starts again.

Requires avsc or existing registered schema?

Does this require a schema to already exist in the Schema Registry, or for the user to provide an avsc?

I'm getting io.confluent.rest.exceptions.RestNotFoundException: Schema not found using this config:

      codec => avro_schema_registry {
      endpoint => "http://localhost:8081"
      schema_id => 1
        register_schema => true
      binary_encoded => true
    }
    value_serializer => "org.apache.kafka.common.serialization.ByteArraySerializer"

how to use subject_string?

I want to use a value schema that exists in the schema registry.
If the endpoint is the URL of the schema registry, should subject string be the (namespace+)name of the schema (subject)?

Support for Logstash 5.1.1?

Are you supporting logstash 5.1.1?

Bundler::VersionConflict: Bundler could not find compatible versions for gem "logstash-core":
In snapshot (Gemfile.lock):
logstash-core (= 5.1.1)

In Gemfile:
logstash-core-plugin-api (>= 0) java depends on
logstash-core (= 5.1.1) java

logstash-codec-avro_schema_registry (>= 0) java depends on
  logstash-core (< 3.0.0, >= 2.0.0.beta2) java

logstash-core (>= 0) java

Add encode capability

Requested by @malonej7 and others.

We need to develop the encode capabilities using Confluent Avro Schema Registry. Luckily schema registration is already taken care of by https://github.com/wvanbergen/schema_registry so we just need to figure out the settings interface.

The options that need to be added:

  • subject_name
  • schema_uri
  • schema_string
  • check_compatibility
  • override_compatibility

If anyone can think of anything else they would need, feel free to add them here. I might have time to take a crack at this soon unless someone else wants to give it a try. I will be happy to help walk anyone through it.

Error to decode - avro_schema_registry

  • logstash version 8.14.3
  • plugins versions:
    logstash-codec-avro (3.4.1)
    logstash-codec-avro_schema_registry (1.2.0)
  • Configuracion logstash:
    input
    {
    kafka
    {
    bootstrap_servers => ["vppevkfbobpe001:9092,vppevkfbobpe002:9092,vppevkfbobpe003:9092"]
    group_id => "datahub_audience_event_tools_group"
    topics => ["Audience_Event"]
    #decorate_events => true
    codec => avro_schema_registry {
    endpoint => ""
    #tag_on_failure => true
    }
    consumer_threads => 1
    value_deserializer_class => "org.apache.kafka.common.serialization.ByteArrayDeserializer"
    }
    }

Error:
Debería convertirlo directamente
[2024-07-31T06:25:33,133][ERROR][org.logstash.Logstash ][test][a9ed325a79c7c8b8081d2f9e256338696c246c834c93fa755e87ffe03ed66ff6] uncaught exception (in thread kafka-input-worker-logstash-0)
org.jruby.exceptions.RangeError: (RangeError) integer 1722256768896 too big to convert to `int'
at org.jruby.ext.stringio.StringIO.read(org/jruby/ext/stringio/StringIO.java:857) ~[stringio.jar:?]
at RUBY.read(/usr/share/logstash/vendor/bundle/jruby/3.1.0/gems/avro-1.10.2/lib/avro/io.rb:106) ~[?:?]
at RUBY.read_bytes(/usr/share/logstash/vendor/bundle/jruby/3.1.0/gems/avro-1.10.2/lib/avro/io.rb:93) ~[?:?]
at RUBY.read_string(/usr/share/logstash/vendor/bundle/jruby/3.1.0/gems/avro-1.10.2/lib/avro/io.rb:99) ~[?:?]
at RUBY.read_data(/usr/share/logstash/vendor/bundle/jruby/3.1.0/gems/avro-1.10.2/lib/avro/io.rb:276) ~[?:?]
at RUBY.read_record(/usr/share/logstash/vendor/bundle/jruby/3.1.0/gems/avro-1.10.2/lib/avro/io.rb:364) ~[?:?]
at org.jruby.RubyArray.each(org/jruby/RubyArray.java:1981) ~[jruby.jar:?]
at RUBY.read_record(/usr/share/logstash/vendor/bundle/jruby/3.1.0/gems/avro-1.10.2/lib/avro/io.rb:361) ~[?:?]
at RUBY.read_data(/usr/share/logstash/vendor/bundle/jruby/3.1.0/gems/avro-1.10.2/lib/avro/io.rb:287) ~[?:?]
at RUBY.read(/usr/share/logstash/vendor/bundle/jruby/3.1.0/gems/avro-1.10.2/lib/avro/io.rb:252) ~[?:?]
at RUBY.decode(/usr/share/logstash/vendor/bundle/jruby/3.1.0/gems/logstash-codec-avro_schema_registry-1.2.0/lib/logstash/codecs/avro_schema_registry.rb:233) ~[?:?]
at RUBY.handle_record(/usr/share/logstash/vendor/bundle/jruby/3.1.0/gems/logstash-integration-kafka-11.4.2-java/lib/logstash/inputs/kafka.rb:360) ~[?:?]
at RUBY.thread_runner(/usr/share/logstash/vendor/bundle/jruby/3.1.0/gems/logstash-integration-kafka-11.4.2-java/lib/logstash/inputs/kafka.rb:329) ~[?:?]
at RUBY.thread_runner(/usr/share/logstash/vendor/bundle/jruby/3.1.0/gems/logstash-integration-kafka-11.4.2-java/lib/logstash/inputs/kafka.rb:329) ~[?:?]
[2024-07-31T06:25:33,209][INFO ][logstash.javapipeline ][test] Pipeline terminated {"pipeline.id"=>"test"}

Install the plugin with logstash docker image

Is there a way to install this plugin when using docker image logstash?
the compose are just like that.

  logstash:
    image: docker.elastic.co/logstash/logstash:7.13.2
    restart: unless-stopped
    volumes:
      - ./logstash/logstash.conf:/usr/share/logstash/pipeline/logstash

Schema Registry - (ArgumentError) wrong number of arguments

Hello, I keep getting errors when trying to connect to my Schema Registry. I think it can't handle well the exception the server gives.

Logstash.javapipeline ][main] Pipeline worker error, the pipeline will be stopped {:pipeline_id=>"main", :error=>"(ArgumentError) wrong number of arguments (given 1, expected 2)", :exception=>Java::OrgJrubyExceptions::ArgumentError, :backtrace=>["org.jruby.RubyException.exception(org/jruby/RubyException.java:129)", "C_3a_.datos.logstash.vendor.bundle.jruby.$2_dot_5_dot_0.gems.schema_registry_minus_0_dot_1_dot_1.lib.schema_registry.client.request(C:/datos/logstash/vendor/bundle/jruby/2.5.0/gems/schema_registry-0.1.1/lib/schema_registry/client.rb:127)", "uri_3a_classloader_3a_.META_minus_INF.jruby_dot_home.lib.ruby.stdlib.net.http.start(uri:classloader:/META-INF/jruby.home/lib/ruby/stdlib/net/http.rb:914)", "uri_3a_classloader_3a_.META_minus_INF.jruby_dot_home.lib.ruby.stdlib.net.http.start(uri:classloader:/META-INF/jruby.home/lib/ruby/stdlib/net/http.rb:609)", "RUBY.request(C:/datos/logstash/vendor/bundle/jruby/2.5.0/gems/schema_registry-0.1.1/lib/schema_registry/client.rb:101)", "RUBY.version(C:/datos/logstash/vendor/bundle/jruby/2.5.0/gems/schema_registry-0.1.1/lib/schema_registry/subject.rb:30)", "RUBY.get_write_schema_id(C:/datos/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-codec-avro_schema_registry-1.2.0/lib/logstash/codecs/avro_schema_registry.rb:189)", ... ...

In my case I'm trying to output into kafka serializing with Avro:

output {
 file {
   path => "C:/datos/logstash/out/logfile.json"
   codec => "json_lines"
 }
 kafka {
   acks => "1"
   client_id => "My client"
   bootstrap_servers => "my servers"
   topic_id => "my topic"
   compression_type => "gzip"
   retries => 0
   codec => avro_schema_registry {
     endpoint=> "schema registry URI"
     subject_name => "the AVRO schema in SR"
     schema_version => "5"
   }
   value_serializer => "org.apache.kafka.common.serialization.ByteArraySerializer"
 }

You are likely vulnerable to Unicode overlong character sequences

Hello, I maintain a repository that I forked from this repository (because I needed to add some local concerns that wouldn't be appropriate to feed back to this repository). Recently I found that my pipeline would occassionally crash because of web access logs that contained attempts at path traversal using overlong character sequences.

Because the input's encoder didn't sanitize for this condition (for example by replacing illegal character sequences with a Unicode replacement character, which is what the standard codecs seem to do) it caused an unhandled exception when Logstash evaluated a conditional such as

if [message] =~ /foo/ { ... }

This caused the pipeline to shutdown. The error in the log (using Logstash 7.14.0) would appear like this:

[2021-08-11T13:04:18,744][ERROR][logstash.javapipeline    ][main] Pipeline worker error, the pipeline will be stopped {:pipeline_id=>"main", :error=>"(ArgumentError) invalid byte sequence in UTF-8", :exception=>Java::OrgJrubyExceptions::ArgumentError, :backtrace=>["org.jruby.RubyRegexp.match?(org/jruby/RubyRegexp.java:1180)", "usr.share.logstash.logstash_minus_core.lib.logstash.java_pipeline.start_workers(/usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb:295)"], :thread=>"#<Thread:0x2230aabb sleep>"}
[2021-08-11T13:04:18,749][INFO ][filewatch.observingread  ][main] QUIT - closing all files and shutting down.
[2021-08-11T13:04:18,750][WARN ][logstash.javapipeline    ][main] Waiting for input plugin to close {:pipeline_id=>"main", :thread=>"#<Thread:0x2230aabb run>"}

I have a post at https://discuss.elastic.co/t/logstash-7-14-0-invalid-byte-sequence-in-utf-8-in-logstash-javapipeline/280976/5

The short version is that for each string field you should use a LogStash::Util::Charset object and .convert it; this uses String.valid_encoding? to detect if a string is dodgy and then does a hack to clean it up a bit.

expected type double, got BigDecimal with value 0.0

We wan't to use this lib to use logstash with schemas from our schema-registry. But when deserializing the data we get this error:

Avro::SchemaParseError: Error validating default for workday_hours: at . expected type double, got BigDecimal with value 0.0

The relevant part of the schema:

{
  "name" : "workday_hours",
  "type" : {
    "type" : "double",
    "connect.default" : 0.0
  },
  "default" : 0.0
}

The column in our database was float(10, 2) (imported using debezium).

The call stack:

[2019-08-14T10:59:25,376][ERROR][logstash.javapipeline ] A plugin had an unrecoverable error. Will restart this plugin.
Pipeline_id:main
Plugin: <LogStash::Inputs::Kafka value_deserializer_class=>"org.apache.kafka.common.serialization.ByteArrayDeserializer", codec=><LogStash::Codecs::AvroSchemaRegistry endpoint=>"http://kafka.local:8081", id=>"9ec6aef1-7f4a-4db3-98e9-d4273a5cebbc", enable_metric=>true, check_compatibility=>false, register_schema=>false, binary_encoded=>true, tag_on_failure=>false, verify_mode=>"verify_peer">, auto_offset_reset=>"earliest", topics_pattern=>"^paymodb\\.paymoapp\\..*", id=>"kafka_paymodb", bootstrap_servers=>"kafka.local:9092", client_id=>"logstash", enable_metric=>true, auto_commit_interval_ms=>"5000", consumer_threads=>1, enable_auto_commit=>"true", group_id=>"logstash", key_deserializer_class=>"org.apache.kafka.common.serialization.StringDeserializer", topics=>["logstash"], poll_timeout_ms=>100, ssl_endpoint_identification_algorithm=>"https", security_protocol=>"PLAINTEXT", sasl_mechanism=>"GSSAPI", decorate_events=>false>
Error: Error validating default for workday_hours: at . expected type double, got BigDecimal with value 0.0
Exception: Avro::SchemaParseError
Stack: /usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/avro-1.9.0/lib/avro/schema.rb:403:in `validate_default!'
/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/avro-1.9.0/lib/avro/schema.rb:377:in `initialize'
/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/avro-1.9.0/lib/avro/schema.rb:216:in `block in make_field_objects'
org/jruby/RubyArray.java:1792:in `each'
org/jruby/RubyEnumerable.java:1194:in `each_with_index'
/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/avro-1.9.0/lib/avro/schema.rb:209:in `make_field_objects'
/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/avro-1.9.0/lib/avro/schema.rb:240:in `initialize'
/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/avro-1.9.0/lib/avro/schema.rb:72:in `real_parse'
/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/avro-1.9.0/lib/avro/schema.rb:161:in `subparse'
/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/avro-1.9.0/lib/avro/schema.rb:295:in `block in initialize'
org/jruby/RubyArray.java:1792:in `each'
org/jruby/RubyEnumerable.java:1204:in `each_with_object'
/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/avro-1.9.0/lib/avro/schema.rb:294:in `initialize'
/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/avro-1.9.0/lib/avro/schema.rb:90:in `real_parse'
/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/avro-1.9.0/lib/avro/schema.rb:161:in `subparse'
/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/avro-1.9.0/lib/avro/schema.rb:372:in `initialize'
/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/avro-1.9.0/lib/avro/schema.rb:216:in `block in make_field_objects'
org/jruby/RubyArray.java:1792:in `each'
org/jruby/RubyEnumerable.java:1194:in `each_with_index'
/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/avro-1.9.0/lib/avro/schema.rb:209:in `make_field_objects'
/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/avro-1.9.0/lib/avro/schema.rb:240:in `initialize'
/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/avro-1.9.0/lib/avro/schema.rb:72:in `real_parse'
/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/avro-1.9.0/lib/avro/schema.rb:38:in `parse'
/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-codec-avro_schema_registry-1.1.1/lib/logstash/codecs/avro_schema_registry.rb:158:in `get_schema'
/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-codec-avro_schema_registry-1.1.1/lib/logstash/codecs/avro_schema_registry.rb:228:in `decode'
/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-input-kafka-9.0.0/lib/logstash/inputs/kafka.rb:256:in `block in thread_runner'
/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-input-kafka-9.0.0/lib/logstash/inputs/kafka.rb:255:in `block in thread_runner'
[2019-08-14T10:59:25,405][FATAL][logstash.runner ] An unexpected error occurred! {:error=>#<Avro::SchemaParseError: Error validating default for workday_hours: at . expected type double, got BigDecimal with value 0.0>, :backtrace=>["/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/avro-1.9.0/lib/avro/schema.rb:403:in `validate_default!'", "/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/avro-1.9.0/lib/avro/schema.rb:377:in `initialize'", "/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/avro-1.9.0/lib/avro/schema.rb:216:in `block in make_field_objects'", "org/jruby/RubyArray.java:1792:in `each'", "org/jruby/RubyEnumerable.java:1194:in `each_with_index'", "/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/avro-1.9.0/lib/avro/schema.rb:209:in `make_field_objects'", "/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/avro-1.9.0/lib/avro/schema.rb:240:in `initialize'", "/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/avro-1.9.0/lib/avro/schema.rb:72:in `real_parse'", "/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/avro-1.9.0/lib/avro/schema.rb:161:in `subparse'", "/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/avro-1.9.0/lib/avro/schema.rb:295:in `block in initialize'", "org/jruby/RubyArray.java:1792:in `each'", "org/jruby/RubyEnumerable.java:1204:in `each_with_object'", "/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/avro-1.9.0/lib/avro/schema.rb:294:in `initialize'", "/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/avro-1.9.0/lib/avro/schema.rb:90:in `real_parse'", "/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/avro-1.9.0/lib/avro/schema.rb:161:in `subparse'", "/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/avro-1.9.0/lib/avro/schema.rb:372:in `initialize'", "/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/avro-1.9.0/lib/avro/schema.rb:216:in `block in make_field_objects'", "org/jruby/RubyArray.java:1792:in `each'", "org/jruby/RubyEnumerable.java:1194:in `each_with_index'", "/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/avro-1.9.0/lib/avro/schema.rb:209:in `make_field_objects'", "/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/avro-1.9.0/lib/avro/schema.rb:240:in `initialize'", "/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/avro-1.9.0/lib/avro/schema.rb:72:in `real_parse'", "/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/avro-1.9.0/lib/avro/schema.rb:38:in `parse'", "/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-codec-avro_schema_registry-1.1.1/lib/logstash/codecs/avro_schema_registry.rb:158:in `get_schema'", "/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-codec-avro_schema_registry-1.1.1/lib/logstash/codecs/avro_schema_registry.rb:228:in `decode'", "/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-input-kafka-9.0.0/lib/logstash/inputs/kafka.rb:256:in `block in thread_runner'", "/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-input-kafka-9.0.0/lib/logstash/inputs/kafka.rb:255:in `block in thread_runner'"]}

SyntaxError: /usr/share/logstash/vendor/bundle/jruby/1.9/gems/schema_registry-0.1.0/lib/schema_registry/client.rb:33: syntax error, unexpected tPOW

Hello
I've got this error on logstash 5.6.4 with installed plugin version 1.1.1

Sending Logstash's logs to /var/log/logstash which is now configured via log4j2.properties
21:38:59.734 [main] INFO  logstash.modules.scaffold - Initializing module {:module_name=>"fb_apache", :directory=>"/usr/share/logstash/modules/fb_apache/configuration"}
21:38:59.737 [main] INFO  logstash.modules.scaffold - Initializing module {:module_name=>"netflow", :directory=>"/usr/share/logstash/modules/netflow/configuration"}
21:38:59.743 [main] INFO  logstash.setting.writabledirectory - Creating directory {:setting=>"path.queue", :path=>"/var/lib/logstash/queue"}
21:38:59.744 [main] INFO  logstash.setting.writabledirectory - Creating directory {:setting=>"path.dead_letter_queue", :path=>"/var/lib/logstash/dead_letter_queue"}
21:38:59.772 [LogStash::Runner] INFO  logstash.agent - No persistent UUID file found. Generating new UUID {:uuid=>"45ee9744-50b2-41bc-a468-39c68e0713f6", :path=>"/var/lib/logstash/uuid"}
SyntaxError: /usr/share/logstash/vendor/bundle/jruby/1.9/gems/schema_registry-0.1.0/lib/schema_registry/client.rb:33: syntax error, unexpected tPOW
    def initialize(endpoint, username = nil, password = nil, **http_options)
                                                               ^
                 require at org/jruby/RubyKernel.java:1040
                 require at /usr/share/logstash/vendor/bundle/jruby/1.9/gems/polyglot-0.3.5/lib/polyglot.rb:65
                  (root) at /usr/share/logstash/vendor/bundle/jruby/1.9/gems/schema_registry-0.1.0/lib/schema_registry.rb:16
                 require at org/jruby/RubyKernel.java:1040
                 require at /usr/share/logstash/vendor/bundle/jruby/1.9/gems/polyglot-0.3.5/lib/polyglot.rb:65
                  (root) at /usr/share/logstash/vendor/bundle/jruby/1.9/gems/logstash-codec-avro_schema_registry-1.1.1/lib/logstash/codecs/avro_schema_registry.rb:1
                 require at org/jruby/RubyKernel.java:1040
                 require at /usr/share/logstash/vendor/bundle/jruby/1.9/gems/polyglot-0.3.5/lib/polyglot.rb:65
                  (root) at /usr/share/logstash/vendor/bundle/jruby/1.9/gems/logstash-codec-avro_schema_registry-1.1.1/lib/logstash/codecs/avro_schema_registry.rb:4
                  (root) at /usr/share/logstash/logstash-core/lib/logstash/plugins/registry.rb:1
           legacy_lookup at /usr/share/logstash/logstash-core/lib/logstash/plugins/registry.rb:156
                  lookup at /usr/share/logstash/logstash-core/lib/logstash/plugins/registry.rb:138
  lookup_pipeline_plugin at /usr/share/logstash/logstash-core/lib/logstash/plugins/registry.rb:180
                  lookup at /usr/share/logstash/logstash-core/lib/logstash/plugin.rb:140
                    eval at org/jruby/RubyKernel.java:1079
                  plugin at /usr/share/logstash/logstash-core/lib/logstash/pipeline.rb:103
              initialize at (eval):8
              initialize at /usr/share/logstash/logstash-core/lib/logstash/pipeline.rb:75
              initialize at /usr/share/logstash/logstash-core/lib/logstash/pipeline.rb:165
         create_pipeline at /usr/share/logstash/logstash-core/lib/logstash/agent.rb:296
       register_pipeline at /usr/share/logstash/logstash-core/lib/logstash/agent.rb:95
                 execute at /usr/share/logstash/logstash-core/lib/logstash/runner.rb:313
                     run at /usr/share/logstash/vendor/bundle/jruby/1.9/gems/clamp-0.6.5/lib/clamp/command.rb:67
                  (root) at /usr/share/logstash/lib/bootstrap/environment.rb:71

This is my Docker file

FROM logstash:5.6.4

# Install Logstash Codec Plugin for Avro Schema Registry
# https://github.com/revpoint/logstash-codec-avro_schema_registry#logstash-codec---avro-schema-registry
RUN logstash-plugin install --version 1.1.1 logstash-codec-avro_schema_registry

NoMethodError: undefined method `to_java_bytes'

Hello, I'm doing a bit of development with this codec (thank you for making this available, much appreciated). I struck an issue that may be due to change in logstash.

With logstash 6.3 (I haven't tried other versions), I get the following stack-trace

Jun 18 12:47:02 node-1 logstash[11790]: [FATAL] 2018-06-18 00:47:02.731 [LogStash::Runner] runner - An unexpected error occurred! {:error=>#<NoMethodError: undefined method `to_java_bytes' for #<#<Class:0x427befaa>:0x3ad546be>
Jun 18 12:47:02 node-1 logstash[11790]: Did you mean?  to_java_object>, :backtrace=>[
"/usr/share/logstash/vendor/bundle/jruby/2.3.0/gems/logstash-output-kafka-7.0.10/lib/logstash/outputs/kafka.rb:199:in `block in register'", 
"/usr/share/logstash/vendor/bundle/jruby/2.3.0/gems/logstash-codec-avro_schema_registry-1.1.0/lib/logstash/codecs/avro_schema_registry.rb:251:in `encode'", 
"/usr/share/logstash/vendor/bundle/jruby/2.3.0/gems/logstash-output-kafka-7.0.10/lib/logstash/outputs/kafka.rb:221:in `block in multi_receive'", 
"org/jruby/RubyArray.java:1734:in `each'", 
"/usr/share/logstash/vendor/bundle/jruby/2.3.0/gems/logstash-output-kafka-7.0.10/lib/logstash/outputs/kafka.rb:219:in `multi_receive'", 
"org/logstash/config/ir/compiler/OutputStrategyExt.java:109:in `multi_receive'", 
"org/logstash/config/ir/compiler/OutputDelegatorExt.java:156:in `multi_receive'", "/usr/share/logstash/logstash-core/lib/logstash/pipeline.rb:475:in `block in output_batch'", 
"org/jruby/RubyHash.java:1343:in `each'", 
"/usr/share/logstash/logstash-core/lib/logstash/pipeline.rb:474:in `output_batch'", 
"/usr/share/logstash/logstash-core/lib/logstash/pipeline.rb:426:in `worker_loop'", 
"/usr/share/logstash/logstash-core/lib/logstash/pipeline.rb:384:in `block in start_workers'"]}
Jun 18 12:47:03 node-1 logstash[11790]: [ERROR] 2018-06-18 00:47:03.076 [LogStash::Runner] Logstash - java.lang.IllegalStateException: Logstash stopped processing because of an error: (SystemExit) exit

This appears to occur because of the following code in avro_schema_registry.rb

/usr/share/logstash/vendor/bundle/jruby/2.3.0/gems/logstash-codec-avro_schema_registry-1.1.0/lib/logstash/codecs/avro_schema_registry.rb

    if @binary_encoded
       @on_event.call(event, buffer.string.to_java_bytes)   # <--- line 251

This points to a to_java_bytes method, but this seems to cause issues with kafka.rb

(/usr/share/logstash/vendor/bundle/jruby/2.3.0/gems/logstash-output-kafka-7.0.10/lib/logstash/outputs/kafka.rb)

    elsif value_serializer == 'org.apache.kafka.common.serialization.ByteArraySerializer'
      @codec.on_event do |event, data|
        write_to_kafka(event, data.to_java_bytes)   # <--- line 199
      end

So data here is presumably already a to_java_bytes.... and is why jruby is complaining that it doesn't have to_java_bytes method.

Changing avro_schema_registry.rb to pass in buffer.string instead of buffer.string.to_java_bytes seems to fix this problem, and I get the results I am expecting.

    if @binary_encoded
       #@on_event.call(event, buffer.string.to_java_bytes)
       @on_event.call(event, buffer.string)
    else
       @on_event.call(event, Base64.strict_encode64(buffer.string))
    end

Disclaimer: I am doing some custom development on your plugin to support my own use-case (its not in any public repo at this time, too early). In my use-case, I have an AVRO schema that I use to encapsulate the event in a logstash reception tier. I have various fields that are used in a backbone format, and the event goes into a 'message' field (and could be json, some binary payload, etc.). That said, the same behaviour occurs when I use avro_schema_registry.rb as per your current HEAD.

Possibly related to logstash-plugins/logstash-output-kafka#123

Here is my configuration and testing; after fixing the issue.

My logstash output configuration:

input {
    tcp {
        host => "0.0.0.0"
        port => 5140
        mode => "server"
        codec => "json_lines"
    }
}

output {
  kafka {
    topic_id => "wapiti_backbone_sandpit"
    compression_type => "snappy"

    codec => avro_schema_registry {
      endpoint => "http://127.0.0.1:8081"
      subject_name => "wapiti_backbone_submitted-value"
      schema_version => "3"
      register_schema => false
      binary_encoded => true
    }

    value_serializer => "org.apache.kafka.common.serialization.ByteArraySerializer"
  }
}

Current version of schema (will change 'message' to bytes soon).

{
  "name": "wapiti_backbone",
  "namespace": "local.wapiti",
  "doc":"A log event on a Wapiti Backbone topic",
  "type": "record",

  "fields": [
    {
      "name":"submission_time",
      "type":"long",
      "logical-type":"timestamp-millis",
      "doc":"When first seen at log submission. ms since Unix epoch"
    },
    {
      "name":"submitted_from",
      "type":"string",
      "doc":"Source Hostname or IP at entry of log submission"
    },
    {
      "name":"originating_host",
      "type":"string",
      "doc":"Hostname of the host the log was first created on"
    },
    {
      "name":"vertical",
      "type":"string",
      "doc":"Business application, eg. 'corporate-website'"
    },
    {
      "name":"environment",
      "type":"string",
      "doc":"Business environment, eg. 'dev' or 'prod'"
    },
    {
      "name":"processing_key",
      "type":"string",
      "doc":"Processing key, eg. 'apache-httpd-access-combined'"
    },
    {
      "name":"message_format",
      "type":"string",
      "doc":"format of data contained in 'message' field, eg. 'json', 'syslog', ..."
    },
    {
      "name":"message",
      "type":"string",
      "doc":"The encapsulated message payload."
    }
  ]
}

Test input:

[vagrant@node-1 ~]$ echo "{\"breath_in\": \"$(date --iso-8601=ns)\"}" | nc 127.0.0.1 5140

Output when viewed using kafka-avro-console-consumer:

{"submission_time":0,"submitted_from":"SUBMITTED_FROM","originating_host":"ORIGINATING_HOST","vertical":"VERTICAL","environment":"ENVIRONMENT","processing_key":"PROCESSING_KEY","message_format":"json","message":"{\"host\":\"node-1\",\"@timestamp\":\"2018-06-18T01:27:59.221Z\",\"breath_in\":\"2018-06-18T13:27:59,081679157+1200\",\"@version\":\"1\",\"port\":48710}"}

Tombstones are swallowed in an Avro topic: please give them to the pipeline, with [@metadata][kafka][tombstone]=true

Using Logstash version 8.13.0

When tombstones are sent to the input topic, they are swallowed by the plugin and never surface in the pipeline.
Kafka tombstones are events with a null value.
They are used to signal the deletion of the entity behind the key of the message.
Here, we use an Avro-schema for the topic: this may be the source of the problem.
See an example pipeline below.

This null-management is crucial because we use Logstash to ingest Kafka topics into ElasticSearch, so we can index the entities present in the events, and entites can be deleted (they are streamed from a database): such deleted entities should be unindexed in ElasticSearch.

It seems like the problem lies in this file:
https://github.com/revpoint/logstash-codec-avro_schema_registry/blob/master/lib/logstash/codecs/avro_schema_registry.rb#L218

We clearly see the warning that is printed by Logstash when it encounters a null value:

  public
  def decode(data)
    if data.length < 5
      @logger.error('message is too small to decode')
    else

It could be just a test if data.length == 0 to trigger the tombstone handling:
Giving an empty log with one field: [@metadata][kafka][tombstone]=true

Steps to reproduce:

  1. Publish an event with a key, and a null value into a Kafka topic that is provided as the input of a Logstash pipeline
  2. Use stdout output and see that nothing is printed for such event
  3. Use a filter to add some metadata when a mandatory field is not present (likely a tombstone message) and see that nothing is printed for such event

Here is a minimum pipeline showing that null events are not sent to the pipeline, and a sample of our use-case we would like to achieve:

input {
  kafka {
    bootstrap_servers => ["${KAFKA_BROKER_URL}"]
    topics => ["${TOPIC}"]
    security_protocol => "SASL_SSL"
    sasl_mechanism => "PLAIN"
    sasl_jaas_config => "${SASL_JAAS_CONFIG}"
    ssl_endpoint_identification_algorithm => "https"
    group_id => "${GROUP_ID}"
    auto_offset_reset => "${AUTO_OFFSET_RESET}"
    isolation_level => "read_committed"
    codec => avro_schema_registry {
      endpoint => "${KAFKA_SCHEMA_REGISTRY_URL}"
      username => "${KAFKA_SCHEMA_REGISTRY_USERNAME}"
      password => "${KAFKA_SCHEMA_REGISTRY_PASSWORD}"
    }
    value_deserializer_class => "org.apache.kafka.common.serialization.ByteArrayDeserializer"
    decorate_events => "basic"
  }
}

# Here, it's already too late: the event is not processed, so there is nothing we can filter on...
# filter {
# }

output {
  # Here, we'd like to do this:
  # if [@metadata][kafka][tombstone] {
  #   elasticsearch {
  #     action => "delete"
  #     ...
  #   }
  # } else {
  #   elasticsearch {
  #     action => "index"
  #     ...
  #   }
  # }

  stdout {
    codec => rubydebug { metadata => true }
  }
}

exception when decorate_events is set to true for kafka input plugin

This is my input kafka plugin configuration

input {
    kafka {
        bootstrap_servers => "th-kafka-svc:9093" # example: dc-cdf-master.ad.interset.com:9092
        topics => ["avro-event"]
        ...
        decorate_events => true
        key_deserializer_class => "org.apache.kafka.common.serialization.ByteArrayDeserializer"
        value_deserializer_class => "org.apache.kafka.common.serialization.ByteArrayDeserializer"
        codec => avro_schema_registry {
            endpoint => "https://th-schema-svc:8081"
            client_key => "/vault-crt/logstash.key"
            client_certificate => "/vault-crt/logstash.crt"
            ca_certificate => "/vault-crt/trustedCAs/RE_ca.crt"
            verify_mode => "verify_peer"
        }
    }
}

so do decorate_events is set to true, i see schema registry codec throws exception as below,

org.logstash.MissingConverterException: Missing Converter handling for full class name=[B, simple name=byte[]
at org.logstash.Valuefier.fallbackConvert(Valuefier.java:118)
at org.logstash.Valuefier.convert(Valuefier.java:96)
at org.logstash.Valuefier.lambda$static$3(Valuefier.java:72)
at org.logstash.Valuefier.convert(Valuefier.java:94)
at org.logstash.ext.JrubyEventExtLibrary$RubyEvent.safeValueifierConvert(JrubyEventExtLibrary.java:355)
at org.logstash.ext.JrubyEventExtLibrary$RubyEvent.ruby_set_field(JrubyEventExtLibrary.java:121)
at org.logstash.ext.JrubyEventExtLibrary$RubyEvent$INVOKER$i$2$0$ruby_set_field.call(JrubyEventExtLibrary$RubyEvent$INVOKER$i$2$0$ruby_set_field.gen)
at org.jruby.internal.runtime.methods.JavaMethod$JavaMethodN.call(JavaMethod.java:835)
at org.jruby.ir.targets.InvokeSite.invoke(InvokeSite.java:207)
at usr.share.logstash.vendor.bundle.jruby.$2_dot_5_dot_0.gems.logstash_minus_integration_minus_kafka_minus_10_dot_5_dot_3_minus_java.lib.logstash.inputs.kafka.RUBY$block$thread_runner$3(/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-integration-kafka-10.5.3-java/lib/logstash/inputs/kafka.rb:286)
at org.jruby.ir.targets.YieldSite.yield(YieldSite.java:110)
at usr.share.logstash.vendor.bundle.jruby.$2_dot_5_dot_0.gems.logstash_minus_codec_minus_avro_schema_registry_minus_1_dot_2_dot_0.lib.logstash.codecs.avro_schema_registry.RUBY$method$decode$0(/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-codec-avro_schema_registry-1.2.0/lib/logstash/codecs/avro_schema_registry.rb:233)
at usr.share.logstash.vendor.bundle.jruby.$2_dot_5_dot_0.gems.logstash_minus_codec_minus_avro_schema_registry_minus_1_dot_2_dot_0.lib.logstash.codecs.avro_schema_registry.RUBY$method$decode$0$VARARGS(/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-codec-avro_schema_registry-1.2.0/lib/logstash/codecs/avro_schema_registry.rb)
at org.jruby.internal.runtime.methods.CompiledIRMethod.call(CompiledIRMethod.java:80)
at org.jruby.internal.runtime.methods.MixedModeIRMethod.call(MixedModeIRMethod.java:70)
er$2(/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-integration-kafka-10.5.3-java/lib/logstash/inputs/kafka.rb:279)
at org.jruby.runtime.CompiledIRBlockBody.yieldDirect(CompiledIRBlockBody.java:148)
at org.jruby.runtime.BlockBody.yield(BlockBody.java:106)
at org.jruby.runtime.Block.yield(Block.java:184)
at org.jruby.javasupport.ext.JavaLang$Iterable.each(JavaLang.java:98)
at org.jruby.javasupport.ext.JavaLang$Iterable$INVOKER$s$0$0$each.call(JavaLang$Iterable$INVOKER$s$0$0$each.gen)
at org.jruby.internal.runtime.methods.JavaMethod$JavaMethodZeroBlock.call(JavaMethod.java:555)
at org.jruby.ir.targets.InvokeSite.invoke(InvokeSite.java:197)
at usr.share.logstash.vendor.bundle.jruby.$2_dot_5_dot_0.gems.logstash_minus_integration_minus_kafka_minus_10_dot_5_dot_3_minus_java.lib.logstash.inputs.kafka.RUBY$block$thread_runner$1(/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-integration-kafka-10.5.3-java/lib/logstash/inputs/kafka.rb:278)
at org.jruby.runtime.CompiledIRBlockBody.callDirect(CompiledIRBlockBody.java:138)
at org.jruby.runtime.IRBlockBody.call(IRBlockBody.java:58)
at org.jruby.runtime.IRBlockBody.call(IRBlockBody.java:52)
at org.jruby.runtime.Block.call(Block.java:139)
at org.jruby.RubyProc.call(RubyProc.java:318)
at org.jruby.internal.runtime.RubyRunnable.run(RubyRunnable.java:105)
at java.lang.Thread.run(Thread.java:748)

Can you help to fix this issue.

Add a notice about deserializers in logstash-5

key_deserializer_class => "org.apache.kafka.common.serialization.ByteArrayDeserializer"
value_deserializer_class => "org.apache.kafka.common.serialization.ByteArrayDeserializer"

should be added to the kafka input options in the examples.

Error to decode - avro_schema_registry

  • logstash version 8.14.3
  • plugins versions:
    logstash-codec-avro (3.4.1)
    logstash-codec-avro_schema_registry (1.2.0)
  • Configuracion logstash:
    input
    {
    kafka
    {
    bootstrap_servers => ["vppevkfbobpe001:9092,vppevkfbobpe002:9092,vppevkfbobpe003:9092"]
    group_id => "datahub_audience_event_tools_group"
    topics => ["Audience_Event"]
    #decorate_events => true
    codec => avro_schema_registry {
    endpoint => ""
    #tag_on_failure => true
    }
    consumer_threads => 1
    value_deserializer_class => "org.apache.kafka.common.serialization.ByteArrayDeserializer"
    }
    }

Error:
Debería convertirlo directamente
[2024-07-31T06:25:33,133][ERROR][org.logstash.Logstash ][test][a9ed325a79c7c8b8081d2f9e256338696c246c834c93fa755e87ffe03ed66ff6] uncaught exception (in thread kafka-input-worker-logstash-0)
org.jruby.exceptions.RangeError: (RangeError) integer 1722256768896 too big to convert to `int'
at org.jruby.ext.stringio.StringIO.read(org/jruby/ext/stringio/StringIO.java:857) ~[stringio.jar:?]
at RUBY.read(/usr/share/logstash/vendor/bundle/jruby/3.1.0/gems/avro-1.10.2/lib/avro/io.rb:106) ~[?:?]
at RUBY.read_bytes(/usr/share/logstash/vendor/bundle/jruby/3.1.0/gems/avro-1.10.2/lib/avro/io.rb:93) ~[?:?]
at RUBY.read_string(/usr/share/logstash/vendor/bundle/jruby/3.1.0/gems/avro-1.10.2/lib/avro/io.rb:99) ~[?:?]
at RUBY.read_data(/usr/share/logstash/vendor/bundle/jruby/3.1.0/gems/avro-1.10.2/lib/avro/io.rb:276) ~[?:?]
at RUBY.read_record(/usr/share/logstash/vendor/bundle/jruby/3.1.0/gems/avro-1.10.2/lib/avro/io.rb:364) ~[?:?]
at org.jruby.RubyArray.each(org/jruby/RubyArray.java:1981) ~[jruby.jar:?]
at RUBY.read_record(/usr/share/logstash/vendor/bundle/jruby/3.1.0/gems/avro-1.10.2/lib/avro/io.rb:361) ~[?:?]
at RUBY.read_data(/usr/share/logstash/vendor/bundle/jruby/3.1.0/gems/avro-1.10.2/lib/avro/io.rb:287) ~[?:?]
at RUBY.read(/usr/share/logstash/vendor/bundle/jruby/3.1.0/gems/avro-1.10.2/lib/avro/io.rb:252) ~[?:?]
at RUBY.decode(/usr/share/logstash/vendor/bundle/jruby/3.1.0/gems/logstash-codec-avro_schema_registry-1.2.0/lib/logstash/codecs/avro_schema_registry.rb:233) ~[?:?]
at RUBY.handle_record(/usr/share/logstash/vendor/bundle/jruby/3.1.0/gems/logstash-integration-kafka-11.4.2-java/lib/logstash/inputs/kafka.rb:360) ~[?:?]
at RUBY.thread_runner(/usr/share/logstash/vendor/bundle/jruby/3.1.0/gems/logstash-integration-kafka-11.4.2-java/lib/logstash/inputs/kafka.rb:329) ~[?:?]
at RUBY.thread_runner(/usr/share/logstash/vendor/bundle/jruby/3.1.0/gems/logstash-integration-kafka-11.4.2-java/lib/logstash/inputs/kafka.rb:329) ~[?:?]
[2024-07-31T06:25:33,209][INFO ][logstash.javapipeline ][test] Pipeline terminated {"pipeline.id"=>"test"}

OpenSSL::SSL::SSLError: Received fatal alert: handshake_failure

L.S.

I get this error:

OpenSSL::SSL::SSLError: Received fatal alert: handshake_failure
    connect_nonblock at org/jruby/ext/openssl/SSLSocket.java:276
  ssl_socket_connect at uri:classloader:/META-INF/jruby.home/lib/ruby/stdlib/net/protocol.rb:44
             connect at uri:classloader:/META-INF/jruby.home/lib/ruby/stdlib/net/http.rb:985
            do_start at uri:classloader:/META-INF/jruby.home/lib/ruby/stdlib/net/http.rb:924
               start at uri:classloader:/META-INF/jruby.home/lib/ruby/stdlib/net/http.rb:913
               start at uri:classloader:/META-INF/jruby.home/lib/ruby/stdlib/net/http.rb:609
             request at /usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/schema_registry-0.1.0/lib/schema_registry/client.rb:101
              schema at /usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/schema_registry-0.1.0/lib/schema_registry/client.rb:40
          get_schema at /usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-codec-avro_schema_registry-1.2.0/lib/logstash/codecs/avro_schema_registry.rb:160
              encode at /usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-codec-avro_schema_registry-1.2.0/lib/logstash/codecs/avro_schema_registry.rb:248
       multi_receive at /usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-output-kafka-7.3.2/lib/logstash/outputs/kafka.rb:223
                each at org/jruby/RubyArray.java:1792
       multi_receive at /usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-output-kafka-7.3.2/lib/logstash/outputs/kafka.rb:221
       multi_receive at org/logstash/config/ir/compiler/OutputStrategyExt.java:118
       multi_receive at org/logstash/config/ir/compiler/AbstractOutputDelegatorExt.java:101
        output_batch at /usr/share/logstash/logstash-core/lib/logstash/pipeline.rb:390
                each at org/jruby/RubyHash.java:1419
        output_batch at /usr/share/logstash/logstash-core/lib/logstash/pipeline.rb:389
         worker_loop at /usr/share/logstash/logstash-core/lib/logstash/pipeline.rb:341
       start_workers at /usr/share/logstash/logstash-core/lib/logstash/pipeline.rb:304

The config is:

kafka {
      codec => avro_schema_registry {
          endpoint => "https://XXXXXX.somewhere"
          subject_name => "subj-name"
          schema_id => 1121
          schema_uri => "https://XXXXXX.somewhere"
          base64_encoded => false
          verify_mode => "verify_none"
      }
      ssl_truststore_location => "/XXXX/XXXXX/xxxx.jks"
      ssl_truststore_password => "xxxxxxx"
      ssl_keystore_location   => "/XXXX/XXXXXs/xxxxx.jks"
      ssl_key_password        => "xxxxxxx"
      ssl_keystore_password   => "xxxxxxx"
      security_protocol       => "SSL"
    bootstrap_servers => "XXXXXX.somewhere:9092"
    topic_id => "scema_name"
    value_serializer => "org.apache.kafka.common.serialization.ByteArraySerializer"
  }

What could cause this handshake failure?

Issue on decoding timestamps in UNIX format

Hello, I've installed the plugin on my LS installation (v.5.2.1) and I configured my kafka input section for using it. I started LS and the events start flowing from my Kafka queue towards a file (just for testing) and I was surprised to see that every field is correctly converted except the time fields.

{"eventId":"my_eventId","@timestamp":"2017-08-11T10:26:53.799Z","activity":{"activityTime":-111032204562303474814398456,"startTime":-52944280892516839416,"type":"my_type","watchType":"my_watch_type","uuid":"my_uuid"},"@Version":"1","type":"My_type","device":{"deviceId":"my_deviceId","version":"My_version"},"timestamp":-111032204562303474814398456}

I thought about that there were problems on the data so I tried to consume them with a kafka-consumer and they are correctly interpreted and translated

{"eventId":"my-eventId","timestamp":1502445430,"device":{"deviceId":"my_deviceId","version":"my_version"},"activity":{"activityTime":1502445430,"type":"my_type","watchType":"my_watch_type","startTime":1501848509,"uuid":"my_uuid"}}

As you can see, the events managed by Logstash are damaged: all the fields that contain a date are wrongly converted while the kafka consumer converts them to the correct UNIX format.

Into the AVRO schema the fields activityTime, activityStart and timestamp are defined as int.
Logstash do nothing apart taking the input from the Kafka queue and sending it to a test file adding its service fields (no filters are applied).

Regards.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.