Git Product home page Git Product logo

Comments (9)

mtth avatar mtth commented on July 17, 2024

This is typically caused by invalid data fed to avsc (encoding mismatch, data prefix, ...). It's hard to say more without data to reproduce it.

from avsc.

tommy38hk avatar tommy38hk commented on July 17, 2024

I created a sample schema and message that produced the same error.

{
  "fields": [
    {
      "name": "id",
      "type": [
        "null",
        "long"
      ]
    },
    {
      "name": "name",
      "type": [
        "null",
        {
          "avro.java.string": "String",
          "type": "string"
        }
      ]
    },
    {
      "name": "email",
      "type": [
        "null",
        {
          "avro.java.string": "String",
          "type": "string"
        }
      ]
    },
    {
      "name": "timestamp",
      "type": [
        "null",
        "long"
      ]
    },
    {
      "name": "items",
      "type": [
        "null",
        {
          "items": {
            "fields": [
              {
                "name": "name",
                "type": [
                  "null",
                  {
                    "avro.java.string": "String",
                    "type": "string"
                  }
                ]
              },
              {
                "name": "price",
                "type": [
                  "null",
                  "double"
                ]
              }
            ],
            "name": "Item",
            "type": "record"
          },
          "type": "array"
        }
      ]
    }
  ],
  "name": "User",
  "namespace": "org.example.test.testkafka",
  "type": "record"
}

And a Java Kafka producer

package org.example.test.testkafka;// Import libraries

import io.confluent.kafka.serializers.KafkaAvroSerializer;
import io.confluent.kafka.serializers.KafkaAvroSerializerConfig;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
@Service

public class KafkaProducerExample {
    @Value("${kafka.bootstrap-servers}")
    private String bootstrapServers;
    @Value("${kafka.client-id}")
    private String clientId;

    @Value("${kafka.username}")
    private String username;

    @Value("${kafka.password}")
    private String password;

    @Value("${kafka.schema-registry-url}")
    String schemaRegistryUrl;
    @Value("${kafka.schema-registry-user-info}")
    String schemaRegistryUserInfo;


    public void test(){
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
        props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
        props.put("basic.auth.user.info", schemaRegistryUserInfo);
        props.put("basic.auth.credentials.source", "USER_INFO");
        props.put("schema.registry.url", schemaRegistryUrl);
        props.put(KafkaAvroSerializerConfig.AUTO_REGISTER_SCHEMAS, false);
        props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
        props.put(SaslConfigs.SASL_JAAS_CONFIG, "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"" + username + "\" password=\"" + password + "\";");

        try (Producer<String, GenericRecord> producer = new org.apache.kafka.clients.producer.KafkaProducer<>(props)) {
            // Create a User record
            User user = new User();
            user.setId(1L);
            user.setName("John Doe");
            user.setEmail("[email protected]");
            user.setTimestamp(System.currentTimeMillis());

            // Create an Item record
            Item item = new Item();
            item.setName("Product A");
            item.setPrice(19.99);

            user.setItems(Collections.singletonList(item));

            // Produce the User record to Kafka
            ProducerRecord<String, GenericRecord> record = new ProducerRecord<>("example", null, user);
            producer.send(record).get();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

{
  "id": 1,
  "name": "John Doe",
  "email": "[email protected]",
  "timestamp": 1709667230821,
  "items": [
    {
      "name": "Product A",
      "price": 19.99
    }
  ]
}

from avsc.

mtth avatar mtth commented on July 17, 2024

The producer code doesn't have enough information; we need to see what data avsc tries to decode. Can you share the value of message in your example?

from avsc.

tommy38hk avatar tommy38hk commented on July 17, 2024

I had already shared, I am reposting again here

{
  "id": 1,
  "name": "John Doe",
  "email": "[email protected]",
  "timestamp": 1709667230821,
  "items": [
    {
      "name": "Product A",
      "price": 19.99
    }
  ]
}

from avsc.

mtth avatar mtth commented on July 17, 2024

That doesn't look like message, it doesn't have the right fields:

async function handleMessage(message) { // We need this message
  try {
    const messageAsString = JSON.stringify(message);
    const messageObject = JSON.parse(messageAsString);
    const avroMessage = messageObject.message.value;
    const buffer = Buffer.from(avroMessage.data); // So we can see the buffer avsc sees
    let type = avro.Type.forSchema(yourAvroSchema);
    let decoded = type.fromBuffer(buffer);
// ...

from avsc.

tommy38hk avatar tommy38hk commented on July 17, 2024

I added some console.log in between as follows:-

		const messageAsString = JSON.stringify(message);
		const messageObject = JSON.parse(messageAsString);

		console.log('messageObject:', messageObject);
		const bufferData = Buffer.from(messageObject.message.value.data);
		
		const hexString = bufferData.toString('hex');
		const formattedString = hexString.match(/../g).join(' ');
		const decodedString = formattedString.toString("utf-8");
		const buffer2 = Buffer.from(decodedString)
		const decodedString2 = buffer2.toString("utf-8");
		console.log(decodedString2); 
		let decoded = type.fromBuffer(bufferData);

And the output is

node consume-schema-topic.js
messageObject: {
  topic: 'example',
  partition: 0,
  message: {
    magicByte: 2,
    attributes: 0,
    timestamp: '1711541805856',
    offset: '0',
    key: null,
    value: { type: 'Buffer', data: [Array] },
    headers: {},
    isControlRecord: false,
    batchContext: {
      firstOffset: '0',
      firstTimestamp: '1711541805856',
      partitionLeaderEpoch: 0,
      inTransaction: false,
      isControlBatch: false,
      lastOffsetDelta: 0,
      producerId: '95946902',
      producerEpoch: 0,
      firstSequence: 0,
      maxTimestamp: '1711541805856',
      timestampType: 0,
      magicByte: 2
    }
  }
}
00 00 01 87 1b 02 02 02 10 4a 6f 68 6e 20 44 6f 65 02 28 6a 6f 68 6e 2e 64 6f 65 40 65 78 61 6d 70 6c 65 2e 63 6f 6d 02 d6 fa ba fd cf 63 02 02 02 12 50 72 6f 64 75 63 74 20 41 02 3d 0a d7 a3 70 fd 33 40 00
{"level":"ERROR","timestamp":"2024-03-27T12:19:23.782Z","logger":"kafkajs","message":"[Runner] Error when calling eachMessage","topic":"example","partition":0,"offset":"0","stack":"TypeError: Right-hand side of 'instanceof' is not an object\n    at Runner.handleMessage [as eachMessage] (C:\\Users\\myuserid\\testkafka\\consume-schema-topic.js:166:13)\n    at Runner.processEachMessage (C:\\Users\\myuserid\\testkafka\\node_modules\\kafkajs\\src\\consumer\\runner.js:231:20)\nat onBatch (C:\\Users\\myuserid\\testkafka\\node_modules\\kafkajs\\src\\consumer\\runner.js:447:20)\n    at Runner.handleBatch (C:\\Users\\myuserid\\testkafka\\node_modules\\kafkajs\\src\\consumer\\runner.js:461:11)\n    at handler (C:\\Users\\myuserid\\testkafka\\node_modules\\kafkajs\\src\\consumer\\runner.js:58:30)\n    at C:\\Users\\myuserid\\testkafka\\node_modules\\kafkajs\\src\\consumer\\worker.js:29:15\n    at Object.run (C:\\Users\\myuserid\\testkafka\\node_modules\\kafkajs\\src\\utils\\sharedPromiseTo.js:14:17)\n    at C:\\Users\\myuserid\\testkafka\\node_modules\\kafkajs\\src\\consumer\\workerQueue.js:27:38\n    at Array.forEach (<anonymous>)\n    at Object.push (C:\\Users\\myuserid\\testkafka\\node_modules\\kafkajs\\src\\consumer\\workerQueue.js:27:13)","error":{}}{"level":"ERROR","timestamp":"2024-03-27T12:19:28.065Z","logger":"kafkajs","message":"[Consumer] Crash: KafkaJSNonRetriableError: Right-hand side of 'instanceof' is not an object","groupId":"microservice1-consumer-group","stack":"KafkaJSNonRetriableError: Right-hand side of 'instanceof' is not an object\n    at C:\\Users\\myuserid\\testkafka\\node_modules\\kafkajs\\src\\retry\\index.js:55:18\nat process.processTicksAndRejections (node:internal/process/task_queues:95:5)"}{"level":"INFO","timestamp":"2024-03-27T12:19:28.135Z","logger":"kafkajs","message":"[Consumer] Stopped","groupId":"microservice1-consumer-group"}
C:\Users\myuserid\testkafka>

from avsc.

mtth avatar mtth commented on July 17, 2024

Thanks, that's helpful. It looks like the system you are using is adding a prefix to the data, you'll need to strip it before decoding. In this example, it's 5 bytes:

> b = Buffer.from(/* above data: 000001871b... */, 'hex')
> t = avro.Type.forSchema(/* above schema */)
> t.fromBuffer(b.subarray(5))
User {
  id: 1,
  name: 'John Doe',
  email: '[email protected]',
  timestamp: 1711541804715,
  items: [ Item { name: 'Product A', price: 19.99 } ]
}

from avsc.

tommy38hk avatar tommy38hk commented on July 17, 2024

Thanks @mtth. This is working as per suggestion. Just that I am not sure if it is always starting from 5.

from avsc.

mtth avatar mtth commented on July 17, 2024

I'm afraid this is out of scope of avsc. You'll need to dig into the enclosing system setup to figure out the prefix' properties.

from avsc.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.