Comments (9)
@qnnn glad to hear you're learning Reactor 🚀 Hope the experience is pleasant so far.
Regarding the suggestion you made:
re 1. FAIL_ZERO_SUBSCRIBER
would indicate failure, while the scenario you describe in the buffering cases means the items are buffered and succeed.
re 2. the FAIL_ZERO_SUBSCRIBER
in its current meaning is exactly that as far as I can tell.
re 3. the idea is not bad per se, however it's not applicable for two reasons: 1) it would be a breaking change of behaviour; 2) I don't see what value it would provide. The idea for Sinks
is essentially to decouple the producer from the consumer so both can make progress. The failure events tell the producer that it needs to worry and deal with backpressure, while the warmup phase is actually a successful scenario until the buffer fills up and requires no indication that nobody is yet receiving.
If you feel this is falling short in some scenarios and there are situations that changing this behaviour would enable new flows, please open a new issue with ideas about the migration path if there's a behaviour change proposed. Let's not mix two concerns in the same issue. Thanks in advance and good luck on your learning journey!
from reactor-core.
@chemicL I really appreciate your patient response, and it has been very helpful to me. I apologize for bringing it up in this issue. Best wishes!
from reactor-core.
Hi, I'm lucky to come across this issue. I'm currently learning Reactor and have also encountered this problem, which has left me confused. Additionally, I have a question related to the EmitResult code, and perhaps it can be addressed together in this issue.
Is it necessary to use a new response code to represent "warm up" and distinguish whether subscribers exist when using tryEmitNext?
Issue #2338 mentions: do we really need a separate fail code (vs reusing FAIL_OVERFLOW
)
The solution related to #2338 is to introduce the FAIL_ZERO_SUBSCRIBER status code. However, triggering the FAIL_ZERO_SUBSCRIBER response code requires meeting the following two conditions:
- The queue cache is full or there is no queue cache
- There are no subscribers
Perhaps adjusting or extending the meaning of the EmitResult code can help us better understand the state of Sinks.Many when using tryEmitNext. I'm not sure if the following points are reasonable:
-
The FAIL_ZERO_SUBSCRIBER response code only indicates that there are no subscribers and does not consider whether the queue cache is full.
-
When the queue overflows, only the FAIL_OVERFLOW status code is triggered (or a new extended status code, such as "FAIL_OVERFLOW_WITHOUT_SUBSCRIBERS," which means that there are no subscribers when FAIL_OVERFLOW occurs).
-
A new status code can be used to represent "warm up," as mentioned in the Javadoc of Sinks.MulticastSpec#onBackpressureBuffer:
"Without Subscriber: warm up. Remembers up to Queues.SMALL_BUFFER_SIZE elements pushed via Sinks.Many.tryEmitNext(Object) before the first Subscriber is registered."
When the "warm up" response code appears, it can indicate the possibility of FAIL_OVERFLOW risk.
from reactor-core.
@kaqqao thanks for the report! I reviewed the other Sinks
implementations and do believe your expectations are backed by the fact that SinkManyUnicast
and SinkManyUnicastNoBackpressure
also reject emissions in case of cancellation. I'll have a look at the PR you opened as well and provide feedback.
from reactor-core.
Nice to see this discussion finally started
from reactor-core.
This issue is still a valid one and is open for contributions. The attempt in #3725 had some concerns and was closed due to inactivity so there's a chance to build upon that feedback.
from reactor-core.
Hi @chemicL I am also learning reactor. I am interested to work on this issue. I saw your concern in the other PR. I am not sure whether it is possible/feasible. But is it a good idea to somehow freeze the queue once the sink is canceled so that it does not accept any more items?
from reactor-core.
Hey, @bajibalu! Happy to hear you're learning reactor and are interested in contributing 🎉
The queues we have do not have a concept of freezing/closing. However, there are other mechanisms to prevent inserting an item. On the other hand, it's also not an issue if the item is injected as long as somthing later notices that a cancellation has happened and takes care of clearing the queue.
After looking at the code one more time I think it is the case already that an item inserted after cancellation should be removed from the queue due to the drain operation that follows. Unfortunately, I couldn't get it to work because somehow the WIP (work-in-progress) marker is left in an unclean state. Debug the below code to see what's happening:
SinkManyEmitterProcessor<Integer> processor = new SinkManyEmitterProcessor<>(true, 1);
processor.tryEmitNext(1);
processor.asFlux().doOnNext(i -> System.out.println("Received " + i)).subscribe().dispose();
processor.tryEmitNext(2);
System.out.println(processor.queue.size());
If nothing was emitted before, then no queue is created:
SinkManyEmitterProcessor<Integer> processor = new SinkManyEmitterProcessor<>(true, 1);
processor.asFlux().subscribe().dispose();
processor.tryEmitNext(1);
System.out.println(processor.queue == null);
Solving this issue would be the first step to a comprehensive solution.
One potential issue is that in case of a emit/cancel race we wouldn't communicate the cancellation to the caller if cancellation happens after checks for cancellation but before the item is inserted into the queue. But perhaps that's not a big deal as long as something discards the item from the queue (so fixing the above nuance is necessary).
What I would love to see in a PR is:
- unit tests validating the above scenarios provided as code samples
- a JCStress test that validates racing cancellation with tryEmitNext
- an allowed test outcome is an OK result for emission
- an allowed test outcome is a CANCELLED result
- in both cases the arbiter validates the queue to check it is indeed empty
@bajibalu in case you'd like to work on this, just leave a note here so if anyone else interested in this wants to take over we can re-assign in case you don't find the time. Thanks!
from reactor-core.
Hi @chemicL I pushed a PR to fix this issue. Please take a look at this PR and let me know if this works.
from reactor-core.
Related Issues (20)
- i m having an error "r.core.Exceptions","stack_trace":"java.lang.StackOverflowError: null HOT 2
- Flux share method feature HOT 2
- CompletableFuture.whenComplete is not invoked for some requests HOT 11
- BufferTimeoutSubscriber is not thread safe HOT 2
- Empty hot source hangs with 2nd late subscriber HOT 4
- Mono.share() allow a stream to be canceled HOT 5
- Flaky test - FluxBlackboxProcessorVerification HOT 6
- Flaky test - DefaultTestSubscriberTest HOT 5
- context lost when using Mono.create with threads HOT 2
- [test] Verify Initialization of Default Labels
- Too difficult to control how much Reactor buffers internally HOT 2
- Enabled Automatic Context Propagation and context propagation with lift causes ClassCastException HOT 10
- [Flaky test] FluxCreateTest.fluxCreateOnRequestMultipleThreadsSlowProducer
- BoundedElasticThreadPerTaskSchedulerTest > ensuresTasksScheduling() FAILED HOT 4
- SinksTest > OptimisticEmitFailureHandlerTest > shouldRetryOptimistically() FAILED
- Add bufferWeightedWithin operator.
- thenMany does not ignore all emissions of a concat due to incorrect optimization HOT 2
- FluxBufferWhenTest > timedOutBuffersDontLeak() FAILED
- Support Considering Individual Element Weight in Determining Buffer Boundary instead of Element Counts
- Javadoc for some versions is missing from the website HOT 3
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from reactor-core.