Git Product home page Git Product logo

karapace's Introduction

Karapace

karapace. Your Apache Kafka® essentials in one tool.

An open-source implementation of Kafka REST and Schema Registry.

Tests Contributor Covenant

Overview

Karapace supports the storing of schemas in a central repository, which clients can access to serialize and deserialize messages. The schemas also maintain their own version histories and can be checked for compatibility between their different respective versions.

Karapace rest provides a RESTful interface to your Apache Kafka cluster, allowing you to perform tasks such as producing and consuming messages and perform administrative cluster work, all the while using the language of the WEB.

Features

  • Drop-in replacement both on pre-existing Schema Registry / Kafka Rest Proxy client and server-sides
  • Moderate memory consumption
  • Asynchronous architecture based on aiohttp
  • Supports Avro, JSON Schema, and Protobuf
  • Leader/Replica architecture for HA and load balancing

Compatibility details

Karapace is compatible with Schema Registry 6.1.1 on API level. When a new version of SR is released, the goal is to support it in a reasonable time. Karapace supports all operations in the API.

There are some caveats regarding the schema normalization, and the error messages being the same as in Schema Registry, which cannot be always fully guaranteed.

Setup

Using Docker

To get you up and running with the latest build of Karapace, a docker image is available:

# Fetch the latest build from main branch
docker pull ghcr.io/aiven-open/karapace:develop

# Fetch the latest release
docker pull ghcr.io/aiven-open/karapace:latest

Versions 3.7.1 and earlier are available from the ghcr.io/aiven registry:

docker pull ghcr.io/aiven/karapace:3.7.1

An example setup including configuration and Kafka connection is available as compose example:

docker compose -f ./container/compose.yml up -d

Then you should be able to reach two sets of endpoints:

Configuration

Each configuration key can be overridden with an environment variable prefixed with KARAPACE_, exception being configuration keys that actually start with the karapace string. For example, to override the bootstrap_uri config value, one would use the environment variable KARAPACE_BOOTSTRAP_URI. Here you can find an example configuration file to give you an idea what you need to change.

Source install

Alternatively you can do a source install using:

python setup.py install

Quickstart

To register the first version of a schema under the subject "test" using Avro schema:

$ curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
  --data '{"schema": "{\"type\": \"record\", \"name\": \"Obj\", \"fields\":[{\"name\": \"age\", \"type\": \"int\"}]}"}' \
  http://localhost:8081/subjects/test-key/versions
{"id":1}

To register a version of a schema using JSON Schema, one needs to use schemaType property:

$ curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
  --data '{"schemaType": "JSON", "schema": "{\"type\": \"object\",\"properties\":{\"age\":{\"type\": \"number\"}},\"additionalProperties\":true}"}' \
  http://localhost:8081/subjects/test-key-json-schema/versions
{"id":2}

To list all subjects (including the one created just above):

$ curl -X GET http://localhost:8081/subjects
["test-key"]

To list all the versions of a given schema (including the one just created above):

$ curl -X GET http://localhost:8081/subjects/test-key/versions
[1]

To fetch back the schema whose global id is 1 (i.e. the one registered above):

$ curl -X GET http://localhost:8081/schemas/ids/1
{"schema":"{\"fields\":[{\"name\":\"age\",\"type\":\"int\"}],\"name\":\"Obj\",\"type\":\"record\"}"}

To get the specific version 1 of the schema just registered run:

$ curl -X GET http://localhost:8081/subjects/test-key/versions/1
{"subject":"test-key","version":1,"id":1,"schema":"{\"fields\":[{\"name\":\"age\",\"type\":\"int\"}],\"name\":\"Obj\",\"type\":\"record\"}"}

To get the latest version of the schema under subject test-key run:

$ curl -X GET http://localhost:8081/subjects/test-key/versions/latest
{"subject":"test-key","version":1,"id":1,"schema":"{\"fields\":[{\"name\":\"age\",\"type\":\"int\"}],\"name\":\"Obj\",\"type\":\"record\"}"}

In order to delete version 10 of the schema registered under subject "test-key" (if it exists):

$ curl -X DELETE http://localhost:8081/subjects/test-key/versions/10
 10

To Delete all versions of the schema registered under subject "test-key":

$ curl -X DELETE http://localhost:8081/subjects/test-key
[1]

Test the compatibility of a schema with the latest schema under subject "test-key":

$ curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
  --data '{"schema": "{\"type\": \"int\"}"}' \
  http://localhost:8081/compatibility/subjects/test-key/versions/latest
{"is_compatible":true}

Get current global backwards compatibility setting value:

$ curl -X GET http://localhost:8081/config
{"compatibilityLevel":"BACKWARD"}

Change compatibility requirements for all subjects where it's not specifically defined otherwise:

$ curl -X PUT -H "Content-Type: application/vnd.schemaregistry.v1+json" \
  --data '{"compatibility": "NONE"}' http://localhost:8081/config
{"compatibility":"NONE"}

Change compatibility requirement to FULL for the test-key subject:

$ curl -X PUT -H "Content-Type: application/vnd.schemaregistry.v1+json" \
  --data '{"compatibility": "FULL"}' http://localhost:8081/config/test-key
{"compatibility":"FULL"}

List topics:

$ curl "http://localhost:8082/topics"

Get info for one particular topic:

$ curl "http://localhost:8082/topics/my_topic"

Produce a message backed up by schema registry:

$ curl -H "Content-Type: application/vnd.kafka.avro.v2+json" -X POST -d \
  '{"value_schema": "{\"namespace\": \"example.avro\", \"type\": \"record\", \"name\": \"simple\", \"fields\": \
  [{\"name\": \"name\", \"type\": \"string\"}]}", "records": [{"value": {"name": "name0"}}]}' http://localhost:8082/topics/my_topic

Create a consumer:

$ curl -X POST -H "Content-Type: application/vnd.kafka.v2+json" -H "Accept: application/vnd.kafka.v2+json" \
  --data '{"name": "my_consumer", "format": "avro", "auto.offset.reset": "earliest"}' \
  http://localhost:8082/consumers/avro_consumers

Subscribe to the topic we previously published to:

$ curl -X POST -H "Content-Type: application/vnd.kafka.v2+json" --data '{"topics":["my_topic"]}' \
  http://localhost:8082/consumers/avro_consumers/instances/my_consumer/subscription

Consume previously published message:

$ curl -X GET -H "Accept: application/vnd.kafka.avro.v2+json" \
  http://localhost:8082/consumers/avro_consumers/instances/my_consumer/records?timeout=1000

Commit offsets for a particular topic partition:

$ curl -X POST -H "Content-Type: application/vnd.kafka.v2+json" --data '{}' \
  http://localhost:8082/consumers/avro_consumers/instances/my_consumer/offsets

Delete consumer:

$ curl -X DELETE -H "Accept: application/vnd.kafka.v2+json" \
  http://localhost:8082/consumers/avro_consumers/instances/my_consumer

Backing up your Karapace

Karapace natively stores its data in a Kafka topic the name of which you can configure freely but which by default is called _schemas.

Karapace includes a tool to backing up and restoring data. To back up, run:

karapace_schema_backup get --config karapace.config.json --location schemas.log

You can also back up the data by using Kafka's Java console consumer:

./kafka-console-consumer.sh --bootstrap-server brokerhostname:9092 --topic _schemas --from-beginning --property print.key=true --timeout-ms 1000 1> schemas.log

Restoring Karapace from backup

Your backup can be restored with Karapace by running:

karapace_schema_backup restore --config karapace.config.json --location schemas.log

Or Kafka's Java console producer can be used to restore the data to a new Kafka cluster.

You can restore the data from the previous step by running:

./kafka-console-producer.sh --broker-list brokerhostname:9092 --topic _schemas --property parse.key=true < schemas.log

Performance comparison to Confluent stack

Latency

  • 50 concurrent connections, 50.000 requests
Format Karapace Confluent
Avro 80.95 7.22
Binary 66.32 46.99
Json 60.36 53.7
  • 15 concurrent connections, 50.000 requests
Format Karapace Confluent
Avro 25.05 18.14
Binary 21.35 15.85
Json 21.38 14.83
  • 4 concurrent connections, 50.000 requests
Format Karapace Confluent
Avro 6.54 5.67
Binary 6.51 4.56
Json 6.86 5.32

Also, it appears there is quite a bit of variation on subsequent runs, especially for the lower numbers, so once more exact measurements are required, it's advised we increase the total req count to something like 500K

We'll focus on Avro serialization only after this round, as it's the more expensive one, plus it tests the entire stack

Consuming RAM

A basic push pull test , with 12 connections on the publisher process and 3 connections on the subscriber process, with a 10 minute duration. The publisher has the 100 ms timeout and 100 max_bytes parameters set on each request so both processes have work to do Heap size limit is set to 256M on Rest proxy

Ram consumption, different consumer count, over 300s

Consumers Karapace combined Confluent rest
1 47 200
10 55 400
20 83 530

Commands

Once installed, the karapace program should be in your path. It is the main daemon process that should be run under a service manager such as systemd to serve clients.

Configuration keys

Keys to take special care are the ones needed to configure Kafka and advertised_hostname.

Parameter Default Value Description
http_request_max_size 1048576 The maximum client HTTP request size. This value controls how large (POST) payloads are allowed. When configuration of karapace_rest is set to true and http_request_max_size is not set, Karapace configuration adapts the allowed client max size from the producer_max_request_size. In cases where automatically selected size is not enough the configuration can be overridden by setting a value in configuration. For schema registry operation set the client max size according to expected size of schema payloads if default size is not enough.
advertised_protocol http The protocol being advertised to other instances of Karapace that are attached to the same Kafka group.
advertised_hostname socket.gethostname() The hostname being advertised to other instances of Karapace that are attached to the same Kafka group. All nodes within the cluster need to have their advertised_hostname's set so that they can all reach each other.
advertised_port None The port being advertised to other instances of Karapace that are attached to the same Kafka group. Fallbacks to port if not set.
bootstrap_uri localhost:9092 The URI to the Kafka service where to store the schemas and to run coordination among the Karapace instances.
sasl_bootstrap_uri None The URI to the Kafka service to use with the Kafka REST API when SASL authorization with REST is used.
client_id sr-1 The client_id Karapace will use when coordinating with other Karapace instances. The instance with the ID that sorts first alphabetically is chosen as master from the services with master_eligibility set to true.
consumer_enable_autocommit True Enable auto commit on rest proxy consumers
consumer_request_timeout_ms 11000 Rest proxy consumers timeout for reads that do not limit the max bytes or provide their own timeout
consumer_request_max_bytes 67108864 Rest proxy consumers maximum bytes to be fetched per request
consumer_idle_disconnect_timeout 0 Disconnect idle consumers after timeout seconds if not used. Inactivity leads to consumer leaving consumer group and consumer state. 0 (default) means no auto-disconnect.
fetch_min_bytes 1 Rest proxy consumers minimum bytes to be fetched per request.
group_id schema-registry The Kafka group name used for selecting a master service to coordinate the storing of Schemas.
master_eligibility true Should the service instance be considered for promotion to the master service. One reason to turn this off would be to have an instance of Karapace running somewhere else for HA purposes but which you wouldn't want to automatically promote to master if the primary instances become unavailable.
producer_compression_type None Type of compression to be used by rest proxy producers
producer_acks 1 Level of consistency desired by each producer message sent on the rest proxy. More on Kafka Producer
producer_linger_ms 0 Time to wait for grouping together requests. More on Kafka Producer
producer_max_request_size 1048576 The maximum size of a request in bytes. More on Kafka Producer configs
security_protocol PLAINTEXT Default Kafka security protocol needed to communicate with the Kafka cluster. Other options is to use SSL for SSL client certificate authentication.
sentry None Used to configure parameters for sentry integration (dsn, tags, ...). Setting the environment variable SENTRY_DSN will also enable sentry integration.
ssl_cafile /path/to/cafile Used when security_protocol is set to SSL, the path to the SSL CA certificate.
ssl_certfile /path/to/certfile Used when security_protocol is set to SSL, the path to the SSL certfile.
ssl_keyfile /path/to/keyfile Used when security_protocol is set to SSL, the path to the SSL keyfile.
topic_name _schemas The name of the Kafka topic where to store the schemas.
replication_factor 1 The replication factor to be used with the schema topic.
host 127.0.0.1 Listening host for the Karapace server. Use an empty string to listen to all available networks.
port 8081 Listening port for the Karapace server.
server_tls_certfile /path/to/certfile Filename to a certificate chain for the Karapace server in HTTPS mode.
server_tls_keyfile /path/to/keyfile Filename to a private key for the Karapace server in HTTPS mode.
registry_host 127.0.0.1 Schema Registry host, used by Kafka Rest for schema related requests. If running both in the same process, it should be left to its default value
registry_port 8081 Schema Registry port, used by Kafka Rest for schema related requests. If running both in the same process, it should be left to its default value
registry_user None Schema Registry user for authentication, used by Kafka Rest for schema related requests.
registry_password None Schema Registry password for authentication, used by Kafka Rest for schema related requests.
registry_ca /path/to/cafile Kafka Registry CA certificate, used by Kafka Rest for Avro related requests. If this is set, Kafka Rest will use HTTPS to connect to the registry. If running both in the same process, it should be left to its default value
registry_authfile /path/to/authfile.json Filename to specify users and access control rules for Karapace Schema Registry. If this is set, Schema Segistry requires authentication for most of the endpoints and applies per endpoint authorization rules.
rest_authorization false Use REST API's calling authorization credentials to invoke Kafka operations over SASL authentication of sasl_bootstrap_uri to delegate REST proxy authorization to Kafka. If false, then use configured common credentials for all Kafka connections of REST proxy operations.
rest_base_uri None Publicly available URI of this instance advertised to the clients using stateful operations such as creating consumers. If not set, then construct URI using advertised_protocol, advertised_hostname, and advertised_port.
metadata_max_age_ms 60000 Period of time in milliseconds after Kafka metadata is force refreshed.
karapace_rest true If the rest part of the app should be included in the starting process At least one of this and karapace_registry options need to be enabled in order for the service to start
karapace_registry true If the registry part of the app should be included in the starting process At least one of this and karapace_rest options need to be enabled in order for the service to start
protobuf_runtime_directory runtime Runtime directory for the protoc protobuf schema parser and code generator
name_strategy topic_name Name strategy to use when storing schemas from the kafka rest proxy service. You can opt between topic_name , record_name and topic_record_name
name_strategy_validation true If enabled, validate that given schema is registered under used name strategy when producing messages from Kafka Rest
master_election_strategy lowest Decides on what basis the Karapace cluster master is chosen (only relevant in a multi node setup)

Authentication and authorization of Karapace Schema Registry REST API

To enable HTTP Basic Authentication and user authorization the authorization configuration file is set in the main configuration key registry_authfile of the Karapace.

Karapace Schema Registry authorization file is an optional JSON configuration, which contains a list of authorized users in users and a list of access control rules in permissions.

Each user entry contains following attributes:

Parameter Description
username A string
algorithm One of supported hashing algorithms, scrypt, sha1, sha256, or sha512
salt Salt used for hashing the password
password_hash Hash string of the password calculated using given algorithm and salt.

Password hashing can be done using karapace_mkpasswd tool, if installed, or by invoking directly with python -m karapace.auth. The tool generates JSON entry with these fields.

$ karapace_mkpasswd -u user -a sha512 secret
{
    "username": "user",
    "algorithm": "sha512",
    "salt": "iuLouaExTeg9ypqTxqP-dw",
    "password_hash": "R6ghYSXdLGsq6hkQcg8wT4xkD4QToxBhlp7NerTnyB077M+mD2qiN7ZxXCDb4aE+5lExu5P11UpMPYAcVYxSQA=="
}

Each access control rule contains following attributes:

Parameter Description
username A string to match against authenticated user
operation Exact value of Read or Write. Write implies also read permissions. Write includes all mutable operations, e.g. deleting schema versions
resource A regular expression used to match against accessed resource.

Supported resource authorization:

Resource Description
Config: Controls authorization to global schema registry configuration.
Subject:<subject_name> Controls authorization to subject. The <subject_name> is a regular expression to match against the accessed subject.

Example of complete authorization file

{
    "users": [
        {
            "username": "admin",
            "algorithm": "scrypt",
            "salt": "<put salt for randomized hashing here>",
            "password_hash": "<put hashed password here>"
        },
        {
            "username": "plainuser",
            "algorithm": "sha256",
            "salt": "<put salt for randomized hashing here>",
            "password_hash": "<put hashed password here>"
        }
    ],
    "permissions": [
        {
            "username": "admin",
            "operation": "Write",
            "resource": ".*"
        },
        {
            "username": "plainuser",
            "operation": "Read",
            "resource": "Subject:general.*"
        },
        {
            "username": "plainuser",
            "operation": "Read",
            "resource": "Config:"
        }
    ]
}

Karapace Schema Registry access to the schemas topic

The principal used by the Karapace Schema Registry has to have adequate access to the schemas topic (see the topic_name configuration option above). In addition to what is required to access the topic, as described in the Confluent Schema Registry documentation, the unique, single-member consumer group used by consumers in the schema registry needs Describe and Read permissions on the group. These unique (per instance of the schema registry) consumer group names are prefixed by karapace-autogenerated-, followed by a random string.

OAuth2 authentication and authorization of Karapace REST proxy

The Karapace REST proxy supports passing OAuth2 credentials to the underlying Kafka service (defined in the sasl_bootstrap_uri configuration parameter). The JSON Web Token (JWT) is extracted from the Authorization HTTP header if the authorization scheme is Bearer, eg. Authorization: Bearer $JWT. If a Bearer token is present, the Kafka clients managed by Karapace will be created to use the SASL OAUTHBEARER mechanism and the JWT will be passed along. The Karapace REST proxy does not verify the token, that is done by the underlying Kafka service itself, if it's configured accordingly.

Authorization is also done by Kafka itself, typically using the sub claim (although it's configurable) from the JWT as the username, checked against the configured ACLs.

OAuth2 and Bearer token usage is dependent on the rest_authorization configuration parameter being true.

Token expiry

The REST proxy process manages a set of producer and consumer clients, which are identified by the OAuth2 JWT token. These are periodically cleaned up if they are idle, as well as before the JWT token expires (the clean up currently runs every 5 minutes).

Before a client refreshes its OAuth2 JWT token, it is expected to remove currently running consumers (eg. after committing their offsets) and producers using the current token.

Schema Normalization

If specified as a rest parameter for the POST /subjects/{subject}/versions?normalize=true endpoint and the POST subjects/{subject}?normalize=true endpoint, Karapace uses a schema normalization algorithm to ensure that the schema is stored in a canonical form.

This normalization process is done so that schemas semantically equivalent are stored in the same way and should be considered equal.

Normalization is currently only supported for Protobuf schemas. Karapace does not support all normalization features implemented by Confluent Schema Registry. Currently the normalization process is done only for the ordering of the optional fields in the schema. Use the feature with the assumption that it will be extended in the future and so two schemas that are semantically equivalent could be considered different by the normalization process in different future versions of Karapace. The safe choice, when using a normalization process, is always to consider as different two schemas that are semantically equivalent while the problem is when two semantically different schemas are considered equivalent. In that view the future extension of the normalization process isn't considered a breaking change but rather an extension of the normalization process.

Uninstall

To unistall Karapace from the system you can follow the instructions described below. We would love to hear your reasons for uninstalling though. Please file an issue if you experience any problems or email us with feedback

Installed via Docker

If you installed Karapace via Docker, you would need to first stop and remove the images like described:

First obtain the container IDs related to Karapace, you should have one for the registry itself and another one for the rest interface:

docker ps | grep karapace

After this, you can stop each of the containers with:

docker stop <CONTAINER_ID>

If you don't need or want to have the Karapace images around you can now proceed to delete them using:

docker rm <CONTAINER_ID>

Installed from Sources

If you installed Karapace from the sources via python setup.py install, it can be uninstalled with the following pip command:

pip uninstall karapace

Development

Execute make (GNU, usually gmake on BSD and Mac) to set up a venv and install the required software for development. Use make unit-tests and make integration-tests to execute the respective test suite, or simply make test to execute both. You can set PYTEST_ARGS to customize the execution (e.g. PYTEST_ARGS=--maxfail=1 make test).

By default pyenv is expected to be installed and in PATH. This ensures on all platforms that arbitrary Python versions can be used for development. It is possible to overwrite this by setting PYENV to something else (e.g. PYENV=python3 make venv to simply use the global Python executable). The default Python version is defined in .python-version.

Karapace currently depends on various system software to be installed. The installation of these is automated for some operation systems, but not all. At the time of writing Java, the Protobuf Compiler, and the Snappy shared library are required to work with Karapace. You need to install them manually if your operating system is not supported by the automatic installation scripts. Note that the scripts are going to ask before installing any of these on your system.

Note that Karapace requires a Protobuf Compiler older than 3.20.0, because 3.20.0 introduces various breaking changes. The tests are going to fail if the Protobuf Compiler is newer than that. However, you can work around this locally by running pip install --upgrade protobuf in your venv. We are going to fix this soon.

Note that the integration tests are currently not working on Mac. You can use Docker, just be sure to set VENV_DIR to a directory outside the working directory so that the container is not overwriting files from the host (e.g. docker run --env VENV_DIR=/tmp/venv ...).

Note that the runtime directory MUST exist and that Karapace is going to fail if it does not. The runtime directory is also not cleaned between test runs, and left over data might result in failing tests. Use the make test targets that correctly clean the runtime directory without deleting it, but keep this in mind whenever you are not using make (e.g. running tests from your IDE).

Note that the pre-commit checks are currently not working with the default Python version. This is because isort dropped Python 3.7 support. You have to use at least Python 3.8 for the pre-commit checks. Use pipx or brew or … to install pre-commit and use the global installation, there is also no dependency on it.

License

Karapace is licensed under the Apache license, version 2.0. Full license text is available in the LICENSE file.

Please note that the project explicitly does not require a CLA (Contributor License Agreement) from its contributors.

Contact

Bug reports and patches are very welcome, please post them as GitHub issues and pull requests at https://github.com/aiven/karapace . Any possible vulnerabilities or other serious issues should be reported directly to the maintainers <[email protected]>.

Trademark

Apache Kafka is either a registered trademark or trademark of the Apache Software Foundation in the United States and/or other countries. Kafka Rest and Schema Registry are trademarks and property of their respective owners. All product and service names used in this page are for identification purposes only and do not imply endorsement.

Credits

Karapace was created by, and is maintained by, Aiven cloud data hub developers.

The schema storing part of Karapace loans heavily from the ideas of the earlier Schema Registry implementation by Confluent and thanks are in order to them for pioneering the concept.

Recent contributors are listed on the GitHub project page, https://github.com/aiven/karapace/graphs/contributors

Copyright ⓒ 2021 Aiven Ltd.

karapace's People

Contributors

abn avatar aiven-anton avatar amstee avatar anatolypopov avatar dependabot[bot] avatar eliax1996 avatar fingon avatar fleshgrinder avatar ftisiot avatar giuseppelillo avatar hackaugusto avatar helenmel avatar ivanyu avatar jjaakola-aiven avatar jlprat avatar jongiddy avatar juha-aiven avatar keejon avatar kmichel-aiven avatar krisek avatar matyaskuti avatar mhoffm-aiven avatar nosahama avatar oikarinen avatar ormod avatar rikonen avatar ryanskraba avatar sebinsunny avatar tvainika avatar vtainio avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

karapace's Issues

karapace/version.py missing when building with pip

In order to be able to create production Dockerfile for karapace, it would be great if it could be distributed in pip.

pip build fails as the generation of karapace\version.py is depending on the git context (?) which is not available there.

To reproduce:
git archive --format zip --output /tmp/karapace.zip HEAD && pip install /tmp/karapace.zip

Processing /tmp/karapace.zip                                                                                                                                                                                                                  
    ERROR: Command errored out with exit status 1:                                                                                                                                                                                            
     command: /tmp/what/bin/python -c 'import sys, setuptools, tokenize; sys.argv[0] = '"'"'/tmp/pip-req-build-2l54z3xz/setup.py'"'"'; __file__='"'"'/tmp/pip-req-build-2l54z3xz/setup.py'"'"';f=getattr(tokenize, '"'"'open'"'"', open)(__fil
e__);code=f.read().replace('"'"'\r\n'"'"', '"'"'\n'"'"');f.close();exec(compile(code, __file__, '"'"'exec'"'"'))' egg_info --egg-base /tmp/pip-pip-egg-info-pgo_5_bd                                                                          
         cwd: /tmp/pip-req-build-2l54z3xz/                                                                                                                                                                                                    
    Complete output (7 lines):                                                                                                                                                                                                                
    Traceback (most recent call last):                                                                                                                                                                                                        
      File "<string>", line 1, in <module>                                                                                                                                                                                                    
      File "/tmp/pip-req-build-2l54z3xz/setup.py", line 16, in <module>                                                                                                                                                                       
        version_for_setup_py = version.get_project_version("karapace/version.py")                                                                                                                                                             
      File "/tmp/pip-req-build-2l54z3xz/version.py", line 45, in get_project_version                                                                                                                                                          
        raise Exception("version not available from git or from file {!r}".format(version_file))                                                                                                                                              
    Exception: version not available from git or from file '/tmp/pip-req-build-2l54z3xz/karapace/version.py'                                                                                                                                  
    ----------------------------------------                                                                                                                                                                                                  
ERROR: Command errored out with exit status 1: python setup.py egg_info Check the logs for full command output.             

As far as I see this version.py is generated by the Makefile when python3 setup.py sdist bdist_wheel is ran.

I have no idea where to put the logic to overcome this situation.

Implements `/clusters` API for REST Proxy

What is currently missing?

The /clusters APIs are not implemented for the REST Proxy

How could this be improved?

Implement these APIs:

Verb URL
GET /clusters
GET /clusters/<cluster_id>
GET /clusters/<cluster_id>/acls
POST /clusters/<cluster_id>/acls
DELETE /clusters/<cluster_id>/acls
GET /clusters/<cluster_id>/broker-configs
POST /clusters/<cluster_id>/broker-configs:alter
GET /clusters/<cluster_id>/broker-configs/<name>
PUT /clusters/<cluster_id>/broker-configs/<name>
DELETE /clusters/<cluster_id>/broker-configs/<name>
GET /clusters/<cluster_id>/broker/<broker_id>/configs
POST /clusters/<cluster_id>/broker/<broker_id>/configs:alter
GET /clusters/<cluster_id>/broker/<broker_id>/configs/<name>
PUT /clusters/<cluster_id>/broker/<broker_id>/configs/<name>
DELETE /clusters/<cluster_id>/broker/<broker_id>/configs/<name>
GET /clusters/<cluster_id>/topics/<topic_name>/configs
POST /clusters/<cluster_id>/topics/<topic_name>/configs:alter
GET /clusters/<cluster_id>/topics/<topic_name>/configs/<name>
PUT /clusters/<cluster_id>/topics/<topic_name>/configs/<name>
DELETE /clusters/<cluster_id>/topics/<topic_name>/configs/<name>
GET /clusters/<cluster_id>/brokers
GET /clusters/<cluster_id>/brokers/<broker_id>
DELETE /clusters/<cluster_id>/brokers/<broker_id>
GET /clusters/<cluster_id>/brokers/<broker_id>/partition-replicas
GET /clusters/<cluster_id>/consumer-groups
GET /clusters/<cluster_id>/consumer-groups/<consumer_group_id>
GET /clusters/<cluster_id>/consumer-groups/<consumer_group_id>/consumers
GET /clusters/<cluster_id>/consumer-groups/<consumer_group_id>/consumers/<consumer_id>
GET /clusters/<cluster_id>/consumer-groups/<consumer_group_id>/consumers/<consumer_id>/assignments/
GET /clusters/<cluster_id>/consumer-groups/<consumer_group_id>/consumers/<consumer_id>/assignments/<topic_name>/partitions/<partition_id>
GET /clusters/<cluster_id>/topics
POST /clusters/<cluster_id>/topics
GET /clusters/<cluster_id>/topics/<topic_name>
DELETE /clusters/<cluster_id>/topics/<topic_name>
GET /clusters/<cluster_id>/topics/<topic_name>/partitions
GET /clusters/<cluster_id>/topics/<topic_name>/partitions/<partition_id>
GET /clusters/<cluster_id>/topics/-/partitions/-/reassignment
GET /clusters/<cluster_id>/topics/<topic_name>/partitions/-/reassignment
GET /clusters/<cluster_id>/topics/<topic_name>/partitions/<partition_id>/reassignment
GET /clusters/<cluster_id>/topics/<topic_name>/partitions/<partition_id>/replicas
GET /clusters/<cluster_id>/topics/<topic_name>/partitions/<partition_id>/replicas/<broker_id>
GET /clusters/<cluster_id>/brokers/-/tasks
GET /clusters/<cluster_id>/brokers/<broker_id>/tasks
GET /clusters/<cluster_id>/brokers/-/tasks/<task_type>
GET /clusters/<cluster_id>/brokers/<broker_id>/tasks/<task_type>
GET /clusters/<cluster_id>/remove_broker-tasks
GET /clusters/<cluster_id>/remove_broker-tasks/<broker_id>

Undocumented features

Verb URL
POST "/v3/clusters/{clusterId}/topics/{topicName}/records"

Provide instructions for container usage

There is missing documentation on how to use the container images for deployment of the registry and rest proxy.

Information missing:

  • How to access the container image (name, repository, etc)
  • How to configure the registry and proxy
  • How to connect to external Kafka

Schema not found (40403) for existing schema using POST /subjects/<subject>

Hi,

we are in the process of migrating a Kafka Streams application to another cluster. In this process, we have copied the '_schemas' from our old (Confluent) schema registry to the new Aiven/Karapace one. However, the app throws the following error:

org.apache.kafka.common.errors.SerializationException: Error retrieving Avro schema: {\"type\":\"record\",\"name\":\"MyKey\",\"namespace\":\"my.example.key\",\"fields\":[{\"name\":\"did\",\"type\":\"string\"},{\"name\":\"company\",\"type\":\"string\"},{\"name\":\"country\",\"type\":\"string\"}]}

Upon further debuuging, I found the reason is when the schema refistry client is trying to look up the schema id by the schema string using POST /subjects/ with the following body:

{"schema":"{\"type\":\"record\",\"name\":\"MyKey\",\"namespace\":\"my.example.key\",\"fields\":[{\"name\":\"did\",\"type\":\"string\"},{\"name\":\"company\",\"type\":\"string\"},{\"name\":\"country\",\"type\":\"string\"}]}"}

This fails with an error code of 40403, as I can reproduce using curl.

However, the subject clearly exists with the exact same schema string in the registry:

GET /subjects/<mytopic>-key/versions/latest | jq '.schema'
{"id":1155,"schema":"{\"type\":\"record\",\"name\":\"MyKey\",\"namespace\":\"my.example.key\",\"fields\":[{\"name\":\"did\",\"type\":\"string\"},{\"name\":\"company\",\"type\":\"string\"},{\"name\":\"country\",\"type\":\"string\"}]}","subject":"<mytopic>-key","version":1}

What is going on here, does the POST-endpoint not work as expected?

Contribute to HUE

When working with Cloudera / Hortonworks products, I typically setup HUE for the data-science folk, and while I really like it, even as a standalone product by itself (it makes a great JDBC tool and Jupyter notebook and filebrowser replacement), the Kafka lib however, is relatively lacking https://github.com/cloudera/hue/tree/master/desktop/libs/kafka

What are your thoughts on contributing some work into making a Kafka dashboard in Mako templates?

cc @romainr

How to use Karapace with JSON

Hi, right now when following the steps from https://github.com/aiven/karapace, we serialize in avro format. Is it possible to just serialize in JSON or even skip the schema?
We're using Clickhouse DB hosted in Altinity Cloud and setting up the schema registry URL turns out tricky. Thanks!

SASL_SSL / SCRAM support

Hi,

as far as I understand SASL_SSL is not supported (and consequently SCRAM-SHA-512 is not supported too) any plans for this or eventually a hint where should I start to hack?

add registry client api for both services running

When both registry and rest apis are running under a single process, rest should be able to speak to registry without going below L7.
implementing this by adhering to the existing registry client API is desirable

Keyboard interrupt is not terminating the process

Traceback (most recent call last):
  File "/home/hack/work/karapace-venv-3.6/lib64/python3.6/site-packages/kafka/conn.py", line 1536, in dns_lookup
    socket.SOCK_STREAM)))
  File "/usr/lib64/python3.6/socket.py", line 745, in getaddrinfo
    for res in _socket.getaddrinfo(host, port, family, type, proto, flags):
socket.gaierror: [Errno -2] Name or service not known

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/hack/work/karapace/karapace/kafka_rest_apis/__init__.py", line 302, in init_admin_client
    client_factory=KarapaceKafkaClient,
  File "/home/hack/work/karapace/karapace/kafka_rest_apis/admin.py", line 16, in __init__
    super().__init__(**configs)
  File "/home/hack/work/karapace-venv-3.6/lib64/python3.6/site-packages/kafka/admin/client.py", line 218, in __init__
    self._client.check_version(timeout=(self.config['api_version_auto_timeout_ms'] / 1000))
  File "/home/hack/work/karapace-venv-3.6/lib64/python3.6/site-packages/kafka/client_async.py", line 899, in check_version
    self._maybe_connect(try_node)
  File "/home/hack/work/karapace-venv-3.6/lib64/python3.6/site-packages/kafka/client_async.py", line 390, in _maybe_connect
    conn.connect()
  File "/home/hack/work/karapace-venv-3.6/lib64/python3.6/site-packages/kafka/conn.py", line 367, in connect
    next_lookup = self._next_afi_sockaddr()
  File "/home/hack/work/karapace-venv-3.6/lib64/python3.6/site-packages/kafka/conn.py", line 326, in _next_afi_sockaddr
    if not self._dns_lookup():
  File "/home/hack/work/karapace-venv-3.6/lib64/python3.6/site-packages/kafka/conn.py", line 317, in _dns_lookup
    self._gai = dns_lookup(self.host, self.port, self.afi)
  File "/home/hack/work/karapace-venv-3.6/lib64/python3.6/site-packages/kafka/conn.py", line 1536, in dns_lookup
    socket.SOCK_STREAM)))
KeyboardInterrupt

This can be experienced when the keyboard interrupt is sent during initialization. For example because of a misconfigured advertised listener #247 (comment)

Implement schema-registry support

Either in the underlying python client, or as an add-on in karapace through inheritance / composition / etc.
Study the most extensible / least time consuming solution .

KafkaConfigurationError: Unrecognized configs: {'kafka_client'}

I am trying to package karapace in Docker for use in the NAIS platform. I have used the work being done for #136 , because I need to base my image on our own baseimages.

I have managed to create an image with all dependencies as far as I can tell, but when I start the container, it fails with a KafkaConfigurationError. I'm not sure if this is a failure in my packaging, or an actual error in the current master branch?

The log:

Karapace            	MainThread	INFO    	Karapace initialized
KarapaceRest        	MainThread	ERROR   	Unable to start admin client, retrying
Traceback (most recent call last):
  File "/app/.local/lib/python3.9/site-packages/karapace/kafka_rest_apis/__init__.py", line 302, in init_admin_client
    self.admin_client = KafkaRestAdminClient(
  File "/app/.local/lib/python3.9/site-packages/karapace/kafka_rest_apis/admin.py", line 15, in __init__
    super().__init__(**configs)
  File "/app/.local/lib/python3.9/site-packages/kafka/admin/client.py", line 195, in __init__
    raise KafkaConfigurationError("Unrecognized configs: {}".format(extra_configs))
kafka.errors.KafkaConfigurationError: KafkaConfigurationError: Unrecognized configs: {'kafka_client'}
^CTraceback (most recent call last):
  File "/app/.local/lib/python3.9/site-packages/karapace/kafka_rest_apis/__init__.py", line 302, in init_admin_client
    self.admin_client = KafkaRestAdminClient(
  File "/app/.local/lib/python3.9/site-packages/karapace/kafka_rest_apis/admin.py", line 15, in __init__
    super().__init__(**configs)
  File "/app/.local/lib/python3.9/site-packages/kafka/admin/client.py", line 195, in __init__
    raise KafkaConfigurationError("Unrecognized configs: {}".format(extra_configs))
kafka.errors.KafkaConfigurationError: KafkaConfigurationError: Unrecognized configs: {'kafka_client'}

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/app/.local/bin/karapace", line 8, in <module>
    sys.exit(main())
  File "/app/.local/lib/python3.9/site-packages/karapace/karapace_all.py", line 55, in main
    kc = KafkaRest(config_file_path=config_file_path, config=config)
  File "/app/.local/lib/python3.9/site-packages/karapace/kafka_rest_apis/__init__.py", line 43, in __init__
    self._init(config=config)
  File "/app/.local/lib/python3.9/site-packages/karapace/kafka_rest_apis/__init__.py", line 57, in _init
    self.init_admin_client()
  File "/app/.local/lib/python3.9/site-packages/karapace/kafka_rest_apis/__init__.py", line 316, in init_admin_client
    time.sleep(1)
KeyboardInterrupt
asyncio             	MainThread	ERROR   	Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x7f7875bb3d90>
asyncio             	MainThread	ERROR   	Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x7f7875983ac0>

I'm running with this configuration:

{
  "bootstrap_uri": "redacted.aivencloud.com:26484",
  "client_id": "a09a129aa418",
  "group_id": "a09a129aa418",
  "registry_host": "redacted.aivencloud.com",
  "registry_port": "26487",
  "host": "",
  "log_level": "INFO",
  "security_protocol": "SSL",
  "ssl_cafile": "/var/run/config/nais/ca.pem",
  "ssl_certfile": "/var/run/config/nais/service.cert",
  "ssl_keyfile": "/var/run/config/nais/service.key",
  "karapace_rest": true,
  "karapace_registry": false
}

Add observability

What is currently missing?

Observability into the performance and operation of Karapace.

How could this be improved?

It would be very useful to add some observability, like number of requests, number of messages produced/consumed, schemas etc.

For my particular use case, it would be great to have a prometheus endpoint with these metrics.

Is this a feature you would work on yourself?

Probably not at this time, but if my backlog clears up and nobody has done anything here, I might take a look at it.

Would welcome some design guidance beforehand.

Registry container is failing on startup via docker-compose

I am trying to start karapace via docker-compose file. I built image and when docker-compose is launching components, then I am getting following error:

registry_1  | Traceback (most recent call last):
registry_1  |   File "/usr/lib64/python3.7/runpy.py", line 193, in _run_module_as_main
registry_1  |     "__main__", mod_spec)
registry_1  |   File "/usr/lib64/python3.7/runpy.py", line 85, in _run_code
registry_1  |     exec(code, run_globals)
registry_1  |   File "/app/karapace/schema_registry_apis.py", line 1, in <module>
registry_1  |     from karapace import version as karapace_version
registry_1  | ImportError: cannot import name 'version' from 'karapace' (/app/karapace/__init__.py)

Then container_registry_1 exits with code 0

Kafka and Zookeeper starts and operates successfully.

What could be causing this issue?

Trying to install, getting Exception: version not available from git or from file '/root/karapace-master/karapace/version.py'

Hi,

I have downloaded the zip file from this GitHub and following instructions to install. I need to create a schema registry for my Kafka topic.

(airflow_virtualenv) [root@rhes75 karapace-master]# python3 setup.py bdist_egg
Traceback (most recent call last):
  File "setup.py", line 16, in <module>
    version_for_setup_py = version.get_project_version("karapace/version.py")
  File "/root/karapace-master/version.py", line 43, in get_project_version
    raise Exception("version not available from git or from file {!r}".format(version_file))
Exception: version not available from git or from file '/root/karapace-master/karapace/version.py'

Kindly advise a workaround.

Thanks,

Mich

test_subscription leaves the rest api coordinator with unclean state

After running test_subscription the rest api will regularly log:

karapace-rest_1 | kafka.coordinator sub_group-heartbeat WARNING Heartbeat failed for group sub_group because it is rebalancing

The test is executed successful but the logging will persist, this is a bit confusing if a test executed afterwards fails, because at first sight it is unclear if the warning is serious or not. Fixing this would make debugging failures a tad-bit easier.

Edit: This in causing flakiness when the services are started outside of the fixtures, the fix was to generate a new consumer group id for every run.

Handle schema references

I ran into an error using https://github.com/riferrei/srclient which is a Go client for Kafka with Confluent Schema Registry support. That (since v5.5 I believe) has support for schema references, however Karapace sees this as an unknown field and gives an error:

422 Unprocessable Entity: Unrecognized field: references

Looking at what the client is sending, it's an empty [] field named references at the same level as schema and schemaType.

What I expected: Karapace should ignore/tolerate this field, even if it doesn't have support for schema references quite yet.

Compatibility check for union types doesn't work

Having a schema like this in the registry

{
  "type": "record",
  "name": "order",
  "namespace": "example",
  "fields": [
    {
      "name": "someField",
      "type": [
        "null",
        {
          "type": "record",
          "name": "someEmbeddedRecord",
          "namespace": "example",
          "fields": [
            {
              "name": "name",
              "type": "string"
            }
          ]
        }
      ],
      "default": null
    }
  ]
}

and trying to update to this

{
  "type": "record",
  "name": "order",
  "namespace": "example",
  "fields": [
    {
      "name": "someField",
      "type": [
        "null",
        {
          "type": "record",
          "name": "someEmbeddedRecord",
          "namespace": "example",
          "fields": [
            {
              "name": "name",
              "type": "string"
            },
            {
              "name": "price",
              "type": "int"
            }
          ]
        }
      ],
      "default": null
    }
  ]
}

Karapace rest call for compatibility claims, that it is compatible.
For full_transitive only this version should be allowed:

            {
              "name": "price",
              "type": [
                "null",
                "int"
              ],
              "default": null
            }

Curl call to register the schema:

curl --location --request POST 'localhost:9081/subjects/test/versions' \
--header 'Accept: application/vnd.schemaregistry.v1+json' \
--header 'Content-Type: application/vnd.schemaregistry.v1+json' \
--data-raw '{"schema":"{\"type\":\"record\",\"name\":\"order\",\"namespace\":\"example\",\"fields\":[{\"name\":\"someField\",\"type\":[\"null\",{\"type\":\"record\",\"name\":\"someEmbeddedRecord\",\"namespace\":\"example\",\"fields\":[{\"name\":\"name\",\"type\":\"string\"}]}],\"default\":null}]}"}'

Curl call to check compatibility:

curl --location --request POST 'localhost:9081/compatibility/subjects/test/versions/latest' \
--header 'Accept: application/vnd.schemaregistry.v1+json' \
--header 'Content-Type: application/json' \
--data-raw '{"schema":"{\"type\":\"record\",\"name\":\"order\",\"namespace\":\"example\",\"fields\":[{\"name\":\"someField\",\"type\":[\"null\",{\"type\":\"record\",\"name\":\"someEmbeddedRecord\",\"namespace\":\"example\",\"fields\":[{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"price\",\"type\":\"int\"}]}],\"default\":null}]}"}'

Karapace claims:

{
    "is_compatible": true
}

safely handle compatibility update calls

Currently updating a compatibility setting, on either a subject or global level, will send the message to the kafka topic without forwarding it to the master, or, for that matter, checking to see if the instance is in a running state. This can lead to a timeout when a new message arrives, but the offset is not written in the queue because the cluster is not ready yet. As a fix we should either adopt the same logic used for schema messages or ignore the node state when processing config type messages

Evolving schemas not working properly

Hey!

I can't get the following schema to evolve with Karapace but it works as expected with Schema Registry:

Initial schema:

{
  "name": "bar",
  "namespace": "foo",
  "type": "record",
  "fields": [
    {
      "name": "foobar",
      "type": [
        {
          "type": "array",
          "name": "foobar_items",
          "items": {
            "type": "record",
            "name": "foobar_fields",
            "fields": [
              {
                "name": "foo",
                "type": "string"
              }
            ]
          }
        }
      ]
    }
  ]
}

Initial registration:

〉 http http://localhost:8082/subjects/test/versions schema=@schemas/foo.bar.events-value.avsc 
HTTP/1.1 200 OK
Access-Control-Allow-Headers: Authorization, Content-Type
Access-Control-Allow-Methods: DELETE, GET, OPTIONS, POST, PUT
Access-Control-Allow-Origin: *
Content-Length: 8
Content-Type: application/json
Date: Tue, 12 May 2020 15:17:55 GMT
Server: Karapace/1.1.0-42-gfd4a04c
access-control-expose-headers: etag
etag: "848ec8509f030dd4e57263de2ccbb6f5"

{
    "id": 1
}

Evolved schema:

{
  "name": "bar",
  "namespace": "foo",
  "type": "record",
  "fields": [
    {
      "name": "foobar",
      "type": [
        {
          "type": "array",
          "name": "foobar_items",
          "items": {
            "type": "record",
            "name": "foobar_fields",
            "fields": [
              {
                "name": "foo",
                "type": "string"
              },
              {
                "name": "bar",
                "type": [
                  "null",
                  "string"
                ],
                "default": null
              }
            ]
          }
        }
      ]
    }
  ]
}

And this is the response I get when trying to register it again:

〉 http http://localhost:8082/subjects/test/versions schema=@schemas/foo.bar.events-value.avsc
HTTP/1.1 409 Conflict
Access-Control-Allow-Headers: Authorization, Content-Type
Access-Control-Allow-Methods: DELETE, GET, OPTIONS, POST, PUT
Access-Control-Allow-Origin: *
Content-Length: 93
Content-Type: application/json
Date: Tue, 12 May 2020 15:19:49 GMT
Server: Karapace/1.1.0-42-gfd4a04c

{
    "error_code": 409,
    "message": "Schema being registered is incompatible with an earlier schema"
}

It seems the issue only appears when using a record inside an array.

Karapace config used:

{
    "advertised_hostname": "localhost",
    "bootstrap_uri": "kafka-cp-kafka-headless.kafka.svc.cluster.local:9092",
    "client_id": "sr-1",
    "compatibility": "BACKWARD",
    "group_id": "karapace",
    "host": "127.0.0.1",
    "log_level": "INFO",
    "port": 8082,
    "master_eligibility": true,
    "replication_factor": 1,
    "security_protocol": "PLAINTEXT",
    "ssl_cafile": null,
    "ssl_certfile": null,
    "ssl_keyfile": null,
    "topic_name": "_schemas2"
}

And I have verified that we are actually using the BACKWARD compatibility level:

〉 http http://localhost:8082/config 
HTTP/1.1 200 OK
Access-Control-Allow-Headers: Authorization, Content-Type
Access-Control-Allow-Methods: DELETE, GET, OPTIONS, POST, PUT
Access-Control-Allow-Origin: *
Content-Length: 33
Content-Type: application/vnd.schemaregistry.v1+json
Date: Tue, 12 May 2020 15:22:32 GMT
Server: Karapace/1.1.0-42-gfd4a04c
access-control-expose-headers: etag
etag: "8e0ae0809e433605a5df7927a1b207ac"

{
    "compatibilityLevel": "BACKWARD"
}

Karapace version: Karapace/1.1.0-42-gfd4a04c

We also have the same issues with your managed version of Karapace and it's blocking us at the moment from using Karapace in production.

Do you know what's going on?

Performance improvements for rest endpoints

Try any combination of the below in an effort to improve performance , as measured by RAM usage and response time. Should any of these offer only net benefits, incorporate them into the code base. Leave some useful papertrail in either case (what and how was tried, how bad it performed). Due to the nature of the V2 api endpoints , most , if not all , focus will be on the produce and consume endpoints

  • Cache schema str to id mappings on the producer code
  • Commit to either the sync or the async functionality and refactor accordingly
  • Add metrics throughout the load tested endpoint and figure out the bottlenecks
  • Switch avro with fastavro
  • Related to the second item on the list, but as a low hanging fruit, one can just switch the producer with an async producer for the purpose of load testing the publish endpoints
  • Use a single extended consumer instead of 1 per consumer group
  • Have a producer pool

Record Headers on produce message

What is currently missing?

I can't manage record headers to add metadatas when I produce a message.

How could this be improved?

Can you change the contract on produce endpoint to have a key/value map on a record for these headers ?

Is this a feature you would work on yourself?

[ ] I plan to open a pull request for this feature

Got an unexpected keyword argument 'loop' starting Karapace

Hello,

I'm trying to test Karapace but I got the following error:

karapace config_file
Karapace                MainThread      INFO            Karapace initialized
Exception ignored in: <function ClientSession.__del__ at 0x7fd02144b0e0>
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/aiohttp-4.0.0a1-py3.7-linux-x86_64.egg/aiohttp/client.py", line 269, in __del__
    if not self.closed:
  File "/usr/local/lib/python3.7/site-packages/aiohttp-4.0.0a1-py3.7-linux-x86_64.egg/aiohttp/client.py", line 894, in closed
    return self._connector is None or self._connector.closed
AttributeError: _connector
Traceback (most recent call last):
  File "/usr/local/bin/karapace", line 12, in <module>
    load_entry_point('karapace==2.0.1.dev97', 'console_scripts', 'karapace')()
  File "/usr/local/lib/python3.7/site-packages/karapace-2.0.1.dev97-py3.7.egg/karapace/karapace_all.py", line 56, in main
    kc = KarapaceAll(arg.config_file)
  File "/usr/local/lib/python3.7/site-packages/karapace-2.0.1.dev97-py3.7.egg/karapace/karapace_all.py", line 17, in __init__
    KafkaRest._init(self, config)
  File "/usr/local/lib/python3.7/site-packages/karapace-2.0.1.dev97-py3.7.egg/karapace/kafka_rest_apis/__init__.py", line 44, in _init
    self.serializer = SchemaRegistrySerializer(config_path=config_path)
  File "/usr/local/lib/python3.7/site-packages/karapace-2.0.1.dev97-py3.7.egg/karapace/serialization.py", line 122, in __init__
    registry_client = SchemaRegistryClient(registry_url)
  File "/usr/local/lib/python3.7/site-packages/karapace-2.0.1.dev97-py3.7.egg/karapace/serialization.py", line 71, in __init__
    self.client = Client(server_uri=schema_registry_url, client=aiohttp.ClientSession(loop=loop))
TypeError: __init__() got an unexpected keyword argument 'loop'

OS: CentOS Linux release 7.7.1908 (Core)
Python Version: Python 3.7.9 (default, Jan 4 2021, 17:06:19)

Config File options:

{
    "advertised_hostname": "localhost",
    "bootstrap_uri": "ip-node-1:9092,ip-node-2:9092,ip-node-3:9092",
    "client_id": "kp-1",
    "compatibility": "FULL",
    "group_id": "schema-registry",
    "host": "",
    "log_level": "DEBUG",
    "port": 8081,
    "master_eligibility": true,
    "replication_factor": 1,
    "security_protocol": "PLAINTEXT",
    "ssl_cafile": null,
    "ssl_certfile": null,
    "ssl_keyfile": null,
    "karapace_rest": true,
    "karapace_registry": true,
    "topic_name": "_schemas",
    "session_timeout_ms": 10000
}

Can someone give me a tip on where I can investigate?

Thanks!

handle os signals (gracefully)

at the moment we do not really do anything on SIGHUP or SIGTERM, and only react to SIGKILL.
Investigate feasibility of implementing logic for config reload and graceful shutdown
The former is iffy but the latter should be pretty straight forward. Entry point would be karapace/karapace_all.py

Refactor unit tests

There's a lot of repetition in the unit tests, especially when it comes to registry / rest fixtures. In most cases, those particular objects are not needed, but rather a http client using the built in client fixture.

Try to have an async fixture for those clients only, hiding the registry / rest fixtures from most tests.
One that is done, have said fixture be aware of remote test env vars and update most check_* tests to be actual tests so they can be easily ran separately

Retry metadata requests in the admin client

Sending metadata requests with a seldomly client will cause the request to fail de to stale connections. Either retrying failed requests a couple of times before giving up OR actually having the KafkaRestAdminClient update it's metadata in a background thread would both be considered acceptable solutions by me.

Add support for hard delete of schemas through the REST API

When the user sends a DELETE request with the get parameter ?permanent=true, the schema should be hard deleted by sending null as value, instead of setting the flag deleted to true. Support for handling these messages is already in place:

https://github.com/aiven/karapace/blob/61b700e92fbf459e84a57995b95a230bfb80bb26/karapace/schema_reader.py#L287

Write regression test for soft deletes bug

PR fixing the issue: #169

Previously Karapace would ignore soft deleted schemas (schemas with deleted flag set) if the schemas was not seen before. This was a problem since the _shemas topic is log.cleanup.policy=compact, and Karapace would only see the deleted schema. That meant the schema would not be available through the schemas/ids/:id which broke some use cases.

add support for jsonschema

[ ] differentiate between the 2 schema types
[ ] some basic validation rules to get started with
[ ] support for schema references
[ ] support for proper same type promotion based on restrictions

Registry container is failing again on startup via docker-compose

Currently facing following error during registry startup in docker-compose:

registry_1  | Traceback (most recent call last):
registry_1  |   File "/usr/lib64/python3.7/runpy.py", line 193, in _run_module_as_main
registry_1  |     "__main__", mod_spec)
registry_1  |   File "/usr/lib64/python3.7/runpy.py", line 85, in _run_code
registry_1  |     exec(code, run_globals)
registry_1  |   File "/app/karapace/schema_registry_apis.py", line 2, in <module>
registry_1  |     from karapace.compatibility import Compatibility, IncompatibleSchema
registry_1  |   File "/app/karapace/compatibility.py", line 7, in <module>
registry_1  |     from karapace.schema_reader import SchemaType, TypedSchema
registry_1  |   File "/app/karapace/schema_reader.py", line 15, in <module>
registry_1  |     from karapace.utils import json_encode
registry_1  |   File "/app/karapace/utils.py", line 13, in <module>
registry_1  |     import karapace.rapu
registry_1  |   File "/app/karapace/rapu.py", line 10, in <module>
registry_1  |     from karapace.utils import json_encode
registry_1  | ImportError: cannot import name 'json_encode' from 'karapace.utils' (/app/karapace/utils.py)

Only Kafka and Zookeeper starts and operates fine.

I used codebase from 'master' branch.

Any thoughts on this?

Add more config options for rest producers

We now create the REST producers with a minimum of configuration . In order of importance , at least these config options need to be available in the json config file:

  • acks
  • compression_type
  • linger_ms

Implement endpoint for health check

Bugs in dependency libraries can cause the karapace APIs to become unresponsive, restarting the process is usually a valid and quick fix for the problem. To properly implement the restart a /status endpoint is necessary, this endpoint has to peform some request through the kafka client to make sure communication is working properly.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.