Git Product home page Git Product logo

rskafka's Introduction

RSKafka

CircleCI Crates.io Documentation License

This crate aims to be a minimal Kafka implementation for simple workloads that wish to use Kafka as a distributed write-ahead log.

It is not a general-purpose Kafka implementation, instead it is heavily optimised for simplicity, both in terms of implementation and its emergent operational characteristics. In particular, it aims to meet the needs of IOx.

This crate has:

  • No support for offset tracking, consumer groups, transactions, etc...
  • No built-in buffering, aggregation, linger timeouts, etc...
  • Independent write streams per partition

It will be a good fit for workloads that:

  • Perform offset tracking independently of Kafka
  • Read/Write reasonably sized payloads per-partition
  • Have a low number of high-throughput partitions 1

Usage

# async fn test() {
use rskafka::{
    client::{
        ClientBuilder,
        partition::{Compression, UnknownTopicHandling},
    },
    record::Record,
};
use chrono::{TimeZone, Utc};
use std::collections::BTreeMap;

// setup client
let connection = "localhost:9093".to_owned();
let client = ClientBuilder::new(vec![connection]).build().await.unwrap();

// create a topic
let topic = "my_topic";
let controller_client = client.controller_client().unwrap();
controller_client.create_topic(
    topic,
    2,      // partitions
    1,      // replication factor
    5_000,  // timeout (ms)
).await.unwrap();

// get a partition-bound client
let partition_client = client
    .partition_client(
        topic.to_owned(),
        0,  // partition
        UnknownTopicHandling::Retry,
     )
     .await
    .unwrap();

// produce some data
let record = Record {
    key: None,
    value: Some(b"hello kafka".to_vec()),
    headers: BTreeMap::from([
        ("foo".to_owned(), b"bar".to_vec()),
    ]),
    timestamp: Utc.timestamp_millis(42),
};
partition_client.produce(vec![record], Compression::default()).await.unwrap();

// consume data
let (records, high_watermark) = partition_client
    .fetch_records(
        0,  // offset
        1..1_000_000,  // min..max bytes
        1_000,  // max wait time
    )
   .await
   .unwrap();
# }

For more advanced production and consumption, see [crate::client::producer] and [crate::client::consumer].

Features

  • compression-gzip (default): Support compression and decompression of messages using gzip.
  • compression-lz4 (default): Support compression and decompression of messages using LZ4.
  • compression-snappy (default): Support compression and decompression of messages using Snappy.
  • compression-zstd (default): Support compression and decompression of messages using zstd.
  • full: Includes all stable features (compression-gzip, compression-lz4, compression-snappy, compression-zstd, transport-socks5, transport-tls).
  • transport-socks5: Allow transport via SOCKS5 proxy.
  • transport-tls: Allows TLS transport via rustls.
  • unstable-fuzzing: Exposes some internal data structures so that they can be used by our fuzzers. This is NOT a stable feature / API!

Testing

Redpanda

To run integration tests against Redpanda, run:

$ docker-compose -f docker-compose-redpanda.yml up

in one session, and then run:

$ TEST_INTEGRATION=1 TEST_BROKER_IMPL=redpanda KAFKA_CONNECT=0.0.0.0:9011 cargo test

in another session.

Apache Kafka

To run integration tests against Apache Kafka, run:

$ docker-compose -f docker-compose-kafka.yml up

in one session, and then run:

$ TEST_INTEGRATION=1 TEST_BROKER_IMPL=kafka KAFKA_CONNECT=localhost:9011 KAFKA_SASL_CONNECT=localhost:9097 cargo test

in another session. Note that Apache Kafka supports a different set of features then redpanda, so we pass other environment variables.

Using a SOCKS5 Proxy

To run the integration test via a SOCKS5 proxy, you need to set the environment variable SOCKS_PROXY. The following command requires a running proxy on the local machine.

$ KAFKA_CONNECT=0.0.0.0:9011,kafka-1:9021,redpanda-1:9021 SOCKS_PROXY=localhost:1080 cargo test --features full

The SOCKS5 proxy will automatically be started by the docker compose files. Note that KAFKA_CONNECT was extended by addresses that are reachable via the proxy.

Java Interopt

To test if RSKafka can produce/consume records to/from the official Java client, you need to have Java installed and the TEST_JAVA_INTEROPT=1 environment variable set.

Fuzzing

RSKafka offers fuzz targets for certain protocol parsing steps. To build them make sure you have cargo-fuzz installed. Select one of the following fuzzers:

  • protocol_reader: Selects an API key and API version and then reads message frames and tries to decode the response object. The message frames are read w/o the length marker for more efficient fuzzing.
  • record_batch_body_reader: Reads the inner part of a record batch (w/o the prefix that contains length and CRC) and tries to decode it. In theory this is covered by protocol_reader as well but the length fields and CRC make it hard for the fuzzer to traverse this data structure.

Then run the fuzzer with:

$ cargo +nightly fuzz run protocol_reader
...

Let it running for how long you wish or until it finds a crash:

...
Failing input:

        fuzz/artifacts/protocol_reader/crash-369f9787d35767c47431161d455aa696a71c23e3

Output of `std::fmt::Debug`:

        [0, 18, 0, 3, 0, 0, 0, 0, 71, 88, 0, 0, 0, 164, 164, 164, 164, 164, 164, 164, 164, 164, 164, 0, 0, 0, 164, 164, 164, 164, 164, 164, 164, 164, 164, 164, 164, 164, 164, 164, 164, 164, 164, 164, 164, 164, 164, 164, 164, 164, 0, 164, 164, 164, 164, 164, 164, 164, 164, 164, 164, 164, 164, 164, 164, 164, 164, 164, 164, 18, 18, 0, 164, 0, 164, 164, 164, 30, 164, 164, 0, 0, 0, 0, 63]

Reproduce with:

        cargo fuzz run protocol_reader fuzz/artifacts/protocol_reader/crash-369f9787d35767c47431161d455aa696a71c23e3

Minimize test case with:

        cargo fuzz tmin protocol_reader fuzz/artifacts/protocol_reader/crash-369f9787d35767c47431161d455aa696a71c23e3

Sadly the backtraces that you might get are not really helpful and you need a debugger to detect the exact source locations:

$ rust-lldb ./target/x86_64-unknown-linux-gnu/release/protocol_reader fuzz/artifacts/protocol_reader/crash-7b824dad6e26002e5488e8cc84ce16728222dcf5
...

(lldb) r
...
Process 177543 launched: '/home/mneumann/src/rskafka/target/x86_64-unknown-linux-gnu/release/protocol_reader' (x86_64)
INFO: Running with entropic power schedule (0xFF, 100).
INFO: Seed: 3549747846
...
==177543==ABORTING
(lldb) AddressSanitizer report breakpoint hit. Use 'thread info -s' to get extended information about the report.
Process 177543 stopped
...

(lldb) bt
* thread #1, name = 'protocol_reader', stop reason = AddressSanitizer detected: allocation-size-too-big
  * frame #0: 0x0000555556c04f20 protocol_reader`::AsanDie() at asan_rtl.cpp:45:7
    frame #1: 0x0000555556c1a33c protocol_reader`__sanitizer::Die() at sanitizer_termination.cpp:55:7
    frame #2: 0x0000555556c01471 protocol_reader`::~ScopedInErrorReport() at asan_report.cpp:190:7
    frame #3: 0x0000555556c021f4 protocol_reader`::ReportAllocationSizeTooBig() at asan_report.cpp:313:1
...

Then create a unit test and fix the bug.

For out-of-memory errors LLDB does not stop automatically. You can however set a breakpoint before starting the execution that hooks right into the place where it is about to exit:

(lldb) b fuzzer::PrintStackTrace()

Benchmarks

Install cargo-criterion, make sure you have some Kafka cluster running, and then you can run all benchmarks with:

$ TEST_INTEGRATION=1 TEST_BROKER_IMPL=kafka KAFKA_CONNECT=localhost:9011 cargo criterion --all-features

If you find a benchmark that is too slow, you can may want to profile it. Get cargo-with, and perf, then run (here for the parallel/rskafka benchmark):

$ TEST_INTEGRATION=1 TEST_BROKER_IMPL=kafka KAFKA_CONNECT=localhost:9011 cargo with 'perf record --call-graph dwarf -- {bin}' -- \
    bench --all-features --bench write_throughput -- \
    --bench --noplot parallel/rskafka

Have a look at the report:

$ perf report

License

Licensed under either of these:

Contributing

Unless you explicitly state otherwise, any contribution you intentionally submit for inclusion in the work, as defined in the Apache-2.0 license, shall be dual-licensed as above, without any additional terms or conditions.

Footnotes

  1. Kafka's design makes it hard for any client to support the converse, as ultimately each partition is an independent write stream within the broker. However, this crate makes no attempt to mitigate per-partition overheads e.g. by batching writes to multiple partitions in a single ProduceRequest

rskafka's People

Contributors

carols10cents avatar crepererum avatar dependabot[bot] avatar domodwyer avatar einarmo avatar jacobmarble avatar kodiakhq[bot] avatar pierwill avatar ramrengaswamy avatar shepmaster avatar toondaey avatar tustvold avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

rskafka's Issues

Enable TLS integration test

We already have a TLS integration test here

rskafka/tests/client.rs

Lines 73 to 75 in 3af1939

// Disabled as currently no TLS integration tests
#[ignore]
#[tokio::test]

but it is disabled because our CI doesn't set up any form of TLS. Would be nice if we could get this to work. Ideally the test would be gated by some environment variable like TLS_ENDPOINT so that developers can still run a rather simple cargo test locally.

Support throttling

Properly react to throttle_time_ms in protocol responses. Currently we ignore that field, so if a fetch request is throttled this leads to some bizarre errors:

invalid response: Expected a single topic in response, got 0

Run fuzzers regularly

Get some CI (either our own or some external infrastructure like oss-fuzz) to run our fuzzers. At the moment CI only builds the fuzzers but doesn't run them.

Convert `as` conversions to `From`/`TryFrom`

The as conversions in our protocol code our quite dangerous and I think we might have a few integer overflow bugs in there. In general I think we should replace all as conversions with From and TryFrom to make it clear if the conversion can fail or not. There is also a clippy lint as_conversions which should be added to our code base to prevent regressions.

`PartitionClient` may silently read from replica

When a cluster re-balances, the existing partition leader may become a replica, but will not actively report that to the PartitionClient. Even worse, the read lag may trigger this Redpanda quirk even under Kafka:

// Redpanda never sends OffsetOutOfRange even when it should. "Luckily" it does not support deletions so we can
// implement a simple heuristic.
if partition.high_watermark.0 < offset {
warn!(
"This message looks like Redpanda wants to report a OffsetOutOfRange but doesn't."
);
return Err(Error::ServerError(
ProtocolError::OffsetOutOfRange,
String::from("Offset out of range"),
));
}

Fuzz target tries to run when not instrumented

Steps to reproduce

  1. cargo test --workspace --bins

I have this as part of a pre-commit hook, and it always fails because of the fuzzer.

Expected behavior

Tests to pass on master.

Actual behavior

Fuzz target explodes:

$ cargo test --workspace --bins
   Compiling rskafka v0.3.0 (/Users/dom/Documents/rust/rskafka)
   Compiling rskafka-fuzz v0.0.0 (/Users/dom/Documents/rust/rskafka/fuzz)
    Finished test [unoptimized + debuginfo] target(s) in 2.09s
     Running unittests fuzz_targets/protocol_reader.rs (target/debug/deps/protocol_reader-af583cadbc4544d6)
WARNING: Failed to find function "__sanitizer_acquire_crash_state". Reason dlsym(RTLD_DEFAULT, __sanitizer_acquire_crash_state): symbol not found.
WARNING: Failed to find function "__sanitizer_print_stack_trace". Reason dlsym(RTLD_DEFAULT, __sanitizer_print_stack_trace): symbol not found.
WARNING: Failed to find function "__sanitizer_set_death_callback". Reason dlsym(RTLD_DEFAULT, __sanitizer_set_death_callback): symbol not found.
INFO: Running with entropic power schedule (0xFF, 100).
INFO: Seed: 1873570374
INFO: -max_len is not provided; libFuzzer will not generate inputs larger than 4096 bytes
INFO: A corpus is not provided, starting from an empty corpus
#2	INITED exec/s: 0 rss: 28Mb
ERROR: no interesting inputs were found. Is the code instrumented for coverage? Exiting.
error: test failed, to rerun pass '-p rskafka-fuzz --bin protocol_reader'

Code coverage

We be nice if our CI would report code coverage. This should ideally use the new LLVM-based code coverage that rustc now supports and should combine the coverage of both test runs (kafka and redpanda) together.

New release ?

Hello 👋

the current release rskafka v0.3.0 rely on zstd = "^0.10" .

which lead in my case to a conflict (Only one package in the dependency graph may specify the same links value.) with the deltalake crate (rely on zstd=0.12)

however main rely on 0.12.

is it planned to push a new rskafka release soon ?

Production message error: Client(Request(NoVersionMatch { api_key: Produce }))

Hello, I have had some problems using this crate recently, I would like to ask you to help take a look, thank you very much! Appreciate!!!

Error details log

thread 'tokio-runtime-worker' panicked at 'produce error:: Client(Request(NoVersionMatch { api_key: Produce }))', src\main.rs:110:44

Environment in

  • rskafka version: 0.5.0
  • kafka version: 0.10.2
  • cargo version: 1.70.0 (ec8a8a0ca 2023-04-25)

Execute the command

 cargo run

Source

async fn test_batch_producer() {
   let connection = "127.0.0.1:9092".to_owned();
   let client = ClientBuilder::new(vec![connection]).build().await.unwrap();
   let topic = "test";
   // produce some data
   let record = Record {
       key: Option::from(b"".to_vec()),
       value: Option::from(b"hello kafka".to_vec()),
       headers: Default::default(),
       timestamp: Default::default(),
   };

   let partition_client = Arc::new(
       client
           .partition_client(&*topic, 0, UnknownTopicHandling::Retry)
           .await
           .unwrap(),
   );
   let producer = BatchProducerBuilder::new(partition_client)
       .with_linger(Duration::from_secs(5))
       .build(RecordAggregator::new(record.approximate_size() * 2 + 1));
   producer.produce(record.clone()).await.expect("produce error: ");
}

Recreate producer aggregator instead of flushing it

In #113 @tustvold fixed some panic and cancelation safety issues in our producer pipeline. One issue however remains: what is supposed to happen when Aggregator::flush panics? At the moment we assume that we can reuse the aggregator afterwards and that the aggregator is kinda panic safe. However it would be nicer if we wouldn't need to make this assumption and instead re-create the aggregator instead of modifying it. This would also result in a somewhat clearer interface, since currently most aggregator implementations use a static config part and a "state" part that is reset on flush, see

#[derive(Debug)]
pub struct RecordAggregator {
max_batch_size: usize,
state: AggregatorState,
}

and

https://github.com/influxdata/influxdb_iox/blob/3f547c58c9b986c82a71d21ff4976e0c1b9c9a90/write_buffer/src/kafka/aggregator.rs#L236-L256

Using some constructor/factory (fn create(&self) -> Aggregator) + a throw-away aggregator (fn flush(self)) would result in a somewhat clearer interface and would likely also make the code in rskafka easier to understand.

Note that w/ user-provided code there is always some form of panic-safety involved. In the proposed solution the "aggregator factory" could panic when creating new aggregators, but I still think this would be easier to understand then a &mut self function that panics.

Test against java kafka client

We currently have a test matrix that produces and consumes data using rdkafka and rskafka in all supported compression formats:

#[tokio::test]
async fn test_produce_rdkafka_consume_rdkafka_nocompression() {
assert_produce_consume(produce_rdkafka, consume_rdkafka, Compression::NoCompression).await;
}
#[tokio::test]
async fn test_produce_rskafka_consume_rdkafka_nocompression() {
assert_produce_consume(produce_rskafka, consume_rdkafka, Compression::NoCompression).await;
}
#[tokio::test]
async fn test_produce_rdkafka_consume_rskafka_nocompression() {
assert_produce_consume(produce_rdkafka, consume_rskafka, Compression::NoCompression).await;
}
#[tokio::test]
async fn test_produce_rskafka_consume_rskafka_nocompression() {
assert_produce_consume(produce_rskafka, consume_rskafka, Compression::NoCompression).await;
}
#[cfg(feature = "compression-gzip")]
#[tokio::test]
async fn test_produce_rdkafka_consume_rdkafka_gzip() {
assert_produce_consume(produce_rdkafka, consume_rdkafka, Compression::Gzip).await;
}
#[cfg(feature = "compression-gzip")]
#[tokio::test]
async fn test_produce_rskafka_consume_rdkafka_gzip() {
assert_produce_consume(produce_rskafka, consume_rdkafka, Compression::Gzip).await;
}
#[cfg(feature = "compression-gzip")]
#[tokio::test]
async fn test_produce_rdkafka_consume_rskafka_gzip() {
assert_produce_consume(produce_rdkafka, consume_rskafka, Compression::Gzip).await;
}
#[cfg(feature = "compression-gzip")]
#[tokio::test]
async fn test_produce_rskafka_consume_rskafka_gzip() {
assert_produce_consume(produce_rskafka, consume_rskafka, Compression::Gzip).await;
}
#[cfg(feature = "compression-lz4")]
#[tokio::test]
async fn test_produce_rdkafka_consume_rdkafka_lz4() {
assert_produce_consume(produce_rdkafka, consume_rdkafka, Compression::Lz4).await;
}
#[cfg(feature = "compression-lz4")]
#[tokio::test]
async fn test_produce_rskafka_consume_rdkafka_lz4() {
assert_produce_consume(produce_rskafka, consume_rdkafka, Compression::Lz4).await;
}
#[cfg(feature = "compression-lz4")]
#[tokio::test]
async fn test_produce_rdkafka_consume_rskafka_lz4() {
assert_produce_consume(produce_rdkafka, consume_rskafka, Compression::Lz4).await;
}
#[cfg(feature = "compression-lz4")]
#[tokio::test]
async fn test_produce_rskafka_consume_rskafka_lz4() {
assert_produce_consume(produce_rskafka, consume_rskafka, Compression::Lz4).await;
}
#[cfg(feature = "compression-snappy")]
#[tokio::test]
async fn test_produce_rdkafka_consume_rdkafka_snappy() {
assert_produce_consume(produce_rdkafka, consume_rdkafka, Compression::Snappy).await;
}
#[cfg(feature = "compression-snappy")]
#[tokio::test]
async fn test_produce_rskafka_consume_rdkafka_snappy() {
assert_produce_consume(produce_rskafka, consume_rdkafka, Compression::Snappy).await;
}
#[cfg(feature = "compression-snappy")]
#[tokio::test]
async fn test_produce_rdkafka_consume_rskafka_snappy() {
assert_produce_consume(produce_rdkafka, consume_rskafka, Compression::Snappy).await;
}
#[cfg(feature = "compression-snappy")]
#[tokio::test]
async fn test_produce_rskafka_consume_rskafka_snappy() {
assert_produce_consume(produce_rskafka, consume_rskafka, Compression::Snappy).await;
}

Since record batches are mostly only processed by the clients, it would be nice to add an official java/scala client to test matrix via some bridge / CLI call, just to make sure we can work with most of the ecosystem.

Frequent connection losses

On the producer side, we see loads (~once per minute) connection losses in prod, like:

level=info msg="request encountered non-fatal error - backing off" e="Request error: Connection is poisened: Cannot read framed message: Cannot read data: early eof" request_name=produce backoff_secs=0 target="rskafka::backoff"

They are automatically retried, but it would be nice if we could prevent this from happening THAT often.

Panic: just flushed

Under certain rare circumstances it seems that the BatchProducer panics with just flushed. This is this piece of code here:

// Flush data
Self::flush_impl(&mut inner, self.client.as_ref(), self.compression).await;
extract(&result_slot.now_or_never().expect("just flushed"), tag)

Until now I've seen that only once during benchmarking and once in prod. I'm wondering if now_or_never() is too aggressive.

More helpful `OffsetOutOfRange` error

OffsetOutOfRange may happen on two sides:

  1. The offset that we are trying to read was deleted (either via the API or via the retention policy)
  2. The offset does not exist yet (or is gone for another reason, e.g. because a topic was re-created).

Both cases are reported using the very same error code, but the FETCH response should have enough information (log start, high watermark) to determine the difference.

Support for authorization methods.

Hi there,

I am attempting to use rskafka to connect to a Kafka cluster using authorization using this tutorial (for NodeJS).
Is authz supported at all? If so how do I provide such data to ClientBuilder? I cannot find any example of this in the code.

thanks in advance!

Chaos testing

Our tests currently use a rather static Kafka/Redpanda cluster and test the happy path. We should also have a test that deals with a messy cluster.

For that I think we can have the following long-running test job:

  • have cluster with multiple nodes (e.g. 3)
  • have one producer and one consumer and a continuous flow of data
  • have a chaos monkey killing nodes or re-scaling the cluster (incl. re-balancing)

This is a bit related to #49 which captures the unit-test aspects of our retry logic. Also relates to #147 which would probably have found by proper chaos testing.

proper logging

Replace our println! statements with proper structured log messages (mostly debug! and a few warn!) from tracing. Also add a helper or use an existing one so that logs are printed to stdout during tests (might be a function like these).

redpanda frame read errors with early eof

rskafka fails to read frames when talking to the latest redpanda release (v22.2.1). It is unclear if this is a redpanda bug, or rskafka - I can't see any reports in the redpanda issue tracker, and rskafka doesn't progress at all, so I suspect it is us as redpanda would quickly have issues filed if it didn't work at all!

Log snippet from the IOx tests where I discovered the problem:

2022-08-19T15:49:53.188690Z  INFO rskafka::connection: Establishing new connection url="127.0.0.1:52575"
2022-08-19T15:49:53.194054Z  WARN rskafka::connection: Failed to connect to broker e=cannot sync versions: Request error: Connection is poisoned: Cannot read framed message: Cannot read data: early eof
2022-08-19T15:49:53.194124Z  INFO rskafka::backoff: request encountered non-fatal error - backing off e=Failed to connect to any broker, backing off request_name="broker_connect" backoff_secs=0
2022-08-19T15:49:53.296358Z  INFO rskafka::connection: Establishing new connection url="127.0.0.1:52575"
2022-08-19T15:49:53.300379Z  WARN rskafka::connection: Failed to connect to broker e=cannot sync versions: Request error: Connection is poisoned: Cannot read framed message: Cannot read data: early eof
2022-08-19T15:49:53.300421Z  INFO rskafka::backoff: request encountered non-fatal error - backing off e=Failed to connect to any broker, backing off request_name="broker_connect" backoff_secs=0
2022-08-19T15:49:53.489981Z  INFO rskafka::connection: Establishing new connection url="127.0.0.1:52575"
2022-08-19T15:49:53.499777Z  WARN rskafka::connection: Failed to connect to broker e=cannot sync versions: Request error: Connection is poisoned: Cannot read framed message: Cannot read data: early eof
2022-08-19T15:49:53.499963Z  INFO rskafka::backoff: request encountered non-fatal error - backing off e=Failed to connect to any broker, backing off request_name="broker_connect" backoff_secs=0
2022-08-19T15:49:53.631317Z  INFO rskafka::connection: Establishing new connection url="127.0.0.1:52575"
2022-08-19T15:49:53.642401Z  WARN rskafka::connection: Failed to connect to broker e=cannot sync versions: Request error: Connection is poisoned: Cannot read framed message: Cannot read data: early eof
2022-08-19T15:49:53.642451Z  INFO rskafka::backoff: request encountered non-fatal error - backing off e=Failed to connect to any broker, backing off request_name="broker_connect" backoff_secs=0
2022-08-19T15:49:54.026461Z  INFO rskafka::connection: Establishing new connection url="127.0.0.1:52575"
2022-08-19T15:49:54.037168Z  WARN rskafka::connection: Failed to connect to broker e=cannot sync versions: Request error: Connection is poisoned: Cannot read framed message: Cannot read data: early eof
2022-08-19T15:49:54.037317Z  INFO rskafka::backoff: request encountered non-fatal error - backing off e=Failed to connect to any broker, backing off request_name="broker_connect" backoff_secs=0
2022-08-19T15:49:54.372955Z  INFO rskafka::connection: Establishing new connection url="127.0.0.1:52575"
2022-08-19T15:49:54.383209Z  WARN rskafka::connection: Failed to connect to broker e=cannot sync versions: Request error: Connection is poisoned: Cannot read framed message: Cannot read data: early eof
2022-08-19T15:49:54.383369Z  INFO rskafka::backoff: request encountered non-fatal error - backing off e=Failed to connect to any broker, backing off request_name="broker_connect" backoff_secs=0
2022-08-19T15:49:54.570851Z  INFO rskafka::connection: Establishing new connection url="127.0.0.1:52575"
2022-08-19T15:49:54.584100Z  WARN rskafka::connection: Failed to connect to broker e=cannot sync versions: Request error: Connection is poisoned: Cannot read framed message: Cannot read data: early eof
2022-08-19T15:49:54.584243Z  INFO rskafka::backoff: request encountered non-fatal error - backing off e=Failed to connect to any broker, backing off request_name="broker_connect" backoff_secs=0
2022-08-19T15:49:54.726602Z  INFO rskafka::connection: Establishing new connection url="127.0.0.1:52575"
2022-08-19T15:49:54.740463Z  WARN rskafka::connection: Failed to connect to broker e=cannot sync versions: Request error: Connection is poisoned: Cannot read framed message: Cannot read data: early eof
2022-08-19T15:49:54.740640Z  INFO rskafka::backoff: request encountered non-fatal error - backing off e=Failed to connect to any broker, backing off request_name="broker_connect" backoff_secs=0
2022-08-19T15:49:55.092203Z  INFO rskafka::connection: Establishing new connection url="127.0.0.1:52575"
2022-08-19T15:49:55.113559Z  WARN rskafka::connection: Failed to connect to broker e=cannot sync versions: Request error: Connection is poisoned: Cannot read framed message: Cannot read data: early eof
2022-08-19T15:49:55.113719Z  INFO rskafka::backoff: request encountered non-fatal error - backing off e=Failed to connect to any broker, backing off request_name="broker_connect" backoff_secs=0
2022-08-19T15:49:55.822818Z  INFO rskafka::connection: Establishing new connection url="127.0.0.1:52575"
2022-08-19T15:49:55.836358Z  WARN rskafka::connection: Failed to connect to broker e=cannot sync versions: Request error: Connection is poisoned: Cannot read framed message: Cannot read data: early eof
2022-08-19T15:49:55.836520Z  INFO rskafka::backoff: request encountered non-fatal error - backing off e=Failed to connect to any broker, backing off request_name="broker_connect" backoff_secs=0
2022-08-19T15:49:56.760850Z  INFO rskafka::connection: Establishing new connection url="127.0.0.1:52575"
2022-08-19T15:49:56.770216Z  WARN rskafka::connection: Failed to connect to broker e=cannot sync versions: Request error: Connection is poisoned: Cannot read framed message: Cannot read data: early eof
2022-08-19T15:49:56.770301Z  INFO rskafka::backoff: request encountered non-fatal error - backing off e=Failed to connect to any broker, backing off request_name="broker_connect" backoff_secs=2

See #168.

Metadata from controller

For certain read actions like getting the partition leader or getting a topic list we currently use an arbitrary broker. This is technically allowed and will eventually lead to the correct results (at least as far as my understanding goes). However this creates some delays, esp. when acting on a topic that was just created by the controller and our arbitrary broker is not the same.

We might instead use the controller connection in more places to reduce this delay.

I’m implementing pure rust client let’s merge our efforts

Great job!

I was frustrated using the C based lib with binding so I decided to write a pure rust tokio v1 based general purpose Kafka client.
The client supports Kafka 0.11 and above.

The client is still no complete yet but I’ve made some progress.

Let’s contribute and merge our efforts.

Enable SOCKS5 integration test

Add some SOCKS5 proxy to our CI and enable

rskafka/tests/client.rs

Lines 118 to 122 in 3af1939

// Disabled as currently no SOCKS5 integration tests
#[cfg(feature = "transport-socks5")]
#[ignore]
#[tokio::test]
async fn test_socks5() {

This should be gated by some environment variable like SOCKS5_ENDPOINT so that developers don't need to have a SOCKS5 proxy running locally if they don't work on that particular feature.

Cache Invalidation Race

Currently in various places we cache connections, and invalidate them on error. This is currently racy, and may end up invalidating a connection different from the one that error-ed, if something else concurrently invalidated the cache and then fetched a new connection.

This is unlikely to be a problem in practice, but the fix is simple - pass the BrokerConnection to the invalidate call and only invalidate if Arc::ptr_eq is true

Protocol fuzzing

Write a fuzzer for our binary read protocol. This should randomly select an ApiKey and ApiVersion and provide a byte vector from which we'll try to read a response header followed by the response body. This must not panic.

I'm pretty sure this is going to panic btw. (I have seen this happen during fiddling with the protocol) because while we ensure that the message size is capped, we don't have any size constraints when reading variable-sized collections (like strings and arrays) from the message stream. E.g. this is going to panic for large arrays/strings sizes even though the message size is small:

let mut res = Vec::with_capacity(len);

let mut buf = vec![0; len as usize];

We should probably cap our allocations.

Topic deletion support?

It would be great if ControllerClient also supported topic deletion, for example, for cases where rskafka is used as part of an integration test harness.

Ignore `InvalidReplicationFactor`

From our test bench:

2022-01-25T14:20:09.541956Z ERROR rskafka::client::partition: request encountered fatal error e=Server error InvalidReplicationFactor with message "error getting metadata for topic "myorg_mybucket"" request_name="fetch_records"

According to the protocol docs this happens when:

Replication factor is below 1 or larger than the number of available brokers.

This can happen when you set a low replication factor (like 1) and the cluster is in flux (e.g. because Strimzi decided to rollover the whole cluster).

i dont want import time::OffsetDateTime

let record = Record {
            key: None,
            value: Some(b"hello kafka".to_vec()),
            headers: BTreeMap::from([]),
            timestamp: OffsetDateTime::now_utc(),
        };

i am already import chrono
i dont want import time
😃

Connection is poisoned: Cannot read framed message: Cannot read data: early eof

Thank you for the great crate!

I got the following error. Can I fix this error? I would appreciate it if you could share it with me if you have a good idea,

Errors:

Failed to connect to broker","e":"cannot sync versions: Request error: Connection is poisoned: Cannot read framed message: Cannot read data: early eof

rskafka version: 0.5.0
Apache Kafka version: 2.6.2

I use Amazon MSK and have three brokers.

source code:

use rskafka::{
    client::{ClientBuilder, SaslConfig},
};

let client =
    ClientBuilder::new(connection)
        .sasl_config(SaslConfig::Plain {
            username: sasl_username,
            password: sasl_password,
        });;

debug!("start building client");
let client = client.build().await?; // This line causes errors.

Next release of rskafka

Just wondering things need to be done for the next release to be published and when it could be expected?

Proper "ignore test depending on env" handling

Abstract

We want to skip tests if certain environment variables are NOT set (e.g. SOCKS_PROXY) so that developers do not need to provide a full-blown integration test infrastructure if they only work on a subset of the features.

ACs

  • tests are skipped when environment variables are NOT set
  • tests are skipped at runtime, NOT at compile time
  • skipped tests are marked as "ignored"
  • tests skipping requires little code (ideally a single line in form of a decorator, a function call, a macro call)

Current Situation

We currently use macros like this:

/// Get the Socks Proxy environment variable.
///
/// If `SOCKS_PROXY` is not set, fail the tests and provide
/// guidance for setting `SOCKS_PROXY`.
#[macro_export]
macro_rules! maybe_skip_SOCKS_PROXY {
() => {{
use std::env;
dotenv::dotenv().ok();
match (env::var("SOCKS_PROXY").ok()) {
Some(proxy) => proxy,
_ => {
eprintln!("skipping integration tests with Proxy - set SOCKS_PROXY to run");
return;
}
}
}};
}

This fulfills all ACs except for skipped tests are marked as "ignored".

References

Could async be removed from these functions?

There are functions that do not contain async code. Currently, I need to use futures::executor::block_on to execute these two functions.

  • rskafka/src/client/mod.rs

    Lines 113 to 115 in bd61405

    pub async fn controller_client(&self) -> Result<ControllerClient> {
    Ok(ControllerClient::new(Arc::clone(&self.brokers)))
    }
  • impl ControllerClient {
    pub(super) fn new(brokers: Arc<BrokerConnector>) -> Self {
    Self {
    brokers,
    backoff_config: Default::default(),
    current_broker: Mutex::new(None),
    }
    }
  • rskafka/src/client/mod.rs

    Lines 118 to 128 in bd61405

    pub async fn partition_client(
    &self,
    topic: impl Into<String> + Send,
    partition: i32,
    ) -> Result<PartitionClient> {
    Ok(PartitionClient::new(
    topic.into(),
    partition,
    Arc::clone(&self.brokers),
    ))
    }

Could async be removed for these two or is there a reason it is used?

Should there be a maximum retry/backoff amount?

It looks to me that everywhere there's a loop that uses Backoff such as in request_metadata, the loop could continue forever if it continues to get non-fatal errors.

Should it? I would expect operations to eventually time out, but that's my general expectation, I'm not sure what Kafka clients are expected to do.

I see that Backoff has a max_backoff duration, but that seems to be the maximum time any one retry will wait, not the total time to continue retrying.

In the librdkafka configuration properties list, I see settings like socket.timeout.ms and message.timeout.ms that sound like they might be overall timeouts in various situations but I'm not very confident I'm understanding that right...

Detect an unavailable connection

I'm trying to create an situation where the user doesn't have a user doesn't have a kafka instance or provides the wrong broker connection using the code in the sample test code in the readme. However each time I run the code, it get's stuck on this line let client = ClientBuilder::new(vec![connection]).build().await.unwrap();. I'm not sure if this has something to do with the backoff configuration with the BrokerConnector::new method.
I know this may seem a silly question but I'm still trying to figure out my way around rust too so I'd really appreciate some assistance with this.

Test Retry Logic

Currently there is no real testing of the retry logic, in particular leader failover, connection errors, etc... This should be addressed

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.