Comments (6)
Thank you for heads up, @Monofraps !
Looks like I see what we are missing there.
So, we have this code so far:
try {
channel = connection.createChannel(isChannelTransacted());
...
}
catch (AmqpApplicationContextClosedException e) {
throw new AmqpConnectException(e);
}
catch (IOException e) {
RabbitUtils.closeChannel(channel);
RabbitUtils.closeConnection(connection);
...
this.consumersToRestart.add(consumer);
consumer = null;
}
But according your StackTrace we have there:
org.springframework.amqp.AmqpConnectException: java.net.ConnectException: Connection refused: connect
That's very close to what we would like, unless we have missed the fact that java.net.ConnectException
(IOException
) is wrapped to the AmqpConnectException
by the RabbitExceptionTranslator
:
protected final Connection createBareConnection() {
try {
...
}
catch (IOException | TimeoutException e) {
throw RabbitExceptionTranslator.convertRabbitAccessException(e);
}
}
So, this is a good catch and I think there is no need in the application to reproduce.
Although we will be appreciate, if you are able to test the solution from your side when it comes to this issue as a BUILD-SNAPSHOT.
from spring-amqp.
@Monofraps ,
as you see the fix has been just pushed.
Give it 10-15 mins and BUILD-SNAPSHOT will be available for testing from your side.
from spring-amqp.
Hi @artembilan and thank you for the quick reply. :)
I now see another issue. The consumer which should be added back to the list of consumers to restart here is initialized here. However, the connect exception is thrown two lines earlier which leaves consumer
at null
. I suppose you could do the same as in DirectMessageListenerContainer.java#L556.
So maybe something like
Channel channel = null;
SimpleConsumer consumer = null;
try {
channel = connection.createChannel(isChannelTransacted());
channel.basicQos(getPrefetchCount());
consumer = new SimpleConsumer(connection, channel, queue);
channel.queueDeclarePassive(queue);
consumer.consumerTag = channel.basicConsume(queue, getAcknowledgeMode().isAutoAck(),
(getConsumerTagStrategy() != null
? getConsumerTagStrategy().createConsumerTag(queue) : ""),
false, isExclusive(), getConsumerArguments(), consumer);
}
catch (AmqpApplicationContextClosedException e) {
throw new AmqpConnectException(e);
}
catch (IOException | AmqpConnectException e) {
RabbitUtils.closeChannel(channel);
RabbitUtils.closeConnection(connection);
if (e.getCause() instanceof ShutdownSignalException
&& e.getCause().getMessage().contains("in exclusive use")) {
getExclusiveConsumerExceptionLogger().log(logger,
"Exclusive consumer failure", e.getCause());
publishConsumerFailedEvent("Consumer raised exception, attempting restart", false, e);
}
else if (e.getCause() instanceof ShutdownSignalException
&& RabbitUtils.isPassiveDeclarationChannelClose((ShutdownSignalException) e.getCause())) {
this.logger.error("Queue not present, scheduling consumer " + consumer + " for restart", e);
}
else if (this.logger.isWarnEnabled()) {
this.logger.warn("basicConsume failed, scheduling consumer " + consumer + " for restart", e);
}
if(consumer == null) {
this.consumersToRestart.add(new SimpleConsumer(null, null, queue));
} else {
this.consumersToRestart.add(consumer);
}
consumer = null;
}
from spring-amqp.
I submitted my suggested change (which indeed fixes the consumer restart issues) as a PR
from spring-amqp.
@artembilan
We have been facing the same issue currently. Only difference that we see in our case is that due to some network failure we are getting java.net.SocketTimeoutException
which being converted into org.springframework.amqp.AmqpIOException by RabbitExceptionTranslator.convertRabbitAccessException(e);
DirectMessageListenerContainer#doConsumeFromQueue is only handling either of IOException | AmqpConnectException
DirectMessageListenerContainer.java#L661 as result of which AmqpIOException is preventing consumer from getting added back to list of consumersToRestart
Stacktrace:
[ERROR] 2018-11-26 08:19:52.501 [listenerContainer-consumerMonitor-1] [] DirectMessageListenerContainer:397 - Consumer canceled - channel closed SimpleConsumer [queue=task_result_queue, consumerTag=amq.ctag-3mWZ5wgYDuQJY2C1wn2aZA identity=5df5c0b2] [INFO ] 2018-11-26 08:19:52.507 [listenerContainer-consumerMonitor-1] [] CachingConnectionFactory:447 - Attempting to connect to: [xxx:5672] [ERROR] 2018-11-26 08:22:52.658 [listenerContainer-consumerMonitor-1] [] DirectMessageListenerContainer:416 - Cannot connect to server org.springframework.amqp.AmqpIOException: java.net.SocketTimeoutException: connect timed out at org.springframework.amqp.rabbit.support.RabbitExceptionTranslator.convertRabbitAccessException(RabbitExceptionTranslator.java:71) ~[spring-rabbit-2.0.3.RELEASE.jar!/:2.0.3.RELEASE]
from spring-amqp.
Please open a new issue referencing this one, rather than commenting on an old, closed, issue.
from spring-amqp.
Related Issues (20)
- Get rid of `synchronized (this.consumersMonitor)` in the `SimpleMessageListenerContainer`
- CachingConnectionFactory leaks channels during connection resets when used in a SimpleMessageContainer HOT 1
- RabbitTemplate with SIMPLE correlation causes uncached channels
- Get rid of `synchronized` blocks in the `CachingConnectionFactory`
- Get rid of `synchronized` in `RabbitAdmin`
- Get rid of `synchronized` in `RabbitTemplate` HOT 1
- Get rid of `synchronized` in the `BlockingQueueConsumer`
- Get rid of `synchronized` in `spring-rabbit-test` module
- Get rid of the rest of `synchronized` in the ListenerContainer infrastructure
- Improve Extensibility of RepublishMessageRecovererWithConfirms by supporting SpEL expressions HOT 1
- AmqpRejectAndDontRequeueException completely restarts Consumer HOT 2
- The `SimpleMessageListenerContainer` does not shutdown properly
- The `SimpleMessageListenerContainer` does not shutdown properly HOT 1
- Swallowed exception 'AmqpTimeoutException: No available channels' HOT 4
- Add batchReceiveTimeout in SimpleMessageListenerContainer HOT 3
- MessageProperties setDelay maximum value problem HOT 3
- ImmediateAcknowledgeAmqpException keeps the message in the queue HOT 10
- TraceId propagation to the new thread local HOT 4
- Wrong ClassLoader is used for message deserialization when devtools are active
- Wrong ClassLoader is used for message deserialization when devtools are active HOT 1
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from spring-amqp.