Git Product home page Git Product logo

Comments (11)

chemicL avatar chemicL commented on May 23, 2024

Hey, @rohitrajwani 👋
Please have a look at the 3.5.0 release notes. Perhaps you're running into the behaviour change around Mono becoming lazy in certain scenarios. Without any reproducer it will be difficult to tell if there's an error in the library or in your usage. Please try reproducing the problem i a minimal fashion and please provide a test case here or a link to a repository. Thanks!

from reactor-core.

rohitrajwani avatar rohitrajwani commented on May 23, 2024

Hi @chemicL
Thanks for the quick response.

While I check for the release notes to understand the behaviour change of Mono, please find the below controller class as this represents the flow I am trying to implement.

Please let me know if there is anything that can be improved in the below implementation, also providing a test case might not be feasible as this behaviour is only seen at high traffic.

@RestController
public class RouterController {

    CompletableFuture<ResponseEntity<?>> finalRspCompFuture;

    @RequestMapping("/**")
    public CompletableFuture<ResponseEntity<?>>
    handleDownStreamReq(RequestEntity<byte[]> reqEntity, @RequestHeader HttpHeaders reqHeaders) {

        URI uri = null;
        try {
            uri = new URI(reqEntity.getUrl().toString());
        }
        catch (URISyntaxException ex) {
            System.out.println("Exception in forming URI, " + ex.toString());
        }
        byte[] body = reqEntity.getBody();

        finalRspCompFuture = new CompletableFuture<>();

        buildAndSendRequest(uri, reqEntity, reqHeaders, body.length, body);

        finalRspCompFuture = finalRspCompFuture.exceptionally(ex->{

            ResponseEntity<byte[]> rsp = new ResponseEntity<>(HttpStatus.INTERNAL_SERVER_ERROR);

            return rsp;
        }).whenComplete((rsp, exe) -> {
            System.out.println("going in the when complete");
            if (exe != null) {
                System.out.println("whenComplete failed " + exe.toString());
            }
        });

        return finalRspCompFuture;
    }

    private Disposable buildAndSendRequest(URI uri, RequestEntity<byte[]> reqEntity, HttpHeaders reqHeaders, int bodyLen, Object body){
        WebClient client = null;
        WebClient.RequestHeadersSpec<?> requestHeadersSpec = null;
        WebClient.RequestBodySpec requestBodySpec  = client
                .method(HttpMethod.resolve(reqEntity.getMethod().toString())).uri(uri)
                .headers(hdrs -> hdrs.addAll(reqHeaders));

        if(bodyLen > 0) {
            System.out.println("Body is not Null, RequestBody: " + BodyInserters.fromValue(body));
            requestHeadersSpec = requestBodySpec.body(BodyInserters.fromValue(body));
        }
        else {
            System.out.println("Body is Null");
            requestHeadersSpec = requestBodySpec;
        }

        return requestHeadersSpec.retrieve().toEntity(byte[].class)
                .doFinally(signalType -> {
                    System.out.println("Finally");
                })
                .timeout(Duration.ofMillis(6000))
                .onErrorResume(Throwable.class, throwable -> {
                    Mono<ResponseEntity<byte[]>> rsp = Mono.just(handleError(throwable));
                    return rsp;
                }).doOnCancel(() -> {
                    System.out.println("doOnCancel");
                }).publishOn(Schedulers.fromExecutor(getExecutor()))
                .doFirst(() -> System.out.println("doFirst"))
                .subscribe(rsp -> destCallback(rsp), throwable -> {});
    }

    private void destCallback(ResponseEntity<byte[]> rsp) {
        System.out.println("Handling callback here");
        // After response is received from the server and some additional processing here
        finalRspCompFuture.complete(new ResponseEntity<>(HttpStatus.OK));
    }

    private ResponseEntity<byte[]> handleError(Throwable throwable) {
        System.out.println("Error Occurred");
        return new ResponseEntity<>(HttpStatus.INTERNAL_SERVER_ERROR);
    }

    private ThreadPoolTaskExecutor getExecutor() {
        ThreadPoolTaskExecutor upstreamExecutor = new ThreadPoolTaskExecutor();
        upstreamExecutor.setCorePoolSize(16);
        upstreamExecutor.setMaxPoolSize(16);
        upstreamExecutor.setQueueCapacity(5000);
        upstreamExecutor.setThreadNamePrefix("relay-worker-");
        upstreamExecutor.initialize();

        return upstreamExecutor;
    }

}

from reactor-core.

chemicL avatar chemicL commented on May 23, 2024

Thanks for the example. There are a lot of moving parts: a specific thread pool, CompletableFuture, WebClient, etc. So it is really hard to tell what's going on. But first perhaps please try to make your code idiomatic. Instead of calling CompletableFuture you could just return the Mono that is a result of a few manipulations (flatMapping, mapping, using handle operator). Also, note that your LambdaSubscriber (the impl used when you provide lambdas to subscribe(...)) is not doing anything with the Throwable, not even logging it. Perhaps the issue lies there.

We can try to work with a minimal reproducer that is just limited to reactor-core primitives - otherwise it's really difficult to pinpoint where the problem is :(

from reactor-core.

rohitrajwani avatar rohitrajwani commented on May 23, 2024

I get your point, will try to make it more focused around reactor-core.
I had one more observation, I was narrowing down the versions to check for the working versions and this behaviour/issue is observed with version 3.4.23 and onwards.
Until 3.4.22, everything is working well.

Is there any behaviour change that got in between these two version that should be taken care specifically?
v3.4.22...v3.4.23

Thanks!

from reactor-core.

chemicL avatar chemicL commented on May 23, 2024

Potentially #3146 could be impacting as long as Spring Framework is wrapping the returned CompletableFuture with a Mono. I'm not certain if that's the case. Again, adding proper logging and enabling debug/trace logging would probably reveal a lot about the situations in which the problem occurs.

from reactor-core.

chemicL avatar chemicL commented on May 23, 2024

Actually, looking at the code in Spring Framework it does seem this might be the case.

from reactor-core.

rohitrajwani avatar rohitrajwani commented on May 23, 2024

Thanks for the further clarification.

What should be the approach then with this moving forward? Do you have any suggestions.

from reactor-core.

chemicL avatar chemicL commented on May 23, 2024

@rohitrajwani the same as I said before :)

Again, adding proper logging and enabling debug/trace logging would probably reveal a lot about the situations in which the problem occurs.

and

Also, note that your LambdaSubscriber (the impl used when you provide lambdas to subscribe(...)) is not doing anything with the Throwable, not even logging it. Perhaps the issue lies there.

This behaviour change is desired. It was requested by a member of the Spring Framework team. As I can imagine, previously it would have hidden some broken state. It is really vital to introduce proper logging and handle the errors and cancellations in your reactive pipeline. Once you take care of that, the problem should reveal itself.

I'll close the issue, as it doesn't seem to be a bug in reactor-core. Please feel free to comment if you find out something. Or if there's a genuine bug, we can either reopen this one or you can create a new one with a reproducer.

Best of luck with your investigation!

from reactor-core.

rohitrajwani avatar rohitrajwani commented on May 23, 2024

@chemicL
Upon further investigation, I found that the netty server is triggering the MonoCompletionStage.cancel, because the channel is marked as inactive/closed.

Just wanted to check if there can be a way to handle the future cancel gracefully,
as currently in #3146 the cancel is called without considering if the request or the future was completed.

Won't it be better if we add some additional check for this and make sure to close/complete the future gracefully instead.
If not, can you please suggest if there is a way to handle this at application level.

from reactor-core.

chemicL avatar chemicL commented on May 23, 2024

@rohitrajwani please reach out on StackOverflow if my previous answers are not sufficient. I recommended an approach that doesn't involve CompletableFuture APIs. Using the Reactor API gives you all the necessary hooks for handling errors and cancellation.

from reactor-core.

chemicL avatar chemicL commented on May 23, 2024

As you also reported reactor/reactor-netty#3103 it feels that one thing needs to be said - despite all requests finishing with 2xx response codes, cancellation can happen on a connection/stream (HTTP2) level and that's where there is no request any more. So it means that the client has disconnected for some reason (perhaps timeout, perhaps the process was killed). You'd still like to be informed about that and perform cleanup. That's why there's no running away from cancellation. Also, we don't know whether something was successful or not - it can't be delivered. That's why CompletableFuture is a disconnected concept in your use case that does not fit properly in your reactive pipeline as it doesn't connect the consumer with the producer and there is no clear communication. The cancellation is in fact propagated now, which is an improvement. It should not be worked-around.

from reactor-core.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.