Git Product home page Git Product logo

bus-queue's Introduction

Lock-free Bounded Non-Blocking Pub-Sub Queue

This is a publish subscribe pattern queue, where the publisher is never blocked by slow subscribers. The side effect is that slow subscribers will miss messages. The intended use-case are high throughput streams where receiving the latest message is prioritized over receiving the entire stream. Market Data Feeds, Live Streams, etc....

The underlying data-structure is a vector of Arc(s) eliminating the use of copies.

Features

  • Lock-Free Write/Read - Lock-Free for Publisher and Lock-Free for Subscribers.
  • Bounded - Constant size of memory used, max is sizeof(MsgObject)*(queue_size + sub_cnt + 1). This is an edge-case where each subscriber is holding a ref to an object while the publisher has published a full length of queue in the mean time.
  • Non-Blocking - The queue never blocks the publisher, slow subscribers miss data proportinal to their speed.
  • Pub-Sub - Every Subscriber that can keep up with the Publisher will recieve all the data the Publisher publishes.
  • channel - a raw Pub/Sub channel implementation without the thread synchronisation and futures logic.
  • bus - an async Pub/Sub queue with futures::sink::Sink and futures::stream::Stream traits.

bus::Publisher, and channel::Sender are used to broadcast data to bus::Subscriber, and channel::Receiver pools. Subscribers are clone-able such that many threads, or futures, can receive data simultaneously. The only limitation is that Subscribers have to keep up with the frequency of the Publisher. If a Subscriber is slow it will drop data.

Disconnection

The broadcast and receive operations on channels will all return a Result indicating whether the operation succeeded or not. An unsuccessful operation is normally indicative of the other half of a channel having "hung up" by being dropped in its corresponding thread.

Once half of a channel has been deallocated, most operations can no longer continue to make progress, so Err will be returned. Many applications will continue to unwrap the results returned from this module, instigating a propagation of failure among threads if one unexpectedly dies.

Examples

Simple raw usage

extern crate bus_queue;
use bus_queue::flavors::arc_swap::bounded;

let (tx, rx) = bounded(10);
(1..15).for_each(|x| tx.broadcast(x).unwrap());

let received: Vec<i32> = rx.map(|x| *x).collect();
// Test that only the last 10 elements are in the received list.
let expected: Vec<i32> = (5..15).collect();

assert_eq!(expected, received);

Simple async usage

use bus_queue::flavors::arc_swap::async_bounded;
use futures::executor::block_on;
use futures::stream;
use futures::StreamExt;

let (publisher, subscriber1) = async_bounded(10);
let subscriber2 = subscriber1.clone();

block_on(async move {
    stream::iter(1..15)
        .map(|i| Ok(i))
        .forward(publisher)
        .await
        .unwrap();
});

let received1: Vec<u32> = block_on(async { subscriber1.map(|x| *x).collect().await });
let received2: Vec<u32> = block_on(async { subscriber2.map(|x| *x).collect().await });
// Test that only the last 10 elements are in the received list.
let expected = (5..15).collect::<Vec<u32>>();
assert_eq!(received1, expected);
assert_eq!(received2, expected);

bus-queue's People

Contributors

alpha-60 avatar dyxushuai avatar filipdulic avatar katyo avatar sdbondi avatar vladan avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

bus-queue's Issues

Add Unbuffered flavor for Channel

Depends on #36
This flavor will use a single arc-swap slot for the transmission of items and should be used only in extreme cases where the highest importance is given to the reception of the latest data. This flavor will be optimized for this use case, but care should be taken in use as frequent data drops are to be expected.

Add SwapSlot Trait

For the implementation of flavors the SwapSlot Trait will be used.
This is a syncing primitive that the Channel will use to swap in and load atomically counted references into and from the buffer;

SwapSlot requires three methods:

  • none - used during buffer initialization for placeholders before any values are broadcast.
  • store - used during Channel::broadcast method to insert atomically counted items into the buffer.
    Stored items have a reference count of one. The store method overwrites previous item in
    the buffer, so the expected result is that the previous item's reference count drops by one,
    thus ensuring that the object that the references refer to get dropped once the reference
    count drops to 0.
  • load - used during Channel::try_recv method to retrieve atomically counted references from the
    buffer. The load method must ensure that the reference count is incremented by one
    atomically.

Rename `async` submodule

I have failed to use this module with rust 2018:

error: expected identifier, found reserved keyword `async`
 --> engine/src/engine.rs:7:16
  |
7 | use bus_queue::async::{channel, Publisher, Subscriber};
  |                ^^^^^ expected identifier, found reserved keyword

Because async now is reserved keyword in Rust, so the async submodule should be renamed.
As a quick solution I suggest just add single underscore at end (i.e. async_).

Example of blocking usage pattern with new API

I previously (0.4.1) used the sync API, where I could just do a let msg = rx.try_recv().unwrap(); and the unwrap would only fail if the other end disconnected.

My use case is that I have a producer thread and several consumer threads that are just waiting for messages.

How is the new API intended to be used? Currently I do something like the following:

loop {
            let msg = match self.rx.try_recv() {
                Ok(msg) => msg,
                Err(channel::TryRecvError::Empty) => continue,
                Err(channel::TryRecvError::Disconnected) => panic!("Sender disconnected"),
            };
}

On the receiver which seems to work, but uses full CPU, so probably I should add a sleep in the Empty branch:

loop {
            let msg = match self.rx.try_recv() {
                Ok(msg) => msg,
                Err(channel::TryRecvError::Empty) => {
                    sleep(Duration::from_millis(1));
                    continue;
                },
                Err(channel::TryRecvError::Disconnected) => panic!("Sender disconnected"),
            };
}

Is this how the API should be used? Or should I use a different crate for my use case?

is_empty on Receiver and Subscriber.

Current is_empty implementation on Receiver and Subscriber only check if wi == 0;

Should be implemented as ri == wi for Receivers and Subscriber. Perhaps add is_receiver_empty on Channel.

data-race when publisher catches up with the subscriber

Publisher operations during broadcast

  1. buffer[wi % size] = item;
  2. wi++;

Subscriber operations during try_recv

  1. if ri == wi => empty
  2. item = buffer[ri % size];
  3. loop
    3.a. if wi > ri + size => ri = wi - size;
    3.b. else => ri++, return item

Test Case

wi = 10, ri = 0, size = 10, item = y, buffer[0] = x.

  1. pub.1. buffer[10 % 10] = item; <==> buffer[0] = y;
  2. sub.1. if 0 == 10 ==> not empty
  3. sub.2. item = buffer[0] <==> item = y;
  4. sub.3.a. if 10 > 0 + 10 => sub.3.b
  5. sub.3.b ri++ => 1, return y.

y is returned even though x should have been, or sub should have moved to next.

Rename structs and modules to make more sense.

Modules

  • bus --> async_bus
  • channel --> bus

Structs

  • Sender --> Publisher
  • Receiver --> Subscriber
  • Publisher --> AsyncPublisher
  • Subscriber --> AsyncSubscriber
  • Channel --> RingBuffer

Functions

raw_bounded --> bounded
bounded --> async_bounded

Remove export features, leave only arcswap pub use export.

Currently you can change the default exports by selecting *-export features. This is unnecessary, cumbersome and does not allow for calling the --all-features cargo/ci flag as exports are exclusive.

Since all flavors are tested in their SwapSlot implementations, there is no need to run the full test suite on each one.

fix up gihub workflows ci to use --all-features instead of separate iterations for different flavors.

Index overflow not handled.

The writer and reader indices are of the type usize which is a 32 or 64 bit unsigned integer depending on the target's address space. The edge case where the writer index has overflown pass the usize::MAX value is not covered by the internal queue logic.
For this to be an issue on 64 bit targets more then 18,446,744,073,709,551,615 items need to have been written to the queue.

It would take more then 500 years for this to happen if a value is written to the queue every 1ns.

Add special version of channel where only one SwapSlot is used.

Implement a special version of Channel that uses a single SwapSlot for it's buffer. This is a special case where the Receivers always want the latest data when an item is dropped.

While there would be a performance increase for this specialized use case, as he try_recv method would be much simpler, there would be a penalty in that more items would be dropped, because the Sender will overwrite the old one on every broadcast call.

This should only be used in systems where even the smallest performance benefits outweigh potential loss of data. For all other use cases a much more stable option would be to use a buffer of size, and set the skip_items Receiver field to size - 1.

Fix up export paths

Some of the exported structs, traits and functions require long export paths making the library and documentation usage cumbersome.

Add RwLock flavor

This flavor will implement Channel where RwLocks will be used instead of ArcSwap. The intention behind this is to provide a non lock-free version that focuses on stability and data-race avoidance. While the lock-free version is implemented to the best of our abilities, the sheer complexity of the lock-free domain requires formal verification where critical systems depend on this library, this will provide an intern solution, using the same interfaces, until such a time that a formal verification of the lock-free version is provided.

Fix Atomic Ordering

Every ordering used is Ordering::Relaxed, which means sending a message from one thread and receiving it from another is a data race. At the very least, Ordering::Acquire and Ordering::Release should be used somewhere. - reported by stjepang

Refactor channel (Sender, Reciever) to use Channel<T, SwapSlot<T>> with flavors

The internal Channel struct is used in crosbeam-channel and async-std. Refactor the code to provide a similar implementation. Also use the flavors logic to enable multiple versions of the inner Channel.
etc...
Major refactor:

  • refactor folder structure to have individual files for Channel, Sender, Reciever, Publisher and Subscriber.
  • Refactor Senders and Reciever to use an internal Channel which provides most of the logic.
  • Integrate Channel with flaovors using the SwapSlot Trait.
  • Copy [piper::Event] into the project until event is exposed on crates.io by it's author.
  • Add Tarapaulin github workflow
  • Increase code coverage.

Datarace where the reader reads the latest value in the queue insted of the oldest one.

Datarace resulting with the reader reading the most recent value added to the queue, instead of the oldest one, as well as a possible double read of the same value.

The example below will demonstrate how the datarace will happen.

The scenario where this happens begins with the writer overtaking the reader index by the size of the queue, in this example the queue size is 3 and the writer has written 3 values (0,1,2) into the queue, and incremented it's writer index to 3.

+--------+-----------+
| size   |     3     |
+--------+---+---+---+
| index  | 0 | 1 | 2 |
+--------+---+---+---+
| values | 0 | 1 | 2 |
+--------+---+---+---+
| wi     | 3 |   |   |
+--------+---+---+---+
| ri     | 0 |   |   |
+--------+---+---+---+

If while the writer inserts the 4th value (3) into the queue, but before it increments it's index (wi), the Reciever's try_recv method is called.

+--------+-----------+
| size   |     3     |
+--------+---+---+---+
| index  | 0 | 1 | 2 |
+--------+---+---+---+
| values | 3 | 1 | 2 |
+--------+---+---+---+
| wi     | 3 |   |   |
+--------+---+---+---+
| ri     | 0 |   |   |
+--------+---+---+---+

All of the try_recv conditional statements will pass, and the value 3 would be returned. This will result in the reader reading a 3, while expecting 0. If the reader continues to read it will eventually read the 3 value again.

Add property tests for the bare bus implementation

The bare example can be transformed into a property test where an arbitrary vector will be passed on input, along with the length of the buffer. The result should be a function that validates if the consumed vector is equal to the last N elements of the input vector, where N is the length of the bus.

Add AtomicArc flavor

AtomicArc is an alternative to the ArcSwap library the bus-queue uses. Provide an alternative that can be tested and bench-marked.

AtomicArc resides in the atomic repo, but is currently unpublished on crates.io, copy the content of the files, with appropriate considerations to the author.

Optional Arc nesting

I've been playing around with the library and it's really nice.

The motivation for this issue is that primary structs that I'm passing through the bus are Bytes (https://docs.rs/bytes/0.4.12/bytes/struct.Bytes.html) and tuples of Enums and bytes. Bytes already have the main body of memory enclosed in an Arc.

This is a little uncomfortable because I essentially have a Arc<Arc<..>> for no real reason now.

I tried to switch out Send for Clone but came across the fact that the buffer is updated by use of ArcSwap. Replacing ArcSwap for a RwLock hurts performance a bit (predictably).

The other alternative here for me would be to swap the Arc for ArcSwap in the bytes library, but this is a bit of a momentous task.

Is there any advice for some sort of work around here? Thanks

Implement a version of GenericSubscriber and GenericAsyncSybscriber that returns a vector of data instead of a single item.

The current implementation of the Trait Stream for Subscriber returns only one item from the buffer. Make an alternative version, perhaps called VectoredSubscriber, whose stream would always return a vector of items filled will all the items from the channel until the try_recv method returns TryRecvError::Empty.

This would optimize Task generation, and Task wake-ups in async operations.

Implement Adapter pattern for `Slot` flavors

Currently we have ambiguity when calling a SwapSlot implementation, e.g. ArcSwapOption already has load and store methods.

Implement a wrapper struct for each flavor, something in terms of:

pub struct Slot<T>(ArcSwapOption<T>);
...
 
let item = Slot::<i32>::none();
item.store(1);
let arc = item.load();

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.