Git Product home page Git Product logo

awslabs / aws-glue-schema-registry Goto Github PK

View Code? Open in Web Editor NEW
117.0 17.0 94.0 1.69 MB

AWS Glue Schema Registry Client library provides serializers / de-serializers for applications to integrate with AWS Glue Schema Registry Service. The library currently supports Avro, JSON and Protobuf data formats. See https://docs.aws.amazon.com/glue/latest/dg/schema-registry.html to get started.

License: Apache License 2.0

Java 99.69% Shell 0.31%

aws-glue-schema-registry's Introduction

AWS Glue Schema Registry Library

Build Status CI Status Apache 2 License Java

AWS Glue Schema Registry provides a solution for customers to centrally discover, control and evolve schemas while ensuring data produced was validated by registered schemas. AWS Glue Schema Registry Library offers Serializers and Deserializers that plug-in with Glue Schema Registry.

Getting Started

  1. Sign up for AWS — Before you begin, you need an AWS account. For more information about creating an AWS account and retrieving your AWS credentials, see AWS Account and Credentials in the AWS SDK for Java Developer Guide.
  2. Sign up for AWS Glue Schema Registry — Go to the AWS Glue Schema Registry console to sign up for the service and create an AWS Glue Schema Registry. For more information, see Getting Started with Glue Schema Registry in the AWS Glue Developer Guide.
  3. Minimum requirements — To use the AWS Glue Schema Registry, you'll need Java > 1.8 and < Java 15.

Features

  1. Messages/records are serialized on producer front and deserialized on the consumer front by using schema-registry-serde.
  2. Support for three data formats: AVRO, JSON (with JSON Schema Draft04, Draft06, Draft07), and Protocol Buffers (Protobuf syntax versions 2 and 3).
  3. Kafka Streams support for AWS Glue Schema Registry.
  4. Records can be compressed to reduce message size.
  5. An inbuilt local in-memory cache to save calls to AWS Glue Schema Registry. The schema version id for a schema definition is cached on Producer side and schema for a schema version id is cached on the Consumer side.
  6. Auto registration of schema can be enabled for any new schema to be auto-registered.
  7. For Schemas, Evolution check is performed while registering.
  8. Migration from a third party Schema Registry.
  9. Flink support for AWS Glue Schema Registry.
  10. Kafka Connect support for AWS Glue Schema Registry.

Building from Source

After you've downloaded the code from GitHub, you can build it using Maven.

The following maven command will clean the target directory, compile the project, execute the tests and package the project build into a JAR.

cd build-tools/ && mvn clean install && cd .. && mvn clean install

Alternatively, one could git clone this repo and run mvn clean install.

Testing

To simply run the tests, execute the following maven command:

mvn test

Using the AWS Glue Schema Registry Library Serializer / Deserializer

The recommended way to use the AWS Glue Schema Registry Library for Java is to consume it from Maven.

Using AWS Glue Schema Registry with Amazon MSK — To set-up Amazon Managed Streaming for Apache Kafka see Getting started with Amazon MSK.

Maven Dependency

<dependency>
    <groupId>software.amazon.glue</groupId>
    <artifactId>schema-registry-serde</artifactId>
    <version>1.1.20</version>
</dependency>

Code Example

Producer for Kafka with AVRO format

        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, GlueSchemaRegistryKafkaSerializer.class.getName());
        properties.put(AWSSchemaRegistryConstants.DATA_FORMAT, DataFormat.AVRO.name());
        properties.put(AWSSchemaRegistryConstants.AWS_REGION, "us-east-1");
        properties.put(AWSSchemaRegistryConstants.REGISTRY_NAME, "my-registry");
        properties.put(AWSSchemaRegistryConstants.SCHEMA_NAME, "my-schema");

        Schema schema_payment = null;
        try {
            schema_payment = parser.parse(new File("src/main/resources/avro/com/tutorial/Payment.avsc"));
        } catch (IOException e) {
            e.printStackTrace();
        }
        
        GenericRecord musical = new GenericData.Record(schema_payment);
        musical.put("id", "entertainment_2");
        musical.put("amount", 105.0);

        List<GenericRecord> misc = new ArrayList<>();
        misc.add(musical);

        try (KafkaProducer<String, GenericRecord> producer = new KafkaProducer<String, GenericRecord>(properties)) {
            for (int i = 0; i < 4; i++) {
                GenericRecord r = misc.get(i);

                final ProducerRecord<String, GenericRecord> record;
                record = new ProducerRecord<String, GenericRecord>(topic, r.get("id").toString(), r);

                producer.send(record);
                System.out.println("Sent message " + i);
                Thread.sleep(1000L);
            }
            producer.flush();
            System.out.println("Successfully produced 10 messages to a topic called " + topic);

        } catch (final InterruptedException | SerializationException e) {
            e.printStackTrace();
        }

Consumer for Kafka with AVRO format

        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, GlueSchemaRegistryKafkaDeserializer.class
        .getName();
        properties.put(AWSSchemaRegistryConstants.AWS_REGION, "us-east-1");
        properties.put(AWSSchemaRegistryConstants.AVRO_RECORD_TYPE, AvroRecordType.GENERIC_RECORD.getName());
        
        try (final KafkaConsumer<String, GenericRecord> consumer = new KafkaConsumer<String, GenericRecord>(properties)) {
            consumer.subscribe(Collections.singletonList(topic));

            while (true) {
                final ConsumerRecords<String, GenericRecord> records = consumer.poll(100);
                for (final ConsumerRecord<String, GenericRecord> record : records) {
                    final String key = record.key();
                    final GenericRecord value = record.value();
                    System.out.println("Received message: key = " + key + ", value = " + value);
                }
            }
        }

Producer for Kafka with JSON format

        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, GlueSchemaRegistryKafkaSerializer.class.getName());
        properties.put(AWSSchemaRegistryConstants.DATA_FORMAT, DataFormat.JSON.name());
        properties.put(AWSSchemaRegistryConstants.AWS_REGION, "us-east-1");
        properties.put(AWSSchemaRegistryConstants.REGISTRY_NAME, "my-registry");
        properties.put(AWSSchemaRegistryConstants.SCHEMA_NAME, "my-schema");
        
        String jsonSchema = "{\n" + "        \"$schema\": \"http://json-schema.org/draft-04/schema#\",\n"
                                                + "        \"type\": \"object\",\n" + "        \"properties\": {\n" + "          \"employee\": {\n"
                                                + "            \"type\": \"object\",\n" + "            \"properties\": {\n"
                                                + "              \"name\": {\n" + "                \"type\": \"string\"\n" + "              },\n"
                                                + "              \"age\": {\n" + "                \"type\": \"integer\"\n" + "              },\n"
                                                + "              \"city\": {\n" + "                \"type\": \"string\"\n" + "              }\n"
                                                + "            },\n" + "            \"required\": [\n" + "              \"name\",\n"
                                                + "              \"age\",\n" + "              \"city\"\n" + "            ]\n" + "          }\n"
                                                + "        },\n" + "        \"required\": [\n" + "          \"employee\"\n" + "        ]\n"
                                                + "      }";
        String jsonPayload = "{\n" + "        \"employee\": {\n" + "          \"name\": \"John\",\n" + "          \"age\": 30,\n"
                                                 + "          \"city\": \"New York\"\n" + "        }\n" + "      }";
        
        JsonDataWithSchema jsonSchemaWithData = JsonDataWithSchema.builder(jsonSchema, jsonPayload).build();

        List<JsonDataWithSchema> genericJsonRecords = new ArrayList<>();
        genericJsonRecords.add(jsonSchemaWithData);
        
        try (KafkaProducer<String, JsonDataWithSchema> producer = new KafkaProducer<String, JsonDataWithSchema>(properties)) {
            for (int i = 0; i < genericJsonRecords.size(); i++) {
                JsonDataWithSchema r = genericJsonRecords.get(i);

                final ProducerRecord<String, JsonDataWithSchema> record;
                record = new ProducerRecord<String, JsonDataWithSchema>(topic, "message-" + i, r);

                producer.send(record);
                System.out.println("Sent message " + i);
                Thread.sleep(1000L);
            }
            producer.flush();
            System.out.println("Successfully produced 10 messages to a topic called " + topic);

        } catch (final InterruptedException | SerializationException e) {
            e.printStackTrace();
        }

Consumer for Kafka with JSON format

        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, GlueSchemaRegistryKafkaDeserializer.class
        .getName();
        properties.put(AWSSchemaRegistryConstants.AWS_REGION, "us-east-1");
        
        try (final KafkaConsumer<String, JsonDataWithSchema> consumer = new KafkaConsumer<String, JsonDataWithSchema>(properties)) {
            consumer.subscribe(Collections.singletonList(topic));

            while (true) {
                final ConsumerRecords<String, JsonDataWithSchema> records = consumer.poll(100);
                for (final ConsumerRecord<String, JsonDataWithSchema> record : records) {
                    final String key = record.key();
                    final JsonDataWithSchema value = record.value();
                    System.out.println("Received message: key = " + key + ", value = " + value);
                }
            }
        }

Producer for Kafka with PROTOBUF format

        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, GlueSchemaRegistryKafkaSerializer.class.getName());
        properties.put(AWSSchemaRegistryConstants.DATA_FORMAT, DataFormat.PROTOBUF.name());
        properties.put(AWSSchemaRegistryConstants.AWS_REGION, "us-east-1");
        properties.put(AWSSchemaRegistryConstants.REGISTRY_NAME, "my-registry");
        properties.put(AWSSchemaRegistryConstants.SCHEMA_NAME, "protobuf-file-name.proto")

        // POJO production

        // CustomerAddress is the generated Protocol Buffers class based on the given Protobuf schema definition
        CustomerAddress customerAddress = CustomerAddress.newBuilder().build();
        
        KafkaProducer<String, CustomerAddress> producer = 
             new KafkaProducer<String, CustomerAddress>(properties);
             
        producer.send(customerAddress);
        
        // DynamicMessage production

        DynamicMesssage customerDynamicMessage = 
             DynamicMessage.newBuilder(CustomerAddress.getDescriptor()).build();

        KafkaProducer<String, DynamicMesssage> producer = 
             new KafkaProducer<String, DynamicMesssage>(properties);
        
        producer.send(customerDynamicMessage);

Consumer for Kafka with PROTOBUF format

        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, GlueSchemaRegistryKafkaDeserializer.class.getName());
        properties.put(AWSSchemaRegistryConstants.AWS_REGION, "us-east-1");

        // POJO consumption
        
        properties.put(AWSSchemaRegistryConstants.PROTOBUF_MESSAGE_TYPE, ProtobufMessageType.POJO.getName());
        
        KafkaConsumer<String, CustomerAddress> consumer = 
             new KafkaConsumer<String, CustomerAddress>(properties)
        
        consumer.subscribe(Collections.singletonList(topic));
        
        final ConsumerRecords<String, CustomerAddress> records = consumer.poll(10);
        records
            .stream()
            .forEach(record -> processRecord(record))
            
        // DynamicMessage consumption

        // This is optional. By default AWSSchemaRegistryConstants.PROTOBUF_MESSAGE_TYPE is set as ProtobufMessageType.DYNAMIC_MESSAGE.getName()
        properties.put(AWSSchemaRegistryConstants.PROTOBUF_MESSAGE_TYPE, ProtobufMessageType.DYNAMIC_MESSAGE.getName());
        
        KafkaConsumer<String, DynamicMessage> consumer = 
             new KafkaConsumer<String, DynamicMesssage>(properties)
        
        consumer.subscribe(Collections.singletonList(topic));
        
        final ConsumerRecords<String, DynamicMessage> records = consumer.poll(10);
        records
            .stream()
            .forEach(record -> processRecord(record))

Dealing with Specific Record (JAVA POJO) for JSON

You could use a Java POJO and pass the object as a record. We use mbknor-jackson-jsonschema to generate a JSON Schema for the POJO passed. This library can also inject additional information in the JSON Schema.

GSR Library uses the "className" to fully classified class name to deserialize back to an Object of the POJO

Example class :

@JsonSchemaDescription("This is a car")
@JsonSchemaTitle("Simple Car Schema")
@Builder
@AllArgsConstructor
@EqualsAndHashCode
// Fully qualified class name to be added to an additionally injected property
// called className for deserializer to determine which class to deserialize
// the bytes into
@JsonSchemaInject(
        strings = {@JsonSchemaString(path = "className",
                value = "com.amazonaws.services.schemaregistry.integrationtests.generators.Car")}
)
// List of annotations to help infer JSON Schema are defined by https://github.com/mbknor/mbknor-jackson-jsonSchema
public class Car {
    @JsonProperty(required = true)
    private String make;

    @JsonProperty(required = true)
    private String model;

    @JsonSchemaDefault("true")
    @JsonProperty
    public boolean used;

    @JsonSchemaInject(ints = {@JsonSchemaInt(path = "multipleOf", value = 1000)})
    @Max(200000)
    @JsonProperty
    private int miles;

    @Min(2000)
    @JsonProperty
    private int year;

    @JsonProperty
    private Date purchaseDate;

    @JsonProperty
    @JsonFormat(shape = JsonFormat.Shape.NUMBER)
    private Date listedDate;

    @JsonProperty
    private String[] owners;

    @JsonProperty
    private Collection<Float> serviceChecks;

    // Empty constructor is required by Jackson to deserialize bytes
    // into an Object of this class
    public Car() {}
}

Using AWS Glue Schema Registry with Kinesis Data Streams

Kinesis Client library (KCL) / Kinesis Producer Library (KPL): Getting started with AWS Glue Schema Registry with AWS Kinesis Data Streams

If you cannot use KCL / KPL libraries for Kinesis Data Streams integration, **See examples and integration-tests for working example with Kinesis SDK, KPL and KCL.

Using Auto-Registration

Auto-Registration allows any record produced with new schema to be automatically registered with the AWS Glue Schema Registry. The Schema is registered automatically and a new schema version is created and evolution checks are performed.

If the Schema already exists, but the schema version is new, the new schema version is created and evolution checks are performed.

Auto-Registration is disabled by default. To enable Auto-Registration, enable setting by passing the configuration to the Producer as below :

    properties.put(AWSSchemaRegistryConstants.SCHEMA_AUTO_REGISTRATION_SETTING, true); // If not passed, defaults to false

Providing Registry Name

Registry Name can be provided by setting this property -

    properties.put(AWSSchemaRegistryConstants.REGISTRY_NAME, "my-registry"); // If not passed, uses "default-registry"

Providing Schema Name

Schema Name can be provided by setting this property -

    properties.put(AWSSchemaRegistryConstants.SCHEMA_NAME, "my-schema"); // If not passed, uses transport name (topic name in case of Kafka)

Alternatively, a schema registry naming strategy implementation can be provided.

    properties.put(AWSSchemaRegistryConstants.SCHEMA_NAMING_GENERATION_CLASS,
                    "com.amazonaws.services.schemaregistry.serializers.avro.CustomerProvidedSchemaNamingStrategy");

An example test implementation class is here.

Providing Registry Description

Registry Description can be provided by setting this property -

    properties.put(AWSSchemaRegistryConstants.DESCRIPTION, "This registry is used for several purposes."); // If not passed, constructs a description

Providing Compatibility Setting for Schema

Registry Description can be provided by setting this property -

    properties.put(AWSSchemaRegistryConstants.COMPATIBILITY_SETTING, Compatibility.FULL); // Pass a compatibility mode. If not passed, uses Compatibility.BACKWARD

Using Compression

Deserialized byte array can be compressed to save on data usage over the network and storage on the topic/stream. The Consumer side using AWS Glue Schema Registry Deserializer would be able to decompress and deserialize the byte array. By default, compression is disabled. Customers can choose ZLIB as compressionType by setting up below property.

    // If not passed, defaults to no compression
    properties.put(AWSSchemaRegistryConstants.COMPRESSION_TYPE, AWSSchemaRegistryConstants.COMPRESSION.ZLIB.name());

In-Memory Cache settings

In Memory cache is used by Producer to store schema to schema version id mapping and by consumer to store schema version id to schema mapping. This cache allows Producers and Consumers to save time and hits on IO calls to Schema Registry.

The cache is available by default. However, it can be fine-tuned by providing cache specific properties.

    properties.put(AWSSchemaRegistryConstants.CACHE_TIME_TO_LIVE_MILLIS, "60000"); // If not passed, defaults to 24 Hours
    properties.put(AWSSchemaRegistryConstants.CACHE_SIZE, "100"); // Maximum number of elements in a cache - If not passed, defaults to 200

Migrating from a third party Schema Registry

To migrate to AWS Glue Schema Registry from a third party schema registry for AVRO data types for Kafka, add this property for value class along with the third party jar.

    properties.put(AWSSchemaRegistryConstants.SECONDARY_DESERAILIZER, <ThirdPartyKafkaDeserializer>);

Using Kafka Connect with AWS Glue Schema Registry

  • Clone this repo, build and copy dependencies
git clone git@github.com:awslabs/aws-glue-schema-registry.git
cd aws-glue-schema-registry
cd build-tools
mvn clean install
cd ..
mvn clean install
mvn dependency:copy-dependencies
  • Configure Kafka Connectors with following properties

When configuring Kafka Connect workers or connectors, use the value of the string constant properties in the AWSSchemaRegistryConstants class to configure the AWSKafkaAvroConverter.

    key.converter=com.amazonaws.services.schemaregistry.kafkaconnect.AWSKafkaAvroConverter
    value.converter=com.amazonaws.services.schemaregistry.kafkaconnect.AWSKafkaAvroConverter
    key.converter.region=ca-central-1
    value.converter.region=ca-central-1
    key.converter.schemaAutoRegistrationEnabled=true
    value.converter.schemaAutoRegistrationEnabled=true
    key.converter.avroRecordType=GENERIC_RECORD
    value.converter.avroRecordType=GENERIC_RECORD
    key.converter.schemaName=KeySchema
    value.converter.schemaName=ValueSchema

As Glue Schema Registry is a fully managed service by AWS, there is no notion of schema registry URLs. Name of the registry (within the same AWS account) can be optionally configured using following options. If not specified, default-registry is used.

    key.converter.registry.name=my-registry
    value.converter.registry.name=my-registry
  • Add command below to Launch mode section under kafka-run-class.sh
-cp $CLASSPATH:"<your aws glue schema registry base directory>/target/dependency/*" 

It should look like this

    # Launch mode
    if [ "x$DAEMON_MODE" = "xtrue" ]; then
      nohup "$JAVA" $KAFKA_HEAP_OPTS $KAFKA_JVM_PERFORMANCE_OPTS $KAFKA_GC_LOG_OPTS $KAFKA_JMX_OPTS $KAFKA_LOG4J_OPTS -cp $CLASSPATH:"/Users/johndoe/aws-glue-schema-registry/target/dependency/*" $KAFKA_OPTS "$@" > "$CONSOLE_OUTPUT_FILE" 2>&1 < /dev/null &
    else
      exec "$JAVA" $KAFKA_HEAP_OPTS $KAFKA_JVM_PERFORMANCE_OPTS $KAFKA_GC_LOG_OPTS $KAFKA_JMX_OPTS $KAFKA_LOG4J_OPTS -cp $CLASSPATH:"/Users/johndoe/aws-glue-schema-registry/target/dependency/*" $KAFKA_OPTS "$@"
    fi
  • If using bash, run the below commands to set-up your CLASSPATH in your bash_profile. (For any other shell, update the environment accordingly.)

        echo 'export GSR_LIB_BASE_DIR=<>' >>~/.bash_profile
        echo 'export GSR_LIB_VERSION=1.1.20' >>~/.bash_profile
        echo 'export KAFKA_HOME=<your kafka installation directory>' >>~/.bash_profile
        echo 'export CLASSPATH=$CLASSPATH:$GSR_LIB_BASE_DIR/avro-kafkaconnect-converter/target/schema-registry-kafkaconnect-converter-$GSR_LIB_VERSION.jar:$GSR_LIB_BASE_DIR/common/target/schema-registry-common-$GSR_LIB_VERSION.jar:$GSR_LIB_BASE_DIR/avro-serializer-deserializer/target/schema-registry-serde-$GSR_LIB_VERSION.jar' >>~/.bash_profile
        source ~/.bash_profile
  • (Optional) If you wish to test with a simple file source then clone the file source connector.

        git clone https://github.com/mmolimar/kafka-connect-fs.git
        cd kafka-connect-fs/

    Under source connector configuration(config/kafka-connect-fs.properties), edit the data format to Avro, file reader to AvroFileReader and update an example Avro object from the file path you are reading from. For example:

        fs.uris=<path to a sample avro object>
        policy.regexp=^.*\.avro$
        file_reader.class=com.github.mmolimar.kafka.connect.fs.file.reader.AvroFileReader
    

    Install source connector

        mvn clean package
        echo "export CLASSPATH=\$CLASSPATH:\"\$(find target/ -type f -name '*.jar'| grep '\-package' | tr '\n' ':')\"" >>~/.bash_profile
        source ~/.bash_profile
    

    Update the sink properties under /config/connect-file-sink.properties

    file=<output file full path>
    topics=<my topic>
    

    Start Source Connector (In this example it is file source connector)

    $KAFKA_HOME/bin/connect-standalone.sh $KAFKA_HOME/config/connect-standalone.properties config/kafka-connect-fs.properties
    

    Run Sink Connector (In this example it is file sink connector))

    $KAFKA_HOME/bin/connect-standalone.sh $KAFKA_HOME/config/connect-standalone.properties $KAFKA_HOME/config/connect-file-sink.properties
    
  • For more examples for running Kafka Connect with Avro, JSON, and Protobuf formats, refer script run-local-tests.sh under integration-tests module.

Using Kafka Streams with AWS Glue Schema Registry

Maven Dependency

<dependency>
      <groupId>software.amazon.glue</groupId>
      <artifactId>schema-registry-kafkastreams-serde</artifactId>
      <version>1.1.20</version>
</dependency>
    final Properties props = new Properties();
    props.put(StreamsConfig.APPLICATION_ID_CONFIG, "avro-streams");
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
    props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
    props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, AWSKafkaAvroSerDe.class.getName());
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    
    props.put(AWSSchemaRegistryConstants.AWS_REGION, "us-east-1");
    props.put(AWSSchemaRegistryConstants.SCHEMA_AUTO_REGISTRATION_SETTING, true);
    props.put(AWSSchemaRegistryConstants.AVRO_RECORD_TYPE, AvroRecordType.GENERIC_RECORD.getName());

    StreamsBuilder builder = new StreamsBuilder();
    final KStream<String, GenericRecord> source = builder.stream("avro-input");
    final KStream<String, GenericRecord> result = source
        .filter((key, value) -> !"pink".equals(String.valueOf(value.get("favorite_color"))));
        .filter((key, value) -> !"15.0".equals(String.valueOf(value.get("amount"))));
    result.to("avro-output");

    KafkaStreams streams = new KafkaStreams(builder.build(), props);
    streams.start();

Using the AWS Glue Schema Registry Flink Connector

AWS Glue Schema Registry Flink Connector for Java in this repository is not recommended. Please check out Apache Flink repository for the latest support: Avro SerializationSchema and DeserializationSchema and JSON SerializationSchema and DeserializationSchema. Protobuf integration will be followed up soon.

Maven Dependency

<dependency>
     <groupId>software.amazon.glue</groupId>
     <artifactId>schema-registry-flink-serde</artifactId>
     <version>1.1.20</version>
</dependency>

Code Example

Flink Kafka Producer with AVRO format

    String topic = "topic";
    Properties properties = new Properties();
    properties.setProperty("bootstrap.servers", "localhost:9092");
    properties.setProperty("group.id", "test");

    Map<String, Object> configs = new HashMap<>();
    configs.put(AWSSchemaRegistryConstants.AWS_REGION, "us-east-1");
    configs.put(AWSSchemaRegistryConstants.SCHEMA_AUTO_REGISTRATION_SETTING, true);

    Schema.Parser parser = new Schema.Parser();
    Schema schema = parser.parse(new File("path/to/avro/file"));

    FlinkKafkaProducer<GenericRecord> producer = new FlinkKafkaProducer<>(
            topic,
            GlueSchemaRegistryAvroSerializationSchema.forGeneric(schema, topic, configs),
            properties);
    stream.addSink(producer);

Flink Kafka Consumer with AVRO format

    String topic = "topic";
    Properties properties = new Properties();
    properties.setProperty("bootstrap.servers", "localhost:9092");
    properties.setProperty("group.id", "test");

    Map<String, Object> configs = new HashMap<>();
    configs.put(AWSSchemaRegistryConstants.AWS_REGION, "us-east-1");
    configs.put(AWSSchemaRegistryConstants.AVRO_RECORD_TYPE, AvroRecordType.GENERIC_RECORD.getName());

    Schema.Parser parser = new Schema.Parser();
    Schema schema = parser.parse(new File("path/to/avro/file"));

    FlinkKafkaConsumer<GenericRecord> consumer = new FlinkKafkaConsumer<>(
            topic,
            GlueSchemaRegistryAvroDeserializationSchema.forGeneric(schema, configs),
            properties);
    DataStream<GenericRecord> stream = env.addSource(consumer);

Security issue notifications

If you discover a potential security issue in this project we ask that you notify AWS/Amazon Security via our vulnerability reporting page. Please do not create a public github issue.

aws-glue-schema-registry's People

Contributors

abhilash47 avatar amazon-auto avatar antontreushchenko avatar aravinthsamysekar avatar awsalialem avatar blacktooth avatar burakkose avatar dependabot[bot] avatar dude0001 avatar hhkkxxx133 avatar junyuche avatar linyuyao1021 avatar mkr avatar mohitpali avatar nersesam avatar pwhittlesea avatar rushinnaik avatar sg-bmw avatar singhbaljit avatar strokyl avatar svzdvd avatar tveon avatar vanessapinto257 avatar wlaw868 avatar yangsan0622 avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

aws-glue-schema-registry's Issues

schema-registry-serde: everit dependency not available on maven central

Hi,

I am trying to use the glue schema registry client provided by this github repository.

When loading the newest version of schema-registry-serde it can't find the everit dependency since newer versions do not seem to be pushed to it.

Is this supposed to work with maven central only?

Thanks in advance :)

MetaData nullable or to provide more options in case of non topic based registration?

Hi, thank you. Yes it does! Do you know when a new release will be available in maven?
Maybe one more thing: When auto register schemas, I see that constructSchemaVersionMetadata(transportName) in GlueSchemaRegistrySerializationFacade is also expecting binding topic <-> schema.

With record based registration, the metaData metadata.put("x-amz-meta-transport", transportName) with default value default-stream is not really suitable.

Might be an idea to make metaData nullable or to provide more options in case of non topic based registration?

Originally posted by @fabianf92 in #17 (comment)

Multi-lang support

Is anyone here familiar with the roadmaps for https://docs.aws.amazon.com/glue/latest/dg/schema-registry.html ?

It seems to me that Java is the only supported language at the moment? Are there any plans to introduce support for .NET and Python and other data formats such as Protobuf and JSON schema? Until this support is official, is there any recommended way to use .NET with the schema registry (direct API calls etc)

Provide ability to specify the specific implementation of AwsCredentialsProvider

We are using Kubernetes for running our application and the IAM role of these applications are OIDS federated. The AwsCredentialsProvider that we use is WebTokenCredentialsProvider and the token is mounted inside the docker container. Now I know that in the GlueSchemaRegistryKafkaSerializer we use the DefaultCredentialsProvider and the behaviour is documented here. This is not working in our case, as the DefaultCredentialsProvider is not getting to the token. Instead it's just picking up some other credentials (from the host) and those credentials don't have permissions to access to the GlueSchemaRegistry. I got around this issue by extending the GlueSchemaRegistryKafkaSerializer as

    public CustomSerializer() {
        super(WebIdentityTokenFileCredentialsProvider.create(), null, null);
    }
}

And it works. I am not sure if there's a better way to do it. But it will be really good to have the ability to specify the credential provider to the serializer. Just like other aws service client libraries allow us to do. Example

AmazonSQSClientBuilder.standard()
                .withRegion(properties.getSigningRegion())
                .withCredentials(awsCredentialsProvider)
                .build();

AWS Glue schema registry with python

Hi All,
Since glue schema registry is restricted to java for now, what are my options if I want to use it with python kafka producer and consumers?

My initial assumption was that glue SR and confluent SR may have the same design and importantly the same interface, so I looked more into the confluent Kafka client for python, which has a schema registry client here. This client requires a url property. This is not the case with the java SerDe libraries link, which take in properties such as region name, registry name, etc. The glue schema registry does not have any URL associated with it (I can't find it neither on the console nor in glue SR documentation). I looked into the AWS SDK implementation for making the API call to schema registry, but it seems complex to be able to reconstruct the URL from the input properties, and with no guarantees it will work with kafka schema registry client. Even the AWS CLI uses simple parameters like region name, ARN of glue registry, with no mention of any url.

On a side note, this link(under property KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG) asks for a third party SR url for migration purposes. This leads me to believe that a URL can indeed be constructed for glue SR as well, which will imply confluent kafka probably can be used on it.

What do you think is the best way to approach to integrate glue SR and use with python clients? If using kafka SR client is not possible, I can consider implementing my own SerDe on python using the registry APIs (boto3). I will appreciate your thoughts on this.

Thanks

Would specific checks for null values and appropriate error messages be helpful?

I was just in the process of building another consumer application and it's been a while since my last one, so I'm a little foggy on the configuration requirements. I ran into a couple of NPEs for which I thought it would be helpful to have a check and exception to speed up the development and troubleshooting process if nothing else.

Case 1:
I didn't specify the avroRecordType in my Spring Kafka configuration, and an NPE was thrown at

Case 2:
My custom class for a SPECIFIC_RECORD wasn't in the classpath (I missed adding that dependency to my pom.xml), and an NPE was thrown at

If you think it would be helpful to check for null values on these and throw an exeception and/or log an error message, I'd be happy to open a pull request. E.g.,

if (avroRecordType == null)
    throw new AWSSchemaRegistryException("avroRecordType must be defined");
...
if (readerClass == null)
    throw new AWSSchemaRegistryException(String.format("Unable to find class for schema: %s.  It must be present in the classpath.", writerSchema.getFullName());

AWSSchemaRegistryClient can't be reused for different schemas

Hi,

just a minor thing, that is not blocking us, but might be useful to implement:

AWSSchemaRegistryClient takes the compatibility setting and description from private field glueSchemaRegistryConfiguration:

private CreateSchemaRequest getCreateSchemaRequestObject(String schemaName, String dataFormat, String schemaDefinition) {
        return CreateSchemaRequest
                .builder()
                .dataFormat(DataFormat.valueOf(dataFormat))
                .description(glueSchemaRegistryConfiguration.getDescription())
                .registryId(RegistryId.builder().registryName(glueSchemaRegistryConfiguration.getRegistryName()).build())
                .schemaName(schemaName)
                .schemaDefinition(schemaDefinition)
                .compatibility(glueSchemaRegistryConfiguration.getCompatibilitySetting())
                .tags(glueSchemaRegistryConfiguration.getTags())
                .build();
    }

This means that we can't use the same client instance for different schemas, cause every schema has other description or eventually also other compatibility level.
Might make sense that description and compatibility are also method parameters?
All in all, I am wondering if it makes sense to have compatibility and description as part of GlueSchemaRegistryConfiguration because they really depend on the schema. All other fields are more "global" settings that have same value for every schema. What do you think?

Just as background: We do not register our schemas via the serializer, but have an own pipeline that loops through all of our schemas and registers them with a client instance. With the current implementation, I need own instance per schema with new config to pass individual description.

Support for nullable fields in JsonSchema generation from POJO's

When sending POJO's using the JSON data format, the schema that gets generated do not allow for nullable fields. There is also no way to pass this intent as part of the configuration.

In particular, I'm referring to the following:
com.amazonaws.services.schemaregistry.serializers.json.JsonSerializer

serializes using the default configuration

this.jsonSchemaGenerator = new JsonSchemaGenerator(this.objectMapper);

However, the mbknor-jackson-jsonSchema library used has a configuration that, when generating a schema allows for nullable files , see the Nullable types section here

JsonSchemaConfig config = JsonSchemaConfig.nullableJsonSchemaDraft4();
JsonSchemaGenerator generator = new JsonSchemaGenerator(objectMapper, config);

This library needs to be extended with a configuration to allow for the generation of a schema that supports nullable types as described above.

without nullable fileds

"properties": {
        "addressId": {
          "type": "string"
        },

vs

with nullable fields

"properties": {
        "addressId": {
	      "oneOf": [
		      {
			  "type": "null",
			  "title": "Not included"
		      },
		      {
			    "type": "string"
		      }
	      ]
        },

Integration with MSK Connect & Debezium question

We trying to setup MSK connect with glue schema registry.

Steps that we followed -

  1. Created MSK connect custom plugin with Debezium mysql & schema registry kafkaconnect converter dependency
  2. Created a VPC endpoint for glue. And in the inbound rule we have opened HTTPS 443 port for full VPC cidr block. Verified that, it is accessible via cli with --endpoint https://glue.us-east-1.amazonaws.com at the end
  3. Gave glue:* permission to AWS Kafka connect service role
  4. Configure a source connector. Used integration test config file as our reference worker.properties
connector.class=io.debezium.connector.mysql.MySqlConnector
tasks.max=1
database.hostname=<mysql-database-url>
database.port=3306
database.user=<username>
database.password=<password>
database.server.id=123456
database.server.name=<unique-server-name>
database.include.list=<single-database-name>
table.include.list=<single-table-name>
database.history.kafka.bootstrap.servers=<bootstrap-urls>
database.history.kafka.topic=<history-topic-name>
include.schema.changes=true
database.history.consumer.security.protocol=SASL_SSL
database.history.consumer.sasl.mechanism=AWS_MSK_IAM
database.history.consumer.sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;
database.history.consumer.sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
database.history.producer.security.protocol=SASL_SSL
database.history.producer.sasl.mechanism=AWS_MSK_IAM
database.history.producer.sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;
database.history.producer.sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
transforms=unwrap
transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState

# Added Glue config
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter.schemas.enable=false
key.converter=com.amazonaws.services.schemaregistry.kafkaconnect.AWSKafkaAvroConverter
value.converter=com.amazonaws.services.schemaregistry.kafkaconnect.AWSKafkaAvroConverter
key.converter.schemas.enable=false # tried both true & false
value.converter.schemas.enable=true
key.converter.region=us-east-1
value.converter.region=us-east-1
key.converter.schemaAutoRegistrationEnabled=true
value.converter.schemaAutoRegistrationEnabled=true
key.converter.avroRecordType=GENERIC_RECORD
value.converter.avroRecordType=GENERIC_RECORD
key.converter.schemaName=KeySchemaAvro
value.converter.schemaName=ValueSchemaAvro
key.converter.registry.name=<registry-that-we-created>
value.converter.registry.name=<registry-that-we-created>
key.converter.endpoint=https://glue.us-east-1.amazonaws.com
value.converter.endpoint=https://glue.us-east-1.amazonaws.com

Now the error that we getting

[Worker-0d863e8b10266fd6f] Caused by: com.amazonaws.services.schemaregistry.exception.AWSSchemaRegistryException: Exception occurred while fetching or registering schema definition = 
{
    "type": "record",
    "name": "SchemaChangeKey",
    "namespace": "io.debezium.connector.mysql",
    "fields": [
        {
            "name": "databaseName",
            "type": "string"
        }
    ],
    "connect.name": "io.debezium.connector.mysql.SchemaChangeKey"
}
, schema name = KeySchemaAvro

And at the end -

java.net.SocketTimeoutException: connect timed out

We can see database.history.kafka.topic being created as it's using JsonConverter.

Refer this for full error

JSON schema converter throws NPEs with Amazon MSK Connect

Issue

When using MSK Connect with Confluent S3 Sink connector and GSR converter for JSON schema following error is seen in logs causing the connector deployment to fail

[Worker-] [2022-02-01 07:16:17,218] INFO [confluent-s3-sink|task-2] [Consumer clientId=connector-consumer-confluent-s3-sink-2, groupId=connect-confluent-s3-sink] Notifying assignor about the new Assignment(partitions=[]) (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:276)
[Worker-] [2022-02-01 07:16:17,218] INFO [confluent-s3-sink|task-2] [Consumer clientId=connector-consumer-confluent-s3-sink-2, groupId=connect-confluent-s3-sink] Adding newly assigned partitions:  (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:288)
[Worker-] [2022-02-01 07:16:17,218] INFO [confluent-s3-sink|task-3] [Consumer clientId=connector-consumer-confluent-s3-sink-3, groupId=connect-confluent-s3-sink] Notifying assignor about the new Assignment(partitions=[]) (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:276)
[Worker-] [2022-02-01 07:16:17,219] INFO [confluent-s3-sink|task-3] [Consumer clientId=connector-consumer-confluent-s3-sink-3, groupId=connect-confluent-s3-sink] Adding newly assigned partitions:  (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:288)
[Worker-] [2022-02-01 07:16:17,219] INFO [confluent-s3-sink|task-1] [Consumer clientId=connector-consumer-confluent-s3-sink-1, groupId=connect-confluent-s3-sink] Successfully synced group in generation Generation{generationId=5, memberId='connector-consumer-confluent-s3-sink-1-a8e3fd8a-962c-468f-ac78-1293e23677e8', protocol='range'} (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:754)
[Worker-] [2022-02-01 07:16:17,219] INFO [confluent-s3-sink|task-1] [Consumer clientId=connector-consumer-confluent-s3-sink-1, groupId=connect-confluent-s3-sink] Notifying assignor about the new Assignment(partitions=[]) (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:276)
[Worker-] [2022-02-01 07:16:17,219] INFO [confluent-s3-sink|task-1] [Consumer clientId=connector-consumer-confluent-s3-sink-1, groupId=connect-confluent-s3-sink] Adding newly assigned partitions:  (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:288)
[Worker-] [2022-02-01 07:16:17,219] INFO [confluent-s3-sink|task-0] [Consumer clientId=connector-consumer-confluent-s3-sink-0, groupId=connect-confluent-s3-sink] Notifying assignor about the new Assignment(partitions=[friends-0]) (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:276)
[Worker-] [2022-02-01 07:16:17,220] INFO [confluent-s3-sink|task-0] [Consumer clientId=connector-consumer-confluent-s3-sink-0, groupId=connect-confluent-s3-sink] Adding newly assigned partitions: friends-0 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:288)
[Worker-] [2022-02-01 07:16:17,317] INFO [confluent-s3-sink|task-0] [Consumer clientId=connector-consumer-confluent-s3-sink-0, groupId=connect-confluent-s3-sink] Found no committed offset for partition friends-0 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:1354)
[Worker-] [2022-02-01 07:16:17,423] INFO [confluent-s3-sink|task-0] [Consumer clientId=connector-consumer-confluent-s3-sink-0, groupId=connect-confluent-s3-sink] Resetting offset for partition friends-0 to position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[(id: 1 rack: apse2-az2)], epoch=0}}. (org.apache.kafka.clients.consumer.internals.SubscriptionState:396)
[Worker-] [2022-02-01 07:16:18,056] ERROR [confluent-s3-sink|task-0] WorkerSinkTask{id=confluent-s3-sink-0} Error converting message value in topic 'friends' partition 0 at offset 0 and timestamp 1643695005904: null (org.apache.kafka.connect.runtime.WorkerSinkTask:547)
[Worker-] java.lang.NullPointerException
[Worker-] 	at com.amazonaws.services.schemaregistry.kafkaconnect.jsonschema.typeconverters.StructTypeConverter.lambda$getOrderedFields$0(StructTypeConverter.java:339)
[Worker-] 	at java.base/java.util.TimSort.countRunAndMakeAscending(TimSort.java:355)
[Worker-] 	at java.base/java.util.TimSort.sort(TimSort.java:220)
[Worker-] 	at java.base/java.util.Arrays.sort(Arrays.java:1515)
[Worker-] 	at java.base/java.util.stream.SortedOps$SizedRefSortingSink.end(SortedOps.java:353)
[Worker-] 	at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:485)
[Worker-] 	at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
[Worker-] 	at java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:913)
[Worker-] 	at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
[Worker-] 	at java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:578)
[Worker-] 	at com.amazonaws.services.schemaregistry.kafkaconnect.jsonschema.typeconverters.StructTypeConverter.getOrderedFields(StructTypeConverter.java:345)
[Worker-] 	at com.amazonaws.services.schemaregistry.kafkaconnect.jsonschema.typeconverters.StructTypeConverter.toConnectSchema(StructTypeConverter.java:315)
[Worker-] 	at com.amazonaws.services.schemaregistry.kafkaconnect.jsonschema.JsonSchemaToConnectSchemaConverter.toConnectSchema(JsonSchemaToConnectSchemaConverter.java:86)
[Worker-] 	at com.amazonaws.services.schemaregistry.kafkaconnect.jsonschema.JsonSchemaToConnectSchemaConverter.toConnectSchema(JsonSchemaToConnectSchemaConverter.java:63)
[Worker-] 	at com.amazonaws.services.schemaregistry.kafkaconnect.jsonschema.JsonSchemaConverter.toConnectData(JsonSchemaConverter.java:202)
[Worker-] 	at org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:87)
[Worker-] 	at org.apache.kafka.connect.runtime.WorkerSinkTask.convertValue(WorkerSinkTask.java:545)
[Worker-] 	at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$1(WorkerSinkTask.java:501)
[Worker-] 	at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:156)
[Worker-] 	at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:190)
[Worker-] 	at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:132)
[Worker-] 	at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:501)
[Worker-] 	at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:478)
[Worker-] 	at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:328)
[Worker-] 	at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232)
[Worker-] 	at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)
[Worker-] 	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:189)
[Worker-] 	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:238)
[Worker-] 	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
[Worker-] 	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
[Worker-] 	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
[Worker-] 	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
[Worker-] 	at java.base/java.lang.Thread.run(Thread.java:829)
[Worker-] [2022-02-01 07:16:18,059] ERROR [confluent-s3-sink|task-0] WorkerSinkTask{id=confluent-s3-sink-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:191)
[Worker-] org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
[Worker-] 	at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:206)
[Worker-] 	at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:132)
[Worker-] 	at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:501)
[Worker-] 	at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:478)
[Worker-] 	at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:328)
[Worker-] 	at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232)
[Worker-] 	at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)
[Worker-] 	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:189)
[Worker-] 	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:238)
[Worker-] 	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
[Worker-] 	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
[Worker-] 	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
[Worker-] 	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
[Worker-] 	at java.base/java.lang.Thread.run(Thread.java:829)
[Worker-] Caused by: java.lang.NullPointerException
[Worker-] 	at com.amazonaws.services.schemaregistry.kafkaconnect.jsonschema.typeconverters.StructTypeConverter.lambda$getOrderedFields$0(StructTypeConverter.java:339)
[Worker-] 	at java.base/java.util.TimSort.countRunAndMakeAscending(TimSort.java:355)
[Worker-] 	at java.base/java.util.TimSort.sort(TimSort.java:220)
[Worker-] 	at java.base/java.util.Arrays.sort(Arrays.java:1515)
[Worker-] 	at java.base/java.util.stream.SortedOps$SizedRefSortingSink.end(SortedOps.java:353)
[Worker-] 	at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:485)
[Worker-] 	at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
[Worker-] 	at java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:913)
[Worker-] 	at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
[Worker-] 	at java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:578)
[Worker-] 	at com.amazonaws.services.schemaregistry.kafkaconnect.jsonschema.typeconverters.StructTypeConverter.getOrderedFields(StructTypeConverter.java:345)
[Worker-] 	at com.amazonaws.services.schemaregistry.kafkaconnect.jsonschema.typeconverters.StructTypeConverter.toConnectSchema(StructTypeConverter.java:315)
[Worker-] 	at com.amazonaws.services.schemaregistry.kafkaconnect.jsonschema.JsonSchemaToConnectSchemaConverter.toConnectSchema(JsonSchemaToConnectSchemaConverter.java:86)
[Worker-] 	at com.amazonaws.services.schemaregistry.kafkaconnect.jsonschema.JsonSchemaToConnectSchemaConverter.toConnectSchema(JsonSchemaToConnectSchemaConverter.java:63)
[Worker-] 	at com.amazonaws.services.schemaregistry.kafkaconnect.jsonschema.JsonSchemaConverter.toConnectData(JsonSchemaConverter.java:202)
[Worker-] 	at org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:87)
[Worker-] 	at org.apache.kafka.connect.runtime.WorkerSinkTask.convertValue(WorkerSinkTask.java:545)
[Worker-] 	at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$1(WorkerSinkTask.java:501)
[Worker-] 	at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:156)
[Worker-] 	at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:190)
[Worker-] 	... 13 more
[Worker-] [2022-02-01 07:16:18,061] INFO [confluent-s3-sink|task-0] [Consumer clientId=connector-consumer-confluent-s3-sink-0, groupId=connect-confluent-s3-sink] Revoke previously assigned partitions friends-0 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:307)
[Worker-] [2022-02-01 07:16:18,061] INFO [confluent-s3-sink|task-0] [Consumer clientId=connector-consumer-confluent-s3-sink-0, groupId=connect-confluent-s3-sink] Member connector-consumer-confluent-s3-sink-0-7693a472-d7ba-4458-8599-f37acb0cef3e sending LeaveGroup request to coordinator  (id: 2147483646 rack: null) due to the consumer is being closed (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:1038)

Setup -

a) Amazon MSK 2.7.1 running with IAM Auth enabled in a private VPC.
b) Glue schema registry setup using CloudFormation template - https://github.com/jeevanullas/aws-glue-schema-registry-examples/blob/main/template.json
c) Amazon MSK Connect configured with Confluent S3 Sink connector and Glue Schema Registry libraries (including dependencies) for JSON schema converter. Required IAM policies / role assigned to the Connector to access S3, Glue and MSK.

connector.class=io.confluent.connect.s3.S3SinkConnector
value.converter.schemaAutoRegistrationEnabled=true
s3.region=ap-southeast-2
schema.compatibility=BACKWARD
flush.size=2
tasks.max=1
key.converter.region=ap-southeast-2
value.converter.dataFormat=JSON
value.converter.registry.name=testregistry
value.converter.endpoint=https://glue.ap-southeast-2.amazonaws.com
topics.regex=friends*
value.converter.avroRecordType=GENERIC_RECORD
value.converter.region=ap-southeast-2
format.class=io.confluent.connect.s3.format.json.JsonFormat
value.converter.schemaName=friends
name=confluent-s3-sink
value.converter.schemas.enable=true
value.converter=com.amazonaws.services.schemaregistry.kafkaconnect.jsonschema.JsonSchemaConverter
storage.class=io.confluent.connect.s3.storage.S3Storage
s3.bucket.name=msk-cluster-backup-deepens
key.converter=org.apache.kafka.connect.storage.StringConverter

d) Code for working producer and consumer available at below links

Producer - https://github.com/jeevanullas/aws-glue-schema-registry-examples/blob/main/src/main/java/com/amazonaws/services/gsr/samples/json/kafka/RunKafkaProducer.java
Consumer - https://github.com/jeevanullas/aws-glue-schema-registry-examples/blob/main/src/main/java/com/amazonaws/services/gsr/samples/json/kafka/RunKafkaConsumer.java

e) Schema and data payload

Schema - https://github.com/jeevanullas/aws-glue-schema-registry-examples/blob/main/schema.json
Payload -

Received message: key = message-0, value = JsonDataWithSchema(schema={"$id":"https://example.com/person.schema.json","$schema":"http://json-schema.org/draft-07/schema#","title":"Person","type":"object","properties":{"firstName":{"type":"string","description":"The person's first name."},"lastName":{"type":"string","description":"The person's last name."},"age":{"description":"Age in years which must be equal to or greater than zero.","type":"integer","minimum":0}}}, payload={"firstName":"John","lastName":"Doe","age":12})

f) Steps followed to package glue schema registry converter jar and dependencies along with confluent s3 sink connector jars - 1. Download version 1.1.8 from here https://github.com/awslabs/aws-glue-schema-registry/archive/refs/tags/v1.1.8.zip 2. Followed steps as per https://docs.aws.amazon.com/glue/latest/dg/schema-registry-integrations.html#schema-registry-integrations-apache-kafka-connect and copied dependencies from jsonschema-kafkaconnect-converter/target/dependency along with jsonschema-kafkaconnect-converter-1.1.8.jar into confluentinc-kafka-connect-s3-10.0.5/lib. 3. Zipped and uploaded the connector code as custom plugin to Amazon MSK. 4. Used the custom plugin ARN in MSK Connect connector setup.

Too much logging for AWSKafkaAvroSerializer (part two)

Hi, we're getting a "Schema Version Id is null. Trying to register the schema" log for every message serialized with the AWSKafkaAvroSerializer.

It seems that this was changed from info to debug in #9 , but changed back to info as part of #49 (commit a7017fb). Re-changing the log level to debug would remove the spam, but I'm contemplating if there could be some underlying issues?

I haven't fully grokked the code, but in AWSKafkaAvroSerializer#serialize the null-check is against the final schemaVersionId... is there any feasible way to get this configured/initialized to be non-null?

My hunch is that either there's a bit of an architectural issue, or there's some configuration issue we've totally missed – it doesn't seem right that the lookup fails for each and every message. Fortunately the caching in GlueSchemaRegistrySerializationFacade seems to be working, though! 😅

Kafka Connect dependencies question

Hi guys,

I want to integrate AWS Glue SR with Kafka connect, I've been following your documentation steps and I stuck.
After of cloning the repo and doing mvn clean install and mvn dependency:copy-dependencies, there is no path <AWS GlueSchema Registry base directory>/target/dependency/* that will be referenced in the env CLASSPATH. All jars creates are stored in folders such as: /<AWS GlueSchema Registry base directory>/aws-glue-schema-registry/avro-flink-serde/target/dependency or <AWS GlueSchema Registry base directory>/jsonschema-kafkaconnect-converter/target/dependency/.

I don't know what i'm doing wrong or what path I have to reference in CLASSPATH.

Thanks in advance.

Deserialise records that were serialised in another account

Is it possible to deserialise data that were serialised in another account (given that proper IAM permissions are in place)?
Serialiser creates and stores schema UUID with every message. Is this UUID unique globally or only within account?

One use case would be if data are replicated using MirrorMaker as blobs between two MSK clusters located in separate accounts.

Cheers,
Alex.

Unable to get the glue registry working with Kafka connect in a docker setup.

I get this error message, and as I understand it its not able to find the schema name property.

java.lang.IllegalArgumentException: The validated map is empty
2022-02-05T05:22:08.438891384Z 	at org.apache.commons.lang3.Validate.notEmpty(Validate.java:352)
2022-02-05T05:22:08.438893742Z 	at org.apache.commons.lang3.Validate.notEmpty(Validate.java:374)
2022-02-05T05:22:08.438896020Z 	at com.amazonaws.services.schemaregistry.utils.GlueSchemaRegistryUtils.checkIfPresentInMap(GlueSchemaRegistryUtils.java:59)
2022-02-05T05:22:08.438908559Z 	at com.amazonaws.services.schemaregistry.utils.GlueSchemaRegistryUtils.getSchemaName(GlueSchemaRegistryUtils.java:75)
2022-02-05T05:22:08.438912642Z 	at com.amazonaws.services.schemaregistry.serializers.avro.AWSKafkaAvroSerializer.configure(AWSKafkaAvroSerializer.java:83)
2022-02-05T05:22:08.438915327Z 	at com.amazonaws.services.schemaregistry.kafkaconnect.AWSKafkaAvroConverter.configure(AWSKafkaAvroConverter.java:79)
2022-02-05T05:22:08.438917523Z 	at org.apache.kafka.connect.runtime.isolation.Plugins.newConverter(Plugins.java:277)
2022-02-05T05:22:08.438919644Z 	at org.apache.kafka.connect.runtime.Worker.startTask(Worker.java:526)
2022-02-05T05:22:08.438921750Z 	at org.apache.kafka.connect.runtime.distributed.DistributedHerder.startTask(DistributedHerder.java:1421)
2022-02-05T05:22:08.438924023Z 	at org.apache.kafka.connect.runtime.distributed.DistributedHerder.lambda$getTaskStartingCallable$22(DistributedHerder.java:1434)
2022-02-05T05:22:08.438926335Z 	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
2022-02-05T05:22:08.438928502Z 	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
2022-02-05T05:22:08.438930685Z 	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
2022-02-05T05:22:08.438932852Z 	at java.base/java.lang.Thread.run(Thread.java:829)

Since I am using the connect docker image I use environment variables to add properties like this:

CONNECT_KEY_CONVERTER: com.amazonaws.services.schemaregistry.kafkaconnect.AWSKafkaAvroConverter
CONNECT_VALUE_CONVERTER: com.amazonaws.services.schemaregistry.kafkaconnect.AWSKafkaAvroConverter
CONNECT_KEY_COVERTER_REGION: us-east-1
CONNECT_VALUE_CONVERTER_REGION: us-east-1
CONNECT_KEY_COVERTER_SCHEMAAUTOREGISTRATIONENABLED: true
CONNECT_VALUE_CONVERTER_SCHEMAAUTOREGISTRATIONENABLED: true
CONNECT_KEY_COVERTER_AVRORECORDTYPE: GENERIC_RECORD
CONNECT_VALUE_CONVERTER_AVRORECORDTYPE: GENERIC_RECORD
CONNECT_KEY_COVERTER_SCHEMANAME: KeySchemaAvro
CONNECT_VALUE_CONVERTER_SCHEMANAME: ValueSchemaAvro
CONNECT_KEY_COVERTER_REGISTRY_NAME: kafka_registry
CONNECT_VALUE_CONVERTER_REGISTRY_NAME: kafka_registry

I think it fails because the docker image parses the env variables and adds them to the property files.
But since they are uppercased in the env variables, lowercased in the properties and this repo requires camelcase it easily gets messy.

In the readme of this repo there is this example to be used with the connector:

key.converter=com.amazonaws.services.schemaregistry.kafkaconnect.AWSKafkaAvroConverter
value.converter=com.amazonaws.services.schemaregistry.kafkaconnect.AWSKafkaAvroConverter
key.converter.region=ca-central-1
value.converter.region=ca-central-1
key.converter.schemaAutoRegistrationEnabled=true
value.converter.schemaAutoRegistrationEnabled=true
key.converter.avroRecordType=GENERIC_RECORD
value.converter.avroRecordType=GENERIC_RECORD
key.converter.schemaName=KeySchema
value.converter.schemaName=ValueSchema

but it actually ends up like this:

key.converter=com.amazonaws.services.schemaregistry.kafkaconnect.AWSKafkaAvroConverter
value.converter=com.amazonaws.services.schemaregistry.kafkaconnect.AWSKafkaAvroConverter
key.converter.region=ca-central-1
value.converter.region=ca-central-1
key.converter.schemaautoregistrationenabled=true
value.converter.schemaautoregistrationenabled=true
key.converter.avrorecordtype=GENERIC_RECORD
value.converter.avrorecordtype=GENERIC_RECORD
key.converter.schemaname=KeySchema
value.converter.schemaname=ValueSchema

Just wondering if there is anything else I can do to make this work?
Could it be a solution to ignore the casing of the properties that are camelcased?

Use case of AWS Glue Schema Registry with MSK and Can it be used in PHP?

My customer wants to introduce the aws glue schema registry with msk.

They have some question :
1.Are there some cases which were already applied to some company and what are the details?
2.We know aws glue sr is free but has a concern for addtional cost related with that. Is there any addtional cost with that?
3.We want to know the SLA policy.
4.We use PHP mainly and plan to use Golang and python in the future.
Can we use aws glue SR in php, golang and python? If not, Is there any work around?

AWSKafkaAvroConverter throws "java.lang.NullPointerException" in absence of "avroRecordType" in connector config

While working with Kafka Connect and GSR, "AWSKafkaAvroConverter" throws below exception in the absence of config values"key.converter.avroRecordType" and "value.converter.avroRecordType". Below is the detailed stack trace of error message and sample connector configuration.

[2022-01-07 20:59:09,204] ERROR WorkerSinkTask{id=s3-sink-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:179)
org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:485)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:465)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:321)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed due to serialization error: 
    at com.amazonaws.services.schemaregistry.kafkaconnect.AWSKafkaAvroConverter.toConnectData(AWSKafkaAvroConverter.java:119)
    at org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:87)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$1(WorkerSinkTask.java:485)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
    ... 13 more
Caused by: com.amazonaws.services.schemaregistry.exception.AWSSchemaRegistryException: Exception occurred while de-serializing Avro message
    at com.amazonaws.services.schemaregistry.deserializers.avro.AvroDeserializer.deserialize(AvroDeserializer.java:101)
    at com.amazonaws.services.schemaregistry.deserializers.GlueSchemaRegistryDeserializationFacade.deserialize(GlueSchemaRegistryDeserializationFacade.java:162)
    at com.amazonaws.services.schemaregistry.deserializers.avro.AWSKafkaAvroDeserializer.deserializeByHeaderVersionByte(AWSKafkaAvroDeserializer.java:149)
    at com.amazonaws.services.schemaregistry.deserializers.avro.AWSKafkaAvroDeserializer.deserialize(AWSKafkaAvroDeserializer.java:114)
    at com.amazonaws.services.schemaregistry.kafkaconnect.AWSKafkaAvroConverter.toConnectData(AWSKafkaAvroConverter.java:117)
    ... 17 more
Caused by: com.google.common.util.concurrent.UncheckedExecutionException: java.lang.NullPointerException
    at com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2051)
    at com.google.common.cache.LocalCache.get(LocalCache.java:3951)
    at com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:3974)
    at com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4935)
    at com.amazonaws.services.schemaregistry.deserializers.avro.AvroDeserializer.deserialize(AvroDeserializer.java:91)
    ... 21 more
Caused by: java.lang.NullPointerException
    at com.amazonaws.services.schemaregistry.deserializers.avro.DatumReaderInstance.from(DatumReaderInstance.java:37)
    at com.amazonaws.services.schemaregistry.deserializers.avro.AvroDeserializer$DatumReaderCache.load(AvroDeserializer.java:112)
    at com.amazonaws.services.schemaregistry.deserializers.avro.AvroDeserializer$DatumReaderCache.load(AvroDeserializer.java:109)
    at com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3529)
    at com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2278)
    at com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2155)
    at com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2045)
name=s3-sink
connector.class=io.confluent.connect.s3.S3SinkConnector
tasks.max=1
topics=db1.sampleavro.movies
s3.region=us-east-1
s3.bucket.name=<BUCKET NAME>
storage.class=io.confluent.connect.s3.storage.S3Storage
format.class=io.confluent.connect.s3.format.parquet.ParquetFormat
partitioner.class=io.confluent.connect.storage.partitioner.DefaultPartitioner
schema.compatibility=NONE
flush.size=10
store.kafka.keys=false
store.kafka.headers=false
key.converter=com.amazonaws.services.schemaregistry.kafkaconnect.AWSKafkaAvroConverter
value.converter=com.amazonaws.services.schemaregistry.kafkaconnect.AWSKafkaAvroConverter
key.converter.compressionType=NONE
value.converter.compressionType=NONE
key.converter.endpoint=https://glue.us-east-1.amazonaws.com
value.converter.endpoint=https://glue.us-east-1.amazonaws.com
key.converter.region=us-east-1
value.converter.region=us-east-1
key.converter.timeToLiveMillis=3600000
value.converter.timeToLiveMillis=3600000
key.converter.cacheSize=100
value.converter.cacheSize=100
key.converter.registry.name=msk-cdc-avro-keys
value.converter.registry.name=msk-cdc-avro-values
key.converter.compatibility=NONE
value.converter.compatibility=NONE
key.converter.description=none
value.converter.description=none
key.converter.schemaAutoRegistrationEnabled=true
value.converter.schemaAutoRegistrationEnabled=true

Produce Kafka Avro records using glue schema registry

Failing to get Kafka producer working with AWSSchemaRegistry, not able to connect to the SchemaRegistry.

The configuration for the AWSKafkaAvroSerializer is listed below, with stacktrace further down.
INFO com.amazonaws.services.schemaregistry.serializers.avro.AWSKafkaAvroSerializer:72 - Configuring Amazon Schema Registry Service using these properties:
{schemaAutoRegistrationEnabled=true, value.serializer=com.amazonaws.services.schemaregistry.serializers.avro.AWSKafkaAvroSerializer, bootstrap.servers=b-2.kafka-cluster-dtc-priv.5xvot0.c6.kafka.us-west-2.amazonaws.com:9092,b-3.kafka-cluster-dtc-priv.5xvot0.c6.kafka.us-west-2.amazonaws.com:9092,b-1.kafka-cluster-dtc-priv.5xvot0.c6.kafka.us-west-2.amazonaws.com:9092, registry.name=dtc_avro_registry, region=us-west-2, compression=ZLIB, key.serializer=org.apache.kafka.common.serialization.StringSerializer, client.id=dmp_rawevent_processor_avro_0}

But failing with Stacktrace

	at com.amazonaws.services.schemaregistry.serializers.GlueSchemaRegistrySerializationFacade.getOrRegisterSchemaVersion(GlueSchemaRegistrySerializationFacade.java:84)
	at com.amazonaws.services.schemaregistry.serializers.avro.AWSAvroSerializer.registerSchema(AWSAvroSerializer.java:59)
	at com.amazonaws.services.schemaregistry.serializers.avro.AWSKafkaAvroSerializer.serialize(AWSKafkaAvroSerializer.java:93)
	at org.apache.kafka.common.serialization.Serializer.serialize(Serializer.java:60)
	at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:884)
	at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:846)
	at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:733)
	at com.nanigans.kafka.producer.Producer.sendMessage(Producer.java:256)
	at com.nanigans.dmp.rawevents.attributedevents.AttributedEventCallback.setSequenceAndSend(AttributedEventCallback.java:367)
	at com.nanigans.dmp.rawevents.attributedevents.AttributedEventCallback.sendToKafka(AttributedEventCallback.java:353)
	at com.nanigans.dmp.rawevents.attributedevents.AttributedEventCallback.resolveRawPixelEvents(AttributedEventCallback.java:293)
	at com.nanigans.dmp.rawevents.attributedevents.AttributedEventCallback.processEvents(AttributedEventCallback.java:213)
	at com.nanigans.dmp.rawevents.attributedevents.AttributedEventCallback.processRecordBatch(AttributedEventCallback.java:149)
	at com.nanigans.kafka.consumer.pool.ConsumerPoolPartition$CascadePartitionedConsumerThread.dispatchRecordsToCallbacks(ConsumerPoolPartition.java:227)
	at com.nanigans.kafka.consumer.pool.ConsumerPoolPartition$CascadePartitionedConsumerThread.run(ConsumerPoolPartition.java:159)
	at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: com.amazonaws.services.schemaregistry.exception.AWSSchemaRegistryException: Failed to get schemaVersionId by schema definition for schema name = pixel_events_organic 
	at com.amazonaws.services.schemaregistry.common.AWSSchemaRegistryClient.getSchemaVersionIdByDefinition(AWSSchemaRegistryClient.java:136)
	at com.amazonaws.services.schemaregistry.common.AWSSchemaRegistryClient.getORRegisterSchemaVersionId(AWSSchemaRegistryClient.java:167)
	... 16 common frames omitted
Caused by: software.amazon.awssdk.core.exception.SdkClientException: Unable to execute HTTP request: readHandshakeRecord
	at software.amazon.awssdk.core.exception.SdkClientException$BuilderImpl.build(SdkClientException.java:98)
	at software.amazon.awssdk.core.exception.SdkClientException.create(SdkClientException.java:43)
	at software.amazon.awssdk.core.internal.http.pipeline.stages.utils.RetryableStageHelper.setLastException(RetryableStageHelper.java:205)
	at software.amazon.awssdk.core.internal.http.pipeline.stages.RetryableStage.execute(RetryableStage.java:66)
	at software.amazon.awssdk.core.internal.http.pipeline.stages.RetryableStage.execute(RetryableStage.java:34)
	at software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206)
	at software.amazon.awssdk.core.internal.http.StreamManagingStage.execute(StreamManagingStage.java:56)
	at software.amazon.awssdk.core.internal.http.StreamManagingStage.execute(StreamManagingStage.java:36)
	at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallTimeoutTrackingStage.executeWithTimer(ApiCallTimeoutTrackingStage.java:80)
	at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallTimeoutTrackingStage.execute(ApiCallTimeoutTrackingStage.java:60)
	at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallTimeoutTrackingStage.execute(ApiCallTimeoutTrackingStage.java:42)
	at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallMetricCollectionStage.execute(ApiCallMetricCollectionStage.java:48)
	at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallMetricCollectionStage.execute(ApiCallMetricCollectionStage.java:31)
	at software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206)
	at software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206)
	at software.amazon.awssdk.core.internal.http.pipeline.stages.ExecutionFailureExceptionReportingStage.execute(ExecutionFailureExceptionReportingStage.java:37)
	at software.amazon.awssdk.core.internal.http.pipeline.stages.ExecutionFailureExceptionReportingStage.execute(ExecutionFailureExceptionReportingStage.java:26)
	at software.amazon.awssdk.core.internal.http.AmazonSyncHttpClient$RequestExecutionBuilderImpl.execute(AmazonSyncHttpClient.java:193)
	at software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.invoke(BaseSyncClientHandler.java:133)
	at software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.doExecute(BaseSyncClientHandler.java:159)
	at software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.lambda$execute$1(BaseSyncClientHandler.java:112)
	at software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.measureApiCallSuccess(BaseSyncClientHandler.java:167)
	at software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.execute(BaseSyncClientHandler.java:94)
	at software.amazon.awssdk.core.client.handler.SdkSyncClientHandler.execute(SdkSyncClientHandler.java:45)
	at software.amazon.awssdk.awscore.client.handler.AwsSyncClientHandler.execute(AwsSyncClientHandler.java:55)
	at software.amazon.awssdk.services.glue.DefaultGlueClient.getSchemaByDefinition(DefaultGlueClient.java:6710)
	at com.amazonaws.services.schemaregistry.common.AWSSchemaRegistryClient.getSchemaVersionIdByDefinition(AWSSchemaRegistryClient.java:132)
	... 17 common frames omitted
Caused by: javax.net.ssl.SSLException: readHandshakeRecord
	at java.base/sun.security.ssl.SSLSocketImpl.readHandshakeRecord(SSLSocketImpl.java:1325)
	at java.base/sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:440)
	at java.base/sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:411)
	at org.apache.http.conn.ssl.SSLConnectionSocketFactory.createLayeredSocket(SSLConnectionSocketFactory.java:394)
	at org.apache.http.conn.ssl.SSLConnectionSocketFactory.connectSocket(SSLConnectionSocketFactory.java:353)
	at software.amazon.awssdk.http.apache.internal.conn.SdkTlsSocketFactory.connectSocket(SdkTlsSocketFactory.java:113)
	at org.apache.http.impl.conn.DefaultHttpClientConnectionOperator.connect(DefaultHttpClientConnectionOperator.java:141)
	at org.apache.http.impl.conn.PoolingHttpClientConnectionManager.connect(PoolingHttpClientConnectionManager.java:353)
	at jdk.internal.reflect.GeneratedMethodAccessor13.invoke(Unknown Source)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at software.amazon.awssdk.http.apache.internal.conn.ClientConnectionManagerFactory$Handler.invoke(ClientConnectionManagerFactory.java:80)
	at com.sun.proxy.$Proxy16.connect(Unknown Source)
	at org.apache.http.impl.execchain.MainClientExec.establishRoute(MainClientExec.java:380)
	at org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:236)
	at org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:184)
	at org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:184)
	at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:82)
	at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:55)
	at software.amazon.awssdk.http.apache.internal.impl.ApacheSdkHttpClient.execute(ApacheSdkHttpClient.java:72)
	at software.amazon.awssdk.http.apache.ApacheHttpClient.execute(ApacheHttpClient.java:253)
	at software.amazon.awssdk.http.apache.ApacheHttpClient.access$500(ApacheHttpClient.java:106)
	at software.amazon.awssdk.http.apache.ApacheHttpClient$1.call(ApacheHttpClient.java:232)
	at software.amazon.awssdk.http.apache.ApacheHttpClient$1.call(ApacheHttpClient.java:229)
	at software.amazon.awssdk.core.internal.util.MetricUtils.measureDurationUnsafe(MetricUtils.java:64)
	at software.amazon.awssdk.core.internal.http.pipeline.stages.MakeHttpRequestStage.executeHttpRequest(MakeHttpRequestStage.java:76)
	at software.amazon.awssdk.core.internal.http.pipeline.stages.MakeHttpRequestStage.execute(MakeHttpRequestStage.java:55)
	at software.amazon.awssdk.core.internal.http.pipeline.stages.MakeHttpRequestStage.execute(MakeHttpRequestStage.java:39)
	at software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206)
	at software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206)
	at software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206)
	at software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206)
	at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptTimeoutTrackingStage.execute(ApiCallAttemptTimeoutTrackingStage.java:73)
	at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptTimeoutTrackingStage.execute(ApiCallAttemptTimeoutTrackingStage.java:42)
	at software.amazon.awssdk.core.internal.http.pipeline.stages.TimeoutExceptionHandlingStage.execute(TimeoutExceptionHandlingStage.java:77)
	at software.amazon.awssdk.core.internal.http.pipeline.stages.TimeoutExceptionHandlingStage.execute(TimeoutExceptionHandlingStage.java:39)
	at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptMetricCollectionStage.execute(ApiCallAttemptMetricCollectionStage.java:50)
	at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptMetricCollectionStage.execute(ApiCallAttemptMetricCollectionStage.java:36)
	at software.amazon.awssdk.core.internal.http.pipeline.stages.RetryableStage.execute(RetryableStage.java:64)
	... 40 common frames omitted
	Suppressed: java.net.SocketException: Broken pipe (Write failed)
		at java.base/java.net.SocketOutputStream.socketWrite0(Native Method)
		at java.base/java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:110)
		at java.base/java.net.SocketOutputStream.write(SocketOutputStream.java:150)
		at java.base/sun.security.ssl.SSLSocketOutputRecord.encodeAlert(SSLSocketOutputRecord.java:81)
		at java.base/sun.security.ssl.TransportContext.fatal(TransportContext.java:380)
		at java.base/sun.security.ssl.TransportContext.fatal(TransportContext.java:292)
		at java.base/sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:450)
		... 77 common frames omitted
Caused by: java.net.SocketException: Broken pipe (Write failed)
	at java.base/java.net.SocketOutputStream.socketWrite0(Native Method)
	at java.base/java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:110)
	at java.base/java.net.SocketOutputStream.write(SocketOutputStream.java:150)
	at java.base/sun.security.ssl.SSLSocketOutputRecord.encodeChangeCipherSpec(SSLSocketOutputRecord.java:221)
	at java.base/sun.security.ssl.OutputRecord.changeWriteCiphers(OutputRecord.java:162)
	at java.base/sun.security.ssl.ChangeCipherSpec$T10ChangeCipherSpecProducer.produce(ChangeCipherSpec.java:118)
	at java.base/sun.security.ssl.Finished$T12FinishedProducer.onProduceFinished(Finished.java:395)
	at java.base/sun.security.ssl.Finished$T12FinishedProducer.produce(Finished.java:379)
	at java.base/sun.security.ssl.SSLHandshake.produce(SSLHandshake.java:436)
	at java.base/sun.security.ssl.ServerHelloDone$ServerHelloDoneConsumer.consume(ServerHelloDone.java:182)
	at java.base/sun.security.ssl.SSLHandshake.consume(SSLHandshake.java:392)
	at java.base/sun.security.ssl.HandshakeContext.dispatch(HandshakeContext.java:443)
	at java.base/sun.security.ssl.HandshakeContext.dispatch(HandshakeContext.java:421)
	at java.base/sun.security.ssl.TransportContext.dispatch(TransportContext.java:182)
	at java.base/sun.security.ssl.SSLTransport.decode(SSLTransport.java:171)
	at java.base/sun.security.ssl.SSLSocketImpl.decode(SSLSocketImpl.java:1408)
	at java.base/sun.security.ssl.SSLSocketImpl.readHandshakeRecord(SSLSocketImpl.java:1314)
	... 78 common frames omitted```

JSON Converter is not working with Kafka connect

Im using a kafka connect(debezium) to use the Glue schema registry.

I have built with the mvn, and the Avro format worked well, then I want to use JSON format for the registry,

connector config:

{
	"name": "mysql-connector-01",
	"config": {
		"name": "mysql-connector-01",
		"connector.class": "io.debezium.connector.mysql.MySqlConnector",
		"database.server.id": "1",
		"tasks.max": "1",
		"database.history.kafka.bootstrap.servers": "XXXXX",
		"database.history.kafka.topic": "mysql-db01.schema-changes.mysql",
		"database.server.name": "mysql-db01",
		"database.hostname": "XXXX",
		"database.port": "3306",
		"database.user": "root",
		"database.password": "XXXX",
		"database.whitelist": "bhuvi",
		"transforms": "unwrap",
		"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
		"transforms.unwrap.add.source.fields": "ts_ms",
		"tombstones.on.delete": false,
		"key.converter": "com.amazonaws.services.schemaregistry.kafkaconnect.jsonschema.JsonSchemaConverter",
		"key.converter.schemas.enable": "false",
		"internal.key.converter": "com.amazonaws.services.schemaregistry.kafkaconnect.jsonschema.JsonSchemaConverter",
		"internal.key.converter.schemas.enable": "false",
		"internal.value.converter": "com.amazonaws.services.schemaregistry.kafkaconnect.jsonschema.JsonSchemaConverter",
		"internal.value.converter.schemas.enable": "false",
		"value.converter": "com.amazonaws.services.schemaregistry.kafkaconnect.jsonschema.JsonSchemaConverter",
		"value.converter.schemas.enable": "true",
		"value.converter.region": "ap-south-1",
		"key.converter.region": "ap-south-1",
		"key.converter.schemaAutoRegistrationEnabled": "true",
		"value.converter.schemaAutoRegistrationEnabled": "true",
		"value.converter.avroRecordType": "GENERIC_RECORD",
		"key.converter.registry.name": "bhuvi-debezium",
		"value.converter.registry.name": "bhuvi-debezium",
		"key.converter.dataFormat": "JSON",
		"value.converter.dataFormat": "JSON",
		"snapshot.mode": "initial"
	}
}

Error message:

Aug  3 09:13:39 ip-172-30-32-13 connect-distributed: [2021-08-03 09:13:39,342] ERROR WorkerSourceTask{id=mysql-connector-01-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:184)
Aug  3 09:13:39 ip-172-30-32-13 connect-distributed: java.lang.NoSuchMethodError: scala.collection.immutable.Map$.apply(Lscala/collection/Seq;)Lscala/collection/GenMap;
Aug  3 09:13:39 ip-172-30-32-13 connect-distributed: at com.kjetland.jackson.jsonSchema.JsonSchemaConfig$.<init>(JsonSchemaGenerator.scala:38)
Aug  3 09:13:39 ip-172-30-32-13 connect-distributed: at com.kjetland.jackson.jsonSchema.JsonSchemaConfig$.<clinit>(JsonSchemaGenerator.scala)
Aug  3 09:13:39 ip-172-30-32-13 connect-distributed: at com.kjetland.jackson.jsonSchema.JsonSchemaGenerator.<init>(JsonSchemaGenerator.scala:283)
Aug  3 09:13:39 ip-172-30-32-13 connect-distributed: at com.amazonaws.services.schemaregistry.serializers.json.JsonSerializer.<init>(JsonSerializer.java:67)
Aug  3 09:13:39 ip-172-30-32-13 connect-distributed: at com.amazonaws.services.schemaregistry.serializers.GlueSchemaRegistrySerializerFactory.lambda$getInstance$1(GlueSchemaRegistrySerializerFactory.java:52)
Aug  3 09:13:39 ip-172-30-32-13 connect-distributed: at java.util.concurrent.ConcurrentHashMap.computeIfAbsent(ConcurrentHashMap.java:1660)
Aug  3 09:13:39 ip-172-30-32-13 connect-distributed: at com.amazonaws.services.schemaregistry.serializers.GlueSchemaRegistrySerializerFactory.getInstance(GlueSchemaRegistrySerializerFactory.java:52)
Aug  3 09:13:39 ip-172-30-32-13 connect-distributed: at com.amazonaws.services.schemaregistry.serializers.GlueSchemaRegistrySerializationFacade.getSchemaDefinition(GlueSchemaRegistrySerializationFacade.java:154)
Aug  3 09:13:39 ip-172-30-32-13 connect-distributed: at com.amazonaws.services.schemaregistry.serializers.GlueSchemaRegistryKafkaSerializer.prepareInput(GlueSchemaRegistryKafkaSerializer.java:147)
Aug  3 09:13:39 ip-172-30-32-13 connect-distributed: at com.amazonaws.services.schemaregistry.serializers.GlueSchemaRegistryKafkaSerializer.serialize(GlueSchemaRegistryKafkaSerializer.java:103)
Aug  3 09:13:39 ip-172-30-32-13 connect-distributed: at com.amazonaws.services.schemaregistry.kafkaconnect.jsonschema.JsonSchemaConverter.fromConnectData(JsonSchemaConverter.java:140)
Aug  3 09:13:39 ip-172-30-32-13 connect-distributed: at org.apache.kafka.connect.storage.Converter.fromConnectData(Converter.java:63)
Aug  3 09:13:39 ip-172-30-32-13 connect-distributed: at org.apache.kafka.connect.runtime.WorkerSourceTask.lambda$convertTransformedRecord$1(WorkerSourceTask.java:295)
Aug  3 09:13:39 ip-172-30-32-13 connect-distributed: at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:156)
Aug  3 09:13:39 ip-172-30-32-13 connect-distributed: at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:190)
Aug  3 09:13:39 ip-172-30-32-13 connect-distributed: at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:132)
Aug  3 09:13:39 ip-172-30-32-13 connect-distributed: at org.apache.kafka.connect.runtime.WorkerSourceTask.convertTransformedRecord(WorkerSourceTask.java:295)
Aug  3 09:13:39 ip-172-30-32-13 connect-distributed: at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:324)
Aug  3 09:13:39 ip-172-30-32-13 connect-distributed: at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:248)
Aug  3 09:13:39 ip-172-30-32-13 connect-distributed: at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:182)
Aug  3 09:13:39 ip-172-30-32-13 connect-distributed: at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:231)
Aug  3 09:13:39 ip-172-30-32-13 connect-distributed: at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
Aug  3 09:13:39 ip-172-30-32-13 connect-distributed: at java.util.concurrent.FutureTask.run(FutureTask.java:266)
Aug  3 09:13:39 ip-172-30-32-13 connect-distributed: at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
Aug  3 09:13:39 ip-172-30-32-13 connect-distributed: at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
Aug  3 09:13:39 ip-172-30-32-13 connect-distributed: at java.lang.Thread.run(Thread.java:748)
Aug  3 09:13:39 ip-172-30-32-13 connect-distributed: [2021-08-03 09:13:39,352] INFO Stopping down connector (io.debezium.connector.common.BaseSourceTask:238)
Aug  3 09:13:39 ip-172-30-32-13 connect-distributed: [2021-08-03 09:13:39,402] INFO Stopped reading binlog after 0 events, no new offset was recorded (io.debezium.connector.mysql.MySqlStreamingChangeEventSource:1173)

Then I tried with replacing the key.converter.dataFormat and value.converter.dataFormat to lower case json, after that I'm getting this error.

Aug  3 09:22:11 ip-172-30-32-13 connect-distributed: [2021-08-03 09:22:11,124] ERROR WorkerSourceTask{id=mysql-connector-01-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:184)
Aug  3 09:22:11 ip-172-30-32-13 connect-distributed: java.lang.NoClassDefFoundError: Could not initialize class com.kjetland.jackson.jsonSchema.JsonSchemaConfig$
Aug  3 09:22:11 ip-172-30-32-13 connect-distributed: at com.kjetland.jackson.jsonSchema.JsonSchemaGenerator.<init>(JsonSchemaGenerator.scala:283)
Aug  3 09:22:11 ip-172-30-32-13 connect-distributed: at com.amazonaws.services.schemaregistry.serializers.json.JsonSerializer.<init>(JsonSerializer.java:67)
Aug  3 09:22:11 ip-172-30-32-13 connect-distributed: at com.amazonaws.services.schemaregistry.serializers.GlueSchemaRegistrySerializerFactory.lambda$getInstance$1(GlueSchemaRegistrySerializerFactory.java:52)
Aug  3 09:22:11 ip-172-30-32-13 connect-distributed: at java.util.concurrent.ConcurrentHashMap.computeIfAbsent(ConcurrentHashMap.java:1660)
Aug  3 09:22:11 ip-172-30-32-13 connect-distributed: at com.amazonaws.services.schemaregistry.serializers.GlueSchemaRegistrySerializerFactory.getInstance(GlueSchemaRegistrySerializerFactory.java:52)
Aug  3 09:22:11 ip-172-30-32-13 connect-distributed: at com.amazonaws.services.schemaregistry.serializers.GlueSchemaRegistrySerializationFacade.getSchemaDefinition(GlueSchemaRegistrySerializationFacade.java:154)
Aug  3 09:22:11 ip-172-30-32-13 connect-distributed: at com.amazonaws.services.schemaregistry.serializers.GlueSchemaRegistryKafkaSerializer.prepareInput(GlueSchemaRegistryKafkaSerializer.java:147)
Aug  3 09:22:11 ip-172-30-32-13 connect-distributed: at com.amazonaws.services.schemaregistry.serializers.GlueSchemaRegistryKafkaSerializer.serialize(GlueSchemaRegistryKafkaSerializer.java:103)
Aug  3 09:22:11 ip-172-30-32-13 connect-distributed: at com.amazonaws.services.schemaregistry.kafkaconnect.jsonschema.JsonSchemaConverter.fromConnectData(JsonSchemaConverter.java:140)
Aug  3 09:22:11 ip-172-30-32-13 connect-distributed: at org.apache.kafka.connect.storage.Converter.fromConnectData(Converter.java:63)
Aug  3 09:22:11 ip-172-30-32-13 connect-distributed: at org.apache.kafka.connect.runtime.WorkerSourceTask.lambda$convertTransformedRecord$1(WorkerSourceTask.java:295)
Aug  3 09:22:11 ip-172-30-32-13 connect-distributed: at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:156)
Aug  3 09:22:11 ip-172-30-32-13 connect-distributed: at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:190)
Aug  3 09:22:11 ip-172-30-32-13 connect-distributed: at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:132)
Aug  3 09:22:11 ip-172-30-32-13 connect-distributed: at org.apache.kafka.connect.runtime.WorkerSourceTask.convertTransformedRecord(WorkerSourceTask.java:295)
Aug  3 09:22:11 ip-172-30-32-13 connect-distributed: at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:324)
Aug  3 09:22:11 ip-172-30-32-13 connect-distributed: at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:248)
Aug  3 09:22:11 ip-172-30-32-13 connect-distributed: at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:182)
Aug  3 09:22:11 ip-172-30-32-13 connect-distributed: at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:231)
Aug  3 09:22:11 ip-172-30-32-13 connect-distributed: at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
Aug  3 09:22:11 ip-172-30-32-13 connect-distributed: at java.util.concurrent.FutureTask.run(FutureTask.java:266)
Aug  3 09:22:11 ip-172-30-32-13 connect-distributed: at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
Aug  3 09:22:11 ip-172-30-32-13 connect-distributed: at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
Aug  3 09:22:11 ip-172-30-32-13 connect-distributed: at java.lang.Thread.run(Thread.java:748)
Aug  3 09:22:11 ip-172-30-32-13 connect-distributed: [2021-08-03 09:22:11,126] INFO Stopping down connector (io.debezium.connector.common.BaseSourceTask:238)
Aug  3 09:22:11 ip-172-30-32-13 connect-distributed: [2021-08-03 09:22:11,138] INFO Stopped reading binlog after 0 events, no new offset was recorded (io.debezium.connector.mysql.MySqlStreamingChangeEventSource:1173)

Allow setting reader schema in GenericDatumReader

The current implementation of AWSKafkaAvroDeserializer.deserialize(String topic, byte[] data) does not allow to set a certain reader schema for the generic record case. In Confluent KafkaAvroDeserializer we have used this method:

public Object deserialize(String s, byte[] bytes, Schema readerSchema) {
        return this.deserialize(bytes, readerSchema);
    }

In AWSAvroDeserializer the Avro Reader is constructed this way:

public DatumReader<Object> createDatumReader(Schema writerSchema, UUID schemaVersionId)
            throws InstantiationException, IllegalAccessException {

        switch (this.avroRecordType) {
            case SPECIFIC_RECORD:
                @SuppressWarnings("unchecked")
                Class<SpecificRecord> readerClass = SpecificData.get().getClass(writerSchema);

                Schema readerSchema = readerClass.newInstance().getSchema();
                log.debug("Using SpecificDatumReader for de-serializing Avro message, schema version id: {}, schema: {})",
                        schemaVersionId, readerSchema.toString());
                return new SpecificDatumReader<>(writerSchema, readerSchema);

            case GENERIC_RECORD:
                log.debug("Using GenericDatumReader for de-serializing Avro message, schema version id: {}, schema: {})",
                        schemaVersionId, writerSchema.toString());
                return new GenericDatumReader<>(writerSchema);

The GenericDatumReader is only constructed by the writerSchema. Would it be possible to add another parameter to the deserialize method and using the constructor new GenericDatumReader(writerSchema, readerSchema) ?

Background of the question is that we want to read some generic data always with version 1 of the schema.

Support environment variables for Kafka Connect config

Hi all,

We're using the latest Glue Schema Registry source with our dockerized Kafka Connect set up which uses environment variables for it's configuration https://docs.confluent.io/platform/current/installation/docker/config-reference.html#kconnect-long-configuration

This in theory means properties such as value.converter.avroRecordType=GENERIC_RECORD would be represented as CONNECT_VALUE_CONVERTER_AVRORECORDTYPE: GENERIC_RECORD in our environment variables for the container, following the Confluent docs above. However, this runs into:
[2021-05-24 12:00:31,125] INFO avroRecordType key is not present in the configs {avrorecordtype=GENERIC_RECORD, schemaautoregistrationenabled=true, region=eu-west-1} (com.amazonaws.services.schemaregistry.common.configs.GlueSchemaRegistryConfiguration)

I've tried:

  • CONNECT_VALUE_CONVERTER_avroRecordType
  • CONNECT_VALUE_CONVERTER_AVRORECORDTYPE
  • CONNECT_VALUE_CONVERTER_AVRO_RECORD_TYPE
  • CONNECT_VALUE_CONVERTER_AVRO__RECORD__TYPE
  • CONNECT_VALUE_CONVERTER_AVRO___RECORD___TYPE

Which all result in the same lowercase avrorecordtype in the configs e.g. {avrorecordtype=GENERIC_RECORD}

Is it possible to support setting these properties via environment variables? Perhaps by ignoring case in the presence check?

SerDe class for Apache Avro with Kafka Streams

Hello! We're using MSK to stream messages here in the project, but I can't seem to find the SerDe class mentioned in the documentation. I have a simple producer app that uses the Client API to produce messages, and that one uses the regular Kafka libraries. The simple serializer and deserializer classes for use with the Client APIs are there and seem to be work properly, but the class AWSKafkaAvroSerDe mentioned specifically for the Kafka Streams API is proving hard to find.

When I type AWSKafkaAvro and make the IDE autocomplete the entry, only AWSKafkaAvroSerializer and AWSKafkaAvroDeserializer are shown... AWSKafkaAvroSerDe isn't there.

Is there a specific lib for that class or am I doing something wrong here?

Doc I'm referring to here. The class mentioned can be found under the "Kafka Streams Application Example Code" section.

Key and value serializers for the same topic use the same Glue schema

Note: I'm coming from a world where we use the confluent schema registry so i apologize if i'm misunderstanding something.

Confluent schema registry registers 2 schemas for topics, one for values and one for keys. these schemas are postfixed with -value and -key respectively. It looks like the serializers support this by accepting an isKey argument. However, when i run my application, they both seem to use the same schema in Glue and the 2nd schema shows up as Failed. Am I doing something wrong or is this a bug?

AWSSchemaNamingStrategy.java#L31 seems to be the culprit. one function takes in isKey and then drops it

Http client conflicts

Hi,

I'm getting the error below when integrating schema registry in our project.

Caused by: software.amazon.awssdk.core.exception.SdkClientException: Multiple HTTP implementations 
were found on the classpath. To avoid non-deterministic loading implementations, please explicitly provide 
an HTTP client via the client builders, set the software.amazon.awssdk.http.service.impl system property 
with the FQCN of the HTTP service to use as the default, or remove all but one HTTP implementation 
from the classpath

I cannot resolve this with the directions above (with our current httpClient), and believe the issue is that UrlConnectionHttpClient is used explicitly in the builder below.

GlueClientBuilder glueClientBuilder = GlueClient
                .builder()
                .credentialsProvider(credentialsProvider)
                .overrideConfiguration(overrideConfiguration)
                .httpClient(UrlConnectionHttpClient.builder().build())
                .region(Region.of(glueSchemaRegistryConfiguration.getRegion()));

source

Apache Druid client for AWS MSK + AWS glue schema registry @question

Hi,
Currently we use Apache Druid as Kafka consumer/ producer with Confluent Kafka + Confluent Schema Registry. We are assessing to move to AWS MSK however, not sure if the Apache Druid will continue as Kafka consumer/ producer along with + AWS glue schema registry. Is this supported? If so, is there any reference/ link on how to configure AWS glue schema registry with Apache Druid?

Thanks, Murali

POJO for AVRO schema

Is there any support for generating Java Class from AVRO schema?
Also we use Spring Boot in our projects heavily. Is there any example of using Spring-Kafka with AWS Glue Schema Registry, given that AWS Glue schema registry does not expose any HHP endpoint?

GetSchemaVersionRequest in AWSSchemaRegistryClient does not specify a registry name

Hi.

I've been trying to figure out the minimum permissions required to set up Glue schema registry based serialization/deserialization (avro) as we consume/produce events to kafka.

Realized through line by line debugging that the getSchemaVersionRequest does not specify a registry name (https://github.com/awslabs/aws-glue-schema-registry/blob/master/common/src/main/java/com/amazonaws/services/schemaregistry/common/AWSSchemaRegistryClient.java#L222). This requires me to add the following wide permissions for my application to work.

        {
            "Sid": "VisualEditor1",
            "Effect": "Allow",
            "Action": [
                "glue:GetSchemaVersion"
            ],
            "Resource": [
                "*"
            ]
        }

I would like to instead just give such permission to the specific registry I am using right now.

Is it possible to specify a registry name in this request?

Using AWS signature for authentication

Hi.

I use IAM service accounts in k8s to give correct permissions to each pod talking to AWS systems. I could not find a documentation for how to do this for Glue schema registry. Is there a plan to add this functionality?

Currently, I need to add the permission to the worker node's IAM role, which is not ideal.

Support for using temporary credentials

Hi,

We have a use case where devs are developing locally using an IAM user in account A. But, the glue registry is in account B. We want to be able to get temporary credentials and then use this library to read / update the avro registry. Looking through the examples and the docs, there seems to be no documentation regarding that.

Is there a way to be able to pass AWS temporary credentials before interacting with AWS API a la https://docs.aws.amazon.com/AmazonS3/latest/userguide/AuthUsingTempSessionToken.html? Or is there an alternative here?

AWSKafkaAvroConverter throwing because "Didn't find secondary deserializer" when not trying to use a secondary deserializer

Context

Using MSK and MSK Connect , with the confluent S3 sink connector and the avro-kafkaconnect-converter from this repository.
We have a producer sending messages to a topic which is a lambda function, written in Dotnet using the confluent schema registry serdes class to serialize the data into an Avro Generic Record.

Issue

The converter is throwing an error when it is trying to deserialize messages. It looks like it is trying to find the secondary deserializer but I don't want it to use a secondary deserializer and haven't provided config for one.

The stack trace

[Worker-] org.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed due to serialization error:
--
[Worker-] 	at com.amazonaws.services.schemaregistry.kafkaconnect.AWSKafkaAvroConverter.toConnectData(AWSKafkaAvroConverter.java:119)
[Worker-] 	at org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:87)
[Worker-] 	at org.apache.kafka.connect.runtime.WorkerSinkTask.convertValue(WorkerSinkTask.java:545)
[Worker-] 	at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$1(WorkerSinkTask.java:501)
[Worker-] 	at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:156)
[Worker-] 	at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:190)
[Worker-] 	at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:132)
[Worker-] 	at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:501)
[Worker-] 	at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:478)
[Worker-] 	at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:328)
[Worker-] 	at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232)
[Worker-] 	at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)
[Worker-] 	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:189)
[Worker-] 	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:238)
[Worker-] 	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
[Worker-] 	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
[Worker-] 	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
[Worker-] 	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
[Worker-] 	at java.base/java.lang.Thread.run(Thread.java:829)
[Worker-] Caused by: com.amazonaws.services.schemaregistry.exception.AWSSchemaRegistryException: Didn't find secondary deserializer.
[Worker-] 	at com.amazonaws.services.schemaregistry.deserializers.SecondaryDeserializer.deserialize(SecondaryDeserializer.java:65)
[Worker-] 	at com.amazonaws.services.schemaregistry.deserializers.avro.AWSKafkaAvroDeserializer.deserializeByHeaderVersionByte(AWSKafkaAvroDeserializer.java:150)
[Worker-] 	at com.amazonaws.services.schemaregistry.deserializers.avro.AWSKafkaAvroDeserializer.deserialize(AWSKafkaAvroDeserializer.java:114)
[Worker-] 	at com.amazonaws.services.schemaregistry.kafkaconnect.AWSKafkaAvroConverter.toConnectData(AWSKafkaAvroConverter.java:117)
[Worker-] 	... 18 more
[Worker-] [2022-02-03 09:26:58,641] ERROR [emma-tenure-changes-v1\|task-1] WorkerSinkTask{id=emma-tenure-changes-v1-1} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:191)

This is the config supplied to the connector

connector.class=io.confluent.connect.s3.S3SinkConnector
s3.region=eu-west-2
flush.size=1
schema.compatibility=BACKWARD
tasks.max=2
topics=tenure-api
storage.class=io.confluent.connect.s3.storage.S3Storage
s3.bucket.name=<bucket-name>
s3.sse.kms.key.id=<key-id>
format.class=io.confluent.connect.s3.format.parquet.ParquetFormat
partitioner.class=io.confluent.connect.storage.partitioner.DefaultPartitioner

value.converter.schemaAutoRegistrationEnabled=true
value.converter.registry.name=schema-registry
value.convertor.schemaName=tenure-api
value.converter.avroRecordType=GENERIC_RECORD
value.converter.region=eu-west-2
value.converter.schemas.enable=true
value.converter=com.amazonaws.services.schemaregistry.kafkaconnect.AWSKafkaAvroConverter

key.converter=org.apache.kafka.connect.storage.StringConverter

Uploaded the plugin by adding the Glue Schema Registry jar file and all the jar files in the target/dependencies folder for avro-kafkaconnect-converter into lib, zipped it up and uploaded it S3.

aws-glue-schema-registry v1.8.1
Kafka v2.8.1
MSK v2.7.1

Kafka 2.5.0 vulnerability - please update to 2.8.1 or newer

We have a security scanner that scans all direct and transitive dependencies of our software.

Kafka 2.5.0 came up with this https://sca.analysiscenter.veracode.com/vulnerability-database/security/timing-attack/java/sid-32216

Apache Kafka is vulnerable to timing attack. The library validates password or key using Arrays.equals, allowing an attacker to leak credentials via brute force attacks.

Our top-level dependency is:

    <groupId>com.amazonaws</groupId>
    <artifactId>amazon-kinesis-producer</artifactId>
    <version>0.14.8</version>

Please could you update to 2.8.1 as it suggests? We also would then need the new release to be bumped in amazon-kinesis-producer.

Strange schema issue with Kafka Connect

In my Kafka Connect connector I have 2 different schemas for key and value data with different names:

			// Schema init - keySchema is immutable and always 1
			final SchemaBuilder keySchemaBuilder = SchemaBuilder
						.struct()
						.required()
						.name(tableFqn + ".Key")
						.version(1);
			final SchemaBuilder valueSchemaBuilder = SchemaBuilder
						.struct()
						.optional()
						.name(tableFqn + ".Value")
						.version(version);

SourceRecord is created using both schemas:

				keySchema = keySchemaBuilder.build();
				valueSchema = valueSchemaBuilder.build();
				//
				// Populations of Struct for key and value...
				//
				sourceRecord = new SourceRecord(
					sourcePartition,
					offset,
					kafkaTopic,
					keySchema,
					keyStruct,
					valueSchema,
					valueStruct);

My connect standalone.properties contains:

bootstrap.servers=localhost:9092

internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter.schemas.enable=false

offset.storage.file.filename=/tmp/connect.offsets

offset.flush.interval.ms=10000
poll.interval.ms=5000

# Glue Schema Registry Specific Converters
key.converter=com.amazonaws.services.schemaregistry.kafkaconnect.AWSKafkaAvroConverter
key.converter.schemas.enable=true
value.converter=com.amazonaws.services.schemaregistry.kafkaconnect.AWSKafkaAvroConverter
value.converter.schemas.enable=true

# Converter-specific settings can be passed in by prefixing the Converter's setting
# with the converter we want to apply it to
key.converter.region=eu-north-1
key.converter.schemaAutoRegistrationEnabled=true
key.converter.avroRecordType=GENERIC_RECORD
# Default value: default-registry
key.converter.registry.name=TestSchemaRegistry001
value.converter.region=eu-north-1
value.converter.schemaAutoRegistrationEnabled=true
value.converter.avroRecordType=GENERIC_RECORD
# Default value: default-registry
value.converter.registry.name=TestSchemaRegistry001

This produces very strange schemas, two versions with same name in Glue Schema Registry (Kafka Connect schema names are different). Under version 1 - key schema:

{
  "type": "record",
  "name": "Key",
  "namespace": "SCOTT.SOURCE_CLONE",
  "fields": [
    {
      "name": "OWNER",
      "type": "string"
    },
    {
      "name": "NAME",
      "type": "string"
    },
    {
      "name": "TYPE",
      "type": "string"
    },
    {
      "name": "LINE",
      "type": "double"
    },
    {
      "name": "ORIGIN_CON_ID",
      "type": "double"
    }
  ],
  "connect.version": 1,
  "connect.name": "SCOTT.SOURCE_CLONE.Key"
}

version 2 is ... Value schema:

{
  "type": "record",
  "name": "Value",
  "namespace": "SCOTT.SOURCE_CLONE",
  "fields": [
    {
      "name": "TEXT",
      "type": [
        "null",
        "string"
      ],
      "default": null
    }
  ],
  "connect.version": 1,
  "connect.name": "SCOTT.SOURCE_CLONE.Value"
}

Schemas have different connect.name and different name. Only namespace is same.

Why did schemas with different connect.name and name become one schema with different versions?

Could you please help me with resolving.

consume records with glue schema registry on cli

I used to use the command like below

docker exec -it schema-registry /usr/bin/kafka-avro-console-consumer \
--bootstrap-server kafka:29092 \
--property  \
--from-beginning \
--topic some-topic

but when testing with glue schema registry, I got stuck because glue schema registry doesn't provide http endpoint.
Do you guys have any plan to support this feature or support http endpoint for glue?

We need more examples with regular kafka tools

I was able to use kafka-console-consumer with this library to Avro deserialize a MSK topic data using Glue Schema registry , topic data was published by this library kafka-connect cluster support and Glue schema registry, kafka-connect works like a charm with GSR.

I am wondering how can I use "kafka-console-producer" to Avro Serialize the data and register the defined schema of this command to GSR. is there any property of the serializer I can pass to define the Avro Schema ?

Thank you.

Too much logging for AWSKafkaAvroSerializer

The AWSKafkaAvroSerializer log an info every message serialize.

INFO avro.AWSKafkaAvroSerializer: Schema Version Id is null. Trying to register the schema.
INFO avro.AWSKafkaAvroSerializer: Schema Version Id received from the from schema registry: xxxxx

Also a syntax error on logging

from the from

Is protobuf support planned in the future?

Hi all.

After digging a bit on the codebase I didn't find serializers/deserializers for protocol buffers. I would like to know if there is plan to support this format or if there is no plans for it in the foreseeable future.

Thanks guys.

Extend customization of schema naming strategy

Hello,

We would like to publish schema from kafka connect to glue schema registry. We would like to name the created schemas in glue registry as <topicname>-key and <topicname>-value.
So we implemented AWSSchemaNamingStrategy interface using CustomerSchemaNamingStrategy as an example. However the overridden method "public String getSchemaName(String transportName, Object data)" do not provide information on whether the schema is for key or for value so we cannot append "-key" or "-value" to the topicname to derive the schema name. Since the "AWSKafkaAvroSerializer::configure()" method already has isKey as the parameter, would you be able to extend the AWSSchemaNamingStrategy to include a method getSchemaName(String transportName, Object data, boolean isKey)" ? Or is there any other way we would be able to determine whether the schema name is for the key or value ? Thanks!

SecondaryDeserializer should not be a singleton

Hi,

we are migrating our deserializers to GLUE using the fallback option with SecondaryDeserializer.
We have both Avro SpecificDatumReader and GenericDatumReader in our project so we need to have different types of SecondaryDeserializer.
However, the SecondaryDeserializer is a singleton and we can't have two instances of it.
It would be better if the we can have one SecondaryDeserializer instance per AWSKafkaAvroDeserializer.

Allow record based naming strategies

Hi,

we are currently migrating our registry to AWS Glue.
There are some kafka topics in our project that do not have a specific avro schema attached, but they can receive messages serialized by different schemas.
Therefore, we would like to register those schemas not on a topic based naming strategy.
However, interface AWSSchemaNamingStrategy does only provide a method getSchemaName(String transportName), that generates schema name based on the topic.

It would be great if the interface method could be extended by the schema parameter. This allows to implement strategies that are record based.
As far as I can see, prepareInput in AWSKafkaAvroSerializer already gets data as input, so one could use AVROUtils to extract schema and pass it to getSchemaName.

Support for testing Schema Registry locally

Hi,
We are looking to integrate our Kafka Streams app with Glue Schema Registry.
One obstacle for us is how we stub out Glue Schema Registry in our integration tests which use TopologyTestDriver.

Would you mind adding some guidance docs on how best to approach this?
The Confluent Schema Registry library offers this class which makes it easy, something similar for AWS Glue would be much appreciated https://github.com/confluentinc/schema-registry/blob/master/client/src/main/java/io/confluent/kafka/schemaregistry/testutil/MockSchemaRegistry.java

Many thanks

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.