Right now Obvs provides an abstraction over the transport and provides you streams of data types via IObservable<IMessage>
and all the other IMessage
subtypes. This is nice because the framework allows the apps to just work with its data types and there is little "pollution" of those types from Obvs, but, because all of the messaging semantics have been stripped away, the application loses any kind of responsibility/control over message acknowledgement/completion which is a big problem in the world of competing consumers and fault tolerance.
Suppose a scenario where there is a subscriber for processing CreateUser
commands which fails half way through because of an intermittent connectivity issue with its backend data store, so it just throws an exception. That command shouldn't be lost, it should show back up on the queue after a certain amount of time so it can be retried again later.
Also consider a scenario where you have long running processes as the result of a command. You don't want to use message receipts that are too long lived in case the instance that was handling the command crashes, so maybe you have a 5min receipt that you renew each step along the way to retain ownership over the message so its not relisted on the queue. How can you do this when all you're working with is IMessage
though?
Or, even just consider the very simple scenario of a message being received, but the power going out on the node half way through processing the message. Obviously you don't want that message lost forever just because Joe from IT tripped over a plug in your data center. :)
In the Obvs.ActiveMQ
transport implementation, for example, I notice you auto-acknowledge the message right after you deserialize it. That seems super early as you haven't even delivered it to subscriber(s) yet. In my of Obvs.AzureServiceBus
right now I am calling OnNext
to deliver the message to any subscribers first and then, only if that doesn't result in an exception, do I auto-complete the message.
Have you given this any thought previously??? I ran into the very same design issue in my own SB that I had been working on before I abandoned it in favor of Obvs. I'm not sure what the best solution is. Spitballing...
- You wouldn't want to put these control methods on
IMessage
because then now you put behavior onto your data types and you're going to end up forcing everyone to inherit from a base class.
- One approach I had thought of is was introducing another interface called
IMessageControl
like:
public interface IMessageControl<out TMessage>
where TMessage : IMessage
{
TMessage Message
{
get;
}
void Abandon();
void Complete();
void Renew(TimeSpan renewTime);
void Reject(string reason);
}
Then all the observable streams become IObservable<IMessageControl<TMessage>>
based instead. The problem with this is that it forces this upon everyone using the framework even if they wanted to use a non-safe/enterprisey approach to messaging that didn't have message control. So I sort of reject this based on that.
- The last approach would be to have the message control methods on some other class (the
IServiceBus/IServiceBusClient
maybe?) themselves so people would still receive just MyCommand
instances, but could acknowledge them something like:
serviceBus.Commands.Subscribe(c =>
{
// whatever logic
serviceBus.Complete(c);
}
Maybe directly off the bus interfaces isn't a great idea either though, maybe you get a message controller off of the bus somehow instead?
Any thoughts? Am I missing something obvious that would make this way easier to solve?