Git Product home page Git Product logo

pulsar-client-reactive's Introduction

Reactive client for Apache Pulsar

Reactive client for Apache Pulsar which is compatible with the Reactive Streams specification. This uses Project Reactor as the Reactive Streams implementation.

Getting it

This library requires Java 8 or + to run.

With Gradle:

dependencies {
    implementation "org.apache.pulsar:pulsar-client-reactive-adapter:0.5.6"
}

With Maven:

<dependencies>
    <dependency>
        <groupId>org.apache.pulsar</groupId>
        <artifactId>pulsar-client-reactive-adapter</artifactId>
        <version>0.5.6</version>
    </dependency>
</dependencies>

Usage

Initializing the library

In standalone application

Using an existing PulsarClient instance:

ReactivePulsarClient reactivePulsarClient = AdaptedReactivePulsarClientFactory.create(pulsarClient);

Sending messages

ReactiveMessageSender<String> messageSender = reactivePulsarClient
        .messageSender(Schema.STRING)
        .topic(topicName)
        .maxInflight(100)
        .build();
Mono<MessageId> messageId = messageSender
        .sendOne(MessageSpec.of("Hello world!"));
// for demonstration
messageId.subscribe(System.out::println);

Sending messages with cached producer

By default, a ConcurrentHashMap based cache is used. It’s recommended to use a more advanced cache based on Caffeine. The cache will get used as the default implementation when it is on the classpath.

Adding Caffeine based producer cache with Gradle:

dependencies {
    implementation "org.apache.pulsar:pulsar-client-reactive-adapter:0.5.6"
    implementation "org.apache.pulsar:pulsar-client-reactive-producer-cache-caffeine:0.5.6"
}

Adding Caffeine based producer cache with Maven:

<dependencies>
    <dependency>
        <groupId>org.apache.pulsar</groupId>
        <artifactId>pulsar-client-reactive-adapter</artifactId>
        <version>0.5.6</version>
    </dependency>
    <dependency>
        <groupId>org.apache.pulsar</groupId>
        <artifactId>pulsar-client-reactive-producer-cache-caffeine</artifactId>
        <version>0.5.6</version>
    </dependency>
</dependencies>

Usage example of cache

ReactiveMessageSender<String> messageSender = reactivePulsarClient
        .messageSender(Schema.STRING)
        .cache(AdaptedReactivePulsarClientFactory.createCache())
        .topic(topicName)
        .maxInflight(100)
        .build();
Mono<MessageId> messageId = messageSender
        .sendOne(MessageSpec.of("Hello world!"));
// for demonstration
messageId.subscribe(System.out::println);

It is recommended to use a cached producer in most cases. The cache enables reusing the Pulsar Producer instance and related resources across multiple message sending calls. This improves performance since a producer won’t have to be created and closed before and after sending a message.

The adapter library implementation together with the cache implementation will also enable reactive backpressure for sending messages. The maxInflight setting will limit the number of messages that are pending from the client to the broker. The solution will limit reactive streams subscription requests to keep the number of pending messages under the defined limit. This limit is per-topic and impacts the local JVM only.

Shaded version of Caffeine

A version of the provider is available that shades it usage of Caffeine. This is useful in scenarios where there is another version of Caffeine required in your application or if you do not want Caffeine on the classpath.

Adding shaded Caffeine based producer cache with Gradle:

dependencies {
    implementation "org.apache.pulsar:pulsar-client-reactive-adapter:0.5.6"
    implementation "org.apache.pulsar:pulsar-client-reactive-producer-cache-caffeine-shaded:0.5.6"
}

Adding shaded Caffeine based producer cache with Maven:

<dependencies>
    <dependency>
        <groupId>org.apache.pulsar</groupId>
        <artifactId>pulsar-client-reactive-adapter</artifactId>
        <version>0.5.6</version>
    </dependency>
    <dependency>
        <groupId>org.apache.pulsar</groupId>
        <artifactId>pulsar-client-reactive-producer-cache-caffeine-shaded</artifactId>
        <version>0.5.6</version>
    </dependency>
</dependencies>

Reading messages

Reading all messages for a topic:

    ReactiveMessageReader<String> messageReader =
            reactivePulsarClient.messageReader(Schema.STRING)
                    .topic(topicName)
                    .build();
    messageReader.readMany()
            .map(Message::getValue)
            // for demonstration
            .subscribe(System.out::println);

By default, the stream will complete when the tail of the topic is reached.

Example: poll for up to 5 new messages and stop polling when a timeout occurs

With .endOfStreamAction(EndOfStreamAction.POLL) the Reader will poll for new messages when the reader reaches the end of the topic.

    ReactiveMessageReader<String> messageReader =
            reactivePulsarClient.messageReader(Schema.STRING)
                    .topic(topicName)
                    .startAtSpec(StartAtSpec.ofLatest())
                    .endOfStreamAction(EndOfStreamAction.POLL)
                    .build();
    messageReader.readMany()
            .take(Duration.ofSeconds(5))
            .take(5)
            // for demonstration
            .subscribe(System.out::println);

Consuming messages

    ReactiveMessageConsumer<String> messageConsumer=
        reactivePulsarClient.messageConsumer(Schema.STRING)
        .topic(topicName)
        .subscriptionName("sub")
        .build();
    messageConsumer.consumeMany(messageFlux ->
                    messageFlux.map(message ->
                            MessageResult.acknowledge(message.getMessageId(), message.getValue())))
        .take(Duration.ofSeconds(2))
        // for demonstration
        .subscribe(System.out::println);

Consuming messages using a message handler component with auto-acknowledgements

ReactiveMessagePipeline reactiveMessagePipeline =
    reactivePulsarClient
        .messageConsumer(Schema.STRING)
        .subscriptionName("sub")
        .topic(topicName)
        .build()
        .messagePipeline()
        .messageHandler(message -> Mono.fromRunnable(()->{
            System.out.println(message.getValue());
        }))
        .build()
        .start();
// for demonstration
// the reactive message handler is running in the background, delay for 10 seconds
Thread.sleep(10000L);
// now stop the message handler component
reactiveMessagePipeline.stop();

License

Reactive client for Apache Pulsar is Open Source Software released under the Apache Software License 2.0.

How to Contribute

The library is Apache 2.0 licensed.

Contributions are welcome. Please discuss larger changes on the Apache Pulsar dev mailing list. There’s a contributing guide with more details.

Bugs and Feature Requests

If you detect a bug or have a feature request or a good idea for Reactive client for Apache Pulsar, please open a GitHub issue.

Questions

Please use [reactive-pulsar] tag on Stackoverflow. Ask a question now.

pulsar-client-reactive's People

Contributors

cbornet avatar lhotari avatar nicoloboschi avatar onobc avatar tisonkun avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

pulsar-client-reactive's Issues

Add Javadoc to configuration options in builder and spec interfaces / classes

As mentioned in #3, the configuration model in Reactive Java client is different from the ordinary Java client.

It is necessary to have clear Javadoc on all configuration options. There are multiple ways to configure the same option so it should be decided where to document the option and use links to refer to the primary place of documentation.

The proposal is to primarily document the options in the builder interface methods since that is the primary way for using the configuration api in code. The "spec" classes could contain the copy pasted title for the option and a link to the builder interface method for setting the option. The builder could contain more extensive explanations.

Create test plan with plan for unit tests and more

This library is in initial phase and the main focus has been in finding a model for the Reactive API for Apache Pulsar.

There are only a few end-to-end tests at the moment.
To add tests, it makes sense to first do a high level test plan and describe the test strategy.

Each "unit" should be listed in the plan and the main use cases to test would be listed too. The units aren't necessary single classes. Unit tests should execute quickly.

InflightLimiterTest.shouldSpreadRequestsEvenlyAcrossUpstream is flaky

  repeatShouldNotSubscribeMoreThanMaxInflightForMonos()

    Test shouldSpreadRequestsEvenlyAcrossUpstream() FAILED

    java.lang.AssertionError: 
    Expecting actual:
      [1, 2, 101]
    to contain exactly (and in same order):
      [1, 101, 201]
    but some elements were not found:
      [201]
    and others were not expected:
      [2]
        at org.apache.pulsar.reactive.client.internal.api.InflightLimiterTest.shouldSpreadRequestsEvenlyAcrossUpstream(InflightLimiterTest.java:183)

Improve ReactiveMessageSender.sendMany so that it's possible to associate the input MessageSpec to output MessageId

Currently there's a challenge in using Flux<MessageId> sendMany(Publisher<MessageSpec<T>> messageSpecs); since there's no way to associate the input MessageSpec instance to the MessageId that it produces.

One possibility would be that it's possible to pass a key with the message spec and this key would be returned together with the MessageId.

Perhaps adding an additional method to support the key would be useful: <K> Flux<Tuple2<K, MessageId>> sendMany(Publisher<Tuple2<K, MessageSpec<T>>> messageSpecs);

Add module for configuring consumers, senders (producer) and readers

The configuration model in Apache Pulsar Java client is clumbersome. It supports a properties based configuration, but there isn't consistent support for all configuration options.

The configuration model in Reactive Java client is different. The builder approach is used, but there is a configuration object referred to as "spec" that is intended to hold the configuration value state. The reason for this is that it is more convenient to support different ways of configuration the various options in externalized configuration.
There could be modules to provide good integration with Spring Boot's configuration model and also implement ways for providing the meta data / schema for the different configuration options.

The scope of this issue is to add a Jackson based configuration module that can read json and possibly other formats that jackson supports (flat properties, yaml, etc.) and map that to the "spec" class instances which are used to configure options for consumers, senders (producer) and readers.

Investigate why concurrency seems to go over limits in ReactiveMessagePipelineTest.concurrency()

In this test

@Test
void concurrency() throws Exception {
int numMessages = 1000;
TestConsumer testConsumer = new TestConsumer(numMessages);
// Test that non-concurrent fails to process all messages in time
InflightCounter inflightCounterNoConcurrency = new InflightCounter();
CountDownLatch latch1 = new CountDownLatch(numMessages);
Function<Message<String>, Publisher<Void>> messageHandler = (message) -> Mono.delay(Duration.ofMillis(100))
.transform(inflightCounterNoConcurrency::transform).then().doFinally((__) -> latch1.countDown());
try (ReactiveMessagePipeline pipeline = testConsumer.messagePipeline().messageHandler(messageHandler).build()) {
pipeline.start();
assertThat(latch1.await(150, TimeUnit.MILLISECONDS)).isFalse();
}
assertThat(inflightCounterNoConcurrency.getMax()).isEqualTo(1);
// Test that concurrent succeeds to process all messages in time
InflightCounter inflightCounterConcurrency = new InflightCounter();
CountDownLatch latch2 = new CountDownLatch(numMessages);
Function<Message<String>, Publisher<Void>> messageHandler2 = (message) -> Mono.delay(Duration.ofMillis(100))
.transform(inflightCounterConcurrency::transform).then().doFinally((__) -> latch2.countDown());
try (ReactiveMessagePipeline pipeline = testConsumer.messagePipeline().messageHandler(messageHandler2)
.concurrency(1000).build()) {
pipeline.start();
assertThat(latch2.await(1, TimeUnit.SECONDS)).isTrue();
}
assertThat(inflightCounterConcurrency.getMax()).isGreaterThan(100);
}

if the line 259 is made isEqualTo(1000), the test will fail and the result is greater than 1000.

Split InflightLimiter to separate backpressure module

InflightLimiter is a generic Reactive operator for limiting the number of concurrent elements in progress. It is used in the current adapter based implementation to achieve asynchronous non-blocking backpressure. The InflightLimiter implementation depends on JCTools library which provides a high performance queue implementation that is used in the solution.
It would be useful to move InflightLimiter to an independent module so that it could be used on its own. Contributing InflightLimiter to resilience4j-reactor would be useful too.

Document ReactiveMessageSenderBuilder#maxInflight details

Related to https://github.com/apache/pulsar-client-reactive/pull/28/files#r1033276692

The way the backpressure solution works is that InflightLimiter will never create more demand with ReactiveStreams subscription requests than the configured limit.

This is a bit hard to understand in practice because of the way how ReactiveStreams model works. It's a pull-push model where the "pull" is created with subscription requests. The backpressure is handled by limiting the amount of upstream requests. Subscribing to upstream publishers that are creating messages to send will also be postponed when limits are exceeded.

Adding unit tests and documentation to the underlying InflightLimiter would clarify the details and ensure that there aren't uncovered gaps in the solution.

Add documentation to use with other Reactive Stream implementations

See #10 (comment)

One possible solution to the problem is to use Project Reactor's .as method (<P> P as(Function<? super Flux<T>, P> transformer)) to convert a Flux or Mono to another type.
For example, converting to RxJava3, would be as simple as .as(Flowable::from) or .as(Single::from).
Similar chaining with .as method could be done for Akka Streams and any other Reactive Streams implementation.

We should add doc for this in a tips section of the README

Accept Reactive Streams Publisher as input

Adapt similar strategy that R2DBC uses, described at https://docs.spring.io/spring-data/r2dbc/docs/current/reference/html/#get-started:first-steps:reactive-api .

As a general rule, a Spring Data R2DBC repository accepts a plain Publisher as input, adapts it to a Reactor type internally, uses that, and returns either a Mono or a Flux as output. So, you can pass any Publisher as input and apply operations on the output, but you need to adapt the output for use with another reactive library.

Design API for Reactive client exceptions and possible translation

The current adapter implementation wraps the Java client and throws the errors what the Java client throws.

Since the goal is to design a stable API which could be implemented with a native Reactive client later, it is necessary to document and standardize on the error handling and exception model which could be used both in the current adapter approach and later with the native client.

ReactiveMessagePipelineTest.handlingTimeout is flaky

https://github.com/apache/pulsar-client-reactive/actions/runs/3641666977/jobs/6147871691#step:4:565

  Test handlingTimeout() FAILED

  java.lang.AssertionError: 
  Expected size: 9 but was: 8 in:
  [123456:4128:-1,
      123456:4129:-1,
      123456:4130:-1,
      123456:4131:-1,
      123456:4133:-1,
      123456:4134:-1,
      123456:4135:-1,
      123456:4136:-1]
      at org.apache.pulsar.reactive.client.api.ReactiveMessagePipelineTest.handlingTimeout(ReactiveMessagePipelineTest.java:208)


    [2022-12-07 18:20:31,485] [parallel-2] [org.apache.pulsar.reactive.client.internal.api.DefaultReactiveMessagePipeline] ERROR Message handling for message id 123456:4127:-1 failed.
    java.util.concurrent.TimeoutException: Did not observe any item or terminal signal within 5ms in 'source(MonoDefer)' (and no fallback has been configured)
    	at reactor.core.publisher.FluxTimeout$TimeoutMainSubscriber.handleTimeout(FluxTimeout.java:295) ~[reactor-core-3.4.22.jar:3.4.22]
    	at reactor.core.publisher.FluxTimeout$TimeoutMainSubscriber.doTimeout(FluxTimeout.java:280) ~[reactor-core-3.4.22.jar:3.4.22]
    	at reactor.core.publisher.FluxTimeout$TimeoutTimeoutSubscriber.onNext(FluxTimeout.java:419) ~[reactor-core-3.4.22.jar:3.4.22]
    	at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:79) ~[reactor-core-3.4.22.jar:3.4.22]
    	at reactor.core.publisher.MonoDelay$MonoDelayRunnable.propagateDelay(MonoDelay.java:271) ~[reactor-core-3.4.22.jar:3.4.22]
    	at reactor.core.publisher.MonoDelay$MonoDelayRunnable.run(MonoDelay.java:286) ~[reactor-core-3.4.22.jar:3.4.22]
    	at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:68) ~[reactor-core-3.4.22.jar:3.4.22]
    	at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:28) ~[reactor-core-3.4.22.jar:3.4.22]
    	at java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[?:?]
    	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) ~[?:?]
    	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) ~[?:?]
    	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ~[?:?]
    	at java.lang.Thread.run(Thread.java:829) ~[?:?]
    [2022-12-07 18:20:31,500] [parallel-2] [org.apache.pulsar.reactive.client.internal.api.DefaultReactiveMessagePipeline] ERROR Message handling for message id 123456:4132:-1 failed.
    java.util.concurrent.TimeoutException: Did not observe any item or terminal signal within 5ms in 'source(MonoDefer)' (and no fallback has been configured)
    	at reactor.core.publisher.FluxTimeout$TimeoutMainSubscriber.handleTimeout(FluxTimeout.java:295) ~[reactor-core-3.4.22.jar:3.4.22]
    	at reactor.core.publisher.FluxTimeout$TimeoutMainSubscriber.doTimeout(FluxTimeout.java:280) ~[reactor-core-3.4.22.jar:3.4.22]
    	at reactor.core.publisher.FluxTimeout$TimeoutTimeoutSubscriber.onNext(FluxTimeout.java:419) ~[reactor-core-3.4.22.jar:3.4.22]
    	at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:79) ~[reactor-core-3.4.22.jar:3.4.22]
    	at reactor.core.publisher.MonoDelay$MonoDelayRunnable.propagateDelay(MonoDelay.java:271) ~[reactor-core-3.4.22.jar:3.4.22]
    	at reactor.core.publisher.MonoDelay$MonoDelayRunnable.run(MonoDelay.java:286) ~[reactor-core-3.4.22.jar:3.4.22]
    	at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:68) ~[reactor-core-3.4.22.jar:3.4.22]
    	at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:28) ~[reactor-core-3.4.22.jar:3.4.22]
    	at java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[?:?]
    	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) ~[?:?]
    	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) ~[?:?]
    	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ~[?:?]
    	at java.lang.Thread.run(Thread.java:829) ~[?:?]
    [2022-12-07 18:20:31,509] [parallel-1] [org.apache.pulsar.reactive.client.internal.api.DefaultReactiveMessagePipeline] ERROR ReactiveMessageHandler was unexpectedly completed.

Add support for Pulsar Transactions

There's currently no support for Pulsar transactions in the Reactive Java client for Apache Pulsar. This is a gap. We should ensure that we have a good way to extend the interfaces for transactions.

Project Reactor has support for passing a context along the Reactive subscription chain. This is used in Spring to pass transaction context in Reactive flows.
Reactive Transactions with Spring were introduced in this blog post: https://spring.io/blog/2019/05/16/reactive-transactions-with-spring .

Update to Project Reactor 3.6.0

The client is using 3.5.4 of the current GA version line 3.5.x. The next GA line (3.6.x) is coming soon w/ an already available 3.6.0-M1.

Let's prepare update to 3.6.0 when it goes GA.

Version ETA
3.6.0-M1 now
3.6.0-M2 2023/08/18
3.6.0-M3 2023/09/12
3.6.0-RC1 2023/10/10
3.6.0 ~2023/11/14

Update Reactor to 3.6.3

Reactor 3.6.3 releases on 2024-02-13, update once available.

As part of this, also run ./gradlew versionCatalogUpdate to update any relevant various dependencies.

Consider providing Reactive Streams library implementation specific interfaces

In Micronaut, there's a good example of this.

HttpClient
https://docs.micronaut.io/latest/api/io/micronaut/http/client/HttpClient.html

ReactorHttpClient extends HttpClient:
https://micronaut-projects.github.io/micronaut-reactor/latest/api/io/micronaut/reactor/http/client/ReactorHttpClient.html

With pulsar-client-reactive, the challenge is about the nested interface layers and how to make that seamlessly work.

One possibility would be to have an interface method that adapts the current instance to some other Reactive Streams implementation library type.

One option would be that it's completely decoupled from the Reactive Streams library implementation type

public interface ReactiveMessageSender<T> {

	Publisher<MessageId> sendMessages(Publisher<MessageSpec<T>> messageSpecs);

        <R> R adapt(Class<R> adaptedSenderType);
}
public interface ReactorMessageSender<T> {

	Mono<MessageId> sendMessage(Mono<MessageSpec<T>> messageSpec);

	Flux<MessageId> sendMessages(Flux<MessageSpec<T>> messageSpecs);

}

Internally there would be some SPI (Service Provider Interface) which Reactive Streams library support modules could implement. This way it would be easy to add support for various Reactive Streams implementation libraries where the types of the implementation library would be directly supported.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.