Git Product home page Git Product logo

bus's Introduction

bus

Crates.io Documentation Codecov

Bus provides a lock-free, bounded, single-producer, multi-consumer, broadcast channel.

NOTE: bus sometimes busy-waits in the current implementation, which may cause increased CPU usage โ€” see #23.

It uses a circular buffer and atomic instructions to implement a lock-free single-producer, multi-consumer channel. The interface is similar to that of the std::sync::mpsc channels, except that multiple consumers (readers of the channel) can be produced, whereas only a single sender can exist. Furthermore, in contrast to most multi-consumer FIFO queues, bus is broadcast; every send goes to every consumer.

I haven't seen this particular implementation in literature (some extra bookkeeping is necessary to allow multiple consumers), but a lot of related reading can be found in Ross Bencina's blog post "Some notes on lock-free and wait-free algorithms".

See the documentation for usage examples.

License

Licensed under either of

at your option.

Contribution

Unless you explicitly state otherwise, any contribution intentionally submitted for inclusion in the work by you, as defined in the Apache-2.0 license, shall be dual licensed as above, without any additional terms or conditions.

bus's People

Contributors

carado avatar chapeupreto avatar dependabot[bot] avatar jmchacon avatar jonhoo avatar ralfjung avatar remexre avatar serprex avatar simenb avatar taiki-e avatar tudyx avatar utkarshgupta137 avatar wasabi375 avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

bus's Issues

Separate add_rx from Bus

This might be related to or a duplicate of #19 , but it's not clear based on the wording of the original issue.

The problem with add_rx being tied to Bus occurs in my use case where I want to:

  1. Send the Bus to a separate "dispatch" thread.
  2. Dynamically add/remove receivers over time on the main thread.

Once the Bus has been moved to the dispatch thread, it can't be used on the main thread to create more receivers, thus fixing the number that I can create. It is technically possible to send messages to create more and send them back via channels, but I have another idea I'd like to pursue to see if it's any better.

What I propose is creating a secondary interface (let's call it ReadHandle for now), which could be created by a method on Bus and would also implement add_rx.

Internally, it would only involve taking clones of Bus.state and the senders Bus.leaving.0, Bus.waiting.0 when constructed, so it would be mostly zero-cost. The one major change would be making Bus.readers atomic and moving it to BusInner, but we can at least try it and see how it affects benchmarks.

No support for detecting a closed bus

When a bus is dropped, all receivers should be made aware of this once they have read all current broadcasts. The behavior should be similar to that of regular Rust channels: try_recv should return with Err(mpsc::TryRecvError::Disconnected), and recv should unblock and return Err(mpsc::RecvError::Disconnected). Adding this feature would also mean we could make receivers iterable, like Rust channels, which would be nice.

This shouldn't be too tricky to implement. One fairly straightforward way of doing it would be to have drop do another write to the bus with a "done" value, and have readers return as described above when they see this value. This could be done by having broadcast_inner accept the value as an Option, and write all values as Options. A None then naturally means that the bus is closed, and that no further broadcasts will be sent. The biggest downsides of this are: a) we have to have Options everywhere, even though they are pretty much always Some, and b) drop may block.

A different approach would be to have a special marker on BusInner that readers check when they encounter an empty bus. Closing the bus (i.e., dropping it) sets it to true, indicating to readers that no more broadcasts will be made. Furthermore, there has to be some mechanism for the readers to check that they didn't miss a broadcast between when the saw that the bus was empty and they observed the closed flag. This might be possible to do by having the closed flag be an index instead of a boolean, but I haven't thought it through completely. The biggest drawback of this is that it adds some overhead to reads, because they now also have to do an atomic read when the bus is empty. It does have the advantage of now making drop blocking though.

Feedback and PRs welcome.

Implement "rendez-vous" mode

(This is related to #7, however, that treats a buffer size of 0 as inappropriate value, here it's to discuss its usefulness).

The stdlib mpsc module has a sync_channel(bound: usize)](https://doc.rust-lang.org/std/sync/mpsc/fn.sync_channel.html) method.

The case where bound == 0 is actually made explicit:

Note that a buffer size of 0 is valid, in which case this becomes "rendezvous channel" where each send will not return until a recv is paired with it.

I think that this should not be discarded as "not useful". The reason is that, with a buffer size of 0, this library can be used effectively (for example) to sync threads. By waiting for all receivers to read, a sender can ensure that all the receivers are in the expected state. mpsc can do that, but it's significantly slower.

How do I use a bus with two threads?

I'm new with Rust. This could be could more about using Rust than bus.

Is it possible with Arc? or would bus need to implement the 'sync' trait? (I don't really know about that trait).

Note that I don't want to do bus.add_rx() on the line after Bus::new(10) since in my case it would be done at runtime (when a network connection is opened).

extern crate bus;
use std::{thread, time};
use bus::Bus;

fn main() {
    let mut bus = Bus::new(10);

    thread::spawn(move || {
        let mut i = 0;
        loop {
            bus.broadcast(format!("{}", i));
            i += 1;
            thread::sleep(time::Duration::from_secs(1));
        }        
    });

    loop {
        let mut rx1 = bus.add_rx();
        let r = rx1.recv();
        println!("<- {}", r.unwrap());
    }
}

Memory leak when idle

The following program will slowly leak memory until it exhausts my system's resources.

extern crate bus;
use bus::Bus;

use std::thread;
use std::time::Duration;

fn main() {
    let mut b: Bus<usize> = Bus::new(10);

    let mut rx1 = b.add_rx();
    let mut rx2 = b.add_rx();
    let mut rx3 = b.add_rx();
    let mut rx4 = b.add_rx();
    let mut rx5 = b.add_rx();
    let mut rx6 = b.add_rx();

    thread::spawn(move || for _ in rx1.iter() {});
    thread::spawn(move || for _ in rx2.iter() {});
    thread::spawn(move || for _ in rx3.iter() {});
    thread::spawn(move || for _ in rx4.iter() {});
    thread::spawn(move || for _ in rx5.iter() {});
    thread::spawn(move || for _ in rx6.iter() {});
    
    // UNCOMMENT FOLLOWING LINES TO SEE HOW USING
    // THE CHANNEL FREQUENTLY CAN MITIGATE THE PROBLEM
    
    //thread::sleep(Duration::from_millis(30_000));
    //b.broadcast(0);
    loop {
        thread::sleep(Duration::from_millis(100));
        //b.broadcast(1);
    }
}

There seems to be a leak on the receiving end (rx) that occurs when the bus is idle. Frequent usage of the bus helps mitigate the issue, but, in my tests, did not resolve it completely. On my system, the example above is allocating memory at a rate of about 2 MB/s.

System is running Ubuntu, kernel 4.8.0, x86_64. I was also able to reproduce the problem on my dev environment (macOS 10.12.6).

Receivers don't always get the broadcasted item

Took me all day but I finally got a simple test case. If you cargo run this a few times it will eventually hang (on my machine anyway).

extern crate bus;

use std::thread;

use bus::Bus;

fn main() {
    println!("Starting");

    let strings = vec!["hi", "bye", "one", "two"];
    let mut bus = Bus::new(2);

    {
        let recv = bus.add_rx();

        thread::spawn(move || {
            println!("Looping 1");
            for rec in recv {
                println!("Got 1 :: {}", rec);
            }
        });
    };

    {
        let recv = bus.add_rx();

        thread::spawn(move || {
            println!("Looping 2");
            for rec in recv {
                println!("Got a 2 :: {}", rec);
            }
        });
    };

    for f in strings {
        println!("bcast :: {}", f);
        bus.broadcast(0);
    }
    println!("Done");
}

Example output

Finishes:

    Finished dev [unoptimized + debuginfo] target(s) in 0.08s
     Running `target/debug/fse_dump`
Starting
Looping 1
bcast :: hi
bcast :: bye
Looping 2
Got 1 :: 0
bcast :: one
Got 1 :: 0
Got 1 :: 0
Got a 2 :: 0
Got a 2 :: 0
Got a 2 :: 0
bcast :: two
Got 1 :: 0
Got a 2 :: 0
Done

Stuck:

    Finished dev [unoptimized + debuginfo] target(s) in 0.06s
     Running `target/debug/fse_dump`
Starting
Looping 1
bcast :: hi
Looping 2
Got a 2 :: 0
bcast :: bye
bcast :: one
Got 1 :: 0
Got 1 :: 0
Got 1 :: 0
bcast :: two

Machine info:

$ uname -a
Darwin 34363bc7dc9c 18.2.0 Darwin Kernel Version 18.2.0: Thu Dec 20 20:46:53 PST 2018; root:xnu-4903.241.1~1/RELEASE_X86_64 x86_64 i386 MacBookPro11,3 Darwin

$ rustc --version
rustc 1.33.0 (2aa4c46cf 2019-02-28)

Non-blocking destructive bus

Hi!

I have a need for a non-blocking sender. When the bus is full, I'd like to overwrite oldest non-read elements (so readers might skip items if they're too slow) rather than blocking. Bus::try_broadcast is not working for me since I still need to send the new message when the bus is full.

Is it something this library might add?

Problem with rand-core

The Crate doesn't build anymore, the error:
the trait rand_core::SeedableRng is not implemented for rand_hc::Hc128Rng
the trait rand_core::SeedableRng is not implemented for rand_xorshift::XorShiftRng

It was fine before and no change has been made since

Test it_iterates panics if ran in a loop

I'm investigating a (possibly related?) issue where none of the receivers get any broadcasted items, unfortunately it only happens every so often and has been very frustrating to try and track down. While trying to reproduce in the tests I changed it_iterates() to this:

#[test]
fn it_iterates() {
    for _ in 0..10_000 {
        use std::thread;

        let mut tx = bus::Bus::new(2);
        let mut rx = tx.add_rx();
        let j = thread::spawn(move || {
            for i in 0..1000 {
                tx.broadcast(i);
            }
        });

        let mut ii = 0;
        for i in rx.iter() {
            assert_eq!(i, ii);
            ii += 1;
        }

        j.join().unwrap();
        assert_eq!(ii, 1000);
        assert_eq!(rx.try_recv(), Err(mpsc::TryRecvError::Disconnected));
    }
}

and it will panic most times (not every time) with this:

running 1 test
test it_iterates ... FAILED

failures:

---- it_iterates stdout ----
thread 'it_iterates' panicked at 'not yet implemented', src/lib.rs:642:21
note: Some details are omitted, run with `RUST_BACKTRACE=full` for a verbose backtrace.
stack backtrace:
   0: std::sys::unix::backtrace::tracing::imp::unwind_backtrace
   1: std::sys_common::backtrace::_print
   2: std::panicking::default_hook::{{closure}}
   3: std::panicking::default_hook
   4: std::panicking::rust_panic_with_hook
   5: std::panicking::begin_panic
   6: <bus::BusReader<T>>::recv_inner
   7: core::ops::function::FnOnce::call_once
   8: <F as alloc::boxed::FnBox<A>>::call_box
   9: __rust_maybe_catch_panic
  10: test::run_test::run_test_inner::{{closure}}


failures:
    it_iterates

This happens on a mac running mojave if that matters. I still can't reliable reproduce the issue where the receivers never get any broadcast but if I can I'll open a separate issue. Just figured I should mention it in case it's relevant to this.

Using a 0 sized buffer causes broadcasts to never be received

When a bus is created with a buffer of 0, messages are successfully broadcast, but are never received. I'm not sure if you care about such a use case (I want to receive only messages being sent now, not messages that may have been sitting in the queue waiting for a reader), but if not it woudl be good to get a message/panic when creating such a bus. Code:

extern crate bus;

use bus::Bus;
use std::thread;
use std::time::Duration;
fn main() {
    let mut bus = Bus::new(0);
    let mut rx = bus.add_rx();
    let t = thread::spawn(move || {
        rx.recv().unwrap();
    });
    thread::sleep(Duration::from_millis(500));
    println!("Sending");
    bus.broadcast(());
    println!("Sent, waiting on receive");
    t.join().unwrap();
    println!("Received and joined");
}

In this minimal example, the sent message is always printed, but the final received/joined message is not. Increasing the buffer to 1 makes it complete successfully.

Timeout used by park_timeout seems way too short, causes heavy CPU use

I have a small test program (implementing a component of a real application) using Bus that was idling at about 100-110% CPU utilization (1+ cores of 8), which is excessive.

When I changed the timeouts used by the two calls to park_timeout to be 1ms (Duration::from_millis(1)) rather than 1us currently in the official release, the CPU utilization under a pretty realistic but on the heavy side real-world load (not just idle, and including some computation) fell to about 3.5%.

I think this 3.5% CPU utilization is pretty impressive. Nice library you've got there :-)

BusReader stops receiving if another BusReader is dropped

I have the following scenario: I have a Bus and two BusReader's, The readers are sent to two different threads and they only recv a number of times each, the first recvs 5 times and the second recvs 10 times. The main thread then broadcasts with a 500ms interval between each broadcast. After the 5th broadcast, the first reader is dropped and its thread stops, the second one receives the 6th and 7th broadcast but then also stops, no error or panics, its like recv never returns.

I might be misunderstanding how this crate is suposed to work, I'm not very experienced with multithreading. This is the code that fails:

#![allow(deprecated)]
extern crate bus;

use std::thread;

fn main() {
    let mut bus = bus::Bus::new(1);

    let mut rx1 = bus.add_rx();
    let t1 = thread::spawn(move ||{
        for i in 0..5 {
            match rx1.recv() {
                Ok(v) => println!("T1: {} - {}", i, v),
                Err(e) => {
                    println!("{}", e);
                    break;
                }
            }
        }
        drop(rx1);
    });

    let mut rx2 = bus.add_rx();
    let t2 = thread::spawn(move ||{
        for i in 0..10 {
            match rx2.recv() {
                Ok(v) => println!("T2: {} - {}", i, v),
                Err(e) => {
                    println!("{}", e);
                    break;
                }
            }
        }
        drop(rx2);
    });

    std::thread::sleep_ms(500);

    for i in 0..25 {
        std::thread::sleep_ms(500);
        match bus.try_broadcast(i) {
            Ok(_) => println!("Broadcast successful"),
            Err(e) => println!("Broadcast failed {}", e)
        }
    }

    t1.join().unwrap();
    t2.join().unwrap();
}

Some observations:

  • Increasing the Bus size only makes it take longer for rx2 to stop recving. If the size is 1, it stops at the 7th broadcast, if it is 2, at the 9th, if its 3 at the 11th and so on.
  • I don't mind losing some of the broadcasts due to the Bus being full, as long as rx2 eventually receives the message.

This is the result of running the code on my machine:

T1: 0 - 0
T2: 0 - 0
Broadcast successful
T1: 1 - 1
T2: 1 - 1
Broadcast successful
T1: 2 - 2
T2: 2 - 2
Broadcast successful
T1: 3 - 3
T2: 3 - 3
Broadcast successful
T1: 4 - 4
T2: 4 - 4
Broadcast successful
T2: 5 - 5
Broadcast successful
T2: 6 - 6
Broadcast successful
Broadcast failed 7
Broadcast failed 8
Broadcast failed 9
Broadcast failed 10
[...]
Broadcast failed 24

Leaky version?

I have a use case that would benefit from a leaky version of the Bus, such that when number of messages reaches upper limit older messages are removed. I basically want to send messages from one place that may at some point be read at another place, but only last N really matter. Lock-free implementation would be really useful here.

Blocking Read

Great library!!!! Simple yet effecient! I am just curious for the sake of completion why not implement the blocking read the same way you demonstrate it in your example?

I know it may not be ideal but atleast it will be feature complete and you can optimize in the future. Plus it will make the example more concise and easier to follow for new users.

Just a suggestion =)

Async review

Hi, since I need this crate for a project I'm working on, I decided to give implementing futures Stream for a BusReceiver (Bus sending isn't async in this patch), however I am curious as to whether this implementation is correct, since I'm not too experienced with lock-free code

lib.rs

Basically, if a value is not ready, it sends the current Waker (kind of equivalent of thread handle) to the unparking channel, which will wake it up when it's ready.

async feature's Stream implementation violates contract

Note that BusReader's futures::Stream implementation does not satisfy the Stream contract.

If NotReady is returned then this stream's next value is not ready yet and implementations will ensure that the current task will be notified when the next value may be ready. - futures::Stream.poll

Currently NotReady is returned without notifying the current task. This causes receivers to hang forever even after new messages are broadcasted.

Perhaps this implementation should be removed until it is fixed/replaced or a warning should be put in the README.

Thanks for your work on this library.

recv() cpu abuse

Hello. I'm new with Rust. 20 recv() uses 40% cpu on my linux box. Is this a known issue?

extern crate bus;
use bus::Bus;
use std::sync::{Arc, Mutex};
use std::thread;
use std::io;


fn main() {
    let bus: std::sync::Arc<std::sync::Mutex<bus::Bus<usize>>>
        = Arc::new(Mutex::new(Bus::new(10)));

    for _ in 0..20 {
        let mut rxb = bus.clone().lock().unwrap().add_rx();
        thread::spawn(move || loop {
            let msg = rxb.recv().unwrap();
        });
    }

    io::stdin().read_line(&mut String::new()).unwrap();
}

total

11909 root      20   0 1029148   1980   1700 S  39.2   0.1   1:01.07 bus_possible_bu

per thread

11926 root      20   0 1029148   1980   1700 R   2.7   0.1   0:00.19 bus_possible_bu
11927 root      20   0 1029148   1980   1700 R   2.7   0.1   0:00.19 bus_possible_bu
11931 root      20   0 1029148   1980   1700 R   2.7   0.1   0:00.19 bus_possible_bu

cpu

model name      : Intel(R) Core(TM) i3 CPU         540  @ 3.07GHz

Option for a resizing bus

I'm using bus to broadcast events from a business logic model, which later get consumed by views. This happens in a synchronous application loop. Currently if the bus' buffer is exceeded this will result in either a crash or a hang depending on if I use try_broadcast().unwrap() or broadcast(). If possible I would rather have an option for performance and memory to degrade instead, avoiding a crash.

Can I cancel a recv after a timeout?

Hi. Thanks for the polished library!

I have threads dynamically creating BusReaders and recving on them, but I need to be able to stop these threads after a timeout. Is there a way to prevent bus from waiting forever if nothing is ever broadcast?

Should BusReader require `T: Sync`?

I could be misunderstanding something, but it looks like multiple BusReaders could be attempting to receive a message and cloning the same element at the same time from different threads. Therefore, we must require T: Sync, but the bound is not present in the API.

Here's an example that demonstrates undefined behavior due to the lack of T: Sync bound:

extern crate bus;

use std::cell::Cell;
use std::thread;

use bus::Bus;

struct Foo(Cell<i32>);

impl Clone for Foo {
    fn clone(&self) -> Foo {
        println!("Cloning");
        thread::sleep_ms(1000);
        println!("Cloned");
        Foo(self.0.clone())
    }
}

fn main() {
    let mut bus = Bus::new(100);

    let mut rx1 = bus.add_rx();
    let mut rx2 = bus.add_rx();
    bus.broadcast(Foo(Cell::new(0)));

    let t1 = thread::spawn(move || {
        rx1.recv().unwrap();
    });

    let t2 = thread::spawn(move || {
        rx2.recv().unwrap();
    });

    t1.join().unwrap();
    t2.join().unwrap();
}

Output:

Cloning
Cloning
Cloned
Cloned

Looks like Foo is concurrently accessed from two threads even though it contains a Cell<u32>.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.