Git Product home page Git Product logo

avrora's Issues

Encoding with specific schema version

For better out-of-band support will be cool to encode message with a specific version of the schema and do not register unknown schema on a resolution. When producer and consumer are not allowed to register schemas and retrieve schemas only from CSR

Improve client errors

It makes sense to introduce the Avro Error module with the ability to add a few more things to the client errors. It should make development and debug easier.

For instance, :subject_not_found, should be given with subject name, otherwise it's very hard to grasp which one subject, especially with the nested references feature.

Support for registering schemas under custom subject

As I understand, automatic schema registration always registers schemas under subjects named after the schema's full name (namespace + schema name). However, I've encountered a need to register a schema under a subject named after a kafka topic, because of the expectations of the system we're integrating with. I think this might be a pretty common usecase, as Confluent Schema Registry specifies TopicNameStrategy as the default https://docs.confluent.io/platform/current/schema-registry/serdes-develop/index.html#subject-name-strategy.

I was able to implement a workaround in our code by manually registering the schema

  defp ensure_registered_schema() do
    with {:ok, nil} <- Avrora.Storage.Memory.get(@schema_full_name) do
      {:ok, schema} = Avrora.Storage.File.get(@schema_full_name)
      {:ok, registered_schema} = Avrora.Storage.Registry.put("#{@topic_name}-value", schema.json)
      {:ok, _} = Avrora.Storage.Memory.put(@schema_full_name, registered_schema)
    end
  end

but I was wondering whether there's a simpler way to do it.

avro_turf allows specifying the subject name when encoding a payload https://github.com/dasch/avro_turf/blob/master/lib/avro_turf/messaging.rb#L87. It would be very convenient if Avrora would support something like that.

PS: Thanks for your work on the library! :)

Telemetry support

Would you be open to a contribution adding telemetry (https://github.com/beam-telemetry/telemetry) support to this library? It'd be nice to have telemetry data for the following events:

  • HTTP requests to the schema registry server
  • Message encode
  • Message decoding
  • Schema encoding
  • Schema decoding

I should be able to make a contribution if this is desired.

Allow auto registration of the schemas to be configured

An idea is to be able to stop registering schemas if configured so.

config :avrora,
  registry_schemas_autoreg: false # defaults to `true`

When auto registration is turned off it will read the file from the disc only if the registry is not configured.

Linked #41

Decoding an encoded record wraps the produced map in a list

Hi,

thanks for the library, looks really promising! However, I stumbled over the following, which seems like a bug to me.

Consider the following Avro schema:

{
  "namespace": "foo",
  "type": "record",
  "name": "Bar",
  "version": "1.0",
  "fields": [
    {
      "name": "baz",
      "type": "string"
    }
  ]
}

This schema can be used to encode %{"baz" => "test"}:

iex> {:ok, encoded} = Avrora.encode(%{"baz" => "test"}, schema_name: "foo.Bar")
{:ok,
 <<79, 98, 106, 1, 3, 144, 2, 20, 97, 118, 114, 111, 46, 99, 111, 100, 101, 99,
   8, 110, 117, 108, 108, 22, 97, 118, 114, 111, 46, 115, 99, 104, 101, 109, 97,
   212, 1, 123, 34, 110, 97, 109, 101, 115, 112, 97, 99, 101, ...>>}

However, when decoding this again, the result is not a map like I would've expected; instead, the map is wrapped in a list:

iex> Avrora.decode(encoded)
{:ok, [%{"baz" => "test"}]}

Sure enough, encoding this again fails:

iex> Avrora.encode([%{"baz" => "test"}], schema_name: "foo.Bar")
** (FunctionClauseError) no function clause matching in Avrora.Encoder.encode/2

Warnings on compilation

Hi! Compilation is issuing these warnings:

Compiling 22 files (.ex)
warning: redefining @doc attribute previously set at line 34.

Please remove the duplicate docs. If instead you want to override a previously defined @doc, attach the @doc attribute to a function head:

    @doc """
    new docs
    """
    def resolve(...)

  lib/avrora/resolver.ex:53: Avrora.Resolver.resolve/1

warning: redefining @doc attribute previously set at line 19.

Please remove the duplicate docs. If instead you want to override a previously defined @doc, attach the @doc attribute to a function head:

    @doc """
    new docs
    """
    def get(...)

  lib/avrora/storage/registry.ex:45: Avrora.Storage.Registry.get/1

They are harmless, but a clean compilation would be super.

Implement support of Schema Registry's schema references

Description

When consuming an Avro message with a schema which has a schema reference, the Avrora.decode/1 function fails with {:error, {:not_found, "type"}}.

Schema references is a new feature introduced in Confluent Platform version 5.5.0 (see here https://www.confluent.io/blog/multiple-event-types-in-the-same-kafka-topic/#avro-unions-with-schema-references)

How to reproduce

Given a schema containing a schema reference registered into the schema registry, such as:

Nested schema:

{
  "namespace": "org.schema_registry_test_app.avro",
  "type": "record",
  "name": "Result",
  "fields": [
    {
      "name": "up",
      "type": "string"
    },
    {
      "name": "down",
      "type": "string"
    }
  ]
}

Schema with reference:

{
  "namespace": "org.schema_registry_test_app.avro",
  "type": "record",
  "name": "AvroTest",
  "fields": [
    {
      "name": "id",
      "type": {
        "name": "Uuid",
        "type": "fixed",
        "size": 16
      }
    },
    {
      "name": "results",
      "type": {
        "type": "array",
        "items": "org.schema_registry_test_app.avro.Result"
      }
    }
  ]
}

Try to consume and decode a message produced with AvroTest schema.

I tried with Avrora v0.13.0.

Dialyzer issues with private clients

When creating a private client dialyzer does return a bunch of errors like mismatch between Avrora.Schema and CustomClient.Schema, but also a bunch more I'm not exactly sure what they're related to.

logicalType: date

Hi, I have a schema like:

{"name": "birthday",
 "type": "int",
 "logicalType": "date"}

In order to pass a Date sigil into it it has to be converted to "the number of days since unix epoch":

def date2int(d) when is_struct(d, Date) do
  Date.diff(d, ~D[1970-01-01])
end
def date2int(d) when is_binary(d) and 10 == byte_size(d) do
  Date.from_iso8601!(d), ~D[1970-01-01]) # this can obviously fail
end

I don't have the expertise to add handling of logicalType to the library, but leaving it here in case someone else needs this. :-)
ping @stretch

"Messages encoded with OCF are wrapped in a List" what for?

Is there any reason why messages encoded with Codec.ObjectContainerFile are wrapped in a list?

I propose to rewrite ObjectContainerFile.decode/1 as:

  def decode(payload) when is_binary(payload) do
    with {:ok, {_, _, [decoded]}} <- do_decode(payload), do: {:ok, decoded}
  end

Incorrect typespec for `Avrora.Encoder.encode/2`

The typespec states that the second argument of encode/2 is a keyword list with string values:

@spec encode(map(), keyword(String.t())) :: {:ok, binary()} | {:error, term()}

The two allowed keys in the keyword are schema_name (string) and format (atom). Calling this function with the format explicitly set to e.g. :registry causes Dialyzer to complain.

@spec encode(map(), keyword(String.t())) :: {:ok, binary()} | {:error, term()}

This is a low priority issue that is easily handled with an ignore list for Dialyzer. I'll see if I find time to file a PR to fix this (maybe also going over other type specs to see if there's any inconsistencies).

schema evolution support

Hi, this is a question around schema evolution support in avrora. I want to basically:

  • delete older fields in the reader schema
  • add new fields with default values in the reader schema

I tried the first one in erlavro, and it didn't seem to work there. Perhaps the schema evolution support isn't there in avrora as well given that erlavro doesn't have it? Is there a way to get around it?

I've added the issue in erlavro too:
klarna/erlavro#117

Maintain a CHANGELOG?

A CHANGELOG with a summary of what matters to users in each release would be really useful when upgrading.

Doesn't work with schema registry as source for writer schema

Expected

When I encode data with Avrora.encode(data, schema_name: @schema, format: :registry), I successfully get back the encoded data.

Actual

Whenever I encode data, I get back {:error, :enoent}. this line tries to read a file that doesn't exist yet. This project seems to assume that the writing schema will always be available locally. For our projects we occasionally have multiple writers of the same schema, so our schema files live independently of the specific projects.

The 0.9.x line seems to behave as expected.

Schema registration tasks

Will be good to have mix tasks to register one specific schema or all schemas (depends on arguments). Might be a need to out-of-band messages exchange, when a producer is not allowed to register schema

Option for decoding with tagged union values

erlavro has :avro_decoder_hooks.tag_unions/0 to tag unions (e.g. of multiple records). It would be great if that hook could be enabled via some option when decoding. It seems for encoding tagged and non tagged values are supported already.

Support for multiple registries

First off, thank you for the library! It's very convenient.

I'm currently encountering a challenge: I'm using a library that, too, uses Avrora. However, I'd like to use a different registry than what the library is using.

For example, the library is loading schemas from ./deps/library-name/priv/schemas, and I'd like to use my own schemas located in ./priv/schemas.

Is this something you would consider supporting?

Subject not found

Hi! I am a bit lost here, let me share with you this issue in case it happened to you too.

The library is trying to register a brand new schema, but it fails to do so. The result of http_client().post(payload, headers) in http_client_post/2 is

{:error, %{"error_code" => 40401, "message" => "Subject not found"}}

but I cannot understand how's that possible, since the API of the registry does not have 40401 among its error codes for that POST request.

Have you ever seen this? Perhaps there's a previous step in which you need to create the subject?

Obtain message metadata

Hello, I am decoding a message using auto-dectecting schema

{:ok, decoded} = Avrora.decode(message)

the result is I get is the following

{:ok,
 %{
   "eventTimestamp" => "2020-06-09T16:06:28.221315618Z",
   "userData" => %{
     "name" => "test",
     "lastName" => "test2" 
   }
 }}

I would also need the metadata for the message since I' am using event-sourcing so rather than having different topics user-created user-updated I am using a single topic user-events where every message can be user.UserCreated or user.UserUpdated.

Where can I get this information form the message payload ?

Getting a KeyError

I'm getting exceptions when trying to parse a message that is not encoded with avro.

[error] GenServer #PID<0.584.0> terminating
** (KeyError) key :original not found in: %MatchError{term: "\"uuid\":\"f6ad9736-11f1-4887-845b-aa4d778a1b79\",\"deleted_at\":1565371184722}"}
    (avrora) lib/avrora/encoder.ex:86: Avrora.Encoder.do_decode/2
    (portal) lib/portal/consumers/pipe_deleted.ex:10: anonymous fn/1 in Portal.Consumers.PipeDeleted.handle_messages/1
    (elixir) lib/enum.ex:783: Enum."-each/2-lists^foreach/1-0-"/2
    (elixir) lib/enum.ex:783: Enum.each/2
    (portal) lib/portal/consumers/pipe_deleted.ex:9: Portal.Consumers.PipeDeleted.handle_messages/1
    (portal) lib/portal/consumers/pipe_deleted.ex:2: Portal.Consumers.PipeDeleted.handle_messages/2
    (elsa) lib/elsa/group/worker.ex:93: Elsa.Group.Worker.handle_info/2
    (stdlib) gen_server.erl:637: :gen_server.try_dispatch/4
    (stdlib) gen_server.erl:711: :gen_server.handle_msg/6
    (stdlib) proc_lib.erl:249: :proc_lib.init_p_do_apply/3
Last message: {#PID<0.466.0>, {:kafka_message_set, "pipe_deleted", 0, 12, [{:kafka_message, 11, "", "{\"uuid\":\"f6ad9736-11f1-4887-845b-aa4d778a1b79\",\"deleted_at\":1565371184722}", :create, 1565371726633, []}]}}
State: %Elsa.Group.Worker.State{config: [], generation_id: 36, handler: Portal.Consumers.PipeDeleted, handler_init_args: %{}, handler_state: %{}, name: :pipe_deleted, offset: 11, partition: 0, subscriber_pid: #PID<0.466.0>, topic: "pipe_deleted"}

I solved temporarily by using Exception.message/1 thiamsantos@2cfd0bf.

But we may need to treat the root cause of this problem.

Extract Encoders to a separate modules

Technically we have 3 encoders

  1. ObjectContainerFile
  2. Registry
  3. Plain

and Avrora.Encoder knows too much about all the things. After we introduce Avrora.Encoder.extract_schema the duplication of responsibilities and some functions start overlapping.

At the same time, it would be cool to have something like Avrora.Storage behavior.

Offload Avrora.Encoder tests

Encoder tests cover way too much and should cover the main logic inside the module and stop worry about codecs

unnamed_type error with primary type schema

Hello,

Do is it possible to register a primary type' schema with Avrora?

A primary type schema is by example that:

{
    "type": "string",
    "name": "myvalue"
}

When I try to save it with the register_schema_by_name function:

Avrora.Utils.Registrar.register_schema_by_name("myschema-value")

This error happen:

13:02:04.414 [debug] reading schema `myschema-value` from the file /app/priv/schemas/myproject/myschema-value.avsc
{:error, :unnamed_type}

Strange {:error, %ArgumentError{message: "argument error"}}

I am not sure this is an issue with Avrora, but have nailed down to something minimal in which only Phoenix channels and Avrora are involved.

The summary of the problem is that if a channel encodes, it errs if you exercise it in more than one test. Weird, uh?

To demonstrate it, I have created this minimal Phoenix application. The application has one simple Avro schema, and a channel that just encodes a map when it receives a message.

Now, the surprising part is that the command

mix test test/avrora_issue_web/channels/avrora_channel_test.exs

runs two tests and fails with this output:

{:error, %ArgumentError{message: "argument error"}}
20:38:24.384 [error] GenServer #PID<0.282.0> terminating
** (MatchError) no match of right hand side value: {:error, %ArgumentError{message: "argument error"}}
    (avrora_issue) lib/avrora_issue_web/channels/avrora_channel.ex:9: AvroraIssueWeb.AvroraChannel.handle_in/3
    (phoenix) lib/phoenix/channel/server.ex:280: Phoenix.Channel.Server.handle_info/2
    (stdlib) gen_server.erl:637: :gen_server.try_dispatch/4
    (stdlib) gen_server.erl:711: :gen_server.handle_msg/6
    (stdlib) proc_lib.erl:249: :proc_lib.init_p_do_apply/3
Last message: %Phoenix.Socket.Message{event: "encode", join_ref: nil, payload: nil, ref: #Reference<0.2008719544.1328545796.244887>, topic: "avrora"}


  1) test second test (AvroraIssueWeb.AvroraChannelTest)
     test/avrora_issue_web/channels/avrora_channel_test.exs:16
     ** (EXIT from #PID<0.281.0>) an exception was raised:
         ** (MatchError) no match of right hand side value: {:error, %ArgumentError{message: "argument error"}}
             (avrora_issue) lib/avrora_issue_web/channels/avrora_channel.ex:9: AvroraIssueWeb.AvroraChannel.handle_in/3
             (phoenix) lib/phoenix/channel/server.ex:280: Phoenix.Channel.Server.handle_info/2
             (stdlib) gen_server.erl:637: :gen_server.try_dispatch/4
             (stdlib) gen_server.erl:711: :gen_server.handle_msg/6
             (stdlib) proc_lib.erl:249: :proc_lib.init_p_do_apply/3

But, if you run each test by itself, they pass:

% mix test test/avrora_issue_web/channels/avrora_channel_test.exs:16
Excluding tags: [:test]
Including tags: [line: "16"]

{:ok,
 <<79, 98, 106, 1, 3, 134, 2, 20, 97, 118, 114, 111, 46, 99, 111, 100, 101, 99,
   8, 110, 117, 108, 108, 22, 97, 118, 114, 111, 46, 115, 99, 104, 101, 109, 97,
   202, 1, 123, 34, 110, 97, 109, 101, 115, 112, 97, 99, 101, ...>>}
.

I can't for the life of me explain why this might happen, do you have any idea?

hook to skip certain fields.

I'm writing a hook to skip certain fields. Borrowing the structure from erlavro

Here's the code:


defmodule MyClient do
  use Avrora.Client,
    otp_app: :rudder,
    registry_url: "http://localhost:8081",
    registry_auth: {:basic, ["username", "password"]},
    schemas_path: "priv/schemas/",
    registry_schemas_autoreg: false,
    convert_null_values: false,
    convert_map_to_proplist: false,
    names_cache_ttl: :infinity,
    decoder_hook: &MyClient.decoder_hook/4

  def decoder_hook(type, subNameOrIndex, data, decodeFun) do
    IO.puts("\n----------------\n")

    IO.inspect(:avro.get_type_fullname(type), label: "type_fullname")
    IO.inspect(:avro.get_type_name(type), label: "type_name")
    IO.inspect(:avro.get_type_namespace(type), label: "namespace")
    IO.inspect(subNameOrIndex, label: "subNameOrIndex")

    case {:avro.get_type_fullname(type), subNameOrIndex} do
      {"io.Payment2", "amount"} ->
        # decodeFun.(data)
        nil;

      {_, _} ->
        decodeFun.(data)
    end
  end

  def test() do
    #Avrora.AvroSchemaStore.start_link()
    {:ok, oldschema2} =
      MyClient.Schema.Encoder.from_json(
        ~s({ "type": "record", "name": "Payment2", "namespace": "io", "fields": [{  "name": "id",  "type": "string"},{  "name": "amount",  "type": "double"} ]  }  )
      )

    oldschema = %{oldschema2 | id: nil, version: nil}
    payload = %{"id" => "123", "amount" => 123.45}

    {:ok, encoded} = MyClient.Codec.ObjectContainerFile.encode(payload, schema: oldschema)
    IO.inspect(encoded, label: "encoded value")

    {:ok, [decoded]} = MyClient.Codec.ObjectContainerFile.decode(encoded)
    IO.puts("\n----------------\n")

    IO.inspect(decoded, label: "decoded value")
  end
end

However on running:

iex(1)> MyClient.start_link()
{:ok, #PID<0.252.0>}
iex(2)> MyClient.test()
encoded value: <<79, 98, 106, 1, 3, 186, 2, 20, 97, 118, 114, 111, 46, 99, 111, 100, 101, 99,
  8, 110, 117, 108, 108, 22, 97, 118, 114, 111, 46, 115, 99, 104, 101, 109, 97,
  254, 1, 123, 34, 110, 97, 109, 101, 115, 112, 97, 99, 101, 34, 58, ...>>

----------------

type_fullname: "io.Payment2"
type_name: "Payment2"
namespace: "io"
subNameOrIndex: :none

----------------

type_fullname: "io.Payment2"
type_name: "Payment2"
namespace: "io"
subNameOrIndex: "id"

----------------

type_fullname: "string"
type_name: "string"
namespace: ""
subNameOrIndex: []

----------------

type_fullname: "io.Payment2"
type_name: "Payment2"
namespace: "io"
subNameOrIndex: "amount"
** (MatchError) no match of right hand side value: {:error, :schema_mismatch}
    (avrora 0.26.0) lib/avrora/myclient.ex:44: MyClient.test/0
    iex:2: (file)

Can someone advice me on how to think about the hook in general? I want to skip the io.Payment2.amount field

Support for schema evolution?

I just started using Avrora and I was trying to evolve a schema. It looks like when I encode a record and a schema isn't present, avrora will store the schema in my registry, but if I make a change to my schema and encode a new record, it will use the original schema. Same thing if you clear the memory store.

I found that if you create a new version manually, avrora will get the correct one.

I'm not sure if I'm misusing, or if the feature doesnt exist. I'd love to help if it does not.

Schema storage and reference lookup

:avro_schema_store has a few limitations, you can't delete or update schemas there. Avro.Storage.Memory is used like a cache for schema resolution, but it's used only by the Avrora.Resolver.

The issue occurs when we try to resolve a schema with references. First of all, it's different per storage, Avrora.Storage.File will read only the content of the referenced schemas and never "cache" them. Avrora.Storage.Registry on the other hand going to create new Avrora.Schema objects, but use only JSON and pass it to the reference resolver which will parse it again. None of the referenced objects will be "cached".

It feels like we need to be able to resolve references smartly without repetitive calls to the disk or registry API.

Any way to configure the registry_url at startup?

We are currently using

  use Avrora.Client,
    registry_url: Application.get_env(:the_app, TheApp.AvroClient)[:url],
    schemas_path: Application.get_env(:the_app,  TheApp.AvroClient)[:schemas_path]

to define our own client module.
But as you already know, those configurations must come from compile time environment.
Is there any way to configure it at runtime, when the application starts?
Tried looking at the supervisor tree, but didn't find anything.

Thank you in advance for your answer, and thank you for your awesome library!

Get schema name on decode

Hello,

Is it possible to get the schema name that was used when decoding? I can't find it in the documentation.

I'm thinking of something like this: {:ok, decoded, schema_name} = Avrora.decode(message)

Confluent Schema Registry references

Currently, any reference will be expanded, but we have to separate named and unnamed types. Any named type could be referenced in Schema Registry starting version 5.5.0.

We need configuration options that could expand unnamed types and keep named types as a reference when registering the schema in the registry.

Reuse types

The README has examples for one type.

I have two types, priv/.../Event.avsc, and priv/.../EventType.avsc. The latter is an enum, and Event has a field of type EventType.

Encoding an event yields

[debug] reading schema `...Event` from the file /app/priv/.../Event.avsc
** (throw) {:ref_to_unknown_type, "...EventType"}

Is there a way to get that working?

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.