zalando / nakadi Goto Github PK
View Code? Open in Web Editor NEWA distributed event bus that implements a RESTful API abstraction on top of Kafka-like queues
Home Page: https://nakadi.io
License: MIT License
A distributed event bus that implements a RESTful API abstraction on top of Kafka-like queues
Home Page: https://nakadi.io
License: MIT License
It should be introduces to avoid full disk situation and to avoid metrics performance loose because of files growing too big.
Currently the kafka brokers are specified with IP addresses. And if with time all kafka brokers are recreated with different IPs - we will have problems.
A file should not be set as parameter for per-environment configurations. All values should be passed as environment variables.
To avoid setting several development-specific parameters on builld.gradle
, a convenience for reading configurations from a configuration file and converting them directly to environment variables on gradle would come in handy.
Currently the number of workers and app port number are hard-coded in Dockerfile. That should be taken from env variables.
Hi
I've tried to get events from nakadi topic. Somehow I've forgot to add a X-nakadi-cursor to http request and expected to receive some http 412 error. Instead I've got never ending partition data stream with http 200. Is it ok ? It is quite confusing.
Curl output
* Trying 54.246.212.161...
* Connected to nakadi-testing.aruha-test.zalan.do (54.246.212.161) port 443 (#0)
* TLS 1.2 connection using TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA256
* Server certificate: *.aruha-test.zalan.do
* Server certificate: Zalando Technology serviceCA
> GET /event-types/trial-product-availability-events-100/events?stream_limit=20 HTTP/1.1
> Host: nakadi-testing.aruha-test.zalan.do
> User-Agent: curl/7.43.0
> Accept: */*
> Content-Type: application/json
> Authorization: Bearer token
>
< HTTP/1.1 200 OK
HTTP/1.1 200 OK
< Cache-Control: no-cache, no-store, max-age=0, must-revalidate
Cache-Control: no-cache, no-store, max-age=0, must-revalidate
< Content-Type: text/plain;charset=UTF-8
Content-Type: text/plain;charset=UTF-8
< Date: Thu, 31 Mar 2016 08:52:56 GMT
Date: Thu, 31 Mar 2016 08:52:56 GMT
< Expires: 0
Expires: 0
< Pragma: no-cache
Pragma: no-cache
< Server: Apache-Coyote/1.1
Server: Apache-Coyote/1.1
< Vary: Accept-Encoding
Vary: Accept-Encoding
< X-Content-Type-Options: nosniff
X-Content-Type-Options: nosniff
< X-Flow-Id: cJMACbHUUpNeNNNPMvOuTqto
X-Flow-Id: cJMACbHUUpNeNNNPMvOuTqto
< X-Frame-Options: DENY
X-Frame-Options: DENY
< X-XSS-Protection: 1; mode=block
X-XSS-Protection: 1; mode=block
< transfer-encoding: chunked
transfer-encoding: chunked
< Connection: keep-alive
Connection: keep-alive
<
{"cursor":{"partition":"0","offset":"102203"}}
{"cursor":{"partition":"1","offset":"BEGIN"}}
{"cursor":{"partition":"2","offset":"BEGIN"}}
{"cursor":{"partition":"3","offset":"BEGIN"}}
{"cursor":{"partition":"4","offset":"BEGIN"}}
{"cursor":{"partition":"5","offset":"BEGIN"}}
{"cursor":{"partition":"6","offset":"BEGIN"}}
{"cursor":{"partition":"7","offset":"BEGIN"}}
...
According to https://docs.travis-ci.com/user/docker/ it is possible to docker-compose
to build in travis-ci.
Currently we use Docker build of the ZooKeeper and Kafka, but still use the pip3 installer directly and not as a docker run.
Actually experimenting with docker-compose
could be even more interesting, and will make it possible to build all the components of the system the same way as we would in production.
It should be possible to create topics from API. Currently it is only possible with a use of some black magic.
Define and field cardinality on EventType definition.
The following date is rejected "2016-03-23T12:52:06.8Z" by nakadi but it should be accepted, since 1 digit milliseconds is specified by the rfc https://www.ietf.org/rfc/rfc3339.txt as valid.
Implement component for execution of enrichment of events based on the rules defined on the EventType. It MUST have access to the incoming request including headers besides the current state of the Event.
This eventType was created in the sandboxed environment.
GET {{nakadi-server}}/event-types/test-client-integration-event-2136273392--1957566939-219830819
The field enrichment_strategies
is empty. Should it not be removed from the response?
{
"name": "test-client-integration-event-2136273392--1957566939-219830819",
"owning_application": "???-???",
"category": "undefined",
"enrichment_strategies": [],
"partition_strategy": "random",
"partition_key_fields": [
"order_number"
],
"schema": {
"type": "json_schema",
"schema": "{ \"properties\": { \"order_number\": { \"type\": \"string\" } } }"
}
}
Kafka does not accept topic names with more than 255 chars. Since this is not validated, it fails to create such topic and returns a 500.
It would be better to validate name length informing users with a 422 and a proper detail message.
An invalid query (partition with newest_available_offset = BEGIN
) on [...]/events
leads to HTTP 412. The response does not contain a Content-Type
header, however the values are matching the problem type (application/problem+json
).
$ http --stream http://localhost:8080/event-types/order.ORDER_CANCELLED/events X-Nakadi-Cursors:"[{\"partition\": \"1\", \"offset\": \"0\"}]"
HTTP/1.1 412 Precondition Failed
Cache-Control: no-cache, no-store, max-age=0, must-revalidate
Content-Length: 111
Date: Sat, 05 Mar 2016 23:28:19 GMT
Expires: 0
Pragma: no-cache
Server: Apache-Coyote/1.1
X-Content-Type-Options: nosniff
X-Flow-Id: 1PoiUMR1oA34KtxvbJirPHhY
X-Frame-Options: DENY
X-XSS-Protection: 1; mode=block
{"type":"http://httpstatus.es/412","title":"Precondition Failed","status":412,"detail":"cursors are not valid"}
Since metadata is a field owned by Nakadi, it should not be allowed in schemas defined by users.
To correctly detect out-of-order events, producers should provide a transaction number (like Oracle's SCN, Posgres' WAL position etc. Better term welcome.) that is strictly monotonously increasing for each data_key. Providing this information should be mandatory for each event reporting a modification of persisted state, i.e. for Data Change Events / Resource Events. (Note: clock times are insufficiently reliable for strict ordering.) Typical CDC solutions are able to provide this information.
There are some possibilities for the persistence layer for the EventType data.
Currently we just check that token is valid. We should create the scopes and authorize according to scope the user-service has.
Besides periodically refreshing the list of available brokers https://github.com/zalando/nakadi/blob/nakadi-jvm/src/main/java/de/zalando/aruha/nakadi/repository/kafka/KafkaLocationManager.java#L90-L98 Nakadi does not refresh it's producer instances with up-to-date properties (list of brokers).
Since producers are created during initialisation phase only https://github.com/zalando/nakadi/blob/nakadi-jvm/src/main/java/de/zalando/aruha/nakadi/repository/kafka/KafkaFactory.java#L16, they will not use recently created brokers. It may cause inconsistent behaviour when publishing events.
Nakadi should recover from such scenarios by discovering recently created brokers and updating producers with such information.
For now streaming interface is not implemented
We need to improve the logging of push-endpoint. Currently if error-response is created on connexion-level we don't have too much log data about that. We just have simple access log from gunicorn:
[02/Nov/2015:13:03:10 +0000] "POST /topics/eventstore.article.16/events HTTP/1.1" 400 152 "-" "Java/1.8.0_05"
But in this case we need at least to see the request body.
First we need to check if it is possible to configure connexion to log request body in a case of creation of error-response.
in https://github.com/zalando/nakadi/blob/nakadi-jvm/api/nakadi-event-bus-api.yaml#L879 it says regarding the metadata/eid "Clients are allowed to generate this", which sounds to me like its not mandatory to create this by yourself as a producer.
in https://github.com/zalando/nakadi/blob/nakadi-jvm/api/nakadi-event-bus-api.yaml#L922 though it says that eid is required.
which one is the correct interpretation?
It is inconvenient to provide event schema definition as a string.
Suggestion is to support schema_object field of type object along side/instead of schema string field
Also there is yaml mentioned, but json expected
https://github.com/zalando/nakadi/blob/nakadi-jvm/api/nakadi-event-bus-api.yaml#L1182
The EventTypeSchema
spec suggests json-schema
as valid value for type
:
EventTypeSchema:
properties:
type:
type: string
example: 'json-schema'
description: The type of schema definition (avro, json-schema, etc).
However, the application requires JSON_SCHEMA
as value. Neither json-schema
nor json_schema
are accepted.
$ http -v -a mytok: --verify no https://nakadi-testing.aruha-test.example.org/event-types < event-schema-2.json
POST /event-types HTTP/1.1
Accept: application/json
[...]
User-Agent: HTTPie/0.9.3
{
"category": "data",
"name": "order.ORDER_CANCELLED",
"schema": {
"schema": "{ \"properties\": { \"order_number\": { \"type\": \"string\" } } }",
"type": "json_schema"
}
}
HTTP/1.1 400 Bad Request
[...]
X-Flow-Id: vZW4XYZXO2W3tudzBdi0Jwmy
{
"detail": "Could not read document: Can not construct instance of de.zalando.aruha.nakadi.domain.EventTypeSchema$Type from String value 'json_schema': value not one of declared Enum instance names: [JSON_SCHEMA]\n at [Source: java.io.PushbackInputStream@27b876fd; line: 5, column: 3] (through reference chain: de.zalando.aruha.nakadi.domain.EventType[\"schema\"]->de.zalando.aruha.nakadi.domain.EventTypeSchema[\"type\"]); nested exception is com.fasterxml.jackson.databind.exc.InvalidFormatException: Can not construct instance of de.zalando.aruha.nakadi.domain.EventTypeSchema$Type from String value 'json_schema': value not one of declared Enum instance names: [JSON_SCHEMA]\n at [Source: java.io.PushbackInputStream@27b876fd; line: 5, column: 3] (through reference chain: de.zalando.aruha.nakadi.domain.EventType[\"schema\"]->de.zalando.aruha.nakadi.domain.EventTypeSchema[\"type\"])",
"status": 400,
"title": "Bad Request",
"type": "http://httpstatus.es/400"
}
Creation with JSON_SCHEMA
works perfectly:
$ http -v -a mytok: --verify no https://nakadi-testing.aruha-test.example.org/event-types < event-schema-2.json
POST /event-types HTTP/1.1
[...]
{
"category": "data",
"name": "order.ORDER_CANCELLED",
"schema": {
"schema": "{ \"properties\": { \"order_number\": { \"type\": \"string\" } } }",
"type": "JSON_SCHEMA"
}
}
HTTP/1.1 201 Created
[...]
X-Flow-Id: p1nSPcqtl7PnmP8XoWAYcOE7
This must include underlying creation of topic (ticket #7).
java.lang.RuntimeException: Could not read url: /event-types/PRODUCT_AVAILABILITY_UPDATES/events, status code: 500, message: Internal Server Error, result: {"type":"http://httpstatus.es/500","title":"Internal Server Error","status":500,"detail":"An internal error happened. Please report it. (ETIF4fVmOryh8ZOUiC7hkrbx2it)"}
As a Nakadi user
I want to get a useful error response if I specify invalid enum constants in my json body
Such that I can debug my request easily
For example, if someone posts an event-type json using invalid enum constants, she ends up with an error response like this:
{
"detail": "Could not read document: No enum constant de.zalando.aruha.nakadi.domain.EventTypeSchema.Type.JSON_SCHEMX (through reference chain: de.zalando.aruha.nakadi.domain.EventType[\"schema\"]->de.zalando.aruha.nakadi.domain.EventTypeSchema[\"type\"]); nested exception is com.fasterxml.jackson.databind.JsonMappingException: No enum constant de.zalando.aruha.nakadi.domain.EventTypeSchema.Type.JSON_SCHEMX (through reference chain: de.zalando.aruha.nakadi.domain.EventType[\"schema\"]->de.zalando.aruha.nakadi.domain.EventTypeSchema[\"type\"])",
"status": 400,
"title": "Bad Request",
"type": "http://httpstatus.es/400"
}
Todo:
..to make it easier for people without Gradle (like me)...
Implementation could lead to data loss due to race near queue assignment
https://github.com/zalando/nakadi/blob/nakadi-jvm/src/main/java/de/zalando/aruha/nakadi/repository/kafka/NakadiKafkaConsumer.java#L69
In general internal state should not be changed/changed+checked without proper synchronization
EventTypeSchema
requires type
and schema
attributes. In case the latter is missing, an HTTP 500 error is returned (including a suggestion to report this issue ๐).
$ http -v -a mytok: --verify no https://nakadi-testing.aruha-test.example.org/event-types name=order.ORDER_ACCEPTED category=data schema:="{\"type\": \"JSON_SCHEMA\"}"
POST /event-types HTTP/1.1
[...]
{
"category": "data",
"name": "order.ORDER_ACCEPTED",
"schema": {
"type": "JSON_SCHEMA"
}
}
HTTP/1.1 500 Internal Server Error
[...]
X-Flow-Id: lefUl6RPZCBA5jklfhjkBG75
{
"detail": "An internal error happened. Please report it. (ETIWabzmuHk2uoob0Kq0w7mdaw7)",
"status": 500,
"title": "Internal Server Error",
"type": "http://httpstatus.es/500"
}
The API spec suggests using dots .
in event names (i.e. gizig.price-change
) to denote the owning application, see
nakadi/api/nakadi-event-bus-api.yaml
Line 967 in fefb5c7
However, using dots in event type names (i.e. order.ORDER_CREATED
) reveals an inconsistency the application's behavior:
/event-types
"detail": "EventType \"order\" does not exist."
suggests the name is truncated somewhere in the process)409 Conflict
/event-types/order.ORDER_CREATED/partitions
and .../partitions/{id}
Looking at the code, it seems that the DB access to access the EventType's schema has problems with the dot in event's name. However, on Kafka and Zookeeper side, everything seems to be fine.
Date: Sun, 28 Feb 2016 13:00:01 GMT
Event type list:
$ http -v -a mytok: --verify no GET https://nakadi-testing.aruha-test.example.org/event-types
(...)
[
[...]
{
"category": "data",
"name": "order.ORDER_CANCELLED",
"owning_application": null,
"schema": {
"schema": "{ \"properties\": { \"order_number\": { \"type\": \"string\" } } }",
"type": "JSON_SCHEMA"
}
}
]
Event type retrieval by name: โ
$ http -v -a mytok: --verify no GET https://nakadi-testing.aruha-test.example.org/event-types/order.ORDER_CANCELLED
GET /event-types/order.ORDER_CANCELLED HTTP/1.1
[...]
HTTP/1.1 404 Not Found
[...]
X-Flow-Id: sWwDK8bRDFPcKoy4W0VCYEpa
{
"detail": "EventType \"order\" does not exist.",
"status": 404,
"title": "Not Found",
"type": "http://httpstatus.es/404"
}
Schema update: โ
$ http -v -a mytok: --verify no PUT https://nakadi-testing.aruha-test.example.org/event-types/order.ORDER_CANCELLED < event-schema-2.json
PUT /event-types/order.ORDER_CANCELLED HTTP/1.1
[...]
{
"category": "data",
"name": "order.ORDER_CANCELLED",
"schema": {
"schema": "{ \"properties\": { \"order_number\": { \"type\": \"string\" }, \"cancellation_reason\": \"string\" } }",
"type": "JSON_SCHEMA"
}
}
HTTP/1.1 404 Not Found
[...]
X-Flow-Id: 4bDmpKzwgvVRpGNC0vTvDuvF
{
"detail": "EventType \"order\" does not exist.",
"status": 404,
"title": "Not Found",
"type": "http://httpstatus.es/404"
}
Posting events:
$ http -v -a mytok: --verify no https://nakadi-testing.aruha-test.example.org/event-types/order.ORDER_CANCELLED/events order_number=123
POST /event-types/order.ORDER_CANCELLED/events HTTP/1.1
[...]
{
"order_number": "123"
}
HTTP/1.1 201 Created
[...]
X-Flow-Id: 3HoZulX2BeUZvL4bzgtXYtEL
Partitions:
$ http -v -a mytok: --verify no https://nakadi-testing.aruha-test.example.org/event-types/order.ORDER_CANCELLED/partitions/1
GET /event-types/order.ORDER_CANCELLED/partitions/1 HTTP/1.1
[...]
HTTP/1.1 200 OK
[...]
X-Flow-Id: vX4xSoF0eVny2S36fzziWbOj
[...]
{
"newest_available_offset": "1",
"oldest_available_offset": "0",
"partition": "1"
}
Create the possibility to stream the events from several partitions at the same time. We should also think how that can be specified in request.
Correct spelling is "occurred_at" (double c, double r). Please see: http://www.merriam-webster.com/dictionary/occur
The link mentioned in the README.md which should referr to the swagger API is broken and leads to a png.
@hjacobs proposed to migrate to py.test instead of unittest. I think that makes sense, unittest is quite old stuff and the output of py.test is more clean.
Each incoming event must have a unique id generated for it. This id must be stored and a lookup must be possible.
There are some slight differences between Swagger's (now OpenAPI's) model schema and JSON schema. One of them is that the additionalProperties
key allows only object values (which mean schemas), not boolean true
(which would be equivalent to an empty object) nor false
(which is the default if the key is omitted).
This makes the Nakadi API in the current form unusable with Swagger Codegen (these are simply ignored).
I tried out Nakadi on staging env (nakadi-testing, nakadi-staging). I was able to create an event type, post events and see updated partition offsets. However, the event retrieval via Nakadi (/event-types/{event-name}/events
is accepted with HTTP/1.1 200 OK
, but afterwards it's stuck and times out on client side.
$ http -v -a mytok: --verify no https://nakadi-testing.aruha-test.example.org/event-types/order.ORDER_CANCELLED/events X-Nakadi-Cursors:"[{\"partition\": \"1\", \"offset\": \"0\"}]"
GET /event-types/order.ORDER_CANCELLED/events HTTP/1.1
[...]
X-Nakadi-Cursors: [{"partition": "1", "offset": "0"}]
HTTP/1.1 200 OK
[...]
X-Flow-Id: cVKWzOoUKzlnD0pzcJzvO4LO
I was able to reproduce this behavior also locally. I ran ./gradlew startDockerContainer
on nakadi-jvm (a67bf5d) to start Nakadi.
After checking the Kafka topics directly, I can confirm that the events have been written correctly to the topic and can be read from Kafka using ./kafka-console-consumer.sh --zookeeper localhost:2181 --topic order.ORDER_RECEIVED --from-beginning
.
When running the command: curl --request GET http://localhost:8080/event-types
Following error occurs:
{"type":"http://httpstatus.es/500","title":"Internal Server Error","status":500,"detail":"An internal error happened. Please report it. (ETIalRl2lus7Zr8HtclEe51U51j)"}
Currently we log too much. Now it's more than 100mb of logs in an hour. We should remove the amount of debug information logged.
Slides probably only for Zalando users explains on page 11 that this should work:
$ gradle startDockerContainer
It doesn't, for me (OS X, gradle 2.12, docker v.1.11):
$ gradle startDockerContainer
:compileJava
Note: /Users/akauppi/Git/nakadi/src/main/java/de/zalando/aruha/nakadi/config/JsonConfig.java uses unchecked or unsafe operations.
Note: Recompile with -Xlint:unchecked for details.
:compileGroovy UP-TO-DATE
:processResources
:classes
:jar
:findMainClass
:startScripts
:distTar
:distZip
:bootRepackage
:generateScmSourceJson UP-TO-DATE
:buildDockerImage
Building docker image with tag: aruha/nakadi:AUTOBUILD
Running command: docker build -t aruha/nakadi:AUTOBUILD .
:buildDockerImage FAILED
FAILURE: Build failed with an exception.
* Where:
Build file '/Users/akauppi/Git/nakadi/build.gradle' line: 309
* What went wrong:
Execution failed for task ':buildDockerImage'.
> docker build failed.
* Try:
Run with --stacktrace option to get the stack trace. Run with --info or --debug option to get more log output.
BUILD FAILED
Total time: 6.584 secs
This is from commit 713f9ca of nakadi-jvm
branch.
Creation of events without owning_application
fails with an error:
{
"type": "http://httpstatus.es/422",
"title": "Unprocessable Entity",
"detail": "Field \"owning_application\" may not be null\n",
"status": 422
}
Is owning_application
a required field for EventType
now? The API spec states otherwise.
{
"name": "test",
"category": "data",
"schema": {
"type": "json_schema",
"schema": "{ \"properties\": { \"order_number\": { \"type\": \"string\" } } }"
}
}
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.