Git Product home page Git Product logo

Comments (9)

chemicL avatar chemicL commented on May 27, 2024 1

@qnnn glad to hear you're learning Reactor 🚀 Hope the experience is pleasant so far.

Regarding the suggestion you made:
re 1. FAIL_ZERO_SUBSCRIBER would indicate failure, while the scenario you describe in the buffering cases means the items are buffered and succeed.
re 2. the FAIL_ZERO_SUBSCRIBER in its current meaning is exactly that as far as I can tell.
re 3. the idea is not bad per se, however it's not applicable for two reasons: 1) it would be a breaking change of behaviour; 2) I don't see what value it would provide. The idea for Sinks is essentially to decouple the producer from the consumer so both can make progress. The failure events tell the producer that it needs to worry and deal with backpressure, while the warmup phase is actually a successful scenario until the buffer fills up and requires no indication that nobody is yet receiving.

If you feel this is falling short in some scenarios and there are situations that changing this behaviour would enable new flows, please open a new issue with ideas about the migration path if there's a behaviour change proposed. Let's not mix two concerns in the same issue. Thanks in advance and good luck on your learning journey!

from reactor-core.

qnnn avatar qnnn commented on May 27, 2024 1

@chemicL I really appreciate your patient response, and it has been very helpful to me. I apologize for bringing it up in this issue. Best wishes!

from reactor-core.

qnnn avatar qnnn commented on May 27, 2024

Hi, I'm lucky to come across this issue. I'm currently learning Reactor and have also encountered this problem, which has left me confused. Additionally, I have a question related to the EmitResult code, and perhaps it can be addressed together in this issue.

Is it necessary to use a new response code to represent "warm up" and distinguish whether subscribers exist when using tryEmitNext?

Issue #2338 mentions: do we really need a separate fail code (vs reusing FAIL_OVERFLOW)

The solution related to #2338 is to introduce the FAIL_ZERO_SUBSCRIBER status code. However, triggering the FAIL_ZERO_SUBSCRIBER response code requires meeting the following two conditions:

  1. The queue cache is full or there is no queue cache
  2. There are no subscribers

Perhaps adjusting or extending the meaning of the EmitResult code can help us better understand the state of Sinks.Many when using tryEmitNext. I'm not sure if the following points are reasonable:

  1. The FAIL_ZERO_SUBSCRIBER response code only indicates that there are no subscribers and does not consider whether the queue cache is full.

  2. When the queue overflows, only the FAIL_OVERFLOW status code is triggered (or a new extended status code, such as "FAIL_OVERFLOW_WITHOUT_SUBSCRIBERS," which means that there are no subscribers when FAIL_OVERFLOW occurs).

  3. A new status code can be used to represent "warm up," as mentioned in the Javadoc of Sinks.MulticastSpec#onBackpressureBuffer:

    "Without Subscriber: warm up. Remembers up to Queues.SMALL_BUFFER_SIZE elements pushed via Sinks.Many.tryEmitNext(Object) before the first Subscriber is registered."

image-20240214215905746

When the "warm up" response code appears, it can indicate the possibility of FAIL_OVERFLOW risk.

from reactor-core.

chemicL avatar chemicL commented on May 27, 2024

@kaqqao thanks for the report! I reviewed the other Sinks implementations and do believe your expectations are backed by the fact that SinkManyUnicast and SinkManyUnicastNoBackpressure also reject emissions in case of cancellation. I'll have a look at the PR you opened as well and provide feedback.

from reactor-core.

kimec avatar kimec commented on May 27, 2024

Nice to see this discussion finally started

from reactor-core.

chemicL avatar chemicL commented on May 27, 2024

This issue is still a valid one and is open for contributions. The attempt in #3725 had some concerns and was closed due to inactivity so there's a chance to build upon that feedback.

from reactor-core.

bajibalu avatar bajibalu commented on May 27, 2024

Hi @chemicL I am also learning reactor. I am interested to work on this issue. I saw your concern in the other PR. I am not sure whether it is possible/feasible. But is it a good idea to somehow freeze the queue once the sink is canceled so that it does not accept any more items?

from reactor-core.

chemicL avatar chemicL commented on May 27, 2024

Hey, @bajibalu! Happy to hear you're learning reactor and are interested in contributing 🎉

The queues we have do not have a concept of freezing/closing. However, there are other mechanisms to prevent inserting an item. On the other hand, it's also not an issue if the item is injected as long as somthing later notices that a cancellation has happened and takes care of clearing the queue.

After looking at the code one more time I think it is the case already that an item inserted after cancellation should be removed from the queue due to the drain operation that follows. Unfortunately, I couldn't get it to work because somehow the WIP (work-in-progress) marker is left in an unclean state. Debug the below code to see what's happening:

	SinkManyEmitterProcessor<Integer> processor = new SinkManyEmitterProcessor<>(true, 1);
	processor.tryEmitNext(1);
	processor.asFlux().doOnNext(i -> System.out.println("Received " + i)).subscribe().dispose();

	processor.tryEmitNext(2);

	System.out.println(processor.queue.size());

If nothing was emitted before, then no queue is created:

	SinkManyEmitterProcessor<Integer> processor = new SinkManyEmitterProcessor<>(true, 1);

	processor.asFlux().subscribe().dispose();

	processor.tryEmitNext(1);

	System.out.println(processor.queue == null);

Solving this issue would be the first step to a comprehensive solution.

One potential issue is that in case of a emit/cancel race we wouldn't communicate the cancellation to the caller if cancellation happens after checks for cancellation but before the item is inserted into the queue. But perhaps that's not a big deal as long as something discards the item from the queue (so fixing the above nuance is necessary).

What I would love to see in a PR is:

  • unit tests validating the above scenarios provided as code samples
  • a JCStress test that validates racing cancellation with tryEmitNext
  • an allowed test outcome is an OK result for emission
  • an allowed test outcome is a CANCELLED result
  • in both cases the arbiter validates the queue to check it is indeed empty

@bajibalu in case you'd like to work on this, just leave a note here so if anyone else interested in this wants to take over we can re-assign in case you don't find the time. Thanks!

from reactor-core.

bajibalu avatar bajibalu commented on May 27, 2024

Hi @chemicL I pushed a PR to fix this issue. Please take a look at this PR and let me know if this works.

from reactor-core.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.