Git Product home page Git Product logo

Comments (2)

artembilan avatar artembilan commented on April 27, 2024

Thank you for the sample!

Looking to your code, I see the trap:

new Thread(() -> {

So, you deliberately hand off processing to a different thread.
This way an AsyncMessageProcessingConsumer is able to move on in its mainLoop() for the Message message = consumer.nextMessage(this.receiveTimeout);.
Therefore it is not a surprise that second message is rejected before the first has reached its channel.basicAck(tag, false);.

There is a logic:

long tagToRollback = isAsyncReplies()
							? message.getMessageProperties().getDeliveryTag()
							: -1;

which is used for consumer.rollbackOnExceptionIfNecessary(ex, tagToRollback);.
And that one does this:

		boolean ackRequired = !this.acknowledgeMode.isAutoAck()
				&& (!this.acknowledgeMode.isManual() || ContainerUtils.isRejectManual(ex));
...
if (ackRequired) {
				if (tag < 0) {
					OptionalLong deliveryTag = this.deliveryTags.stream().mapToLong(l -> l).max();
					if (deliveryTag.isPresent()) {
						this.channel.basicNack(deliveryTag.getAsLong(), true,
								ContainerUtils.shouldRequeue(this.defaultRequeueRejected, ex, logger));
					}

Pay attention to the true argument for basicNack:

    /**
     * Reject one or several received messages.
     *
     * Supply the <code>deliveryTag</code> from the {@link com.rabbitmq.client.AMQP.Basic.GetOk}
     * or {@link com.rabbitmq.client.AMQP.Basic.Deliver} method containing the message to be rejected.
     * @see com.rabbitmq.client.AMQP.Basic.Nack
     * @param deliveryTag the tag from the received {@link com.rabbitmq.client.AMQP.Basic.GetOk} or {@link com.rabbitmq.client.AMQP.Basic.Deliver}
     * @param multiple true to reject all messages up to and including
     * the supplied delivery tag; false to reject just the supplied
     * delivery tag.
     * @param requeue true if the rejected message(s) should be requeued rather
     * than discarded/dead-lettered
     * @throws java.io.IOException if an error is encountered
     */
    void basicNack(long deliveryTag, boolean multiple, boolean requeue)

Therefore the first message is also nack'ed since it is a part of tags we are rejecting.

Doesn't look like this has anything to do with restarting.
And even if it happens, that is just a consequence of the previously mentioned multi-nack: we end up not having respective tag to manually ack when your thread in the listener is resumed.

Consider to adjust your demoListener() method to return Mono<Void> or CompletableFuture<Void> to let the listener to treat it as an async mode.
That way only one second message is going to be nack'ed.
When I changed your listener to this:

   @RabbitListener(queues = "demo-queue", containerFactory = "demoRabbitConnectionFactory")
    public CompletableFuture<Void> demoListener(String payload, @Header(AmqpHeaders.DELIVERY_TAG) final long tag) {
        return CompletableFuture.runAsync(() -> {
            log.info("Received message with tag: {}", tag);

            try {
                log.info("Started processing message with tag: {}", tag);
                Thread.sleep(2000L); // simulate some off-line processing
                log.info("Finished processing message with tag: {}", tag);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }

            log.info("Accepted message {} with tag: {}", payload, tag);
        });
    }

it works as expected.

Your test must be adjusted as well since there is no Channel.close() anymore therefore your DemoShutdownListener is not called.

See more info in docs: https://docs.spring.io/spring-amqp/reference/amqp/receiving-messages/async-returns.html

Closing this as Works as Designed.

from spring-amqp.

alexey-anufriev avatar alexey-anufriev commented on April 27, 2024

Thank you for the detailed explanation @artembilan!

from spring-amqp.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.