sociomantic-tsunami / dmqproto Goto Github PK
View Code? Open in Web Editor NEWDistributed Message Queue protocol definition, client, fake node, and tests
License: Boost Software License 1.0
Distributed Message Queue protocol definition, client, fake node, and tests
License: Boost Software License 1.0
Some applications need the records to be in order. The current disk overflow push strategy, which favours piping records through the memory queue over the disk overflow if possible, can change the order dramatically. However, there are applications that need the current strategy.
sociomantic-tsunami/dmqnode#30 discusses how to implement a DMQ channel where a Consumer pops the records strictly in the order in which they were pushed. Important for the node/client protocol is that the Consume request needs to be extended to work as follows.
We add a new preserve_order
parameter to the Consume request. This parameter specifies whether
false
) ortrue
).The first Consume request that connects to a certain channel with a certain subscriber name sets the preserve_order
. From this point on the option is permanently set for this channel/subscriber combination. All subsequent Consume requests with the same channel/subscriber combination are expected to specify the same value for preserve_order
as the first one did, otherwise the Consume request will fail. This seems feasible because multiple consumers for the same channel and subscriber are expected to be instances of the same application.
Should there be a need to change the preserve_order
option for an existing channel, it would be possible to allow it as long as there is no disk overflow for the channel.
As there is no need for it, the Pop request will not be extended. Once a channel has been consumed from Pop requests are not allowed for it anyway.
So the new protocol features would be
dmqproto.client.request.Consume
Args
: additional bool preserve_order
fieldNotification
: additional NodeInfo
field to report a preserve_order
conflict errordmqproto.common.Consume
preserve_order
conflict errorThe templates Const, Immut and Inout were used for transitioning from D1 to D2. Now that swarm has been converted, they should be replaced by plain D2 keywords.
The current design of Consume throttling with suspend/resume control messages doesn't work well if the application has more swarm activity than just the Consumer request. This is a result from testing with a real consuming application and a reduced test case which per received message spawns a task that pushes the record to another DMQ node. Between sending the suspend message and receiving the ACK from the node the application receives umpteen thousand records. Yielding receive and send loops in the node and client or disabling socket output buffering doesn't help. Reducing the client socket input buffer size via setsockopt(SOL_SOCKET, SO_RCVBUF)
does help but would slow down the whole socket connection.
Flow control for Consume needs to be done differently. We came up with the following idea:
The user notifying iteration over the received batch happens under the hood and is suspendable so that it stays compatible with existing throttling infrastructure. The advantage of this design is that the client never receives more records than it can process in time.
A disadvantage is that efficiency is sacrificed: Continuous feedback is involved, and there is a delay of at least one round-trip time between the batches. Overall this Consume design is not streaming any more, it is a sequence of "pop batch" requests instead. However, the efficiency of streaming Consume is that high that it overloads even simple test programs. With multiple nodes the throughput is even multiplied. When a batch from each of two nodes A and B are waiting to be processed by the application, the application first processes batch A, then the client requests the next batch from node A, but until that batch has arrived the application still needs to process batch B.
Another thing to watch out for is to avoid long delays in the node if records are sent at a low rate as it happens when a DMQ channel is empty and records are pushed to it at a low rate. In this situation aggregating a batch of record may take a long time so there should be a timeout after which the node sends an incomplete batch. For production one second should be enough, for testing much lower values may be required.
Like sociomantic-tsunami/dlsproto#18.
(ExtensibleDmqClient
appears to have such a ctor, but SchedulingDmqClient
doesn't.)
Many applications are structured as follows:
It's currently very difficult for such applications to implement support for partial DHT handshaking, as the app doesn't know how to handle any record it receives from the DMQ that would trigger a request to a node in the DHT whose hash range is not yet known. The best thing such an app can do, right now, is to store "orphaned" records in a local queue (in memory / on disk) for processing once the hash range of the missing DHT node is known. This is just a workaround, though, effectively duplicating the behaviour of the DMQ locally in an application instance.
What if we added a feature to DMQ Consume to allow subscription to a subsection of a channel, based on a hash range? That way, an app could start consuming only those DMQ records destined for DHT nodes that have already been handshaked.
prepareChannels
must guarantee that pushToStorage
cannot fail.
Some applications don't need to read all data all the time, e.g. testing applications or applications collecting data for short term analysis. For those it would be simpler if the channel wouldn't overflow and just keep the newest entries up to a certain size.
Another feature could be "remove channel on stop request", which would be of particular interests for application tests on production data in combination with the channel subscriber feature.
The neo Consume request has the facility for the client to initiate a clean shutdown, ensuring that all records that have already been popped from the channel are flushed to the client before the request ends.
There are, however, other situations where records can be lost:
We should consider ways to fix these cases.
(Created here, not in dmqnode, as the fix will likely require both changes to the storage engine and accompanying protocol changes.)
Similarly to what we already have with Ocean, we should add all other open source projects to dlang Jenkins to make sure new DMDs don't break them.
Statements like:
All known nodes have either stopped the request (as requested by the
user, via the controller) or are not currently connected. The request is
now finished
Can be interpreted to mean that the request is finished because either one of the following conditions occurred:
It could be made clearer that this is not the meaning: the controller must have been used.
The only notifications that indicate the request has finished are channel_deleted
or stopped
. There's another case, though: when all nodes return a fatal error to the client (e.g. internal error / unsupported). It's not clear how the client handles this.
Like https://github.com/sociomantic-tsunami/dhtproto/blob/neo/src/dhtproto/client/UsageExamples.d.
This makes the examples simpler for end-users to follow.
https://github.com/sociomantic-tsunami/dmqproto/blob/v13.x.x/src/dmqproto/node/neo/request/Push.d#L57-L64 appears to be dancing in undefined behaviour territory. (See https://github.com/gavin-norman-sociomantic/swarm-tsunami/blob/3428d7cca0171ce4f802da6fbed52606a071295c/src/swarm/neo/util/AcquiredResources.d#L71-L100.)
It's added freshly in this release. Would be worth including a link to the client README (https://github.com/sociomantic-tsunami/dmqproto/tree/v14.x.x/src/dmqproto/client).
These requests are already unsupported by the protocol, but they can't be deprecated since
we can't get rid of the deprecations in D1 due to the call chain analysis bug where the deprecated
struct is triggering the deprecation itself. Since they are doing nothing since two last major, we can just
remove them.
There are applications that need completely separate instantiation logic for their neo vs legacy DMQ modes. We already have a way to instantiate a legacy-only client, but no way to instantiate a neo-only client.
For some use-cases it is not vital that every message gets through, but it is important to avoid queue overflow in the case that there is not an active channel reader. Some example use-cases:
an app wants to start writing a new DMQ channel, but doesn't want to block on the reader app being implemented
we have a very high-throughput channel where if the reader goes down, we will overspill very quickly due to the quantity of data
we have a channel where the reader needs to focus on the most current data, and where old messages can be discarded
In all of these cases it would be convenient to have either a request type (or a flag for Put
requests) that indicates that, in the absence of a channel reader, the DMQ node can just drop the message.
Testing indicates that there is a ~200ms delay between the client sending the continue signal and it receiving the next batch of records. This is unexpectedly slow, and indicates that a flush call is missing somewhere.
11:45:14 ./submodules/dmqproto/src/fakedmq/neo/request/Push.d(62): Deprecation: alias swarm.neo.request.Command.GlobalStatusCode is deprecated - Use the SupportedStatus instead of GlobalStatusCode
11:45:16 ./submodules/dmqproto/src/fakedmq/neo/request/Pop.d(63): Deprecation: alias swarm.neo.request.Command.GlobalStatusCode is deprecated - Use the SupportedStatus instead of GlobalStatusCode
11:45:16 ./submodules/dmqproto/src/dmqproto/node/neo/request/Consume.d(294): Deprecation: function swarm.neo.connection.RequestOnConnBase.RequestOnConnBase.EventDispatcher.yieldReceiveAndHandleEvents is deprecated - Use nextEvent instead
11:45:16 ./submodules/dmqproto/src/dmqproto/node/neo/request/Consume.d(339): Deprecation: function swarm.neo.connection.RequestOnConnBase.RequestOnConnBase.EventDispatcher.sendReceive is deprecated - Use nextEvent instead
11:45:16 ./submodules/dmqproto/src/dmqproto/node/neo/request/Consume.d(409): Deprecation: function swarm.neo.connection.RequestOnConnBase.RequestOnConnBase.EventDispatcher.receiveAndHandleEvents is deprecated - Use nextEvent instead
11:45:16 ./submodules/dmqproto/src/dmqproto/node/neo/request/Consume.d(509): Deprecation: function swarm.neo.connection.RequestOnConnBase.RequestOnConnBase.EventDispatcher.sendReceive is deprecated - Use nextEvent instead
11:45:16 ./submodules/dmqproto/src/dmqproto/node/neo/request/Consume.d(800): Deprecation: function swarm.neo.connection.RequestOnConnBase.RequestOnConnBase.EventDispatcher.sendReceive is deprecated - Use nextEvent instead
11:45:16 ./submodules/dmqproto/src/dmqproto/node/neo/request/Consume.d(837): Deprecation: function swarm.neo.connection.RequestOnConnBase.RequestOnConnBase.EventDispatcher.yieldReceiveAndHandleEvents is deprecated - Use nextEvent instead
11:45:16 ./submodules/dmqproto/src/dmqproto/node/neo/request/Consume.d(895): Deprecation: function swarm.neo.connection.RequestOnConnBase.RequestOnConnBase.EventDispatcher.receiveAndHandleEvents is deprecated - Use nextEvent instead
11:45:16 ./submodules/dmqproto/src/dmqproto/node/neo/request/Consume.d(993): Deprecation: function swarm.neo.connection.RequestOnConnBase.RequestOnConnBase.EventDispatcher.sendReceive is deprecated - Use nextEvent instead
11:45:21 ./submodules/dmqproto/src/fakedmq/neo/RequestHandlers.d(35): Deprecation: function swarm.neo.node.ConnectionHandler.ConnectionHandler.RequestMap.opIndexAssign is deprecated
11:45:21 ./submodules/dmqproto/src/fakedmq/neo/RequestHandlers.d(36): Deprecation: function swarm.neo.node.ConnectionHandler.ConnectionHandler.RequestMap.opIndexAssign is deprecated
11:45:21 ./submodules/dmqproto/src/fakedmq/neo/RequestHandlers.d(37): Deprecation: function swarm.neo.node.ConnectionHandler.ConnectionHandler.RequestMap.opIndexAssign is deprecated
11:45:21 ./submodules/dmqproto/src/fakedmq/neo/request/Consume.d(68): Deprecation: alias swarm.neo.request.Command.GlobalStatusCode is deprecated - Use the SupportedStatus instead of GlobalStatusCode
In Neo Consume, it looks like that receiving the MessageType.ChannelRemoved
will just hang the client:
e.g. see sociomantic-tsunami/dhtproto#40
Generally, trying it again will have the same result.
If a DMQ node is down the error available in the ConnNotification
object passed to the connection notifier callback is ConnectProtocol: finalized
. I would have hoped for something along the lines of Connection refused
to enable debugging of the problem. I'm not sure finalized
will mean much to most users.
sociomantic-tsunami/dhtproto#10 discusses problems around ending a DHT Mirror request when the channel being mirrored is deleted. The same argument applies to the DMQ Consume request.
Discussion led to the idea that it should actually be illegal to remove a channel that is being listened to. RemoveChannel is used in two cases:
In both cases, the request should definitely not work on a channel that's currently in use.
https://github.com/sociomantic-tsunami/swarm/releases/tag/v4.4.0
To move forward with D2 we need to provide automatically converted libraries for projects that are built only for D2.
Every time a new tag is pushed, we have to convert it to D2, tag the converted version with a +d2
build metadata information appended (so tag v2.3.4
--- conversion D2 ---> v2.3.4+d2
) and push it back (making sure the new tag is not converted again!).
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.