Git Product home page Git Product logo

avrora's Introduction

Avrora

Hex pm Hex Docs

Getting Started

This Elixir library supports convenient encoding and decoding of Avro messages.

It can read the Avro schema from local files or the Confluent® Schema Registry, caching data in memory for performance.

It supports reading and writing data Kafka wire format prefix and from Object Container Files formats. Along with Confluent® Schema References it has Inter-Schema references feature for older Schema Registry versions.

Many thanks to the AvroTurf Ruby gem for the initial inspiration 💙


If you like the project and want to support me on my sleepless nights, you can

Support via PayPal ko-fi

Add Avrora to your project

Add Avrora to mix.exs as a dependency

def deps do
  [
    {:avrora, "~> 0.27"}
  ]
end

Configuration

🔰 It's recommended to configure private Avrora clientv0.17 to avoid risk of conflicts with other dependencies which might use shared Avrora client.

Don't worry if you already using the shared client because migration to the private is a matter of copy-paste.

Private client

Create your private Avrora client module

defmodule MyClient do
  use Avrora.Client,
    otp_app: :my_application,
    schemas_path: "./priv/schemas",
    registry_url: "http://localhost:8081",
    registry_auth: {:basic, ["username", "password"]},
    registry_user_agent: "Avrora/0.25.0 Elixir",
    registry_ssl_cacerts: File.read!("./priv/trusted.der"),
    registry_ssl_cacert_path: "./priv/trusted.crt",
    registry_schemas_autoreg: false,
    convert_null_values: false,
    convert_map_to_proplist: false,
    names_cache_ttl: :timer.minutes(5),
    decoder_hook: &MyClient.decoder_hook/4
end

please check the section below 👇 for detailed explanation of each configuration option.

Shared client

Configure the Avrora shared client in config/config.exs

config :avrora,
  otp_app: :my_application, # optional, if you want to use it as a root folder for `schemas_path`
  schemas_path: "./priv/schemas",
  registry_url: "http://localhost:8081",
  registry_auth: {:basic, ["username", "password"]}, # optional
  registry_user_agent: "Avrora/0.24.2 Elixir", # optional: if you want to return previous behaviour, set it to `nil`
  registry_ssl_cacerts: File.read!("./priv/trusted.der"), # optional: if you have DER-encoded certificate
  registry_ssl_cacert_path: "./priv/trusted.crt", # optional: if you have PEM-encoded certificate file
  registry_schemas_autoreg: false, # optional: if you want manually register schemas
  convert_null_values: false, # optional: if you want to keep decoded `:null` values as is
  convert_map_to_proplist: false, # optional: if you want to restore the old behavior for decoding map-type
  names_cache_ttl: :timer.minutes(5), # optional: if you want periodic disk reads
  decoder_hook: &MyClient.decoder_hook/4 # optional: if you want to amend the data/result
  • otp_appv0.22 - Name of the OTP application to use for runtime configuration via env, default nil
  • schemas_path - Base path for locally stored schema files, default ./priv/schemas
  • registry_url - URL for the Schema Registry, default nil
  • registry_auth – Credentials to authenticate in the Schema Registry, default nil
  • registry_user_agentv0.25 - HTTP User-Agent header for Schema Registry requests, default Avrora/<version> Elixir
  • registry_ssl_cacertsv0.28 - DER-encoded certificates, but without combined support, default nil
  • registry_ssl_cacert_pathv0.28 - Path to a file containing PEM-encoded CA certificates, default nil
  • registry_schemas_autoregv0.13 - Flag for automatic schemas registration in the Schema Registry, default true
  • convert_null_valuesv0.14 - Flag for automatic conversion of decoded :null values into nil, default true
  • convert_map_to_proplistv0.15 restore old behaviour and confiugre decoding map-type to proplist, default false
  • names_cache_ttlv0.10 - Time in ms to cache schemas by name in memory, default :infinity
  • decoder_hookv0.24 - Function with arity 4 to amend data or result, default fn _, _, data, fun -> fun.(data) end

Set names_cache_ttl to :infinity will cache forever (no more disk reads will happen). This is safe when schemas resolved in the Schema Registry by numeric id or versioned name, as it is unique. If you need to reload schema from the disk periodically, TTL different from :infinity ensures that.

If the schema resolved by name it will be always overwritten with the latest schema received from Schema Registry.v0.10

Custom decoder hook will be first in the call-chain, after it's done Avrora will use the result in its own decoder hook.v0.24

💡 Disable schemas auto-registration if you want to avoid storing schemas and manually control registration process. Also it's recommended to turn off auto-registration when schemas containing Confluent Schema References.v0.14

💡 When you use releases and especially Umbrella apps with different clients it's recommended to set otp_app which will point to your OTP applications. This will allow you to have a per-client runtime resolution for all configuration options (i.e. schemas_path) with a fallback to staticly defined in a client itself.v0.23

💡 If both registry_ssl_cacerts and registry_ssl_cacert_path given, then registry_ssl_cacerts has a priority.

Start cache process

Avrora uses an in-memory cache to speed up schema lookup.

Private client

After you've created your private Avrora client, add it to your supervision tree

children = [
  MyClient
]

Supervisor.start_link(children, strategy: :one_for_one)

or start the process manually

{:ok, pid} = MyClient.start_link()

Shared client

Add shared Avrora module to your supervision tree

children = [
  Avrora
]

Supervisor.start_link(children, strategy: :one_for_one)

or start the process manually

{:ok, pid} = Avrora.start_link()

Usage

🔰 All the examples below (including Schemas registration) will use Avrora shared client, but if you are using private client, just replace Avrora with your client module name.

The primary way to use the library is via the Avrora.encode/2 and Avrora.decode/2 functions. These functions load the Avro schema for you.

If registry_url defined, it enables Schema Registry storage. If the schema file found locally but not in the registry, either fuction will register the schema.

These examples assume you have a Payment schema stored in the file priv/schemas/io/confluent/Payment.avsc

{
  "type": "record",
  "name": "Payment",
  "namespace": "io.confluent",
  "fields": [
    {
      "name": "id",
      "type": "string"
    },
    {
      "name": "amount",
      "type": "double"
    }
  ]
}

When running interactively, first make sure the cache started

{:ok, pid} = Avrora.start_link()

encode/2

To encode a Payment message:

{:ok, pid} = Avrora.start_link()
message = %{"id" => "tx-1", "amount" => 15.99}

{:ok, encoded} = Avrora.encode(message, schema_name: "io.confluent.Payment")
<<79, 98, 106, 1, 3, 204, 2, 20, 97, 118, 114, 111, 46, 99, 111, 100, 101, 99,
  8, 110, 117, 108, 108, 22, 97, 118, 114, 111, 46, 115, 99, 104, 101, 109, 97,
  144, 2, 123, 34, 110, 97, 109, 101, 115, 112, 97, 99, 101, 34, 58, 34, 105,
  111, 46, 99, 111, 110, 102, 108, 117, 101, 110, 116, 34, 44, 34, 110, 97, 109,
  101, 34, 58, 34, 80, 97, 121, 109, 101, 110, 116, 34, 44, 34, 116, 121, 112,
  101, 34, 58, 34, 114, 101, 99, 111, 114, 100, 34, 44, 34, 102, 105, 101, 108,
  100, 115, 34, 58, 91, 123, 34, 110, 97, 109, 101, 34, 58, 34, 105, 100, 34,
  44, 34, 116, 121, 112, 101, 34, 58, 34, 115, 116, 114, 105, 110, 103, 34, 125,
  44, 123, 34, 110, 97, 109, 101, 34, 58, 34, 97, 109, 111, 117, 110, 116, 34,
  44, 34, 116, 121, 112, 101, 34, 58, 34, 100, 111, 117, 98, 108, 101, 34, 125,
  93, 125, 0, 138, 124, 66, 49, 157, 51, 242, 3, 33, 52, 161, 147, 221, 174,
  114, 48, 2, 26, 8, 116, 120, 45, 49, 123, 20, 174, 71, 225, 250, 47, 64, 138,
  124, 66, 49, 157, 51, 242, 3, 33, 52, 161, 147, 221, 174, 114, 48>>

The :format argument controls output format:

  • :plaindeprecated v0.18 - Just return Avro binary data, with no header or embedded schema
  • :ocf - Use Object Container File format, embedding the full schema with the data
  • :registry - Write data with Confluent Schema Registry Wire Format, which prefixes the data with the schema id
  • :guess - Use :registry if possible, otherwise use :ocf (default)
{:ok, pid} = Avrora.start_link()
message = %{"id" => "tx-1", "amount" => 15.99}

{:ok, encoded} = Avrora.encode(message, schema_name: "io.confluent.Payment", format: :registry)
<<0, 0, 42, 0, 8, 116, 120, 45, 49, 123, 20, 174, 71, 225, 250, 47, 64>>

decode/2

Decode Payment message using the specified schema

{:ok, pid} = Avrora.start_link()
message = <<0, 0, 42, 0, 8, 116, 120, 45, 49, 123, 20, 174, 71, 225, 250, 47, 64>>

{:ok, decoded} = Avrora.decode(message, schema_name: "io.confluent.Payment")
%{"id" => "tx-1", "amount" => 15.99}

decode/1

Decode a message, auto-detecting the schema using magic bytes. It first tries resolving the schema using the integer id in the wire format header.

Next it tries reading using the Object Container Files embedded schema.

NOTE: Messages encoded with OCF wrapped in a List.

{:ok, pid} = Avrora.start_link()
message =
  <<79, 98, 106, 1, 3, 204, 2, 20, 97, 118, 114, 111, 46, 99, 111, 100, 101, 99,
    8, 110, 117, 108, 108, 22, 97, 118, 114, 111, 46, 115, 99, 104, 101, 109, 97,
    144, 2, 123, 34, 110, 97, 109, 101, 115, 112, 97, 99, 101, 34, 58, 34, 105,
    111, 46, 99, 111, 110, 102, 108, 117, 101, 110, 116, 34, 44, 34, 110, 97, 109,
    101, 34, 58, 34, 80, 97, 121, 109, 101, 110, 116, 34, 44, 34, 116, 121, 112,
    101, 34, 58, 34, 114, 101, 99, 111, 114, 100, 34, 44, 34, 102, 105, 101, 108,
    100, 115, 34, 58, 91, 123, 34, 110, 97, 109, 101, 34, 58, 34, 105, 100, 34, 44,
    34, 116, 121, 112, 101, 34, 58, 34, 115, 116, 114, 105, 110, 103, 34, 125, 44,
    123, 34, 110, 97, 109, 101, 34, 58, 34, 97, 109, 111, 117, 110, 116, 34, 44,
    34, 116, 121, 112, 101, 34, 58, 34, 100, 111, 117, 98, 108, 101, 34, 125, 93,
    125, 0, 84, 229, 97, 195, 95, 74, 85, 204, 143, 132, 4, 241, 94, 197, 178, 106,
    2, 26, 8, 116, 120, 45, 49, 123, 20, 174, 71, 225, 250, 47, 64, 84, 229, 97,
    195, 95, 74, 85, 204, 143, 132, 4, 241, 94, 197, 178, 106>>

{:ok, decoded} = Avrora.decode(message)
[%{"id" => "tx-1", "amount" => 15.99}]

💡 Due to possible collision of the :plain format and :registry via magic-like byte sequence it's recommendedv0.18 to use Avrora.decode_plain/2 and Avrora.encode_plain/2 if you are working with :plain format (see in all available functions).

🔍 Click to expand for all available functions

decode_plain/2[0.18]

Decode a message encoded in a :plain format.

{:ok, pid} = Avrora.start_link()
message = <<8, 116, 120, 45, 49, 123, 20, 174, 71, 225, 250, 47, 64>>

{:ok, decoded} = Avrora.decode(message, schema_name: "io.confluent.Payment")
%{"id" => "tx-1", "amount" => 15.99}

encode_plain/2[0.18]

Encode a payload in a :plain format.

{:ok, pid} = Avrora.start_link()
message = %{"id" => "tx-1", "amount" => 15.99}

{:ok, encoded} = Avrora.encode(message, schema_name: "io.confluent.Payment")
<<8, 116, 120, 45, 49, 123, 20, 174, 71, 225, 250, 47, 64>>

extract_schema/1

Extracts a schema from the encoded message, useful when you would like to have some metadata about the schema used to encode the message. All the retrieved schemas will be cached accordingly to the settings.

{:ok, pid} = Avrora.start_link()
message =
  <<79, 98, 106, 1, 3, 204, 2, 20, 97, 118, 114, 111, 46, 99, 111, 100, 101, 99,
    8, 110, 117, 108, 108, 22, 97, 118, 114, 111, 46, 115, 99, 104, 101, 109, 97,
    144, 2, 123, 34, 110, 97, 109, 101, 115, 112, 97, 99, 101, 34, 58, 34, 105,
    111, 46, 99, 111, 110, 102, 108, 117, 101, 110, 116, 34, 44, 34, 110, 97, 109,
    101, 34, 58, 34, 80, 97, 121, 109, 101, 110, 116, 34, 44, 34, 116, 121, 112,
    101, 34, 58, 34, 114, 101, 99, 111, 114, 100, 34, 44, 34, 102, 105, 101, 108,
    100, 115, 34, 58, 91, 123, 34, 110, 97, 109, 101, 34, 58, 34, 105, 100, 34, 44,
    34, 116, 121, 112, 101, 34, 58, 34, 115, 116, 114, 105, 110, 103, 34, 125, 44,
    123, 34, 110, 97, 109, 101, 34, 58, 34, 97, 109, 111, 117, 110, 116, 34, 44,
    34, 116, 121, 112, 101, 34, 58, 34, 100, 111, 117, 98, 108, 101, 34, 125, 93,
    125, 0, 84, 229, 97, 195, 95, 74, 85, 204, 143, 132, 4, 241, 94, 197, 178, 106,
    2, 26, 8, 116, 120, 45, 49, 123, 20, 174, 71, 225, 250, 47, 64, 84, 229, 97,
    195, 95, 74, 85, 204, 143, 132, 4, 241, 94, 197, 178, 106>>

{:ok, schema} = Avrora.extract_schema(message)
{:ok,
 %Avrora.Schema{
   full_name: "io.confluent.Payment",
   id: nil,
   json: "{\"namespace\":\"io.confluent\",\"name\":\"Payment\",\"type\":\"record\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"}]}",
   lookup_table: #Reference<0.146116641.3853647878.152744>,
   version: nil
 }}

Schemas registration

There are two ways you can register AVRO schemas if you have disabled auto-registration.

If you want to make it a part of your code, but with better control, you can use Avrora.Utils.Registrar module and if you want to embed it in the deployment use a mix task avrora.reg.schema.

Avrora.Utils.Registrarv0.16

This module is cache-aware and thus it can be used inside intensive loops if needed. It provides two ways to register schema:

  • by name, then it will be resolved to a file with library conventions
  • by schema, then a given schema will be used without any disk reads

But keep in mind that either way has a memory check to ensure that schema was not registered before and to bypass this check you have to use force: true flag

{:ok, schema} = Avrora.Utils.Registrar.register_schema_by_name("io.confluent.Payment", force: true)

In addition, any schema can be registered under different subject via as: "NewName" option

{:ok, schema} = Avrora.Storage.File.get("io.confluent.Payment")
{:ok, schema_with_id} = Avrora.Utils.Registrar.register_schema(schema, as: "NewName")

mix avrora.reg.schemav0.12

A separate mix task to register a specific schema or all found schemas in schemas folder (see configuration section).

For instance, if you configure Avrora schemas folder to be at ./priv/schemas and you want to register a schema io/confluent/Payment.avsc then you can use this command

$ mix avrora.reg.schema --name io.confluent.Payment
schema `io.confluent.Payment' will be registered

In addition, any schema can be registered under different subject via --as optionv0.16

$ mix avrora.reg.schema --name io.confluent.Payment --as MyCustomName
schema `io.confluent.Payment' will be registered as `MyCustomName'

If you would like to register all schemas found under ./priv/schemas then you can simply execute this command

$ mix avrora.reg.schema --all
schema `io.confluent.Payment' will be registered
schema `io.confluent.Wrong' will be skipped due to an error `argument error'

Additional application config to load additional can be set via --appconfig optionv0.26

$ mix avrora.reg.schema --name io.confluent.Payment --appconfig runtime
schema `io.confluent.Payment' will be registered

avrora's People

Contributors

23skidoo avatar ananthakumaran avatar ankhers avatar apellizzn avatar azeemchauhan avatar emilianobovetti avatar francescopessina avatar fxn avatar gabrielgiordan avatar goozzik avatar lostkobrakai avatar michaldolata avatar mw23 avatar rafaelcamarda avatar raphaklaus avatar reachfh avatar rewritten avatar strech avatar trbngr avatar woylie avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar

avrora's Issues

Encoding with specific schema version

For better out-of-band support will be cool to encode message with a specific version of the schema and do not register unknown schema on a resolution. When producer and consumer are not allowed to register schemas and retrieve schemas only from CSR

Support for schema evolution?

I just started using Avrora and I was trying to evolve a schema. It looks like when I encode a record and a schema isn't present, avrora will store the schema in my registry, but if I make a change to my schema and encode a new record, it will use the original schema. Same thing if you clear the memory store.

I found that if you create a new version manually, avrora will get the correct one.

I'm not sure if I'm misusing, or if the feature doesnt exist. I'd love to help if it does not.

Doesn't work with schema registry as source for writer schema

Expected

When I encode data with Avrora.encode(data, schema_name: @schema, format: :registry), I successfully get back the encoded data.

Actual

Whenever I encode data, I get back {:error, :enoent}. this line tries to read a file that doesn't exist yet. This project seems to assume that the writing schema will always be available locally. For our projects we occasionally have multiple writers of the same schema, so our schema files live independently of the specific projects.

The 0.9.x line seems to behave as expected.

Schema registration tasks

Will be good to have mix tasks to register one specific schema or all schemas (depends on arguments). Might be a need to out-of-band messages exchange, when a producer is not allowed to register schema

schema evolution support

Hi, this is a question around schema evolution support in avrora. I want to basically:

  • delete older fields in the reader schema
  • add new fields with default values in the reader schema

I tried the first one in erlavro, and it didn't seem to work there. Perhaps the schema evolution support isn't there in avrora as well given that erlavro doesn't have it? Is there a way to get around it?

I've added the issue in erlavro too:
klarna/erlavro#117

Allow auto registration of the schemas to be configured

An idea is to be able to stop registering schemas if configured so.

config :avrora,
  registry_schemas_autoreg: false # defaults to `true`

When auto registration is turned off it will read the file from the disc only if the registry is not configured.

Linked #41

Obtain message metadata

Hello, I am decoding a message using auto-dectecting schema

{:ok, decoded} = Avrora.decode(message)

the result is I get is the following

{:ok,
 %{
   "eventTimestamp" => "2020-06-09T16:06:28.221315618Z",
   "userData" => %{
     "name" => "test",
     "lastName" => "test2" 
   }
 }}

I would also need the metadata for the message since I' am using event-sourcing so rather than having different topics user-created user-updated I am using a single topic user-events where every message can be user.UserCreated or user.UserUpdated.

Where can I get this information form the message payload ?

Any way to configure the registry_url at startup?

We are currently using

  use Avrora.Client,
    registry_url: Application.get_env(:the_app, TheApp.AvroClient)[:url],
    schemas_path: Application.get_env(:the_app,  TheApp.AvroClient)[:schemas_path]

to define our own client module.
But as you already know, those configurations must come from compile time environment.
Is there any way to configure it at runtime, when the application starts?
Tried looking at the supervisor tree, but didn't find anything.

Thank you in advance for your answer, and thank you for your awesome library!

Offload Avrora.Encoder tests

Encoder tests cover way too much and should cover the main logic inside the module and stop worry about codecs

"Messages encoded with OCF are wrapped in a List" what for?

Is there any reason why messages encoded with Codec.ObjectContainerFile are wrapped in a list?

I propose to rewrite ObjectContainerFile.decode/1 as:

  def decode(payload) when is_binary(payload) do
    with {:ok, {_, _, [decoded]}} <- do_decode(payload), do: {:ok, decoded}
  end

Maintain a CHANGELOG?

A CHANGELOG with a summary of what matters to users in each release would be really useful when upgrading.

Getting a KeyError

I'm getting exceptions when trying to parse a message that is not encoded with avro.

[error] GenServer #PID<0.584.0> terminating
** (KeyError) key :original not found in: %MatchError{term: "\"uuid\":\"f6ad9736-11f1-4887-845b-aa4d778a1b79\",\"deleted_at\":1565371184722}"}
    (avrora) lib/avrora/encoder.ex:86: Avrora.Encoder.do_decode/2
    (portal) lib/portal/consumers/pipe_deleted.ex:10: anonymous fn/1 in Portal.Consumers.PipeDeleted.handle_messages/1
    (elixir) lib/enum.ex:783: Enum."-each/2-lists^foreach/1-0-"/2
    (elixir) lib/enum.ex:783: Enum.each/2
    (portal) lib/portal/consumers/pipe_deleted.ex:9: Portal.Consumers.PipeDeleted.handle_messages/1
    (portal) lib/portal/consumers/pipe_deleted.ex:2: Portal.Consumers.PipeDeleted.handle_messages/2
    (elsa) lib/elsa/group/worker.ex:93: Elsa.Group.Worker.handle_info/2
    (stdlib) gen_server.erl:637: :gen_server.try_dispatch/4
    (stdlib) gen_server.erl:711: :gen_server.handle_msg/6
    (stdlib) proc_lib.erl:249: :proc_lib.init_p_do_apply/3
Last message: {#PID<0.466.0>, {:kafka_message_set, "pipe_deleted", 0, 12, [{:kafka_message, 11, "", "{\"uuid\":\"f6ad9736-11f1-4887-845b-aa4d778a1b79\",\"deleted_at\":1565371184722}", :create, 1565371726633, []}]}}
State: %Elsa.Group.Worker.State{config: [], generation_id: 36, handler: Portal.Consumers.PipeDeleted, handler_init_args: %{}, handler_state: %{}, name: :pipe_deleted, offset: 11, partition: 0, subscriber_pid: #PID<0.466.0>, topic: "pipe_deleted"}

I solved temporarily by using Exception.message/1 thiamsantos@2cfd0bf.

But we may need to treat the root cause of this problem.

Option for decoding with tagged union values

erlavro has :avro_decoder_hooks.tag_unions/0 to tag unions (e.g. of multiple records). It would be great if that hook could be enabled via some option when decoding. It seems for encoding tagged and non tagged values are supported already.

Support for multiple registries

First off, thank you for the library! It's very convenient.

I'm currently encountering a challenge: I'm using a library that, too, uses Avrora. However, I'd like to use a different registry than what the library is using.

For example, the library is loading schemas from ./deps/library-name/priv/schemas, and I'd like to use my own schemas located in ./priv/schemas.

Is this something you would consider supporting?

Confluent Schema Registry references

Currently, any reference will be expanded, but we have to separate named and unnamed types. Any named type could be referenced in Schema Registry starting version 5.5.0.

We need configuration options that could expand unnamed types and keep named types as a reference when registering the schema in the registry.

Incorrect typespec for `Avrora.Encoder.encode/2`

The typespec states that the second argument of encode/2 is a keyword list with string values:

@spec encode(map(), keyword(String.t())) :: {:ok, binary()} | {:error, term()}

The two allowed keys in the keyword are schema_name (string) and format (atom). Calling this function with the format explicitly set to e.g. :registry causes Dialyzer to complain.

@spec encode(map(), keyword(String.t())) :: {:ok, binary()} | {:error, term()}

This is a low priority issue that is easily handled with an ignore list for Dialyzer. I'll see if I find time to file a PR to fix this (maybe also going over other type specs to see if there's any inconsistencies).

Reuse types

The README has examples for one type.

I have two types, priv/.../Event.avsc, and priv/.../EventType.avsc. The latter is an enum, and Event has a field of type EventType.

Encoding an event yields

[debug] reading schema `...Event` from the file /app/priv/.../Event.avsc
** (throw) {:ref_to_unknown_type, "...EventType"}

Is there a way to get that working?

Warnings on compilation

Hi! Compilation is issuing these warnings:

Compiling 22 files (.ex)
warning: redefining @doc attribute previously set at line 34.

Please remove the duplicate docs. If instead you want to override a previously defined @doc, attach the @doc attribute to a function head:

    @doc """
    new docs
    """
    def resolve(...)

  lib/avrora/resolver.ex:53: Avrora.Resolver.resolve/1

warning: redefining @doc attribute previously set at line 19.

Please remove the duplicate docs. If instead you want to override a previously defined @doc, attach the @doc attribute to a function head:

    @doc """
    new docs
    """
    def get(...)

  lib/avrora/storage/registry.ex:45: Avrora.Storage.Registry.get/1

They are harmless, but a clean compilation would be super.

Telemetry support

Would you be open to a contribution adding telemetry (https://github.com/beam-telemetry/telemetry) support to this library? It'd be nice to have telemetry data for the following events:

  • HTTP requests to the schema registry server
  • Message encode
  • Message decoding
  • Schema encoding
  • Schema decoding

I should be able to make a contribution if this is desired.

hook to skip certain fields.

I'm writing a hook to skip certain fields. Borrowing the structure from erlavro

Here's the code:


defmodule MyClient do
  use Avrora.Client,
    otp_app: :rudder,
    registry_url: "http://localhost:8081",
    registry_auth: {:basic, ["username", "password"]},
    schemas_path: "priv/schemas/",
    registry_schemas_autoreg: false,
    convert_null_values: false,
    convert_map_to_proplist: false,
    names_cache_ttl: :infinity,
    decoder_hook: &MyClient.decoder_hook/4

  def decoder_hook(type, subNameOrIndex, data, decodeFun) do
    IO.puts("\n----------------\n")

    IO.inspect(:avro.get_type_fullname(type), label: "type_fullname")
    IO.inspect(:avro.get_type_name(type), label: "type_name")
    IO.inspect(:avro.get_type_namespace(type), label: "namespace")
    IO.inspect(subNameOrIndex, label: "subNameOrIndex")

    case {:avro.get_type_fullname(type), subNameOrIndex} do
      {"io.Payment2", "amount"} ->
        # decodeFun.(data)
        nil;

      {_, _} ->
        decodeFun.(data)
    end
  end

  def test() do
    #Avrora.AvroSchemaStore.start_link()
    {:ok, oldschema2} =
      MyClient.Schema.Encoder.from_json(
        ~s({ "type": "record", "name": "Payment2", "namespace": "io", "fields": [{  "name": "id",  "type": "string"},{  "name": "amount",  "type": "double"} ]  }  )
      )

    oldschema = %{oldschema2 | id: nil, version: nil}
    payload = %{"id" => "123", "amount" => 123.45}

    {:ok, encoded} = MyClient.Codec.ObjectContainerFile.encode(payload, schema: oldschema)
    IO.inspect(encoded, label: "encoded value")

    {:ok, [decoded]} = MyClient.Codec.ObjectContainerFile.decode(encoded)
    IO.puts("\n----------------\n")

    IO.inspect(decoded, label: "decoded value")
  end
end

However on running:

iex(1)> MyClient.start_link()
{:ok, #PID<0.252.0>}
iex(2)> MyClient.test()
encoded value: <<79, 98, 106, 1, 3, 186, 2, 20, 97, 118, 114, 111, 46, 99, 111, 100, 101, 99,
  8, 110, 117, 108, 108, 22, 97, 118, 114, 111, 46, 115, 99, 104, 101, 109, 97,
  254, 1, 123, 34, 110, 97, 109, 101, 115, 112, 97, 99, 101, 34, 58, ...>>

----------------

type_fullname: "io.Payment2"
type_name: "Payment2"
namespace: "io"
subNameOrIndex: :none

----------------

type_fullname: "io.Payment2"
type_name: "Payment2"
namespace: "io"
subNameOrIndex: "id"

----------------

type_fullname: "string"
type_name: "string"
namespace: ""
subNameOrIndex: []

----------------

type_fullname: "io.Payment2"
type_name: "Payment2"
namespace: "io"
subNameOrIndex: "amount"
** (MatchError) no match of right hand side value: {:error, :schema_mismatch}
    (avrora 0.26.0) lib/avrora/myclient.ex:44: MyClient.test/0
    iex:2: (file)

Can someone advice me on how to think about the hook in general? I want to skip the io.Payment2.amount field

unnamed_type error with primary type schema

Hello,

Do is it possible to register a primary type' schema with Avrora?

A primary type schema is by example that:

{
    "type": "string",
    "name": "myvalue"
}

When I try to save it with the register_schema_by_name function:

Avrora.Utils.Registrar.register_schema_by_name("myschema-value")

This error happen:

13:02:04.414 [debug] reading schema `myschema-value` from the file /app/priv/schemas/myproject/myschema-value.avsc
{:error, :unnamed_type}

Improve client errors

It makes sense to introduce the Avro Error module with the ability to add a few more things to the client errors. It should make development and debug easier.

For instance, :subject_not_found, should be given with subject name, otherwise it's very hard to grasp which one subject, especially with the nested references feature.

Implement support of Schema Registry's schema references

Description

When consuming an Avro message with a schema which has a schema reference, the Avrora.decode/1 function fails with {:error, {:not_found, "type"}}.

Schema references is a new feature introduced in Confluent Platform version 5.5.0 (see here https://www.confluent.io/blog/multiple-event-types-in-the-same-kafka-topic/#avro-unions-with-schema-references)

How to reproduce

Given a schema containing a schema reference registered into the schema registry, such as:

Nested schema:

{
  "namespace": "org.schema_registry_test_app.avro",
  "type": "record",
  "name": "Result",
  "fields": [
    {
      "name": "up",
      "type": "string"
    },
    {
      "name": "down",
      "type": "string"
    }
  ]
}

Schema with reference:

{
  "namespace": "org.schema_registry_test_app.avro",
  "type": "record",
  "name": "AvroTest",
  "fields": [
    {
      "name": "id",
      "type": {
        "name": "Uuid",
        "type": "fixed",
        "size": 16
      }
    },
    {
      "name": "results",
      "type": {
        "type": "array",
        "items": "org.schema_registry_test_app.avro.Result"
      }
    }
  ]
}

Try to consume and decode a message produced with AvroTest schema.

I tried with Avrora v0.13.0.

Extract Encoders to a separate modules

Technically we have 3 encoders

  1. ObjectContainerFile
  2. Registry
  3. Plain

and Avrora.Encoder knows too much about all the things. After we introduce Avrora.Encoder.extract_schema the duplication of responsibilities and some functions start overlapping.

At the same time, it would be cool to have something like Avrora.Storage behavior.

Decoding an encoded record wraps the produced map in a list

Hi,

thanks for the library, looks really promising! However, I stumbled over the following, which seems like a bug to me.

Consider the following Avro schema:

{
  "namespace": "foo",
  "type": "record",
  "name": "Bar",
  "version": "1.0",
  "fields": [
    {
      "name": "baz",
      "type": "string"
    }
  ]
}

This schema can be used to encode %{"baz" => "test"}:

iex> {:ok, encoded} = Avrora.encode(%{"baz" => "test"}, schema_name: "foo.Bar")
{:ok,
 <<79, 98, 106, 1, 3, 144, 2, 20, 97, 118, 114, 111, 46, 99, 111, 100, 101, 99,
   8, 110, 117, 108, 108, 22, 97, 118, 114, 111, 46, 115, 99, 104, 101, 109, 97,
   212, 1, 123, 34, 110, 97, 109, 101, 115, 112, 97, 99, 101, ...>>}

However, when decoding this again, the result is not a map like I would've expected; instead, the map is wrapped in a list:

iex> Avrora.decode(encoded)
{:ok, [%{"baz" => "test"}]}

Sure enough, encoding this again fails:

iex> Avrora.encode([%{"baz" => "test"}], schema_name: "foo.Bar")
** (FunctionClauseError) no function clause matching in Avrora.Encoder.encode/2

Dialyzer issues with private clients

When creating a private client dialyzer does return a bunch of errors like mismatch between Avrora.Schema and CustomClient.Schema, but also a bunch more I'm not exactly sure what they're related to.

Get schema name on decode

Hello,

Is it possible to get the schema name that was used when decoding? I can't find it in the documentation.

I'm thinking of something like this: {:ok, decoded, schema_name} = Avrora.decode(message)

logicalType: date

Hi, I have a schema like:

{"name": "birthday",
 "type": "int",
 "logicalType": "date"}

In order to pass a Date sigil into it it has to be converted to "the number of days since unix epoch":

def date2int(d) when is_struct(d, Date) do
  Date.diff(d, ~D[1970-01-01])
end
def date2int(d) when is_binary(d) and 10 == byte_size(d) do
  Date.from_iso8601!(d), ~D[1970-01-01]) # this can obviously fail
end

I don't have the expertise to add handling of logicalType to the library, but leaving it here in case someone else needs this. :-)
ping @stretch

Subject not found

Hi! I am a bit lost here, let me share with you this issue in case it happened to you too.

The library is trying to register a brand new schema, but it fails to do so. The result of http_client().post(payload, headers) in http_client_post/2 is

{:error, %{"error_code" => 40401, "message" => "Subject not found"}}

but I cannot understand how's that possible, since the API of the registry does not have 40401 among its error codes for that POST request.

Have you ever seen this? Perhaps there's a previous step in which you need to create the subject?

Support for registering schemas under custom subject

As I understand, automatic schema registration always registers schemas under subjects named after the schema's full name (namespace + schema name). However, I've encountered a need to register a schema under a subject named after a kafka topic, because of the expectations of the system we're integrating with. I think this might be a pretty common usecase, as Confluent Schema Registry specifies TopicNameStrategy as the default https://docs.confluent.io/platform/current/schema-registry/serdes-develop/index.html#subject-name-strategy.

I was able to implement a workaround in our code by manually registering the schema

  defp ensure_registered_schema() do
    with {:ok, nil} <- Avrora.Storage.Memory.get(@schema_full_name) do
      {:ok, schema} = Avrora.Storage.File.get(@schema_full_name)
      {:ok, registered_schema} = Avrora.Storage.Registry.put("#{@topic_name}-value", schema.json)
      {:ok, _} = Avrora.Storage.Memory.put(@schema_full_name, registered_schema)
    end
  end

but I was wondering whether there's a simpler way to do it.

avro_turf allows specifying the subject name when encoding a payload https://github.com/dasch/avro_turf/blob/master/lib/avro_turf/messaging.rb#L87. It would be very convenient if Avrora would support something like that.

PS: Thanks for your work on the library! :)

Strange {:error, %ArgumentError{message: "argument error"}}

I am not sure this is an issue with Avrora, but have nailed down to something minimal in which only Phoenix channels and Avrora are involved.

The summary of the problem is that if a channel encodes, it errs if you exercise it in more than one test. Weird, uh?

To demonstrate it, I have created this minimal Phoenix application. The application has one simple Avro schema, and a channel that just encodes a map when it receives a message.

Now, the surprising part is that the command

mix test test/avrora_issue_web/channels/avrora_channel_test.exs

runs two tests and fails with this output:

{:error, %ArgumentError{message: "argument error"}}
20:38:24.384 [error] GenServer #PID<0.282.0> terminating
** (MatchError) no match of right hand side value: {:error, %ArgumentError{message: "argument error"}}
    (avrora_issue) lib/avrora_issue_web/channels/avrora_channel.ex:9: AvroraIssueWeb.AvroraChannel.handle_in/3
    (phoenix) lib/phoenix/channel/server.ex:280: Phoenix.Channel.Server.handle_info/2
    (stdlib) gen_server.erl:637: :gen_server.try_dispatch/4
    (stdlib) gen_server.erl:711: :gen_server.handle_msg/6
    (stdlib) proc_lib.erl:249: :proc_lib.init_p_do_apply/3
Last message: %Phoenix.Socket.Message{event: "encode", join_ref: nil, payload: nil, ref: #Reference<0.2008719544.1328545796.244887>, topic: "avrora"}


  1) test second test (AvroraIssueWeb.AvroraChannelTest)
     test/avrora_issue_web/channels/avrora_channel_test.exs:16
     ** (EXIT from #PID<0.281.0>) an exception was raised:
         ** (MatchError) no match of right hand side value: {:error, %ArgumentError{message: "argument error"}}
             (avrora_issue) lib/avrora_issue_web/channels/avrora_channel.ex:9: AvroraIssueWeb.AvroraChannel.handle_in/3
             (phoenix) lib/phoenix/channel/server.ex:280: Phoenix.Channel.Server.handle_info/2
             (stdlib) gen_server.erl:637: :gen_server.try_dispatch/4
             (stdlib) gen_server.erl:711: :gen_server.handle_msg/6
             (stdlib) proc_lib.erl:249: :proc_lib.init_p_do_apply/3

But, if you run each test by itself, they pass:

% mix test test/avrora_issue_web/channels/avrora_channel_test.exs:16
Excluding tags: [:test]
Including tags: [line: "16"]

{:ok,
 <<79, 98, 106, 1, 3, 134, 2, 20, 97, 118, 114, 111, 46, 99, 111, 100, 101, 99,
   8, 110, 117, 108, 108, 22, 97, 118, 114, 111, 46, 115, 99, 104, 101, 109, 97,
   202, 1, 123, 34, 110, 97, 109, 101, 115, 112, 97, 99, 101, ...>>}
.

I can't for the life of me explain why this might happen, do you have any idea?

Schema storage and reference lookup

:avro_schema_store has a few limitations, you can't delete or update schemas there. Avro.Storage.Memory is used like a cache for schema resolution, but it's used only by the Avrora.Resolver.

The issue occurs when we try to resolve a schema with references. First of all, it's different per storage, Avrora.Storage.File will read only the content of the referenced schemas and never "cache" them. Avrora.Storage.Registry on the other hand going to create new Avrora.Schema objects, but use only JSON and pass it to the reference resolver which will parse it again. None of the referenced objects will be "cached".

It feels like we need to be able to resolve references smartly without repetitive calls to the disk or registry API.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.