Comments (1)
I have reproduced issue with this Spring Boot configuration:
@SpringBootTest(classes = Gh2550ApplicationTests.TestGh2550Application.class,
properties = {
"spring.rabbitmq.cache.channel.checkout-timeout=2000",
"spring.rabbitmq.cache.channel.size=1",
"logging.level.org.springframework.amqp=debug"})
class Gh2550ApplicationTests {
@Test
void verifyThatChannelPermitsAreReleaseOnReconnect(@Autowired TestGh2550Application application)
throws InterruptedException {
assertThat(application.consumerStarted1.await(20, TimeUnit.SECONDS)).isTrue();
application.rabbitContainer().stop();
application.rabbitContainer().start();
assertThat(application.consumerStarted2.await(30, TimeUnit.SECONDS)).isTrue();
}
@SpringBootApplication
public static class TestGh2550Application {
@Bean
@ServiceConnection
RabbitMQContainer rabbitContainer() {
return new RabbitMQContainer(DockerImageName.parse("rabbitmq:latest"));
}
@RabbitListener(queuesToDeclare = @Queue("testQueue"))
void consume(Object payload) {
}
CountDownLatch consumerStarted1 = new CountDownLatch(1);
CountDownLatch consumerStarted2 = new CountDownLatch(2);
@EventListener
void consumerStarted(AsyncConsumerStartedEvent consumerStartedEvent) {
this.consumerStarted1.countDown();
this.consumerStarted2.countDown();
}
}
}
The logs are like this after restarting RabbitMQ:
2023-12-13T16:05:55.634-05:00 DEBUG 22516 --- [pool-2-thread-4] o.s.a.r.listener.BlockingQueueConsumer : Received shutdown signal for consumer tag=amq.ctag-UuGCf7zmHyBQUvJ20Su6KQ
com.rabbitmq.client.ShutdownSignalException: connection error
at com.rabbitmq.client.impl.AMQConnection.startShutdown(AMQConnection.java:1007) ~[amqp-client-5.19.0.jar:5.19.0]
at com.rabbitmq.client.impl.AMQConnection.shutdown(AMQConnection.java:997) ~[amqp-client-5.19.0.jar:5.19.0]
at com.rabbitmq.client.impl.AMQConnection.handleFailure(AMQConnection.java:797) ~[amqp-client-5.19.0.jar:5.19.0]
at com.rabbitmq.client.impl.AMQConnection.access$500(AMQConnection.java:48) ~[amqp-client-5.19.0.jar:5.19.0]
at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:696) ~[amqp-client-5.19.0.jar:5.19.0]
at java.base/java.lang.Thread.run(Thread.java:833) ~[na:na]
Caused by: java.io.EOFException: null
at java.base/java.io.DataInputStream.readUnsignedByte(DataInputStream.java:296) ~[na:na]
at com.rabbitmq.client.impl.Frame.readFrom(Frame.java:91) ~[amqp-client-5.19.0.jar:5.19.0]
at com.rabbitmq.client.impl.SocketFrameHandler.readFrame(SocketFrameHandler.java:199) ~[amqp-client-5.19.0.jar:5.19.0]
at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:687) ~[amqp-client-5.19.0.jar:5.19.0]
... 1 common frames omitted
2023-12-13T16:05:55.943-05:00 DEBUG 22516 --- [ntContainer#0-1] o.s.a.r.l.SimpleMessageListenerContainer : Consumer raised exception, processing can restart if the connection factory supports it
com.rabbitmq.client.ShutdownSignalException: connection error
at com.rabbitmq.client.impl.AMQConnection.startShutdown(AMQConnection.java:1007) ~[amqp-client-5.19.0.jar:5.19.0]
at com.rabbitmq.client.impl.AMQConnection.shutdown(AMQConnection.java:997) ~[amqp-client-5.19.0.jar:5.19.0]
at com.rabbitmq.client.impl.AMQConnection.handleFailure(AMQConnection.java:797) ~[amqp-client-5.19.0.jar:5.19.0]
at com.rabbitmq.client.impl.AMQConnection.access$500(AMQConnection.java:48) ~[amqp-client-5.19.0.jar:5.19.0]
at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:696) ~[amqp-client-5.19.0.jar:5.19.0]
at java.base/java.lang.Thread.run(Thread.java:833) ~[na:na]
Caused by: java.io.EOFException: null
at java.base/java.io.DataInputStream.readUnsignedByte(DataInputStream.java:296) ~[na:na]
at com.rabbitmq.client.impl.Frame.readFrom(Frame.java:91) ~[amqp-client-5.19.0.jar:5.19.0]
at com.rabbitmq.client.impl.SocketFrameHandler.readFrame(SocketFrameHandler.java:199) ~[amqp-client-5.19.0.jar:5.19.0]
at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:687) ~[amqp-client-5.19.0.jar:5.19.0]
... 1 common frames omitted
2023-12-13T16:05:55.947-05:00 INFO 22516 --- [ntContainer#0-1] o.s.a.r.l.SimpleMessageListenerContainer : Restarting Consumer@5a899811: tags=[[amq.ctag-UuGCf7zmHyBQUvJ20Su6KQ]], channel=Cached Rabbit Channel: AMQChannel(amqp://[email protected]:61828/,1), conn: Proxy@60a01cb Shared Rabbit Connection: SimpleConnection@6aad919c [delegate=amqp://[email protected]:61828/, localPort=61842], acknowledgeMode=AUTO local queue size=0
2023-12-13T16:05:55.947-05:00 DEBUG 22516 --- [ntContainer#0-1] o.s.a.r.listener.BlockingQueueConsumer : Closing Rabbit Channel: Cached Rabbit Channel: AMQChannel(amqp://[email protected]:61828/,1), conn: Proxy@60a01cb Shared Rabbit Connection: SimpleConnection@6aad919c [delegate=amqp://[email protected]:61828/, localPort=61842]
2023-12-13T16:05:56.054-05:00 INFO 22516 --- [ Test worker] tc.rabbitmq:latest : Creating container for image: rabbitmq:latest
2023-12-13T16:05:56.119-05:00 INFO 22516 --- [ Test worker] tc.rabbitmq:latest : Container rabbitmq:latest is starting: 391985cb76e9404512f10d5e0d2709b423b99b7a7daf3ce64f9979f9369249a0
2023-12-13T16:05:57.964-05:00 ERROR 22516 --- [ntContainer#0-2] o.s.a.r.l.SimpleMessageListenerContainer : Failed to check/redeclare auto-delete queue(s).
org.springframework.amqp.AmqpTimeoutException: No available channels
at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.obtainPermits(CachingConnectionFactory.java:548) ~[spring-rabbit-3.1.0.jar:3.1.0]
at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.getChannel(CachingConnectionFactory.java:514) ~[spring-rabbit-3.1.0.jar:3.1.0]
at org.springframework.amqp.rabbit.connection.CachingConnectionFactory$ChannelCachingConnectionProxy.createChannel(CachingConnectionFactory.java:1423) ~[spring-rabbit-3.1.0.jar:3.1.0]
at org.springframework.amqp.rabbit.core.RabbitTemplate.doExecute(RabbitTemplate.java:2231) ~[spring-rabbit-3.1.0.jar:3.1.0]
at org.springframework.amqp.rabbit.core.RabbitTemplate.execute(RabbitTemplate.java:2198) ~[spring-rabbit-3.1.0.jar:3.1.0]
at org.springframework.amqp.rabbit.core.RabbitTemplate.execute(RabbitTemplate.java:2178) ~[spring-rabbit-3.1.0.jar:3.1.0]
at org.springframework.amqp.rabbit.core.RabbitAdmin.getQueueInfo(RabbitAdmin.java:465) ~[spring-rabbit-3.1.0.jar:3.1.0]
at org.springframework.amqp.rabbit.core.RabbitAdmin.getQueueProperties(RabbitAdmin.java:449) ~[spring-rabbit-3.1.0.jar:3.1.0]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.attemptDeclarations(AbstractMessageListenerContainer.java:1936) ~[spring-rabbit-3.1.0.jar:3.1.0]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.redeclareElementsIfNecessary(AbstractMessageListenerContainer.java:1904) ~[spring-rabbit-3.1.0.jar:3.1.0]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.initialize(SimpleMessageListenerContainer.java:1383) ~[spring-rabbit-3.1.0.jar:3.1.0]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1225) ~[spring-rabbit-3.1.0.jar:3.1.0]
at java.base/java.lang.Thread.run(Thread.java:833) ~[na:na]
2023-12-13T16:05:57.965-05:00 DEBUG 22516 --- [ntContainer#0-2] o.s.a.r.listener.BlockingQueueConsumer : Starting consumer Consumer@25a8d81f: tags=[[]], channel=null, acknowledgeMode=AUTO local queue size=0
2023-12-13T16:05:59.982-05:00 DEBUG 22516 --- [ntContainer#0-2] o.s.a.r.l.SimpleMessageListenerContainer : Recovering consumer in 5000 ms.
2023-12-13T16:06:02.459-05:00 INFO 22516 --- [ Test worker] tc.rabbitmq:latest : Container rabbitmq:latest started in PT6.4052848S
2023-12-13T16:06:05.151-05:00 DEBUG 22516 --- [ntContainer#0-2] o.s.a.r.l.SimpleMessageListenerContainer : Consumer raised exception, processing can restart if the connection factory supports it
org.springframework.amqp.AmqpTimeoutException: No available channels
at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.obtainPermits(CachingConnectionFactory.java:548) ~[spring-rabbit-3.1.0.jar:3.1.0]
at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.getChannel(CachingConnectionFactory.java:514) ~[spring-rabbit-3.1.0.jar:3.1.0]
at org.springframework.amqp.rabbit.connection.CachingConnectionFactory$ChannelCachingConnectionProxy.createChannel(CachingConnectionFactory.java:1423) ~[spring-rabbit-3.1.0.jar:3.1.0]
at org.springframework.amqp.rabbit.connection.ConnectionFactoryUtils$RabbitResourceFactory.createChannel(ConnectionFactoryUtils.java:351) ~[spring-rabbit-3.1.0.jar:3.1.0]
at org.springframework.amqp.rabbit.connection.ConnectionFactoryUtils.doGetTransactionalResourceHolder(ConnectionFactoryUtils.java:156) ~[spring-rabbit-3.1.0.jar:3.1.0]
at org.springframework.amqp.rabbit.connection.ConnectionFactoryUtils.getTransactionalResourceHolder(ConnectionFactoryUtils.java:102) ~[spring-rabbit-3.1.0.jar:3.1.0]
at org.springframework.amqp.rabbit.connection.ConnectionFactoryUtils.getTransactionalResourceHolder(ConnectionFactoryUtils.java:85) ~[spring-rabbit-3.1.0.jar:3.1.0]
at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.start(BlockingQueueConsumer.java:613) ~[spring-rabbit-3.1.0.jar:3.1.0]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.initialize(SimpleMessageListenerContainer.java:1384) ~[spring-rabbit-3.1.0.jar:3.1.0]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1225) ~[spring-rabbit-3.1.0.jar:3.1.0]
at java.base/java.lang.Thread.run(Thread.java:833) ~[na:na]
2023-12-13T16:06:05.151-05:00 INFO 22516 --- [ntContainer#0-2] o.s.a.r.l.SimpleMessageListenerContainer : Restarting Consumer@25a8d81f: tags=[[]], channel=null, acknowledgeMode=AUTO local queue size=0
2023-12-13T16:06:05.151-05:00 DEBUG 22516 --- [ntContainer#0-2] o.s.a.r.listener.BlockingQueueConsumer : Closing Rabbit Channel: null
2023-12-13T16:06:07.161-05:00 ERROR 22516 --- [ntContainer#0-3] o.s.a.r.l.SimpleMessageListenerContainer : Failed to check/redeclare auto-delete queue(s).
2023-12-13T16:06:07.161-05:00 DEBUG 22516 --- [ntContainer#0-3] o.s.a.r.listener.BlockingQueueConsumer : Starting consumer Consumer@207d8c3d: tags=[[]], channel=null, acknowledgeMode=AUTO local queue size=0
2023-12-13T16:06:09.171-05:00 DEBUG 22516 --- [ntContainer#0-3] o.s.a.r.l.SimpleMessageListenerContainer : Recovering consumer in 5000 ms.
So, yeah... Indeed there is permits release when we would like to recover container from restored connection.
I think the problem is here in the BlockingQueueConsumer
:
public void forceCloseAndClearQueue() {
if (this.channel != null && this.channel.isOpen()) {
RabbitUtils.setPhysicalCloseRequired(this.channel, true);
ConnectionFactoryUtils.releaseResources(this.resourceHolder);
this.deliveryTags.clear();
this.consumers.clear();
this.queue.clear(); // in case we still have a client thread blocked
}
}
Unfortunately, the channel in the picture is not opened any more to do the rest of the logic.
I'll try to fix it the way to ignore the open state since it will be recreated in the cache, but at the same time permit will be released.
from spring-amqp.
Related Issues (20)
- Get rid of `synchronized (this.consumersMonitor)` in the `SimpleMessageListenerContainer`
- CachingConnectionFactory leaks channels during connection resets when used in a SimpleMessageContainer HOT 1
- RabbitTemplate with SIMPLE correlation causes uncached channels
- Get rid of `synchronized` blocks in the `CachingConnectionFactory`
- Get rid of `synchronized` in `RabbitAdmin`
- Get rid of `synchronized` in `RabbitTemplate` HOT 1
- Get rid of `synchronized` in the `BlockingQueueConsumer`
- Get rid of `synchronized` in `spring-rabbit-test` module
- Get rid of the rest of `synchronized` in the ListenerContainer infrastructure
- Improve Extensibility of RepublishMessageRecovererWithConfirms by supporting SpEL expressions HOT 1
- AmqpRejectAndDontRequeueException completely restarts Consumer HOT 2
- The `SimpleMessageListenerContainer` does not shutdown properly
- The `SimpleMessageListenerContainer` does not shutdown properly HOT 1
- Swallowed exception 'AmqpTimeoutException: No available channels' HOT 4
- Add batchReceiveTimeout in SimpleMessageListenerContainer HOT 3
- MessageProperties setDelay maximum value problem HOT 3
- ImmediateAcknowledgeAmqpException keeps the message in the queue HOT 10
- TraceId propagation to the new thread local HOT 4
- Wrong ClassLoader is used for message deserialization when devtools are active
- Wrong ClassLoader is used for message deserialization when devtools are active HOT 1
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from spring-amqp.