Git Product home page Git Product logo

rskafka's Introduction

RSKafka

CircleCI Crates.io Documentation License

This crate aims to be a minimal Kafka implementation for simple workloads that wish to use Kafka as a distributed write-ahead log.

It is not a general-purpose Kafka implementation, instead it is heavily optimised for simplicity, both in terms of implementation and its emergent operational characteristics. In particular, it aims to meet the needs of IOx.

This crate has:

  • No support for offset tracking, consumer groups, transactions, etc...
  • No built-in buffering, aggregation, linger timeouts, etc...
  • Independent write streams per partition

It will be a good fit for workloads that:

  • Perform offset tracking independently of Kafka
  • Read/Write reasonably sized payloads per-partition
  • Have a low number of high-throughput partitions 1

Usage

# async fn test() {
use rskafka::{
    client::{
        ClientBuilder,
        partition::Compression,
    },
    record::Record,
};
use time::OffsetDateTime;
use std::collections::BTreeMap;

// setup client
let connection = "localhost:9093".to_owned();
let client = ClientBuilder::new(vec![connection]).build().await.unwrap();

// create a topic
let topic = "my_topic";
let controller_client = client.controller_client().await.unwrap();
controller_client.create_topic(
    topic,
    2,      // partitions
    1,      // replication factor
    5_000,  // timeout (ms)
).await.unwrap();

// get a partition-bound client
let partition_client = client
    .partition_client(
        topic.to_owned(),
        0,  // partition
     )
    .await
    .unwrap();

// produce some data
let record = Record {
    key: None,
    value: Some(b"hello kafka".to_vec()),
    headers: BTreeMap::from([
        ("foo".to_owned(), b"bar".to_vec()),
    ]),
    timestamp: OffsetDateTime::now_utc(),
};
partition_client.produce(vec![record], Compression::default()).await.unwrap();

// consume data
let (records, high_watermark) = partition_client
    .fetch_records(
        0,  // offset
        1..1_000_000,  // min..max bytes
        1_000,  // max wait time
    )
   .await
   .unwrap();
# }

For more advanced production and consumption, see [crate::client::producer] and [crate::client::consumer].

Features

  • compression-gzip (default): Support compression and decompression of messages using gzip.
  • compression-lz4 (default): Support compression and decompression of messages using LZ4.
  • compression-snappy (default): Support compression and decompression of messages using Snappy.
  • compression-zstd (default): Support compression and decompression of messages using zstd.
  • full: Includes all stable features (compression-gzip, compression-lz4, compression-snappy, compression-zstd, transport-socks5, transport-tls).
  • transport-socks5: Allow transport via SOCKS5 proxy.
  • transport-tls: Allows TLS transport via rustls.
  • unstable-fuzzing: Exposes some internal data structures so that they can be used by our fuzzers. This is NOT a stable feature / API!

Testing

Redpanda

To run integration tests against Redpanda, run:

$ docker-compose -f docker-compose-redpanda.yml up

in one session, and then run:

$ TEST_INTEGRATION=1 KAFKA_CONNECT=0.0.0.0:9093 cargo test

in another session.

Apache Kafka

To run integration tests against Apache Kafka, run:

$ docker-compose -f docker-compose-kafka.yml up

in one session, and then run:

$ TEST_INTEGRATION=1 TEST_DELETE_RECORDS=1 KAFKA_CONNECT=localhost:9094 cargo test

in another session. Note that Apache Kafka supports a different set of features then redpanda, so we pass other environment variables.

Fuzzing

RSKafka offers fuzz targets for certain protocol parsing steps. To build them make sure you have cargo-fuzz installed. Select one of the following fuzzers:

  • protocol_reader: Selects an API key and API version and then reads message frames and tries to decode the response object. The message frames are read w/o the length marker for more efficient fuzzing.
  • record_batch_body_reader: Reads the inner part of a record batch (w/o the prefix that contains length and CRC) and tries to decode it. In theory this is covered by protocol_reader as well but the length fields and CRC make it hard for the fuzzer to traverse this data structure.

Then run the fuzzer with:

$ cargo +nightly fuzz run protocol_reader
...

Let it running for how long you wish or until it finds a crash:

...
Failing input:

        fuzz/artifacts/protocol_reader/crash-369f9787d35767c47431161d455aa696a71c23e3

Output of `std::fmt::Debug`:

        [0, 18, 0, 3, 0, 0, 0, 0, 71, 88, 0, 0, 0, 164, 164, 164, 164, 164, 164, 164, 164, 164, 164, 0, 0, 0, 164, 164, 164, 164, 164, 164, 164, 164, 164, 164, 164, 164, 164, 164, 164, 164, 164, 164, 164, 164, 164, 164, 164, 164, 0, 164, 164, 164, 164, 164, 164, 164, 164, 164, 164, 164, 164, 164, 164, 164, 164, 164, 164, 18, 18, 0, 164, 0, 164, 164, 164, 30, 164, 164, 0, 0, 0, 0, 63]

Reproduce with:

        cargo fuzz run protocol_reader fuzz/artifacts/protocol_reader/crash-369f9787d35767c47431161d455aa696a71c23e3

Minimize test case with:

        cargo fuzz tmin protocol_reader fuzz/artifacts/protocol_reader/crash-369f9787d35767c47431161d455aa696a71c23e3

Sadly the backtraces that you might get are not really helpful and you need a debugger to detect the exact source locations:

$ rust-lldb ./target/x86_64-unknown-linux-gnu/release/protocol_reader fuzz/artifacts/protocol_reader/crash-7b824dad6e26002e5488e8cc84ce16728222dcf5
...

(lldb) r
...
Process 177543 launched: '/home/mneumann/src/rskafka/target/x86_64-unknown-linux-gnu/release/protocol_reader' (x86_64)
INFO: Running with entropic power schedule (0xFF, 100).
INFO: Seed: 3549747846
...
==177543==ABORTING
(lldb) AddressSanitizer report breakpoint hit. Use 'thread info -s' to get extended information about the report.
Process 177543 stopped
...

(lldb) bt
* thread #1, name = 'protocol_reader', stop reason = AddressSanitizer detected: allocation-size-too-big
  * frame #0: 0x0000555556c04f20 protocol_reader`::AsanDie() at asan_rtl.cpp:45:7
    frame #1: 0x0000555556c1a33c protocol_reader`__sanitizer::Die() at sanitizer_termination.cpp:55:7
    frame #2: 0x0000555556c01471 protocol_reader`::~ScopedInErrorReport() at asan_report.cpp:190:7
    frame #3: 0x0000555556c021f4 protocol_reader`::ReportAllocationSizeTooBig() at asan_report.cpp:313:1
...

Then create a unit test and fix the bug.

For out-of-memory errors LLDB does not stop automatically. You can however set a breakpoint before starting the execution that hooks right into the place where it is about to exit:

(lldb) b fuzzer::PrintStackTrace()

Benchmarks

Install cargo-criterion, make sure you have some Kafka cluster running, and then you can run all benchmarks with:

$ TEST_INTEGRATION=1 KAFKA_CONNECT=localhost:9093 cargo criterion --all-features

If you find a benchmark that is too slow, you can may want to profile it. Get cargo-with, and perf, then run (here for the parallel/rskafka benchmark):

$ TEST_INTEGRATION=1 KAFKA_CONNECT=localhost:9093 cargo with 'perf record --call-graph dwarf -- {bin}' -- \
    bench --all-features --bench write_throughput -- \
    --bench --noplot parallel/rskafka

Have a look at the report:

$ perf report

License

Licensed under either of these:

Contributing

Unless you explicitly state otherwise, any contribution you intentionally submit for inclusion in the work, as defined in the Apache-2.0 license, shall be dual-licensed as above, without any additional terms or conditions.

Footnotes

  1. Kafka's design makes it hard for any client to support the converse, as ultimately each partition is an independent write stream within the broker. However, this crate makes no attempt to mitigate per-partition overheads e.g. by batching writes to multiple partitions in a single ProduceRequest โ†ฉ

rskafka's People

Contributors

carols10cents avatar crepererum avatar dependabot[bot] avatar domodwyer avatar kodiakhq[bot] avatar pierwill avatar shepmaster avatar tustvold avatar

Watchers

 avatar

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.