strech / avrora Goto Github PK
View Code? Open in Web Editor NEWA convenient Elixir library to work with Avro schemas and Confluent® Schema Registry
Home Page: https://hexdocs.pm/avrora
License: MIT License
A convenient Elixir library to work with Avro schemas and Confluent® Schema Registry
Home Page: https://hexdocs.pm/avrora
License: MIT License
For better out-of-band support will be cool to encode message with a specific version of the schema and do not register unknown schema on a resolution. When producer and consumer are not allowed to register schemas and retrieve schemas only from CSR
It makes sense to introduce the Avro Error module with the ability to add a few more things to the client errors. It should make development and debug easier.
For instance, :subject_not_found
, should be given with subject
name, otherwise it's very hard to grasp which one subject, especially with the nested references feature.
As I understand, automatic schema registration always registers schemas under subjects named after the schema's full name (namespace + schema name). However, I've encountered a need to register a schema under a subject named after a kafka topic, because of the expectations of the system we're integrating with. I think this might be a pretty common usecase, as Confluent Schema Registry specifies TopicNameStrategy as the default https://docs.confluent.io/platform/current/schema-registry/serdes-develop/index.html#subject-name-strategy.
I was able to implement a workaround in our code by manually registering the schema
defp ensure_registered_schema() do
with {:ok, nil} <- Avrora.Storage.Memory.get(@schema_full_name) do
{:ok, schema} = Avrora.Storage.File.get(@schema_full_name)
{:ok, registered_schema} = Avrora.Storage.Registry.put("#{@topic_name}-value", schema.json)
{:ok, _} = Avrora.Storage.Memory.put(@schema_full_name, registered_schema)
end
end
but I was wondering whether there's a simpler way to do it.
avro_turf allows specifying the subject name when encoding a payload https://github.com/dasch/avro_turf/blob/master/lib/avro_turf/messaging.rb#L87. It would be very convenient if Avrora would support something like that.
PS: Thanks for your work on the library! :)
Would you be open to a contribution adding telemetry (https://github.com/beam-telemetry/telemetry) support to this library? It'd be nice to have telemetry data for the following events:
I should be able to make a contribution if this is desired.
An idea is to be able to stop registering schemas if configured so.
config :avrora,
registry_schemas_autoreg: false # defaults to `true`
When auto registration is turned off it will read the file from the disc only if the registry is not configured.
Linked #41
#63 allows to rewrite the mix
task and add more to the available arguments.
Also will be good to rename the paragraph from Mix tasks
to Schema registration
or so
Rewrite the section with a pivot to schema registration and not mix task itself
Hi,
thanks for the library, looks really promising! However, I stumbled over the following, which seems like a bug to me.
Consider the following Avro schema:
{
"namespace": "foo",
"type": "record",
"name": "Bar",
"version": "1.0",
"fields": [
{
"name": "baz",
"type": "string"
}
]
}
This schema can be used to encode %{"baz" => "test"}
:
iex> {:ok, encoded} = Avrora.encode(%{"baz" => "test"}, schema_name: "foo.Bar")
{:ok,
<<79, 98, 106, 1, 3, 144, 2, 20, 97, 118, 114, 111, 46, 99, 111, 100, 101, 99,
8, 110, 117, 108, 108, 22, 97, 118, 114, 111, 46, 115, 99, 104, 101, 109, 97,
212, 1, 123, 34, 110, 97, 109, 101, 115, 112, 97, 99, 101, ...>>}
However, when decoding this again, the result is not a map like I would've expected; instead, the map is wrapped in a list:
iex> Avrora.decode(encoded)
{:ok, [%{"baz" => "test"}]}
Sure enough, encoding this again fails:
iex> Avrora.encode([%{"baz" => "test"}], schema_name: "foo.Bar")
** (FunctionClauseError) no function clause matching in Avrora.Encoder.encode/2
https://avro.apache.org/docs/current/spec.html#names
Record, enums and fixed are named types. Each has a fullname that is composed of two parts; a name and a namespace. Equality of names is defined on the fullname.
Currently, Encoder
(and Codes.<...>
) have the guards for encoding to be just map()
which is wrong for Enum
and Fixed
.
Background
AWS WAF's core managed rule group contains a rule that rejects requests missing a user-agent header. If the schema registry on AWS behind a WAF with this rule group, which blocks all requests. To avoid this, we should set a user agent in the HTTP header.
Reference:
Hi! Compilation is issuing these warnings:
Compiling 22 files (.ex)
warning: redefining @doc attribute previously set at line 34.
Please remove the duplicate docs. If instead you want to override a previously defined @doc, attach the @doc attribute to a function head:
@doc """
new docs
"""
def resolve(...)
lib/avrora/resolver.ex:53: Avrora.Resolver.resolve/1
warning: redefining @doc attribute previously set at line 19.
Please remove the duplicate docs. If instead you want to override a previously defined @doc, attach the @doc attribute to a function head:
@doc """
new docs
"""
def get(...)
lib/avrora/storage/registry.ex:45: Avrora.Storage.Registry.get/1
They are harmless, but a clean compilation would be super.
Due to #70 completely remove decode/2
and encode/2
and keep them separate as decode_plain/2
and encode_plain/2
When consuming an Avro message with a schema which has a schema reference, the Avrora.decode/1
function fails with {:error, {:not_found, "type"}}
.
Schema references is a new feature introduced in Confluent Platform version 5.5.0 (see here https://www.confluent.io/blog/multiple-event-types-in-the-same-kafka-topic/#avro-unions-with-schema-references)
Given a schema containing a schema reference registered into the schema registry, such as:
Nested schema:
{
"namespace": "org.schema_registry_test_app.avro",
"type": "record",
"name": "Result",
"fields": [
{
"name": "up",
"type": "string"
},
{
"name": "down",
"type": "string"
}
]
}
Schema with reference:
{
"namespace": "org.schema_registry_test_app.avro",
"type": "record",
"name": "AvroTest",
"fields": [
{
"name": "id",
"type": {
"name": "Uuid",
"type": "fixed",
"size": 16
}
},
{
"name": "results",
"type": {
"type": "array",
"items": "org.schema_registry_test_app.avro.Result"
}
}
]
}
Try to consume and decode a message produced with AvroTest
schema.
I tried with Avrora v0.13.0
.
When creating a private client dialyzer does return a bunch of errors like mismatch between Avrora.Schema
and CustomClient.Schema
, but also a bunch more I'm not exactly sure what they're related to.
Hi, I have a schema like:
{"name": "birthday",
"type": "int",
"logicalType": "date"}
In order to pass a Date sigil into it it has to be converted to "the number of days since unix epoch":
def date2int(d) when is_struct(d, Date) do
Date.diff(d, ~D[1970-01-01])
end
def date2int(d) when is_binary(d) and 10 == byte_size(d) do
Date.from_iso8601!(d), ~D[1970-01-01]) # this can obviously fail
end
I don't have the expertise to add handling of logicalType
to the library, but leaving it here in case someone else needs this. :-)
ping @stretch
Is there any reason why messages encoded with Codec.ObjectContainerFile
are wrapped in a list?
I propose to rewrite ObjectContainerFile.decode/1
as:
def decode(payload) when is_binary(payload) do
with {:ok, {_, _, [decoded]}} <- do_decode(payload), do: {:ok, decoded}
end
Following up on the discussion at #86:
I've made a repo producing the issue: https://github.com/LostKobrakai/avrora_repro
I wasn't sure if us using avrora as a dependency was the reason, so I started with an umbrella, but that should actually be irrelevant. The important part is removing _build
as the path, which was hardcoded at compile time. The steps are documented in the readme of the repo.
The typespec states that the second argument of encode/2
is a keyword list with string values:
@spec encode(map(), keyword(String.t())) :: {:ok, binary()} | {:error, term()}
The two allowed keys in the keyword are schema_name
(string) and format
(atom). Calling this function with the format explicitly set to e.g. :registry
causes Dialyzer to complain.
Line 103 in c12c314
This is a low priority issue that is easily handled with an ignore list for Dialyzer. I'll see if I find time to file a PR to fix this (maybe also going over other type specs to see if there's any inconsistencies).
Hi, this is a question around schema evolution support in avrora. I want to basically:
I tried the first one in erlavro, and it didn't seem to work there. Perhaps the schema evolution support isn't there in avrora as well given that erlavro doesn't have it? Is there a way to get around it?
I've added the issue in erlavro too:
klarna/erlavro#117
Remove configuration options:
convert_map_to_proplist
convert_null_values
This is needed for the Confluent Cloud
A CHANGELOG with a summary of what matters to users in each release would be really useful when upgrading.
When I encode data with Avrora.encode(data, schema_name: @schema, format: :registry)
, I successfully get back the encoded data.
Whenever I encode data, I get back {:error, :enoent}
. this line tries to read a file that doesn't exist yet. This project seems to assume that the writing schema will always be available locally. For our projects we occasionally have multiple writers of the same schema, so our schema files live independently of the specific projects.
The 0.9.x line seems to behave as expected.
Will be good to have mix tasks to register one specific schema or all schemas (depends on arguments). Might be a need to out-of-band messages exchange, when a producer is not allowed to register schema
erlavro
has :avro_decoder_hooks.tag_unions/0
to tag unions (e.g. of multiple records). It would be great if that hook could be enabled via some option when decoding. It seems for encoding tagged and non tagged values are supported already.
First off, thank you for the library! It's very convenient.
I'm currently encountering a challenge: I'm using a library that, too, uses Avrora. However, I'd like to use a different registry than what the library is using.
For example, the library is loading schemas from ./deps/library-name/priv/schemas
, and I'd like to use my own schemas located in ./priv/schemas
.
Is this something you would consider supporting?
It is possible starting 0.17
Hi! I am a bit lost here, let me share with you this issue in case it happened to you too.
The library is trying to register a brand new schema, but it fails to do so. The result of http_client().post(payload, headers)
in http_client_post/2
is
{:error, %{"error_code" => 40401, "message" => "Subject not found"}}
but I cannot understand how's that possible, since the API of the registry does not have 40401 among its error codes for that POST
request.
Have you ever seen this? Perhaps there's a previous step in which you need to create the subject?
Hello, I am decoding a message using auto-dectecting schema
{:ok, decoded} = Avrora.decode(message)
the result is I get is the following
{:ok,
%{
"eventTimestamp" => "2020-06-09T16:06:28.221315618Z",
"userData" => %{
"name" => "test",
"lastName" => "test2"
}
}}
I would also need the metadata for the message since I' am using event-sourcing so rather than having different topics user-created
user-updated
I am using a single topic user-events
where every message can be user.UserCreated
or user.UserUpdated
.
Where can I get this information form the message payload ?
Now in the README appear some tags which are not supported
If make a call without linking the Avrora
module this error occurs:
iex(1)> Avrora.Storage.File.get("io.confluent.Payment")
16:10:24.719 [debug] reading schema `io.confluent.Payment` from the file .../avrora/test/fixtures/schemas/io/confluent/Payment.avsc
** (ArgumentError) argument error
I'm getting exceptions when trying to parse a message that is not encoded with avro.
[error] GenServer #PID<0.584.0> terminating
** (KeyError) key :original not found in: %MatchError{term: "\"uuid\":\"f6ad9736-11f1-4887-845b-aa4d778a1b79\",\"deleted_at\":1565371184722}"}
(avrora) lib/avrora/encoder.ex:86: Avrora.Encoder.do_decode/2
(portal) lib/portal/consumers/pipe_deleted.ex:10: anonymous fn/1 in Portal.Consumers.PipeDeleted.handle_messages/1
(elixir) lib/enum.ex:783: Enum."-each/2-lists^foreach/1-0-"/2
(elixir) lib/enum.ex:783: Enum.each/2
(portal) lib/portal/consumers/pipe_deleted.ex:9: Portal.Consumers.PipeDeleted.handle_messages/1
(portal) lib/portal/consumers/pipe_deleted.ex:2: Portal.Consumers.PipeDeleted.handle_messages/2
(elsa) lib/elsa/group/worker.ex:93: Elsa.Group.Worker.handle_info/2
(stdlib) gen_server.erl:637: :gen_server.try_dispatch/4
(stdlib) gen_server.erl:711: :gen_server.handle_msg/6
(stdlib) proc_lib.erl:249: :proc_lib.init_p_do_apply/3
Last message: {#PID<0.466.0>, {:kafka_message_set, "pipe_deleted", 0, 12, [{:kafka_message, 11, "", "{\"uuid\":\"f6ad9736-11f1-4887-845b-aa4d778a1b79\",\"deleted_at\":1565371184722}", :create, 1565371726633, []}]}}
State: %Elsa.Group.Worker.State{config: [], generation_id: 36, handler: Portal.Consumers.PipeDeleted, handler_init_args: %{}, handler_state: %{}, name: :pipe_deleted, offset: 11, partition: 0, subscriber_pid: #PID<0.466.0>, topic: "pipe_deleted"}
I solved temporarily by using Exception.message/1
thiamsantos@2cfd0bf.
But we may need to treat the root cause of this problem.
Technically we have 3 encoders
and Avrora.Encoder
knows too much about all the things. After we introduce Avrora.Encoder.extract_schema
the duplication of responsibilities and some functions start overlapping.
At the same time, it would be cool to have something like Avrora.Storage
behavior.
Encoder tests cover way too much and should cover the main logic inside the module and stop worry about codecs
Hello,
Do is it possible to register a primary type' schema with Avrora?
A primary type schema is by example that:
{
"type": "string",
"name": "myvalue"
}
When I try to save it with the register_schema_by_name
function:
Avrora.Utils.Registrar.register_schema_by_name("myschema-value")
This error happen:
13:02:04.414 [debug] reading schema `myschema-value` from the file /app/priv/schemas/myproject/myschema-value.avsc
{:error, :unnamed_type}
I am not sure this is an issue with Avrora, but have nailed down to something minimal in which only Phoenix channels and Avrora are involved.
The summary of the problem is that if a channel encodes, it errs if you exercise it in more than one test. Weird, uh?
To demonstrate it, I have created this minimal Phoenix application. The application has one simple Avro schema, and a channel that just encodes a map when it receives a message.
Now, the surprising part is that the command
mix test test/avrora_issue_web/channels/avrora_channel_test.exs
runs two tests and fails with this output:
{:error, %ArgumentError{message: "argument error"}}
20:38:24.384 [error] GenServer #PID<0.282.0> terminating
** (MatchError) no match of right hand side value: {:error, %ArgumentError{message: "argument error"}}
(avrora_issue) lib/avrora_issue_web/channels/avrora_channel.ex:9: AvroraIssueWeb.AvroraChannel.handle_in/3
(phoenix) lib/phoenix/channel/server.ex:280: Phoenix.Channel.Server.handle_info/2
(stdlib) gen_server.erl:637: :gen_server.try_dispatch/4
(stdlib) gen_server.erl:711: :gen_server.handle_msg/6
(stdlib) proc_lib.erl:249: :proc_lib.init_p_do_apply/3
Last message: %Phoenix.Socket.Message{event: "encode", join_ref: nil, payload: nil, ref: #Reference<0.2008719544.1328545796.244887>, topic: "avrora"}
1) test second test (AvroraIssueWeb.AvroraChannelTest)
test/avrora_issue_web/channels/avrora_channel_test.exs:16
** (EXIT from #PID<0.281.0>) an exception was raised:
** (MatchError) no match of right hand side value: {:error, %ArgumentError{message: "argument error"}}
(avrora_issue) lib/avrora_issue_web/channels/avrora_channel.ex:9: AvroraIssueWeb.AvroraChannel.handle_in/3
(phoenix) lib/phoenix/channel/server.ex:280: Phoenix.Channel.Server.handle_info/2
(stdlib) gen_server.erl:637: :gen_server.try_dispatch/4
(stdlib) gen_server.erl:711: :gen_server.handle_msg/6
(stdlib) proc_lib.erl:249: :proc_lib.init_p_do_apply/3
But, if you run each test by itself, they pass:
% mix test test/avrora_issue_web/channels/avrora_channel_test.exs:16
Excluding tags: [:test]
Including tags: [line: "16"]
{:ok,
<<79, 98, 106, 1, 3, 134, 2, 20, 97, 118, 114, 111, 46, 99, 111, 100, 101, 99,
8, 110, 117, 108, 108, 22, 97, 118, 114, 111, 46, 115, 99, 104, 101, 109, 97,
202, 1, 123, 34, 110, 97, 109, 101, 115, 112, 97, 99, 101, ...>>}
.
I can't for the life of me explain why this might happen, do you have any idea?
I'm writing a hook to skip certain fields. Borrowing the structure from erlavro
Here's the code:
defmodule MyClient do
use Avrora.Client,
otp_app: :rudder,
registry_url: "http://localhost:8081",
registry_auth: {:basic, ["username", "password"]},
schemas_path: "priv/schemas/",
registry_schemas_autoreg: false,
convert_null_values: false,
convert_map_to_proplist: false,
names_cache_ttl: :infinity,
decoder_hook: &MyClient.decoder_hook/4
def decoder_hook(type, subNameOrIndex, data, decodeFun) do
IO.puts("\n----------------\n")
IO.inspect(:avro.get_type_fullname(type), label: "type_fullname")
IO.inspect(:avro.get_type_name(type), label: "type_name")
IO.inspect(:avro.get_type_namespace(type), label: "namespace")
IO.inspect(subNameOrIndex, label: "subNameOrIndex")
case {:avro.get_type_fullname(type), subNameOrIndex} do
{"io.Payment2", "amount"} ->
# decodeFun.(data)
nil;
{_, _} ->
decodeFun.(data)
end
end
def test() do
#Avrora.AvroSchemaStore.start_link()
{:ok, oldschema2} =
MyClient.Schema.Encoder.from_json(
~s({ "type": "record", "name": "Payment2", "namespace": "io", "fields": [{ "name": "id", "type": "string"},{ "name": "amount", "type": "double"} ] } )
)
oldschema = %{oldschema2 | id: nil, version: nil}
payload = %{"id" => "123", "amount" => 123.45}
{:ok, encoded} = MyClient.Codec.ObjectContainerFile.encode(payload, schema: oldschema)
IO.inspect(encoded, label: "encoded value")
{:ok, [decoded]} = MyClient.Codec.ObjectContainerFile.decode(encoded)
IO.puts("\n----------------\n")
IO.inspect(decoded, label: "decoded value")
end
end
However on running:
iex(1)> MyClient.start_link()
{:ok, #PID<0.252.0>}
iex(2)> MyClient.test()
encoded value: <<79, 98, 106, 1, 3, 186, 2, 20, 97, 118, 114, 111, 46, 99, 111, 100, 101, 99,
8, 110, 117, 108, 108, 22, 97, 118, 114, 111, 46, 115, 99, 104, 101, 109, 97,
254, 1, 123, 34, 110, 97, 109, 101, 115, 112, 97, 99, 101, 34, 58, ...>>
----------------
type_fullname: "io.Payment2"
type_name: "Payment2"
namespace: "io"
subNameOrIndex: :none
----------------
type_fullname: "io.Payment2"
type_name: "Payment2"
namespace: "io"
subNameOrIndex: "id"
----------------
type_fullname: "string"
type_name: "string"
namespace: ""
subNameOrIndex: []
----------------
type_fullname: "io.Payment2"
type_name: "Payment2"
namespace: "io"
subNameOrIndex: "amount"
** (MatchError) no match of right hand side value: {:error, :schema_mismatch}
(avrora 0.26.0) lib/avrora/myclient.ex:44: MyClient.test/0
iex:2: (file)
Can someone advice me on how to think about the hook in general? I want to skip the io.Payment2.amount field
I just started using Avrora and I was trying to evolve a schema. It looks like when I encode a record and a schema isn't present, avrora will store the schema in my registry, but if I make a change to my schema and encode a new record, it will use the original schema. Same thing if you clear the memory store.
I found that if you create a new version manually, avrora will get the correct one.
I'm not sure if I'm misusing, or if the feature doesnt exist. I'd love to help if it does not.
:avro_schema_store
has a few limitations, you can't delete or update schemas there. Avro.Storage.Memory
is used like a cache for schema resolution, but it's used only by the Avrora.Resolver
.
The issue occurs when we try to resolve a schema with references. First of all, it's different per storage, Avrora.Storage.File
will read only the content of the referenced schemas and never "cache" them. Avrora.Storage.Registry
on the other hand going to create new Avrora.Schema
objects, but use only JSON and pass it to the reference resolver which will parse it again. None of the referenced objects will be "cached".
It feels like we need to be able to resolve references smartly without repetitive calls to the disk or registry API.
We are currently using
use Avrora.Client,
registry_url: Application.get_env(:the_app, TheApp.AvroClient)[:url],
schemas_path: Application.get_env(:the_app, TheApp.AvroClient)[:schemas_path]
to define our own client module.
But as you already know, those configurations must come from compile time environment.
Is there any way to configure it at runtime, when the application starts?
Tried looking at the supervisor tree, but didn't find anything.
Thank you in advance for your answer, and thank you for your awesome library!
Hello,
Is it possible to get the schema name that was used when decoding? I can't find it in the documentation.
I'm thinking of something like this: {:ok, decoded, schema_name} = Avrora.decode(message)
What would it take to also support jsonschema?
Currently, any reference will be expanded, but we have to separate named and unnamed types. Any named type could be referenced in Schema Registry starting version 5.5.0
.
We need configuration options that could expand unnamed types and keep named types as a reference when registering the schema in the registry.
The README has examples for one type.
I have two types, priv/.../Event.avsc
, and priv/.../EventType.avsc
. The latter is an enum, and Event
has a field of type EventType
.
Encoding an event yields
[debug] reading schema `...Event` from the file /app/priv/.../Event.avsc
** (throw) {:ref_to_unknown_type, "...EventType"}
Is there a way to get that working?
In OTP 25 the default SSL verification type was changed from :verify_none
to :vefiry_peer
and we need to expose it as a setting to avoid request failures.
Context: #97
When upgrading to elixir 1.15, I got a bunch of warnings: warning: Logger.warn/1 is deprecated. Use Logger.warning/2 instead
I think this is because elixir has now switched to the erlang logger in 1.15.
Some areas that display this issue: https://github.com/search?q=repo%3AStrech%2Favrora%20Logger.warn&type=code
Since TravisCI is giving credits to run OSS it's better to move to Github ecosystem
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.