Comments (11)
Hey, @rohitrajwani 👋
Please have a look at the 3.5.0 release notes. Perhaps you're running into the behaviour change around Mono
becoming lazy in certain scenarios. Without any reproducer it will be difficult to tell if there's an error in the library or in your usage. Please try reproducing the problem i a minimal fashion and please provide a test case here or a link to a repository. Thanks!
from reactor-core.
Hi @chemicL
Thanks for the quick response.
While I check for the release notes to understand the behaviour change of Mono, please find the below controller class as this represents the flow I am trying to implement.
Please let me know if there is anything that can be improved in the below implementation, also providing a test case might not be feasible as this behaviour is only seen at high traffic.
@RestController
public class RouterController {
CompletableFuture<ResponseEntity<?>> finalRspCompFuture;
@RequestMapping("/**")
public CompletableFuture<ResponseEntity<?>>
handleDownStreamReq(RequestEntity<byte[]> reqEntity, @RequestHeader HttpHeaders reqHeaders) {
URI uri = null;
try {
uri = new URI(reqEntity.getUrl().toString());
}
catch (URISyntaxException ex) {
System.out.println("Exception in forming URI, " + ex.toString());
}
byte[] body = reqEntity.getBody();
finalRspCompFuture = new CompletableFuture<>();
buildAndSendRequest(uri, reqEntity, reqHeaders, body.length, body);
finalRspCompFuture = finalRspCompFuture.exceptionally(ex->{
ResponseEntity<byte[]> rsp = new ResponseEntity<>(HttpStatus.INTERNAL_SERVER_ERROR);
return rsp;
}).whenComplete((rsp, exe) -> {
System.out.println("going in the when complete");
if (exe != null) {
System.out.println("whenComplete failed " + exe.toString());
}
});
return finalRspCompFuture;
}
private Disposable buildAndSendRequest(URI uri, RequestEntity<byte[]> reqEntity, HttpHeaders reqHeaders, int bodyLen, Object body){
WebClient client = null;
WebClient.RequestHeadersSpec<?> requestHeadersSpec = null;
WebClient.RequestBodySpec requestBodySpec = client
.method(HttpMethod.resolve(reqEntity.getMethod().toString())).uri(uri)
.headers(hdrs -> hdrs.addAll(reqHeaders));
if(bodyLen > 0) {
System.out.println("Body is not Null, RequestBody: " + BodyInserters.fromValue(body));
requestHeadersSpec = requestBodySpec.body(BodyInserters.fromValue(body));
}
else {
System.out.println("Body is Null");
requestHeadersSpec = requestBodySpec;
}
return requestHeadersSpec.retrieve().toEntity(byte[].class)
.doFinally(signalType -> {
System.out.println("Finally");
})
.timeout(Duration.ofMillis(6000))
.onErrorResume(Throwable.class, throwable -> {
Mono<ResponseEntity<byte[]>> rsp = Mono.just(handleError(throwable));
return rsp;
}).doOnCancel(() -> {
System.out.println("doOnCancel");
}).publishOn(Schedulers.fromExecutor(getExecutor()))
.doFirst(() -> System.out.println("doFirst"))
.subscribe(rsp -> destCallback(rsp), throwable -> {});
}
private void destCallback(ResponseEntity<byte[]> rsp) {
System.out.println("Handling callback here");
// After response is received from the server and some additional processing here
finalRspCompFuture.complete(new ResponseEntity<>(HttpStatus.OK));
}
private ResponseEntity<byte[]> handleError(Throwable throwable) {
System.out.println("Error Occurred");
return new ResponseEntity<>(HttpStatus.INTERNAL_SERVER_ERROR);
}
private ThreadPoolTaskExecutor getExecutor() {
ThreadPoolTaskExecutor upstreamExecutor = new ThreadPoolTaskExecutor();
upstreamExecutor.setCorePoolSize(16);
upstreamExecutor.setMaxPoolSize(16);
upstreamExecutor.setQueueCapacity(5000);
upstreamExecutor.setThreadNamePrefix("relay-worker-");
upstreamExecutor.initialize();
return upstreamExecutor;
}
}
from reactor-core.
Thanks for the example. There are a lot of moving parts: a specific thread pool, CompletableFuture
, WebClient
, etc. So it is really hard to tell what's going on. But first perhaps please try to make your code idiomatic. Instead of calling CompletableFuture
you could just return the Mono
that is a result of a few manipulations (flatMap
ping, map
ping, using handle
operator). Also, note that your LambdaSubscriber
(the impl used when you provide lambdas to subscribe(...)
) is not doing anything with the Throwable
, not even logging it. Perhaps the issue lies there.
We can try to work with a minimal reproducer that is just limited to reactor-core primitives - otherwise it's really difficult to pinpoint where the problem is :(
from reactor-core.
I get your point, will try to make it more focused around reactor-core.
I had one more observation, I was narrowing down the versions to check for the working versions and this behaviour/issue is observed with version 3.4.23 and onwards.
Until 3.4.22, everything is working well.
Is there any behaviour change that got in between these two version that should be taken care specifically?
v3.4.22...v3.4.23
Thanks!
from reactor-core.
Potentially #3146 could be impacting as long as Spring Framework is wrapping the returned CompletableFuture with a Mono. I'm not certain if that's the case. Again, adding proper logging and enabling debug/trace logging would probably reveal a lot about the situations in which the problem occurs.
from reactor-core.
Actually, looking at the code in Spring Framework it does seem this might be the case.
from reactor-core.
Thanks for the further clarification.
What should be the approach then with this moving forward? Do you have any suggestions.
from reactor-core.
@rohitrajwani the same as I said before :)
Again, adding proper logging and enabling debug/trace logging would probably reveal a lot about the situations in which the problem occurs.
and
Also, note that your LambdaSubscriber (the impl used when you provide lambdas to subscribe(...)) is not doing anything with the Throwable, not even logging it. Perhaps the issue lies there.
This behaviour change is desired. It was requested by a member of the Spring Framework team. As I can imagine, previously it would have hidden some broken state. It is really vital to introduce proper logging and handle the errors and cancellations in your reactive pipeline. Once you take care of that, the problem should reveal itself.
I'll close the issue, as it doesn't seem to be a bug in reactor-core. Please feel free to comment if you find out something. Or if there's a genuine bug, we can either reopen this one or you can create a new one with a reproducer.
Best of luck with your investigation!
from reactor-core.
@chemicL
Upon further investigation, I found that the netty server is triggering the MonoCompletionStage.cancel, because the channel is marked as inactive/closed.
Just wanted to check if there can be a way to handle the future cancel gracefully,
as currently in #3146 the cancel is called without considering if the request or the future was completed.
Won't it be better if we add some additional check for this and make sure to close/complete the future gracefully instead.
If not, can you please suggest if there is a way to handle this at application level.
from reactor-core.
@rohitrajwani please reach out on StackOverflow if my previous answers are not sufficient. I recommended an approach that doesn't involve CompletableFuture
APIs. Using the Reactor API gives you all the necessary hooks for handling errors and cancellation.
from reactor-core.
As you also reported reactor/reactor-netty#3103 it feels that one thing needs to be said - despite all requests finishing with 2xx response codes, cancellation can happen on a connection/stream (HTTP2) level and that's where there is no request any more. So it means that the client has disconnected for some reason (perhaps timeout, perhaps the process was killed). You'd still like to be informed about that and perform cleanup. That's why there's no running away from cancellation. Also, we don't know whether something was successful or not - it can't be delivered. That's why CompletableFuture
is a disconnected concept in your use case that does not fit properly in your reactive pipeline as it doesn't connect the consumer with the producer and there is no clear communication. The cancellation is in fact propagated now, which is an improvement. It should not be worked-around.
from reactor-core.
Related Issues (20)
- Empty hot source hangs with 2nd late subscriber HOT 4
- Mono.share() allow a stream to be canceled HOT 5
- Flaky test - FluxBlackboxProcessorVerification HOT 6
- Flaky test - DefaultTestSubscriberTest HOT 5
- context lost when using Mono.create with threads HOT 2
- [test] Verify Initialization of Default Labels
- Too difficult to control how much Reactor buffers internally HOT 2
- Enabled Automatic Context Propagation and context propagation with lift causes ClassCastException HOT 10
- [Flaky test] FluxCreateTest.fluxCreateOnRequestMultipleThreadsSlowProducer
- BoundedElasticThreadPerTaskSchedulerTest > ensuresTasksScheduling() FAILED HOT 4
- SinksTest > OptimisticEmitFailureHandlerTest > shouldRetryOptimistically() FAILED
- Add bufferWeightedWithin operator.
- thenMany does not ignore all emissions of a concat due to incorrect optimization HOT 2
- FluxBufferWhenTest > timedOutBuffersDontLeak() FAILED
- Support Considering Individual Element Weight in Determining Buffer Boundary instead of Element Counts
- Javadoc for some versions is missing from the website HOT 3
- Flux.mergeSequential does not subscribe to last Producer in specific circumstances HOT 1
- Failing while building reactor-core version 3.4.18 using ./gradlew build (io.projectreactor:reactor-core:3.4.18) HOT 4
- Fatal exceptions not caught in onErrorDropped Hook HOT 2
- autoConnect(0) seems to be broken - late subscribers receive data
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from reactor-core.