Comments (2)
A potentially naïve fix (assuming I'm not missing how its supposed to be used) would be to expose the GetNextPublishSeqNo()
method from the underlying channel
in channel_manager.go
func (chanManager *ChannelManager) GetNextPublishSeqNo() uint64 {
return chanManager.channel.GetNextPublishSeqNo()
}
in publish.go
func (publisher *Publisher) GetNextPublishSeqNo() uint64 {
publisher.disablePublishDueToBlockedMux.RLock()
defer publisher.disablePublishDueToBlockedMux.RUnlock()
return publisher.chanManager.GetNextPublishSeqNo()
}
however the thread safety of the lib is something I'm not 100% certain about - grabbing the next delivery tag sequence + publishing in two steps opens the possibility of a race condition if the publisher were to be shared between two go routines (is that even something which is possible / supported?)
I've tested this on a simple publisher / consumer app and that now works with a app where the producer is contained within a single go routine (showing changes from the original code)
// Publisher func
go func() {
publisher.NotifyPublish(func(p rabbitmq.Confirmation) {
publisher_confirms <- p
})
publisher.NotifyReturn(func(r rabbitmq.Return) {
publisher_returns <- &r
})
// Initialise a delivery counter to keep track of the DeliveryTags assigned to the messages
for msg := range messages {
deliveryTag := messagestore.DeliveryTag(publisher.GetNextPublishSeqNo())
err := publisher.PublishWithContext(
context.Background(),
msg.Body,
[]string{msg.RoutingKey},
rabbitmq.WithPublishOptionsExchange("ingress"),
rabbitmq.WithPublishOptionsContentType(msg.ContentType),
rabbitmq.WithPublishOptionsContentEncoding(msg.ContentEncoding),
rabbitmq.WithPublishOptionsAppID(msg.AppId),
rabbitmq.WithPublishOptionsPersistentDelivery,
)
if err != nil {
logger.Errorf("error publishing message, %s", err)
continue
}
store.AppendMessage(deliveryTag, msg)
}
}()
// Confirms func
go func() {
for {
select {
case confirm := <-publisher_confirms:
// Try to confirm
if confirm.Ack {
logger.Infof("Attempting to ack DeliveryTag: %d and ReconnectionCount: %d", confirm.DeliveryTag, confirm.ReconnectionCount)
store.Ack(messagestore.DeliveryTag(confirm.DeliveryTag))
} else {
logger.Infof("Attempting to nack DeliveryTag: %d and ReconnectionCount: %d", confirm.DeliveryTag, confirm.ReconnectionCount)
store.Nack(messagestore.DeliveryTag(confirm.DeliveryTag), true)
}
case <-publisher_returns:
}
}
}()
from go-rabbitmq.
The reconnect count should let you know when a reconnect happens, does that not help you know when the counter is reset?
from go-rabbitmq.
Related Issues (20)
- Support for RabbitMQ Clusters HOT 3
- memory leak HOT 20
- how to gracefully shutdown mq consumers?
- Docs specify `WithPublisherOptionsExchangeDeclare` *stops* this library from declaring the exchanges existance
- Asynchronous reconnection HOT 3
- reconnection bug HOT 3
- reconnection error: error reconnecting to amqp server: Exception (504) Reason: "channel/connection is not open"
- reconnection err while rabbitmq server handshake_timeout HOT 2
- Can consumer support temporary queues? HOT 1
- panic since v0.14.0 HOT 2
- runtime: goroutine stack exceeds 1000000000-byte limit on v0.14.1 after recovery HOT 4
- consumer: v0.14.0 implementation can lead to unprocessable messages HOT 2
- graceful shutdown is invalid HOT 3
- NewStaticResolver's shuffle paramater is not effective
- producer connect pool HOT 2
- Consumer ACK but getting metrics of Manual ACK HOT 1
- consumer Run is blocking code HOT 1
- Custom Logger broken on shutdown HOT 2
- create queue with publisher
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from go-rabbitmq.