Git Product home page Git Product logo

.github's People

Contributors

aneveu avatar bsideup avatar chemicl avatar izeye avatar olegdokuka avatar pderop avatar simonbasle avatar trevormarshall avatar violetagg avatar

Stargazers

 avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

.github's Issues

bufferUntilChanged - emit last element of a hot publisher in its own buffer after a timeout

Motivation

Hi, I am encountering a somewhat weird behavior when using bufferUntilChanged with a hot publisher.
When an element comes in, it is compared to the previous element and if the keyComparator returns false, the previous element is added to the buffer and the buffer is emitted.
A new buffer is created containing the current element and it is left hanging until a new element arrives or the publisher completes.
If no new element arrives, this new buffer is not emitted.

More specifically, I am trying to implement a session window as defined by Kafka Streams and Flink, so I am actually comparing the event times of the elements.
This window is defined based on the time between two consecutive messages.
If the time between two consecutive messages is less than the specified session gap, then the messages are considered to belong to the same session.
If the gap is larger than the session gap, the window is emitted and a new window is started.

I created a test that shows this behavior, the last element is emitted in its own buffer only when the publisher is completed.

   void testBufferUntilChanged() {
        var testPublisher = TestPublisher.<Pair<Integer, Long>>create();
        var flux = testPublisher.flux().bufferUntilChanged(Function.identity(), (pair1, pair2) -> {
            var diff = pair2.getRight() - pair1.getRight();
            System.out.println("Diff " + diff);
            return diff < 500L;
        });
        
        StepVerifier
                .create(flux)
                .then(() -> testPublisher.next(Pair.of(1, System.currentTimeMillis())))
                .thenAwait(Duration.ofMillis(400L))
                .then(() -> testPublisher.next(Pair.of(2, System.currentTimeMillis())))
                .thenAwait(Duration.ofMillis(400L))
                .then(() -> testPublisher.next(Pair.of(3, System.currentTimeMillis())))
                .thenAwait(Duration.ofMillis(600L))
                .then(() -> testPublisher.next(Pair.of(4, System.currentTimeMillis())))
                .assertNext(buffer -> assertThat(buffer.stream().map(Pair::getKey).collect(Collectors.toList()), containsInAnyOrder(1, 2, 3)))
                .thenAwait(Duration.ofMillis(1000L))
                .then(testPublisher::complete)
                .assertNext(buffer -> assertThat(buffer.stream().map(Pair::getKey).collect(Collectors.toList()), containsInAnyOrder(4)))
                .verifyComplete();
    } 

Desired solution

As an enhancement to the bufferUntilChanged methods, do you think it would be useful to create a new signature with a new Duration parameter?
It would be similar to other buffer methods that have Duration parameters.
If a new element does not arrive in the specified Duration interval, the buffer will be emitted no matter what.

Considered alternatives

As a workaround, I am thinking of adding some kind of scheduled heartbeat elements to the publisher.
I am using Flux.create, so I can emit items from a scheduled service.
But it is not ideal, I believe it will complicate the processing pipeline unnecessarily.
Thanks.

Additional context

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.