Git Product home page Git Product logo

Comments (1)

artembilan avatar artembilan commented on April 27, 2024

I have reproduced issue with this Spring Boot configuration:

@SpringBootTest(classes = Gh2550ApplicationTests.TestGh2550Application.class,
		properties = {
				"spring.rabbitmq.cache.channel.checkout-timeout=2000",
				"spring.rabbitmq.cache.channel.size=1",
				"logging.level.org.springframework.amqp=debug"})
class Gh2550ApplicationTests {

	@Test
	void verifyThatChannelPermitsAreReleaseOnReconnect(@Autowired TestGh2550Application application)
			throws InterruptedException {

		assertThat(application.consumerStarted1.await(20, TimeUnit.SECONDS)).isTrue();

		application.rabbitContainer().stop();
		application.rabbitContainer().start();

		assertThat(application.consumerStarted2.await(30, TimeUnit.SECONDS)).isTrue();
	}

	@SpringBootApplication
	public static class TestGh2550Application {

		@Bean
		@ServiceConnection
		RabbitMQContainer rabbitContainer() {
			return new RabbitMQContainer(DockerImageName.parse("rabbitmq:latest"));
		}

		@RabbitListener(queuesToDeclare = @Queue("testQueue"))
		void consume(Object payload) {

		}

		CountDownLatch consumerStarted1 = new CountDownLatch(1);

		CountDownLatch consumerStarted2 = new CountDownLatch(2);

		@EventListener
		void consumerStarted(AsyncConsumerStartedEvent consumerStartedEvent) {
			this.consumerStarted1.countDown();
			this.consumerStarted2.countDown();
		}

	}

}

The logs are like this after restarting RabbitMQ:

2023-12-13T16:05:55.634-05:00 DEBUG 22516 --- [pool-2-thread-4] o.s.a.r.listener.BlockingQueueConsumer   : Received shutdown signal for consumer tag=amq.ctag-UuGCf7zmHyBQUvJ20Su6KQ

com.rabbitmq.client.ShutdownSignalException: connection error
	at com.rabbitmq.client.impl.AMQConnection.startShutdown(AMQConnection.java:1007) ~[amqp-client-5.19.0.jar:5.19.0]
	at com.rabbitmq.client.impl.AMQConnection.shutdown(AMQConnection.java:997) ~[amqp-client-5.19.0.jar:5.19.0]
	at com.rabbitmq.client.impl.AMQConnection.handleFailure(AMQConnection.java:797) ~[amqp-client-5.19.0.jar:5.19.0]
	at com.rabbitmq.client.impl.AMQConnection.access$500(AMQConnection.java:48) ~[amqp-client-5.19.0.jar:5.19.0]
	at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:696) ~[amqp-client-5.19.0.jar:5.19.0]
	at java.base/java.lang.Thread.run(Thread.java:833) ~[na:na]
Caused by: java.io.EOFException: null
	at java.base/java.io.DataInputStream.readUnsignedByte(DataInputStream.java:296) ~[na:na]
	at com.rabbitmq.client.impl.Frame.readFrom(Frame.java:91) ~[amqp-client-5.19.0.jar:5.19.0]
	at com.rabbitmq.client.impl.SocketFrameHandler.readFrame(SocketFrameHandler.java:199) ~[amqp-client-5.19.0.jar:5.19.0]
	at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:687) ~[amqp-client-5.19.0.jar:5.19.0]
	... 1 common frames omitted

2023-12-13T16:05:55.943-05:00 DEBUG 22516 --- [ntContainer#0-1] o.s.a.r.l.SimpleMessageListenerContainer : Consumer raised exception, processing can restart if the connection factory supports it

com.rabbitmq.client.ShutdownSignalException: connection error
	at com.rabbitmq.client.impl.AMQConnection.startShutdown(AMQConnection.java:1007) ~[amqp-client-5.19.0.jar:5.19.0]
	at com.rabbitmq.client.impl.AMQConnection.shutdown(AMQConnection.java:997) ~[amqp-client-5.19.0.jar:5.19.0]
	at com.rabbitmq.client.impl.AMQConnection.handleFailure(AMQConnection.java:797) ~[amqp-client-5.19.0.jar:5.19.0]
	at com.rabbitmq.client.impl.AMQConnection.access$500(AMQConnection.java:48) ~[amqp-client-5.19.0.jar:5.19.0]
	at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:696) ~[amqp-client-5.19.0.jar:5.19.0]
	at java.base/java.lang.Thread.run(Thread.java:833) ~[na:na]
Caused by: java.io.EOFException: null
	at java.base/java.io.DataInputStream.readUnsignedByte(DataInputStream.java:296) ~[na:na]
	at com.rabbitmq.client.impl.Frame.readFrom(Frame.java:91) ~[amqp-client-5.19.0.jar:5.19.0]
	at com.rabbitmq.client.impl.SocketFrameHandler.readFrame(SocketFrameHandler.java:199) ~[amqp-client-5.19.0.jar:5.19.0]
	at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:687) ~[amqp-client-5.19.0.jar:5.19.0]
	... 1 common frames omitted

2023-12-13T16:05:55.947-05:00  INFO 22516 --- [ntContainer#0-1] o.s.a.r.l.SimpleMessageListenerContainer : Restarting Consumer@5a899811: tags=[[amq.ctag-UuGCf7zmHyBQUvJ20Su6KQ]], channel=Cached Rabbit Channel: AMQChannel(amqp://[email protected]:61828/,1), conn: Proxy@60a01cb Shared Rabbit Connection: SimpleConnection@6aad919c [delegate=amqp://[email protected]:61828/, localPort=61842], acknowledgeMode=AUTO local queue size=0
2023-12-13T16:05:55.947-05:00 DEBUG 22516 --- [ntContainer#0-1] o.s.a.r.listener.BlockingQueueConsumer   : Closing Rabbit Channel: Cached Rabbit Channel: AMQChannel(amqp://[email protected]:61828/,1), conn: Proxy@60a01cb Shared Rabbit Connection: SimpleConnection@6aad919c [delegate=amqp://[email protected]:61828/, localPort=61842]
2023-12-13T16:05:56.054-05:00  INFO 22516 --- [    Test worker] tc.rabbitmq:latest                       : Creating container for image: rabbitmq:latest
2023-12-13T16:05:56.119-05:00  INFO 22516 --- [    Test worker] tc.rabbitmq:latest                       : Container rabbitmq:latest is starting: 391985cb76e9404512f10d5e0d2709b423b99b7a7daf3ce64f9979f9369249a0
2023-12-13T16:05:57.964-05:00 ERROR 22516 --- [ntContainer#0-2] o.s.a.r.l.SimpleMessageListenerContainer : Failed to check/redeclare auto-delete queue(s).

org.springframework.amqp.AmqpTimeoutException: No available channels
	at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.obtainPermits(CachingConnectionFactory.java:548) ~[spring-rabbit-3.1.0.jar:3.1.0]
	at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.getChannel(CachingConnectionFactory.java:514) ~[spring-rabbit-3.1.0.jar:3.1.0]
	at org.springframework.amqp.rabbit.connection.CachingConnectionFactory$ChannelCachingConnectionProxy.createChannel(CachingConnectionFactory.java:1423) ~[spring-rabbit-3.1.0.jar:3.1.0]
	at org.springframework.amqp.rabbit.core.RabbitTemplate.doExecute(RabbitTemplate.java:2231) ~[spring-rabbit-3.1.0.jar:3.1.0]
	at org.springframework.amqp.rabbit.core.RabbitTemplate.execute(RabbitTemplate.java:2198) ~[spring-rabbit-3.1.0.jar:3.1.0]
	at org.springframework.amqp.rabbit.core.RabbitTemplate.execute(RabbitTemplate.java:2178) ~[spring-rabbit-3.1.0.jar:3.1.0]
	at org.springframework.amqp.rabbit.core.RabbitAdmin.getQueueInfo(RabbitAdmin.java:465) ~[spring-rabbit-3.1.0.jar:3.1.0]
	at org.springframework.amqp.rabbit.core.RabbitAdmin.getQueueProperties(RabbitAdmin.java:449) ~[spring-rabbit-3.1.0.jar:3.1.0]
	at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.attemptDeclarations(AbstractMessageListenerContainer.java:1936) ~[spring-rabbit-3.1.0.jar:3.1.0]
	at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.redeclareElementsIfNecessary(AbstractMessageListenerContainer.java:1904) ~[spring-rabbit-3.1.0.jar:3.1.0]
	at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.initialize(SimpleMessageListenerContainer.java:1383) ~[spring-rabbit-3.1.0.jar:3.1.0]
	at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1225) ~[spring-rabbit-3.1.0.jar:3.1.0]
	at java.base/java.lang.Thread.run(Thread.java:833) ~[na:na]

2023-12-13T16:05:57.965-05:00 DEBUG 22516 --- [ntContainer#0-2] o.s.a.r.listener.BlockingQueueConsumer   : Starting consumer Consumer@25a8d81f: tags=[[]], channel=null, acknowledgeMode=AUTO local queue size=0
2023-12-13T16:05:59.982-05:00 DEBUG 22516 --- [ntContainer#0-2] o.s.a.r.l.SimpleMessageListenerContainer : Recovering consumer in 5000 ms.
2023-12-13T16:06:02.459-05:00  INFO 22516 --- [    Test worker] tc.rabbitmq:latest                       : Container rabbitmq:latest started in PT6.4052848S
2023-12-13T16:06:05.151-05:00 DEBUG 22516 --- [ntContainer#0-2] o.s.a.r.l.SimpleMessageListenerContainer : Consumer raised exception, processing can restart if the connection factory supports it

org.springframework.amqp.AmqpTimeoutException: No available channels
	at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.obtainPermits(CachingConnectionFactory.java:548) ~[spring-rabbit-3.1.0.jar:3.1.0]
	at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.getChannel(CachingConnectionFactory.java:514) ~[spring-rabbit-3.1.0.jar:3.1.0]
	at org.springframework.amqp.rabbit.connection.CachingConnectionFactory$ChannelCachingConnectionProxy.createChannel(CachingConnectionFactory.java:1423) ~[spring-rabbit-3.1.0.jar:3.1.0]
	at org.springframework.amqp.rabbit.connection.ConnectionFactoryUtils$RabbitResourceFactory.createChannel(ConnectionFactoryUtils.java:351) ~[spring-rabbit-3.1.0.jar:3.1.0]
	at org.springframework.amqp.rabbit.connection.ConnectionFactoryUtils.doGetTransactionalResourceHolder(ConnectionFactoryUtils.java:156) ~[spring-rabbit-3.1.0.jar:3.1.0]
	at org.springframework.amqp.rabbit.connection.ConnectionFactoryUtils.getTransactionalResourceHolder(ConnectionFactoryUtils.java:102) ~[spring-rabbit-3.1.0.jar:3.1.0]
	at org.springframework.amqp.rabbit.connection.ConnectionFactoryUtils.getTransactionalResourceHolder(ConnectionFactoryUtils.java:85) ~[spring-rabbit-3.1.0.jar:3.1.0]
	at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.start(BlockingQueueConsumer.java:613) ~[spring-rabbit-3.1.0.jar:3.1.0]
	at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.initialize(SimpleMessageListenerContainer.java:1384) ~[spring-rabbit-3.1.0.jar:3.1.0]
	at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1225) ~[spring-rabbit-3.1.0.jar:3.1.0]
	at java.base/java.lang.Thread.run(Thread.java:833) ~[na:na]

2023-12-13T16:06:05.151-05:00  INFO 22516 --- [ntContainer#0-2] o.s.a.r.l.SimpleMessageListenerContainer : Restarting Consumer@25a8d81f: tags=[[]], channel=null, acknowledgeMode=AUTO local queue size=0
2023-12-13T16:06:05.151-05:00 DEBUG 22516 --- [ntContainer#0-2] o.s.a.r.listener.BlockingQueueConsumer   : Closing Rabbit Channel: null
2023-12-13T16:06:07.161-05:00 ERROR 22516 --- [ntContainer#0-3] o.s.a.r.l.SimpleMessageListenerContainer : Failed to check/redeclare auto-delete queue(s).
2023-12-13T16:06:07.161-05:00 DEBUG 22516 --- [ntContainer#0-3] o.s.a.r.listener.BlockingQueueConsumer   : Starting consumer Consumer@207d8c3d: tags=[[]], channel=null, acknowledgeMode=AUTO local queue size=0
2023-12-13T16:06:09.171-05:00 DEBUG 22516 --- [ntContainer#0-3] o.s.a.r.l.SimpleMessageListenerContainer : Recovering consumer in 5000 ms.

So, yeah... Indeed there is permits release when we would like to recover container from restored connection.

I think the problem is here in the BlockingQueueConsumer:

	public void forceCloseAndClearQueue() {
		if (this.channel != null && this.channel.isOpen()) {
			RabbitUtils.setPhysicalCloseRequired(this.channel, true);
			ConnectionFactoryUtils.releaseResources(this.resourceHolder);
			this.deliveryTags.clear();
			this.consumers.clear();
			this.queue.clear(); // in case we still have a client thread blocked
		}
	}

Unfortunately, the channel in the picture is not opened any more to do the rest of the logic.

I'll try to fix it the way to ignore the open state since it will be recreated in the cache, but at the same time permit will be released.

from spring-amqp.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.