crossbeam-rs / crossbeam Goto Github PK
View Code? Open in Web Editor NEWTools for concurrent programming in Rust
License: Apache License 2.0
Tools for concurrent programming in Rust
License: Apache License 2.0
The online documentation linked from the main repo page seem to be out of data as it's missing relatively new features such as ArcCell
.
FWIW, servo seem to store up to date crossbeam
documentation.
To my surprise, the following code doesn't compile
extern crate crossbeam;
fn main() {
crossbeam::scope(|scope| {
scope.spawn(move || {
crossbeam::scope(|scope| {});
});
});
}
It gives the following error:
main.rs:5:15: 7:11 error: the trait `core::marker::Sync` is not implemented for the type `core::cell::UnsafeCell<core::option::Option<crossbeam::scoped::DtorChain<'_>>>` [E0277]
main.rs:5 scope.spawn(move || {
main.rs:6 crossbeam::scope(|scope| {});
main.rs:7 });
main.rs:5:15: 7:11 help: run `rustc --explain E0277` to see a detailed explanation
main.rs:5:15: 7:11 note: `core::cell::UnsafeCell<core::option::Option<crossbeam::scoped::DtorChain<'_>>>` cannot be shared between threads safely
main.rs:5:15: 7:11 note: required because it appears within the type `core::cell::RefCell<core::option::Option<crossbeam::scoped::DtorChain<'_>>>`
main.rs:5:15: 7:11 note: required because it appears within the type `crossbeam::scoped::Scope<'_>`
main.rs:5:15: 7:11 note: required because it appears within the type `[[email protected]:5:21: 7:10 scope:&crossbeam::scoped::Scope<'_>]`
main.rs:5:15: 7:11 error: the trait `core::marker::Sync` is not implemented for the type `core::cell::UnsafeCell<usize>` [E0277]
main.rs:5 scope.spawn(move || {
main.rs:6 crossbeam::scope(|scope| {});
main.rs:7 });
main.rs:5:15: 7:11 help: run `rustc --explain E0277` to see a detailed explanation
main.rs:5:15: 7:11 note: `core::cell::UnsafeCell<usize>` cannot be shared between threads safely
main.rs:5:15: 7:11 note: required because it appears within the type `core::cell::Cell<usize>`
main.rs:5:15: 7:11 note: required because it appears within the type `core::cell::RefCell<core::option::Option<crossbeam::scoped::DtorChain<'_>>>`
main.rs:5:15: 7:11 note: required because it appears within the type `crossbeam::scoped::Scope<'_>`
main.rs:5:15: 7:11 note: required because it appears within the type `[[email protected]:5:21: 7:10 scope:&crossbeam::scoped::Scope<'_>]`
error: aborting due to 2 previous errors
Should this code be valid or am I missing something?
Collections like queues work correctly under multiple memory orderings -- both Acquire
/Release
and SeqCst
. Right now, the queues provide acquire/release semantics, but some applications may need stronger guarantees for e.g. relating multiple queues.
We should provide some way of selecting the level of guarantee.
Maybe with an API like
let pool = Pool::new(8); // 8 threads in the pool.
pool.scoped(|scope| {
scope.spawn(...);
});
I'm trying to write some concurrent data structures for a real-time computing project (specifically, a game library). I'm looking at using Atomic for it, but I can't figure out whether the garbage collection is tolerable. (I took a couple of dives into the code, but I didn't figure out whether a thread could ever have to do a lot of garbage collection at once).
What guarantees does crossbeam have for this? Obviously, dropping an Atomic can't be guaranteed to be a worst-case operation because the contained type could have a costly Drop implementation itself. But it could give guarantees conditional on the Drop implementations of the contained types. Does it? If not, could be made to do so?
For AtomicOption
in particular.
As the title says, this function seems to work fine on linux but panics on windows.
Project code for reference:
https://github.com/carbidegames/sapphire-hail/blob/745f4d9725dd519bd473a45676c205e575fe73eb/clockwork/src/clockwork.rs#L70
It can be useful to have channels that can block when they're empty (or full, if they're size limited).
steal
returns an enum which signifies whether the steal operation was successful, aborted, or the queue was empty. try_pop
as it stands only covers the success and failure cases.
Both @msullivan and I have gotten assertion errors in the stress-msq.rs test - for me, it takes a while, so this isn't an easy condition to hit, but this shouldn't ever happen. The assertion that we have both seen fail is the new > cur == true assertion.
Add a simple fn or option to name a thread (Builder does this in base lib).
It seems to me that since it uses Arc
underneath, Sized
is not required and was added implicitly.
I just tried grabbing crossbeam
(I wanted to try to replicate the experimental results from https://aturon.github.io/blog/2015/08/27/epoch/#benchmarks as preparation for an upcoming presentation) and I noticed that out of the box, cargo run --release --bin bench
just prints a sequence of key value pairs where every value is 0
.
From a quick skim of the code, it appears I was supposed to do cargo run --features nightly --release --bin bench
; I would recommend that the bench
program be changed if possible to fail in the scenario where the nightly
feature is missing, with a message to the user suggesting that they include that feature in the cargo
invocation.
In the following example, the node containing 1
is never freed:
extern crate crossbeam;
fn main() {
let stack = crossbeam::sync::TreiberStack::new();
stack.push(1);
}
The simplest solution would be to add a Drop
impl to each of these types that calls try_pop
in a loop, but atomic operations aren't strictly necessary, as we're guaranteed at the time drop
is called that only one thread is accessing the structure.
I can't see how the signal access is safe:
thread::park permits spurious wake ups, so it can't be used to guard memory.
Producer: https://github.com/aturon/crossbeam/blob/master/src/sync/ms_queue.rs#L176-L177
Consumer: https://github.com/aturon/crossbeam/blob/master/src/sync/ms_queue.rs#L298-L301
I just don't see how that is safe. I think it happens to work most of the time because x86 basically provides implicit acq / rel semantics (though the compiler could reorder). Also, the current implementation of thread::park() / unpark() happens to be strong today and there will rarely be a random thread that issues an unpark (though it is possible).
I'm looking over the MsQueue code to practice reasoning about memory barriers, and I see a couple things that appear odd:
Acquire
only does something when paired with Release
and conversely, but MsQueue only uses Release
and Relaxed
on the next
pointers and Acquire
and Relaxed
on the head
and tail
pointers. So I think the implementation is allowed to replace all of the orderings with Relaxed
. ?Acquire
can synchronize with Relaxed
, pop()
is still doing a Relaxed
fetch of head.next
and then reading from the next record's data
without additional memory barriers, which could get uninitialized memory on Alpha ? (It looks like this is the kind of thing memory_order_consume
is for, as this is technically stronger than Relaxed
but still requires no barriers on common hardware. Wonder when LLVM/Rust will get it.)CachePadded
seems to massively overestimate the size of a cache line on 64-bit, it's also not clear why the size was chosen.
It seems that the previous estimate of 128-bytes was chosen to avoid interference between adjacent cache lines, but now is assuming a ridiculous 256 bytes due to it being 32 * sizeof(usize)
in the actual structure. Furthermore the reason for picking the sizes isn't stated anywhere, which makes understanding this code frustrating given that almost all CPUs have 64-byte cache lines.
Also the Padding
struct only aligns to 32-bytes, (4 x 8 (u64) being 32), not the standard 64, which is also confusing.
Hey there,
quick question here: I'm researching what crossbeam has to offer. On TreiberStack I'm finding that Wikipedia says that there is an issue with correctness here, quoting Link:
The Treiber stack is a classical example of an incorrect solution. It suffers from the ABA problem. When a process is about to remove an element from the stack (just before the compare and set in the pop routine below) another process can change the stack such that the head is the same, but the second element is different. The compare and swap will set the head of the stack to the old second element in the stack mixing up the complete data structure.
Could someone shed some light on this (potential) issue? Is the Wikipedia entry simply incomplete for not mentioning ways in which actual implementations of the algorithm successfully work around this and thus do in fact produce correct results?
I did find a bit more info on this in Aaron's blog here: https://aturon.github.io/blog/2015/08/27/epoch/#lock-free-data-structures
To me the bit about not freeing Node
s (immediately) seems to coincide with what WP says about the correctness issue (especially on the page about the ABA problem), and as such I might reason that maybe this is the missing piece and why the algorithm is correct in this implementation after all. But I'd be guessing at this point and would have to actually read the source code and do the critical thinking before forming an actual understanding of what's going on.
Lazy (or otherwise-busy) as I am I thought I'd ask about this, seeing as the same question might pop up for other somewhat-diligent crossbeam users and so having a piece of public info on this would be great :)
Cheers & thanks!
ken@ken-XPS-13-9360:~/kdtree$ cargo run --release 10000
Compiling scoped_threadpool v0.1.7
Compiling glob v0.2.11
Compiling lzw v0.10.0
Compiling crossbeam v0.1.6
error[E0512]: transmute called with differently sized types: [usize; 32] (2048 bits) to [std::sync::atomic::AtomicBool; 32] (256 bits)
--> /home/ken/.cargo/registry/src/github.com-1ecc6299db9ec823/crossbeam-0.1.6/src/sync/seg_queue.rs:34:29
|
34 | ready: unsafe { mem::transmute([0usize; SEG_SIZE]) },
| ^^^^^^^^^^^^^^ transmuting between 2048 bits and 256 bits
error: aborting due to previous error
Build failed, waiting for other jobs to finish...
error: Could not compile `crossbeam`.
Debug
not being implemented means that types that use crossbeam internally can't derive Debug.
Hi,
I know i'm most likely shooting myself into the foot. But I wanted to let you know. It's most likely that the created string inside the spawn goes out of scope or something. But this isn't caught by the compiler. I'm not sure wether it's the library or the compiler.
I guess there isn't a way out of this without using boxes.
Thanks
extern crate crossbeam;
use crossbeam::sync::SegQueue;
use std::thread;
use std::time::Duration;
struct Task {
text: String,
}
impl Task {
pub fn new(text: String) -> Task {
Task { text: text }
}
}
fn main() {
let seg_queue : SegQueue<Task> = SegQueue::new();
seg_queue.push(Task::new("banana".to_string()));
crossbeam::scope(|scope| {
for i in 0..10 {
let qr = &seg_queue;
scope.spawn(move || {
println!("Working 1");
&qr.push(Task::new("Thread {}".to_string()));
println!("Working 2");
});
}
});
loop {
thread::sleep(Duration::new(0, 3));
match seg_queue.try_pop() {
Some(a) => println!("{}", a.text),
None => break,
}
}
println!("Exiting the program.");
}
Would love to use this in stable
I am playing around with an experimental new channel design (https://github.com/google/fchan-go), and have a draft implementation in rust (see rust_impl branch). I'm looking to evaluate its performance against the crossbeam blocking MsQueue
data-structure.
Running the benchmark program (https://github.com/google/fchan-go/blob/rust_impl/fchan-rs/src/bin/bench.rs) causes the MsQueue
to segfault. I get this output:
fchan-n2 6820574.330741031 ops/s
fchan-n4 10748308.94253941 ops/s
fchan-n8 15876137.253150392 ops/s
fchan-n12 17872916.32695152 ops/s
fchan-n16 17865720.769311864 ops/s
fchan-n24 12006629.789970772 ops/s
fchan-n32 13828055.450307708 ops/s
MsQueue-n2 3623537.4289297326 ops/s
MsQueue-n4 7983964.701022765 ops/s
MsQueue-n8 9037691.379822284 ops/s
MsQueue-n12 8658681.51229544 ops/s
MsQueue-n16 8171181.839549608 ops/s
Segmentation fault (core dumped)
The benchmark doesn't include any unsafe code, so I think this means there is a bug somewhere. I get this error if I run the MsQueue
benchmarks before fchan
as well.
This is on a 16-core/32-thread dual-socket x86 machine. I ran this on a windows machine within the WSL linux userland (https://msdn.microsoft.com/en-us/commandline/wsl/about). For what it's worth I am on a developer branch of this distribution, so there is a possibility of bugs there too.
Let me know if I can provide any more info on the issue.
I think a Relaxed
load from self.tail
into tail
in ms_queue.rs
should be Acquire
: https://github.com/aturon/crossbeam/blob/master/src/sync/ms_queue.rs#L280
Because later the loaded value (effectively tail.next
) is used as a pointer (https://github.com/aturon/crossbeam/blob/master/src/sync/ms_queue.rs#L296 https://github.com/aturon/crossbeam/blob/master/src/sync/ms_queue.rs#L87), and with only the Relaxed
ordering, it is not guaranteed that a load from tail.next
sees an initialized value.
I noticed that the Treiber stack pop method is returning an Option. For consistency reasons it would be preferable for that to be named try_pop
.
The downside here is that it would be a breaking change, but since it's still a WIP, it seems like a reasonable change (likely with some kind of temporary deprecation warning).
The ready
field of seg_queue::Segment
is horribly wasteful:
ready: [AtomicBool; SEG_SIZE],
AtomicBool
uses usize
internally, so on 64-bit platforms and SEG_SIZE = 32
, that's 256 bytes, 2048 bits, for only 32 bits' worth of information. In each segment.
I think we can do much better here, using AtomicUsize
as the storage for an atomically accessed bit-set. Since we only need 32 bits of information (true or false for each cell), we can get away with a single AtomicUsize
, a whopping 248 bytes' reduction in size per segment for little additional cost elsewhere. For very long queues, this could mean a significant savings in memory usage, which also means more cache-friendliness.
There might very well be some nuances I'm overlooking, but it seems as though the semantics should remain almost exactly the same. The one thing I can see is increased contention across threads, but I don't know if that'd be an issue or not. Clearly some benchmarks are needed here.
We need to figure out a safe and useful interface for concurrent maps that all implementations in crossbeam would then provide. This is my proposal of how it could look.
First, the basic functions mirroring the ones in the standard HashMap
:
fn insert(&self, k: K, v: V) -> Option<V>;
fn contains_key<Q: ?Sized>(&self, k: &Q) -> bool where K: Borrow<Q>, Q: Hash + Eq;
fn remove<Q: ?Sized>(&self, k: &Q) -> Option<V> where K: Borrow<Q>, Q: Hash + Eq;
fn clear(&self);
fn len(&self) -> usize;
fn is_empty(&self) -> bool;
All methods obviously take an immutable &self
.
Notice that there is no element access. The standard get
returns a borrow Option<&V>
and we cannot ever safely provide that since other threads may remove it in the meantime. Now, it is possible to provide a function like:
fn borrow<Q: ?Sized>(&self, k: &Q) -> Option<Guard<V>> where K: Borrow<Q>, Q: Hash + Eq;
where Guard<V>
would provide immutable borrows of the value and maybe a possibility to mutate it (a somewhat similar idea to Entry
). I can think of two ways to do this:
The first would be to make Guard
pin the epoch on creation and unpin it on destruction. However, since leaking destructors is safe, leaking a single Guard
would disallow recollection of any memory from the pinning thread since the pin would stay on forever. (The same is true of mem::epoch::Guard
, but we expect to only use it inside algorithms which are highly scrutinized due to the unsafe unlinked
calls.) This is therefore most likely a terrible idea.
The second way would be to use reference counting (probably Arc
) internally. This is pointless, since instead we can provide the following interface:
fn find<Q: ?Sized>(&self, k: &Q) -> Option<V> where K: Borrow<Q>, Q: Hash + Eq, V: Clone;
I used a different function name to signify the difference from single-threaded maps.
The user can then just set V
to Arc<T>
and have shared access to T
. Hence, returning Guard
s only makes sense if it can be done without pinning the epoch or reference counting and in a faster way.
One thing that's neat and easily doable here is (@DemiMarie's idea) a closure that operates on the borrow:
fn with<Q: ?Sized, F>(&self, k: &Q, f: F)
where K: Borrow<Q>,
Q: Hash + Eq,
F: FnOnce(Option<&V>);
...
map.with("key", |opt| println!("is key present? {}", opt.is_some()); );
Internally this would pin the epoch, avoiding the problem I outlined above by not allowing arbitrary epoch guard leakage.
Then there are the various iterator APIs: Values
, Keys,
Iter
, Drain
. Providing them would give the concurrent map great expressive power. However, there is the small problem of everything being able to change concurrently, invalidating iterators. I think the smartest move here is to gather information on what would be actually useful and viable in this context instead of mirroring the standard library.
Finally, there is the important problem of atomic transactions. In a single-threaded map a user might do something like:
// Guarantees that 0 maps to "zero" iff it didn't already map to something else.
if (!map.contains_key(0)) {
map.insert(0, "zero");
}
For concurrent maps, doing this would require wrapping the structure in something like Mutex
to guarantee that two threads don't both modify the value. This of course nullifies the whole gain from lock-free algorithms.
Now, we might provide functions for often-used operations like the above one, maybe
/// Inserts a key-value pair iff the key wasn't observed in the structure.
/// Returns `true` if it succeeds, `false` otherwise.
fn insert_new(&self, key: K, val: V) -> bool;
In general though, it's inviable to implement every single thing people might need. There is, however, a promising approach (unfortunately paywalled) to this problem that works for list-based implementations.
It would be great to then provide a functional Combinator
API that would aggregate arbitrary operations like Iterator
s do and execute them in a single atomic transaction on destruction, maybe like so:
map.combine().contains_or(0, |access| { access.insert(0, "zero"); } );
This would still be somewhat limited to what can be called on the Combinator
and complex boolean conditions might or might not be expressible, but it's a beginning. It's definitely possible to think of something much nicer, but in the end it will always be a bit awkward to use, since to execute the transaction we need knowledge of each and every its invariant. Procedural macros might help here, but internally will still expand to long function chains. I would be happy to be proven wrong here.
Lastly, it may be worth it to have a generic trait, something like ConcurrentMap<K, V>
expressing such an interface. Notice however, that the atomic transaction implementation I linked above only applies to linked structures. Hash maps could use STM or such, but I'm unsure whether it's a good idea to force them to provide the transaction API (depending on whether it impacts the performance of the whole structure, not just the relevant methods). We could also have similar interfaces for other structures such as queues, as @schets suggests in #56.
I would be happy to hear any feedback you might have on this.
If you call scope.defer
after threads are spawned, the code that you pass to scope.defer
is sometimes run before the threads are finished. The problem goes away if the call to scope.defer
happens before all calls to scope.spawn
.
extern crate crossbeam;
fn main() {
crossbeam::scope(|scope| {
for _ in 0..4 {
scope.spawn(|| {
println!("Hello from a thread!");
});
}
scope.defer(|| {
println!("This should be at the end.");
});
});
}
Example output (this varies due to race conditions):
Hello from a thread!
Hello from a thread!
Hello from a thread!
This should be at the end.
Hello from a thread!
Is flat combining something that would fit well in crossbeam?
The jist of flat combining is that a single thread acquires ownership of a given datastructure and contending threads send messages to the owning thread which carries out the requisite tasks.
In a sense, flat-combining can be though of as:
This can also transform an spmc queue into an 'mp'mc queue, single-writer into multi-writer, etc. However, it is blocking so I'm not really sure how well this fits in with the rest of the library.
If we want to get fancy, threads can send asynchronous messages as well where they don't block.
Also, this kinda brings up a more detailed (and honestly irrelevant at this point) question of how should threads yield? In something like mio-co, an evented system, or somewhere where 'blocking' time could really just do cleanup work, spinning on a lock and calling thread::yield() is really suboptimal.
I've been thinking about how I might use crossbeam::mem to implement a concurrent hash table. The current examples of stacks and queues are actually somewhat special, because values can be moved into and out of them, but are never borrowed. In contrast with a hash table I want to be able to read it without modifying, which requires borrows; but if a value is borrowed from a hash table and then deleted from it in another thread, the drop()
call needs to be deferred until garbage collection.
Currently crossbeam only supports freeing memory after the grace period. It seems like it could probably be modified to call destructors, but there might be some interesting caveats / possible unsafeties. One obvious thing is that a drop call can free an arbitrary amount of memory, so intelligently batching them becomes more difficult (if an object is holding on to 20MiB of memory, we want to free it ASAP, not after 64 epochs).
Compiling crossbeam v0.1.6
.cargo/registry/src/github.com-1ecc6299db9ec823/crossbeam-0.1.6/src/sync/seg_queue.rs:34:29: 34:43 error: transmute called with differently sized types: [usize; 32] (1024 bits) to [std::sync::atomic::AtomicBool; 32] (256 bits) [E0512]
.cargo/registry/src/github.com-1ecc6299db9ec823/crossbeam-0.1.6/src/sync/seg_queue.rs:34 ready: unsafe { mem::transmute([0usize; SEG_SIZE]) },
^~~~~~~~~~~~~~
.cargo/registry/src/github.com-1ecc6299db9ec823/crossbeam-0.1.6/src/sync/seg_queue.rs:34:29: 34:43 help: run `rustc --explain E0512` to see a detailed explanation
It would be pretty neat if users could specify strict limits on GC - I have an exploratory implementation in this branch that lets users disable/enable the GC in a given scope, and I think there could be more added, mainly two things -
Since collections in one thread don't block collections in another, there are also some really cool things that could be done here in combination with the above. An idea I had is a system where GC work is offloaded to separate worker threads, or at least non real-time threads - This lets crossbeam behave like a pauseless GC that quickly reclaims memory and doesn't impose big throughput penalties on the real-time threads. I'm sure there are other things that could be done.
I'm not sure what a good, or even idiomatic rust api for something like this is. I doubt what I tried is. Maybe the scope api for scoped threads can also handle other scope-based attributes of crossbeam like real-time regions or limiting GC in a given scope.
Of course, crossbeam and the associated datastructures probably won't be suitable for hard real-time work, simply because jemalloc isn't hard real-time afaik. But it would be nice to get something close, at the very least something effective for soft real-time and latency sensitive programs.
Currently, CachePadded
only supports datatypes of size <= CACHE_SIZE
, as far as I understand. Is it possible to generalize CachePadded
to datatypes of arbitrary size?
Is there a way to get data from a queue without actually removing it from the queue?
Currently Atomic::null() exists, but it'd also be nice to have Atomic::new(value) when it exists.
I think it would be a good idea to have a benchmark suite set up so that it's possible to get good throughput and latency measurements for various operations/datastructures. My understanding is that the standard bench attribute only obtains mean execution time and variance, which doesn't really get at interesting statistics like latency distribution, worst time, 90-99th percentile data, etc. I'm not experienced with rust microbenchmarking, but Criterion seems like a good start.
Also, I'll try and write some things that use the datastructures in a nontrivial fashion and get good performance measurements. I don't want to fall into the trap of optimizing for the microbenchmarks at the cost of real-world performance.
I'll look into both of these before delving into GC customizability. It seems silly to embark on a mission to control aspects of the GC without getting good data on how the controls affect actual performance.
The MsQueue::pop
currently starts with a call to epoch::pin
. This I believe will prevent collection until the method returns, which could potentially take quite a long time (due to the blocking behavior).
I think it may be possible to just drop the epoch in the blocking case, but that may be naive, so I'd want to think through this some more.
Self explanatory. I want to be able to name scoped threads.
I was thinking about the reason why using the local bags can cause segfaults - In short, a thread can move two epochs ahead of one with a stale epoch counter if the timing is right - since global garbage two epochs old is collected, this leads to premature freeing of data (local garbage two visible epoch advancements old is collected, not just two epochs so I don't think the local collection itself is vulnerable to the issue you described).
This suggestion may seem a bit silly, but would using an epoch delay of 3 instead of two (4 different bags) solve this with local bags? I don't see a way that a leading thread can move more than two epochs ahead of recently generated garbage, meaning a delay of 3 avoids this issue.
Something else that may work is to have global GC rotate bags on visible epoch advancements as well and not on epoch advancement count. That seems trickier though, maybe not possible in a lock free setting.
I think trying to get local bags to work since using only the global GC in it's current (and in my opinion, any) incarnation seems pretty bad for performance -
If you can send me the test code you used to reproduce this, I'll give this a go - it seems theoretically sound, and tests will tell even more.
How possible would it be to peek at the front item of the queue without removing it?
There is no gitter set-up so I decided to open an issue.
Jeff Preshing(people might know him from his blog Preshing on Programming or his CppCon presentation) has announced a new lock-free concurrent HashMap library called Junction.
His benchmarks indicate that it outperforms the other C++ concurrent HashMaps.
Are there lesson to be learned from it?
Crossbeam doesn't yet have a HashMap implementation, so I wanted to post to provide some inspiration.
src/bin/bench.rs has the executable bit set, which makes it look like a script with no #!
line at the top. This confuses the Debian packaging lint tool "lintian", which complains about a file with the executable bit set but no ELF header or #!
line. Can you please remove the executable bit from that file?
Would a generalized version of the Queue abstraction in bench.rs would be a good idea - maybe one based on producers/consumers? This would make defining functions which generically operate on a queue type easier and standardized. To take it further, a set of traits based on (mp/sp) + (sc/mc) and maybe other properties would make the queue type needed by a function self documenting and give better error messages when the wrong queue type was passed - for instance, the error is on a spsc queue with too many consumers is:
the trait core::marker::Sync
is not implemented for the type *mut i64
with SpscBufferQueue mentioned briefly in the explanation. In a large codebase, I would probably have little idea what was going on.
Crossbeam includes two different queues, but the only information on which one to pick for a purpose is the short bit in SegQueue, which isn't enough to make a decision. It would be useful to have a section similar to the rust docs' collections documentation.
https://doc.rust-lang.org/std/collections/#when-should-you-use-which-collection
I'm not aware if lack of is_empty()
on the queues is technical limitation, but it would be nice to have. I am aware that between .is_empty()
and .pop()
state of the queue can change, but in some cases it's irrelevant or can't happen (eg. just one receiver).
The following code appears to leak memory - topping out at 430MB after the loop finishes. If Vec::with_capacity(0) is used instead, no memory is leaked. Of course it's entirely possible that I'm using crossbeam incorrectly - if so and you could point me in the right direction I'd be most grateful :-).
extern crate crossbeam;
use std::sync::atomic::Ordering::{Acquire, Release};
use crossbeam::mem::epoch::{self, Atomic, Owned};
fn main() {
for _ in 0..2000000000 {
let guard = epoch::pin();
let vec: Atomic<Vec<u32>> = Atomic::null();
vec.store(Some(Owned::new(Vec::with_capacity(1))), Release);
if let Some(tab_shr) = vec.load(Acquire, &guard) {
vec.store(None, Release);
unsafe {
guard.unlinked(tab_shr);
}
}
}
println!("done");
let _ = std::io::stdin().read_line(&mut String::new());
}
As this example demonstrates, the thread-local epoch can be advanced, even though we have the current epoch pinned:
extern crate crossbeam;
use crossbeam::mem::epoch::{self, Atomic};
use std::sync::{Arc, Barrier};
use std::sync::atomic::Ordering::Relaxed;
use std::thread;
const VALUE1: u32 = 42;
const VALUE2: u32 = 1337;
fn reset_ptr(ptr: Arc<Atomic<u32>>) {
let g = epoch::pin();
if let Some(shared) = ptr.swap(None, Relaxed, &g) {
unsafe { g.unlinked(shared) };
}
}
fn advance_epoch() {
// This function uses unsafe, however in this case it is safe to do so,
// or rather it should be if not for the unsoundness bug. We are are simply
// allocating a new atomic pointer, setting it to null and asserting that
// the old value is unreachable (which it is).
let g = epoch::pin();
for _ in 0..32 {
let ptr = Atomic::new(VALUE2);
let shared = ptr.swap(None, Relaxed, &g).unwrap();
unsafe { g.unlinked(shared) };
}
}
fn main() {
let barrier = Arc::new(Barrier::new(2));
let ptr = Arc::new(Atomic::new(VALUE1));
{
let barrier = barrier.clone();
let ptr = ptr.clone();
// The barriers are here to sync the threads so the use-after-free
// happens deterministically. In most cases it would happen as a result
// of a race condition.
thread::spawn(move || {
barrier.wait();
barrier.wait();
reset_ptr(ptr);
barrier.wait();
});
}
barrier.wait();
let g = epoch::pin();
let shared = ptr.load(Relaxed, &g).unwrap();
barrier.wait();
barrier.wait();
// The thread has now finished and our shared pointer still contains the
// original atomic value.
assert!(**shared == VALUE1);
// Run some unrelated code after loading the shared pointer. This code
// should not be able to advance the epoch, since we have it pinned, however
// it is still possible to do so.
advance_epoch();
advance_epoch();
advance_epoch();
// However as soon, our shared pointer has been freed and it now contains a
// new value instead.
assert!(**shared != VALUE1);
}
As of #42, participant data is leaked on thread exit. We need to figure out a workable algorithm for reclaiming inactive participants.
/Users/mX/.cargo/registry/src/github.com-1ecc6299db9ec823/crossbeam-0.1.6/src/sync/seg_queue.rs:34:29: 34:43 error: transmute called with differently sized types: [usize; 32] (2048 bits) to [std::sync::atomic::AtomicBool; 32] (256 bits) [E0512]
/Users/mX/.cargo/registry/src/github.com-1ecc6299db9ec823/crossbeam-0.1.6/src/sync/seg_queue.rs:34 ready: unsafe { mem::transmute([0usize; SEG_SIZE]) },
^~~~~~~~~~~~~~
/Users/mX/.cargo/registry/src/github.com-1ecc6299db9ec823/crossbeam-0.1.6/src/sync/seg_queue.rs:34:29: 34:43 help: run `rustc --explain E0512` to see a detailed explanation
error: aborting due to previous error
Build failed, waiting for other jobs to finish...
error: Could not compile `crossbeam`.
$ rustc --version
rustc 1.10.0-nightly (267cde259 2016-05-25)
The spawn_unsafe
method in Crossbeam should include a bound on the function f: F
that it is also Send
. Otherwise, you may end up calling functions that access non-thread-safe things like UnsafeCell
. This also brings the definition more in line with that of std::thread::spawn
. Specifically, the signature should be:
pub unsafe fn spawn_unsafe<'a, F>(f: F) -> JoinHandle<()>
where F: FnOnce() + Send + 'a
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.