Git Product home page Git Product logo

dmqproto's People

Contributors

andrew-stevenson-sociomantic avatar david-eckardt-sociomantic avatar don-clugston-sociomantic avatar geod24 avatar jens-mueller-sociomantic avatar joseph-wakeling-frequenz avatar mathias-baumann-sociomantic avatar mathias-lang-sociomantic avatar matthias-wende-sociomantic avatar mihails-strasuns-sociomantic avatar nemanja-boric-sociomantic avatar scott-gibson-sociomantic avatar tiyash-basu-sociomantic avatar

Stargazers

 avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

dmqproto's Issues

Consume records in order

Some applications need the records to be in order. The current disk overflow push strategy, which favours piping records through the memory queue over the disk overflow if possible, can change the order dramatically. However, there are applications that need the current strategy.

sociomantic-tsunami/dmqnode#30 discusses how to implement a DMQ channel where a Consumer pops the records strictly in the order in which they were pushed. Important for the node/client protocol is that the Consume request needs to be extended to work as follows.
We add a new preserve_order parameter to the Consume request. This parameter specifies whether

  • the memory queue has priority (the current strategy) (false) or
  • all records in this channel should stay in order (true).

The first Consume request that connects to a certain channel with a certain subscriber name sets the preserve_order. From this point on the option is permanently set for this channel/subscriber combination. All subsequent Consume requests with the same channel/subscriber combination are expected to specify the same value for preserve_order as the first one did, otherwise the Consume request will fail. This seems feasible because multiple consumers for the same channel and subscriber are expected to be instances of the same application.

Should there be a need to change the preserve_order option for an existing channel, it would be possible to allow it as long as there is no disk overflow for the channel.

As there is no need for it, the Pop request will not be extended. Once a channel has been consumed from Pop requests are not allowed for it anyway.

So the new protocol features would be

  • dmqproto.client.request.Consume
    • Args: additional bool preserve_order field
    • Notification: additional NodeInfo field to report a preserve_order conflict error
  • dmqproto.common.Consume
    • New message type to report a preserve_order conflict error

Redesign Consume with batches of records for reliable flow control

The current design of Consume throttling with suspend/resume control messages doesn't work well if the application has more swarm activity than just the Consumer request. This is a result from testing with a real consuming application and a reduced test case which per received message spawns a task that pushes the record to another DMQ node. Between sending the suspend message and receiving the ACK from the node the application receives umpteen thousand records. Yielding receive and send loops in the node and client or disabling socket output buffering doesn't help. Reducing the client socket input buffer size via setsockopt(SOL_SOCKET, SO_RCVBUF) does help but would slow down the whole socket connection.
Flow control for Consume needs to be done differently. We came up with the following idea:

  • The node sends a batch of records with a certain maximum size, say, tens or thousands of kilobytes and waits for the client to request the next batch.
  • The client Consume moves the received batch into a queue, then pops all records from it, calling the user notifier for each record. When the queue is empty -- the application has processed the batch -- then the client requests the next batch from the node.

The user notifying iteration over the received batch happens under the hood and is suspendable so that it stays compatible with existing throttling infrastructure. The advantage of this design is that the client never receives more records than it can process in time.
A disadvantage is that efficiency is sacrificed: Continuous feedback is involved, and there is a delay of at least one round-trip time between the batches. Overall this Consume design is not streaming any more, it is a sequence of "pop batch" requests instead. However, the efficiency of streaming Consume is that high that it overloads even simple test programs. With multiple nodes the throughput is even multiplied. When a batch from each of two nodes A and B are waiting to be processed by the application, the application first processes batch A, then the client requests the next batch from node A, but until that batch has arrived the application still needs to process batch B.
Another thing to watch out for is to avoid long delays in the node if records are sent at a low rate as it happens when a DMQ channel is empty and records are pushed to it at a low rate. In this situation aggregating a batch of record may take a long time so there should be a timeout after which the node sends an incomplete batch. For production one second should be enough, for testing much lower values may be required.

Consider key-based channels

Many applications are structured as follows:

  1. Consume a DMQ channel.
  2. For each record, make a DHT request (using some field of the DMQ record as the DHT key).

It's currently very difficult for such applications to implement support for partial DHT handshaking, as the app doesn't know how to handle any record it receives from the DMQ that would trigger a request to a node in the DHT whose hash range is not yet known. The best thing such an app can do, right now, is to store "orphaned" records in a local queue (in memory / on disk) for processing once the hash range of the missing DHT node is known. This is just a workaround, though, effectively duplicating the behaviour of the DMQ locally in an application instance.

What if we added a feature to DMQ Consume to allow subscription to a subsection of a channel, based on a hash range? That way, an app could start consuming only those DMQ records destined for DHT nodes that have already been handshaked.

Max channel size feature for consume request

Some applications don't need to read all data all the time, e.g. testing applications or applications collecting data for short term analysis. For those it would be simpler if the channel wouldn't overflow and just keep the newest entries up to a certain size.

Another feature could be "remove channel on stop request", which would be of particular interests for application tests on production data in combination with the channel subscriber feature.

Consume: prevent loss of data in write buffers / pending batches

The neo Consume request has the facility for the client to initiate a clean shutdown, ensuring that all records that have already been popped from the channel are flushed to the client before the request ends.

There are, however, other situations where records can be lost:

  • When the node is shut down. Currently there is no support for waiting for active requests to end cleanly.
  • When a connection error occurs. Currently, this just ends all requests on that connection.

We should consider ways to fix these cases.

(Created here, not in dmqnode, as the fix will likely require both changes to the storage engine and accompanying protocol changes.)

Clarify description of notifications related to Consume control changes

Statements like:

All known nodes have either stopped the request (as requested by the
user, via the controller) or are not currently connected. The request is
now finished

Can be interpreted to mean that the request is finished because either one of the following conditions occurred:

  1. The user used the controller to stop the request
  2. All nodes are not currently connected

It could be made clearer that this is not the meaning: the controller must have been used.

Remove de facto deprecated `GetSize`, `GetChannels` and `GetChannelSize`

These requests are already unsupported by the protocol, but they can't be deprecated since
we can't get rid of the deprecations in D1 due to the call chain analysis bug where the deprecated
struct is triggering the deprecation itself. Since they are doing nothing since two last major, we can just
remove them.

Add neo-only client constructors

There are applications that need completely separate instantiation logic for their neo vs legacy DMQ modes. We already have a way to instantiate a legacy-only client, but no way to instantiate a neo-only client.

Support "fire-and-forget" messages

For some use-cases it is not vital that every message gets through, but it is important to avoid queue overflow in the case that there is not an active channel reader. Some example use-cases:

  • an app wants to start writing a new DMQ channel, but doesn't want to block on the reader app being implemented

    • this allows writers to get their work done (and test potential performance issues) without worrying about the reader
    • can switch to persistent messages if needed only once the reader is ready
  • we have a very high-throughput channel where if the reader goes down, we will overspill very quickly due to the quantity of data

    • if we can tolerate the data loss we might want to just drop messages rather than create a big backlog and overspill onto disk
  • we have a channel where the reader needs to focus on the most current data, and where old messages can be discarded

    • in this case, processing a DMQ backlog when the reader revives might delay restoration of a well functioning system (even if the size of the backlog is tolerable)

In all of these cases it would be convenient to have either a request type (or a flag for Put requests) that indicates that, in the absence of a channel reader, the DMQ node can just drop the message.

200ms delay receiving Consume batches

Testing indicates that there is a ~200ms delay between the client sending the continue signal and it receiving the next batch of records. This is unexpectedly slow, and indicates that a flush call is missing somewhere.

Deprecations in 13.1.0

11:45:14 ./submodules/dmqproto/src/fakedmq/neo/request/Push.d(62): Deprecation: alias swarm.neo.request.Command.GlobalStatusCode is deprecated - Use the SupportedStatus instead of GlobalStatusCode
11:45:16 ./submodules/dmqproto/src/fakedmq/neo/request/Pop.d(63): Deprecation: alias swarm.neo.request.Command.GlobalStatusCode is deprecated - Use the SupportedStatus instead of GlobalStatusCode
11:45:16 ./submodules/dmqproto/src/dmqproto/node/neo/request/Consume.d(294): Deprecation: function swarm.neo.connection.RequestOnConnBase.RequestOnConnBase.EventDispatcher.yieldReceiveAndHandleEvents is deprecated - Use nextEvent instead
11:45:16 ./submodules/dmqproto/src/dmqproto/node/neo/request/Consume.d(339): Deprecation: function swarm.neo.connection.RequestOnConnBase.RequestOnConnBase.EventDispatcher.sendReceive is deprecated - Use nextEvent instead
11:45:16 ./submodules/dmqproto/src/dmqproto/node/neo/request/Consume.d(409): Deprecation: function swarm.neo.connection.RequestOnConnBase.RequestOnConnBase.EventDispatcher.receiveAndHandleEvents is deprecated - Use nextEvent instead
11:45:16 ./submodules/dmqproto/src/dmqproto/node/neo/request/Consume.d(509): Deprecation: function swarm.neo.connection.RequestOnConnBase.RequestOnConnBase.EventDispatcher.sendReceive is deprecated - Use nextEvent instead
11:45:16 ./submodules/dmqproto/src/dmqproto/node/neo/request/Consume.d(800): Deprecation: function swarm.neo.connection.RequestOnConnBase.RequestOnConnBase.EventDispatcher.sendReceive is deprecated - Use nextEvent instead
11:45:16 ./submodules/dmqproto/src/dmqproto/node/neo/request/Consume.d(837): Deprecation: function swarm.neo.connection.RequestOnConnBase.RequestOnConnBase.EventDispatcher.yieldReceiveAndHandleEvents is deprecated - Use nextEvent instead
11:45:16 ./submodules/dmqproto/src/dmqproto/node/neo/request/Consume.d(895): Deprecation: function swarm.neo.connection.RequestOnConnBase.RequestOnConnBase.EventDispatcher.receiveAndHandleEvents is deprecated - Use nextEvent instead
11:45:16 ./submodules/dmqproto/src/dmqproto/node/neo/request/Consume.d(993): Deprecation: function swarm.neo.connection.RequestOnConnBase.RequestOnConnBase.EventDispatcher.sendReceive is deprecated - Use nextEvent instead
11:45:21 ./submodules/dmqproto/src/fakedmq/neo/RequestHandlers.d(35): Deprecation: function swarm.neo.node.ConnectionHandler.ConnectionHandler.RequestMap.opIndexAssign is deprecated
11:45:21 ./submodules/dmqproto/src/fakedmq/neo/RequestHandlers.d(36): Deprecation: function swarm.neo.node.ConnectionHandler.ConnectionHandler.RequestMap.opIndexAssign is deprecated
11:45:21 ./submodules/dmqproto/src/fakedmq/neo/RequestHandlers.d(37): Deprecation: function swarm.neo.node.ConnectionHandler.ConnectionHandler.RequestMap.opIndexAssign is deprecated
11:45:21 ./submodules/dmqproto/src/fakedmq/neo/request/Consume.d(68): Deprecation: alias swarm.neo.request.Command.GlobalStatusCode is deprecated - Use the SupportedStatus instead of GlobalStatusCode

Poor error message for Connection Refused

If a DMQ node is down the error available in the ConnNotification object passed to the connection notifier callback is ConnectProtocol: finalized. I would have hoped for something along the lines of Connection refused to enable debugging of the problem. I'm not sure finalized will mean much to most users.

Prohibit RemoveChannel from deleting a channel being listened to

sociomantic-tsunami/dhtproto#10 discusses problems around ending a DHT Mirror request when the channel being mirrored is deleted. The same argument applies to the DMQ Consume request.

Discussion led to the idea that it should actually be illegal to remove a channel that is being listened to. RemoveChannel is used in two cases:

  1. By accident.
  2. Deliberately, to remove a channel which is no longer in use.

In both cases, the request should definitely not work on a channel that's currently in use.

D2: Automatically convert tags and push them

To move forward with D2 we need to provide automatically converted libraries for projects that are built only for D2.

Every time a new tag is pushed, we have to convert it to D2, tag the converted version with a +d2 build metadata information appended (so tag v2.3.4 --- conversion D2 ---> v2.3.4+d2) and push it back (making sure the new tag is not converted again!).

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.