Git Product home page Git Product logo

Comments (7)

akarnokd avatar akarnokd commented on May 13, 2024

How about JMS.queue() returns something that let's you communicate back an acknowledgement/rejection?

JMS.queue("in").concatMap(manualMsg -> {
    try {
         Flux<R> result = Flux.just(processMessage(manualMsg));
         manualMsg.acknowledge();
         return result;
    } catch (Throwable ex) {
        manualMsg.reject();
        Exceptions.throwIfFatal(ex);
        return Flux.error(ex);
    }
});

from reactor-core.

dfeist avatar dfeist commented on May 13, 2024

@akarnokd That's option 3). But it's not quite so simple I don't think:

  • You wouldn't want to return Flux.error(ex) as that is terminal. Not sure what you would return instead though?
  • It's ok to nack/reject here, but IMO this is the wrong place to ack. Given you maybe have further processing steps which may fail. e.g. `JMS.queue("in").map(msg -> processMsg(msg)).map(msg -> convertMsg(msg)).consume(System.out::println). You only really want to ack at the end. You'd have to put a subscriber at the end to do it yourself.

This approach is doable, but ideally there would be some spec/framework support for this, if it makes sense and is common.

from reactor-core.

akarnokd avatar akarnokd commented on May 13, 2024

The problem is you have a per-value lifecycle and Reactive-Streams can't deal with it. The same issue appears with pooled ByteBuffers in I/O libraries where someone has to release the buffer once all stages have processed it and goes "out of scope".

The closest thing is what I call umap in Reactive-IO where you provide a transform for some upstream sequence and all participating object's lifecycle ends when the transformation terminates. In this context, you'd have:

Flux<Void> JMS.queue(String topic, Function<Mono<Message>, Publisher<?>> umap);

(Note, however, that fatal exceptions break this too.)

from reactor-core.

smaldini avatar smaldini commented on May 13, 2024

That's the kind of pattern we try to implement from https://github.com/reactor/reactor-io/blob/master/reactor-ipc/src/main/java/reactor/io/ipc/Outbound.java#L44 where Netty (or Aeron) might receive a notification of failure. But implementing such contract in memory should be fairly easy.

from reactor-core.

smaldini avatar smaldini commented on May 13, 2024

@dfeist should we consider the latest Mono#create addition enough for now and close this issue ?

from reactor-core.

dfeist avatar dfeist commented on May 13, 2024

@smaldini No, this is completely unrelated to Mono#create. The solution to this issue is take a different approach to this scenario and not think of sources (at least one that need to know how stream terminates) as Publishers, but as an something that creates a stream and then is passed and applies a transformation to it. (As explained by @akarnokd )

The only thing potentially outstanding here is the discussion we had with @sskrla around an alternative way to able to handle errors without i) requiring a Tuple everywhere, and having skip certain processing steps if error is present ii) causing a terminal error which requires re-subscription. This might be simply not possible though, given it conflicts with other things that are in the spec. Is this already be captured in #49 ?

from reactor-core.

dfeist avatar dfeist commented on May 13, 2024

@smaldini We can close this out, this was an issue with the way i was attempting to use stream for request/response.

from reactor-core.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.