Git Product home page Git Product logo

atleon's Introduction

Atleon

License Main Build Workflow Maven Central Javadoc

Atleon is a lightweight reactive stream processing framework that scalably transforms data from any supported infrastructure, and allows sending that data nearly anywhere, while seamlessly maintaining at least once processing guarantees.

Atleon is based on Reactive Streams and backed by Project Reactor.

Documentation and Getting Started

Atleon documentation and instructions on how to get started are available in the Wiki.

An example message processing pipeline in Atleon looks like the following:

import io.atleon.core.AloStream;
import io.atleon.core.DefaultAloSenderResultSubscriber;
import io.atleon.kafka.AloKafkaSender;
import reactor.core.Disposable;

public class MyStream extends AloStream<MyStreamConfig> {

    @Override
    public Disposable startDisposable(MyStreamConfig config) {
        AloKafkaSender<String, String> sender = config.buildKafkaMessageSender();

        return config.buildKafkaMessageReceiver()
            .receiveAloRecords(config.getSourceTopic())
            .map(record -> config.getService().transform(record.value()))
            .filter(message -> !message.isEmpty())
            .transform(sender.sendAloValues(config.getDestinationTopic(), message -> message.substring(0, 1)))
            .resubscribeOnError(config.name())
            .doFinally(sender::close)
            .subscribeWith(new DefaultAloSenderResultSubscriber<>());
    }
}

In applications where it is possible for the stream to be self-configured (i.e. Spring), the above stream definition can be simplified to not require an instance of AloStreamConfig:

import io.atleon.core.AloStream;
import io.atleon.core.DefaultAloSenderResultSubscriber;
import io.atleon.kafka.AloKafkaSender;
import reactor.core.Disposable;

public class MyStream extends AloStream<MyStreamConfig> {
    
    private final KafkaConfigSource configSource;
    
    private final MyService service;
    
    private final String sourceTopic;
    
    private final String destinationTopic;
    
    public MyStream(KafkaConfigSource configSource, MyService service, String sourceTopic, String destinationTopic) {
        this.configSource = configSource;
        this.service = service;
        this.sourceTopic = sourceTopic;
        this.destinationTopic = destinationTopic;
    }

    @Override
    public Disposable startDisposable(MyStreamConfig config) {
        AloKafkaSender<String, String> sender = AloKafkaSender.create(configSource);

        return AloKafkaReceiver.<String, String>create(configSource)
            .receiveAloRecords(sourceTopic)
            .map(record -> service.transform(record.value()))
            .filter(message -> !message.isEmpty())
            .transform(sender.sendAloValues(destinationTopic, message -> message.substring(0, 1)))
            .resubscribeOnError(name())
            .doFinally(sender::close)
            .subscribeWith(new DefaultAloSenderResultSubscriber<>());
    }
}

The examples module contains runnable classes showing Atleon in action and intended usage.

Building

Atleon is built using Maven. Installing Maven locally is optional as you can use the Maven Wrapper:

./mvnw clean verify

Docker

Atleon makes use of Testcontainers for some unit tests. Testcontainers is based on Docker, so successfully building Atleon requires Docker to be running locally.

Contributing

Please refer to CONTRIBUTING for information on how to contribute to Atleon

Legal

This project is available under the Apache 2.0 License.

atleon's People

Contributors

sage-pierce avatar

Stargazers

Paul Schwarz avatar Ahmad R avatar andycabezas avatar  avatar Valentin Baca avatar youta avatar Jason Edwards avatar

Watchers

 avatar andycabezas avatar

atleon's Issues

AloKafkaReceiver should not publish on its own Scheduler

Describe the improvement

AloKafkaReceiver currently publishes items on an internal Scheduler. This pattern is incongruent with all the other receivers which publish from the natively emitting threads.

Rational

Scheduling behavior should be consistent between Receivers. The decision to publish on a Scheduler that is independent of the native publishing Scheduler should be a client decision.

Deduplication concurrency should default to Integer.MAX_VALUE

Describe the improvement

Default concurrency for deduplication should be Integer.MAX_VALUE

Rational

Setting the concurrency for deduplication should be a micro-optimization left to clients. It should work fine out of the box for most use cases.

Move decorator parameters to configuration arguments

Describe the improvement

Remove unnecessary information from "received" messages that is only used for decoration, and move that information to configuration properties.

Also take the opportunity to standardize AloReceiver implementation, at least between SQS and RabbitMQ

Rational

In order to implement decoration, I hastily added more data to received messages for Alo decorators. It's looking like this isn't quite the best pattern, as I'm thinking we'll need to implement decorators that are not scoped to Alo, but scoped to resources, like Receivers and Senders

Add OpenTracing integration

Describe the feature

Add Factories and Interceptors to enable end to end APM tracing via OpenTracing

Rational

I should be able to trace operations on processed items in AloStreams from reception to send. The trace context should be active during transformations.

Example Scenario

I want traces related to my stream processing to show up in my APM

Make APM tracing feasible on AloStreams

Describe the feature

Ensure that patterns and conventions are in place to allow feasibly activating APM tracing (OpenTracing, OpenTelemetry, etc.) on AloStreams

Note that this is not advocating for adding support for a particular tracing API (yet). This is just to ensure that it's feasible.

Rational

It should be feasible to allow users to activate APM tracing on AloStreams. Since Alo items are transparently passed through AloStreams, those items may also transparently hold tracing context that can be propagated downstream.

Each received Alo may be treated similar to an individual request. That item would have a tracing context associated with it. As that item is transformed, the associated context should be active during the transformations and propagated with the result of those transformations. Finally, when an Alo item is mapped to something that will be sent (i.e. in a Sender), it should be feasible to propagate the tracing context to the native infrastructure (if/when the infrastructure supports it).

Example Scenario

  • A user is building a stream that receives data from Kafka, processes it, and sends the data to another kafka topic. The user would like to see traces that begin when the items is received and terminated once the item is acknowledged. There should be logs for each transforming operation
  • A user sends a message to Kafka with tracing active. Downstream consumers may activate further tracing that "follows from" the producing context

`mapNotNull` and `mapPresent`

Describe the feature

I've run in to several cases where I need to map values in an AloFlux to possibly null or otherwise non-present results. Lacking a .mapNotNull such as the one offered by Project Reactor is an inconvenience that would be nice not to have to work around. That being said, it is not intended for the result of Alo instances to contain null data items. Therefore there is a little bit of nuance to this feature request. To map to Alo results that may have non-present values, we should map to Alo<Optional<V>> and then either a) acknowledge the original value if the result is not present; or b) emit the result of extracting the present value. This would essentially function as two .map calls where the intermediate result is Optional

Rational

I want to be able to write infinite streams that delegate to functions that may return null or Optional::empty and discard those non-present values without having to duplicate the boilerplate code that does so

Example Scenario

  1. I have a method that returns nullable values; I want to be able to call that method directly without having to hack my code to return Optional or Collection
  2. I have a method that returns Optional values; I want to be able to call that method directly without having to explicitly call .filter(Optional::isPresent) followed by .map(Optional::get)

Environment variables not parsed as properties correctly

Describe the bug

Environment variables are not parsed as properties correctly

To Reproduce

Start with an environment variable like MY_ENV_VAR. If I load environmental values prefixed with my.env, I should get back var. Instead, I get nothing back.

Expected behavior

Environment variables should be converted to properties before being loaded

Additional context

We shouldn't force clients to do unconventional things with environment variables

Enable management of Streams when used with with Spring

Describe the feature

Enable the ability for streams to be managed (started, stopped, queried for state) via external interaction

Rational

I have frequently needed the ability to stop (and usually restart) Streams in running Spring applications. Currently, this requires hand-rolling some solution that can access the streams and wiring up some externally accessible conduit through which those streams can be interacted with (i.e. a REST controller)

Using the current conventions, this actually nearly impossible. The current convention is to just wire up an AloStreamApplicationListener that holds an instance of a Stream and a compatible configuration. The Listener itself exposes no methods for externally starting and stopping the stream. Clients would have to create their own Listener that exposes the start and stop functionalities.

Example Scenario

I need to stop at least one stream in my application without taking the whole application down. I should then be able to restart the Stream. I would like to use a REST endpoint to do this.

Enable automatic binding of Configs and Streams to Spring lifecycle through annotation

Describe the feature

Allow binding of AloStreamConfig to AloStream using annotation of @AutoConfigureStream

Rational

It is currently required that AloStream implementations are bound to applicable AloStreamConfigs by constructing AloStreamApplicationListeners with each compatible tuple. It should be possible to effectively remove this step and allow annotating configs with the stream that they should be applied to.

Example Scenario

I have written an AloStreamConfig compatible with an AloStream. I should just be able to annotate my config with @AutoConfigureStream and provide the compatible AloStream, and Spring should start the stream on startup

Deprecate `AloExtendedFlux` in favor of new `GroupFlux` that is aware of its cardinality

Describe the improvement
Replace AloExtendedFlux with simplified GroupFlux that is aware of its cardinality

Rational
There is currently a type called AloExtendedFlux. The only way to organically obtain an instance of an AloExtendedFlux is to through AloFlux.groupBy. There are a few issues with this:

  1. AloExtendedFlux has non-conventionally named mapping method (.mapExtended), which makes discovery difficult
  2. There are more methods on AloExtendedFlux than is really necessary. In practice, after applying a grouping on an AloFlux, users only want to ultimately apply a .flatMap, potentially with intermediate .map operations, such that they can obtain a resulting AloFlux with which to subscribe to. This increases the likelihood of users doing unintended things
  3. AloExtendedFlux is not currently aware of its cardinality, which requires users to explicitly (and redundantly) pass a concurrency value when calling AloExtendedFlux.flatMap

The real intention behind AloExtendedFlux was to make it easier for users to handle the results of applying grouping to AloFlux. The intended usage of that resultant type is for users to do any further necessary transformation on the inner groups and then flatMap back to an AloFlux. These two functionalities would be made simpler if they both only require single arguments, which can be made possible by the resultant type being aware of its cardinality.

AloExtendedFlux should not extend FluxOperator

AloExtendedFlux extending FluxOperator was a bad choice. It makes available many methods that are really not intended to ever be used by typical Atleon clients. The same pattern from AloFlux should be used to simply wrap a Flux and allow unwrapping it for advanced use cases

BufferUnderFlowException on LoadingAvroDeserializer not handled when `read.null.on.failure` is enabled

Describe the bug

LoadingAvroDeserializer throws a BufferUnderflowException if data contains first valid byte but not enough schema ID bytes. This is expected, but should be able to just return null if read.null.on.failure is enabled

To Reproduce

Map<String, Object> configs = new HashMap<>();
configs.put(AvroSerDe.SCHEMA_REGISTRY_URL_CONFIG, "<NOT_USED>");
configs.put(LoadingAvroDeserializer.READ_NULL_ON_FAILURE_PROPERTY, true);

LoadingAvroDeserializer<Object> deserializer = new ReflectDecoderAvroDeserializer<>();
deserializer.configure(configs, false);

assertNull(deserializer.deserialize("topic", new byte[]{0, 1, 2, 3}));

Expected behavior

LoadingAvroDeserializer returns null when read.null.on.failure is enabled and Avro data that doesn't contain a valid schema ID is received.

Additional context

Returning null is a really nice feature when bad data is encountered. This particular corner case just wasn't covered

Add missing contextual invocation of third party libraries

Describe the improvement

Now that Contextual is a thing, there are certain places where it should be leveraged that we're missing. This will be necessary for certain contextual features to work, like tracing.

Rational

Third party libraries may be instrumented in ways that Alo implementations may be able to take advantage. In order for them to do so, we need to delegate to them to invoke third party libraries that may be aware of such context.

Refactor Alo propagation to differentiate between fan-in and fan-out

Describe the improvement

Add a method to Alo explicitly for fan-in operations. It can be defaulted to return the regular propagate.

Rational

When considering support for tracing, it has become apparent that we're missing a mechanism to allow "fanning in". This is applicable to microbatching and reduction use cases. In such cases, when any given Alo may have its own context, it becomes necessary to allow the implementation to appropriately "merge" those contexts. In the case of tracing, it would make sense for a new context to be created that "links" to all the fanned-in contexts.

Remove unnecessary custom Schedulers

Describe the improvement

Remove any places where we're creating custom Schedulers

Rational

Reduce complexity of the library and just use out-of-the-box Schedulers from Reactor

Upgrade Library Versions

Describe the feature

For some of the libraries we're dependent on, we're pointing at versions that are years-old. At the very least, we should upgrade to the latest Reactor library versions and resolve dependency issues from there

Rational

Keep this toolkit up to date

groupByNumberHash

Describe the feature

It would be nice to have a groupByNumberHash on AloFlux

Rational

We already have a groupByStringHash. Numbers are about as common as Strings. Should have both

Example Scenario

I have records with integral IDs. I want to group by those easily.

Rate limiting is blocking

Describe the bug

Rate limiting via AloFlux.limitPerSecond uses a blocking implementation under the hood. This is bad.

To Reproduce

Use limitPerSecond with BlockHound enabled

Expected behavior

Blocking should be isolated on its own Scheduler

Received SQS Messages should indicate their QueueUrl

Describe the improvement

ReceivedSqsMessage should have a queueUrl accessor to convey the URL of the queue that a message came from

Rational

Allow greater specificity when creating observations, i.e. with tracing and metrics

Default names for AloStreamConfigs

Describe the feature

It would be nice if StreamConfigs came with a default name() implementation.

Rational

There is an established convention where if you have a given stream, say defined as "MyStream", then there is a Config named "MyStreamConfig". There is a further convention that the name of that stream is written in Kebab case, i.e. "my-stream".

Currently, you have to manually follow this convention. This can get tedious in the early stages of writing the Stream and you're constantly changing the name. You may find you wind up with a Stream name that no longer matches (in convention) its name.

Example Scenario

I create a Stream called "MyStream" with "MyStreamConfig". I later rename it to "MyBetterStream" and "MyBetterStreamConfig". The Config's name should automatically be "my-stream" in the first scenario, then change to "my-better-stream" without me needing to explicitly change it

SQS Message deletion schedule could be non-blocking

Scheduling the deletion of SQS messages is currently blocking. I've tried to make it as performant as possible, but haven't accomplished getting rid of either a synchronized block or a busy-wait emission failure handler.

However, it should be possible to Reactor's drain-miss pattern to accomplish work-stealing that doesn't block any threads.

Make config loading strongly typed at compile time

Describe the improvement
Refactor generic Config Loading to use strongly typed compile-time signatures

Rational
ConfigLoading has several methods for loading properties based on runtime parsing. This is no very performant, as we convert everything to a string before parsing it, even though the property may already be the correct type.

This has an additional downside: Clients may populate configs with properties that are objects they want to use; However, it is impossible to ensure those objects actually make it to the calling code because they are serialized (as strings) before being re-parsed for usage.

Remove SLF4J as a compile dependency of Micrometer module

Describe the improvement

Remove SLF4J as dependency from Micrometer. Replace the one log message we generate with java util logging

Rational

In thinking about third party decoration abilities, it occurred to me that the third party modules we have should include as few transitive dependencies as possible. This will allow for easier usage by other libs and enable auto-decoration.

Remove AloMono

Describe the improvement

AloMono was originally added to support AloFlux reductions. However, this doesn't make sense, as AloFlux is meant to model an infinite stream, and as such, reductions would never complete.

Rational

Remove unnecessary code

Move RabbitMQ `Configurable` to `util`

As more Senders and Receivers are implemented, the functionality of "loading" configured Configurable resources is becoming ubiquitous. Let's move that to the util package

Alo Micrometer (Auto-)Decoration

Describe the feature

Implement ability to decorate Alo items with Micrometer metrics. Include ability to auto-decorate

Rational

It would be nice to be able to simply add a decoration and get Micrometer metrics exported from Alo streams. It would be doubly nice if all I have to do is add a dependency on io.atleon:atleon-micrometer-auto

Example Scenario

I want to monitor my stream processing application. I want it to be easy to add Micrometer metrics for throughput and processing latency

Implement base interface for all Sender Results

Describe the improvement

It would be great to have a base interface for Sender Results to build future cross-cutting functionalities on. Currently, every time there is a new Sender implementation, we have to make sure to also include a "default Alo" Subscriber for its Results. If they rather all implemented the same interface, we could just reuse the same Subscriber implementation. At present, the Sender Result just needs to indicate whether or not it is a failure.

Rational

Avoid repetitive and inconsistent reimplementation of Sender Result subscribers. Enable future implementation of cross-cutting features, like deadlettering.

AloRabbitMQReceiver should allow setting a NacknowledgerFactory instead of NackStrategy

Describe the improvement

Deprecate RabbitMQ NackStrategy and replace with NacknowledgerFactory

Rational

We allow clients fine-grained customization of nacknowledgement in SQS. This is better than what we offer for RabbitMQ. Without this, clients don't have the ability to:

  • Requeue if this is the first failure
  • Only discard if the message is over some age
  • In any other fashion, dynamically choose what to do with a message that has failed processing

All Alo Senders should have forwarding methods for sending one message at a time

Describe the feature

After implementing AloSqsSender and AloSnsSender I realize it would be congruent for all Alo Sender implementations to provide single-message sending methods that return Mono.

Rational

Make usage of Alo Senders consistently reusable

Example Scenario

I want to send a single Message to Kafka or RabbitMQ and I only have a reference to an Alo Sender. I'd like to be able to send that Message without having to wrap it as a reactive type myself

Add support for AWS SQS receiving and sending

Describe the feature

It would be nice to support SQS receiving, and though most SQS "sending" is conventionally done with SNS, It would be nice to be able to send data directly to SQS queues. This should be implementable using AWS' V2 async APIs.

Rational

AWS usage is ubiquitous, and the message queuing service of choice is SQS.

Example Scenario

I want to consume messages from an SQS queue and be able to send the results to any service or message queue in a non-blocking fashion with at-least-once gurantee.

Kafka and RabbitMQ Senders should be Closeable

Describe the feature

AloKafkaSender and AloRabbitMQSender are not Closeable, when they both cache references to underlying Clients. This is problematic if I ever want to fully dispose of a Sender, i.e. when a Stream is stopped.

Rational

If a Stream is repetitively started and stopped, it may cause a memory leak if the underlying client resources are not closed.

Example Scenario

I want to stop a stream and restart it. If I do this today, multiple senders will be created without releasing resources of ones no-longer-in-use

Standardize AloFactory loading in Receivers

Describe the improvement

Centralize the code responsible for loading decorated AloFactory implementations

Rational

The process of loading AloFactory in each of the Alo*Receiver implementations is duplicated. It would be best if they all delegated to the same code for loading an AloFactory so they don't drift over time.

Tracing decoration not appropriately linking active span

Describe the bug

After activating tracing for a DataDog-instrumented app, the spans from Atleon are not linking to the active span when the Atleon spans are created. This leads to two spans being generated for each consumed message, but no relationship between them.

To Reproduce

Activate Atleon tracing; Activate DataDog tracing. Produce a Kafka record with tracing context on it. The consumed record will have two spans.

Expected behavior

The Atleon span should "follow from" the consumption span, if there is one active.

Care should be taken, though. It is plausible that the active span is not one that is related to the record span. This is due to the asynchronous nature of publishing. The atleon span should only follow from the active span if the trace IDs are the same between the active span and the record span. Else, the Atleon span should follow from the record span.

RabbitMQ Route Initializing Utilities

Describe the feature

In any application that uses RabbitMQ, that application almost assuredly needs to have code that ensures the RabbitMQ exchanges and queues are configured and bound appropriately. It would be great if Atleon had utilities to implement this.

It would also be great if there were examples for intended usage

Rational

I'd like to be able to maximize convenience of using Atleon with RabbitMQ and would like to use it to initialize my RabbitMQ resources.

Remove unnecessary `Alo` implementations

Describe the improvement

Remove "default" implementations of Alo for Kafka, RabbitMQ, and SQS. Use ComposedAlo instead

Rational

The "default" implementations of Alo for received messages are straight up copies of ComposedAlo, just with their container types explicitly referenced. There's not much advantage to having their own types. Removing them will make it easier to refactor Alo, subsequently making implementation of DecoratedAlo easier.

Polling Stream

Describe the feature
We would like to have the ability to represent a generic polling model as a reactive stream. The end user could create their own specific implementation of a poller (including offsetting) and configure the polling interval used to access the resource.

Rational
There are occasionally situations where a user may need to poll an external resource periodically for changes and would like to be able to process these changes as events in a stream.

Example Scenario
A service keeps track of the state of certain domain entities and exposes an endpoint where you can fetch all state changes for a provided start/end. In this case we would want to be able to represent a periodic polling of that endpoint with a moving window as a bursty stream for processing.

Implement Alo Decorators for Kafka, RabbitMQ, and SQS Receivers

Describe the feature

Add decorators for Alos of Kafka ConsumerRecords, RabbitMQ Messages, and SQS messages. Thinking interface names:

  • AloKafkaConsumerRecordDecorator
  • AloRabbitMQMessageDecorator
  • AloSqsMessageDecorator

Rational

Now that we have the ability to decorate Alo, the originators of Alo emissions should allow the decoration of them with source-aware decorations.

It would be nice if decorators on the class path were loaded via ServiceLoader.

Example Scenario

  • I want to be able to add decoration to the Alo's emitted from my receiver. I should be able to add them to the config
  • I want tracing to be applied to my streams. I should just have to add the tracing dependency and the decoration will happen automatically

Remove unnecessary SendInterceptors

Describe the improvement
Remove KafkaSendInterceptor and RabbitMQSendInterceptor

Rational
I thought we would need *SendInterceptors in order to propagate tracing context, but after doing some research into idiomatic tracing, what I had in mind isn't exactly the correct approach

For tracing, the trace context should be active when instrumented objects/methods are invoked. Therefore, the responsibility is really on the invoking call sites to manage tracing context. Now that Alo is Contextual, we should take advantage of activating its context when invoking methods that pertain to Alo

Introduce Alo Decoration and apply to filtering, mapNotNull, and mapPresent

Describe the feature

Add two new types:

  • DecoratedAlo: Indicates that an Alo decorates another Alo and allows access to its delegate
  • PresentAlo: When we map to an Alo<Optional<T>> where we know the data item is present, wrap it as such in to an Alo<T>

Rational

We want to make Atleon as extensible as possible, and allowing decoration of Alo provides a great tradeoff between complexity and functionality. This will make it much easier to decorate Alo items with functionality like tracing

Example Scenario

I want to add tracing decoration to my Alo items. I should be able to create the decoration and wrap my Alo in it.

Add support for AWS SNS sending

Describe the feature

It would be great if we can support sending data to SNS topics with at-least-once processing guarantees.

Rational

AWS usage is ubiquitous, and the pub-sub service of choice is SNS.

Example Scenario

I want to consume messages from any message queue (Kafka topic, RabbitMQ queue, etc.) and send the resulting message to an SNS topic in non-blocking fashion with at-least-once processing guarantee.

SqsReceiver Poller should just be a regular Subscription

Describe the improvement

There's some unnecessary overhead of using a FluxSink in SqsReceiver::Poller, when we're effectively just implementing a Subscription. We should be able to simply transition to implementing that interface and remove the overhead caused by emitting in to the Sink.

Rational

Reduce overhead and improve efficiency

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.