crossbeam #
https://docs.rs/crossbeam/latest/crossbeam/index.html
提供一些并发编程的工具:
Atomics
- AtomicCell, a thread-safe mutable memory location.
- AtomicConsume, for reading from primitive atomic types with “consume” ordering.
Data structures
- deque, work-stealing deques for building task schedulers.
- 被 rayon 使用, 提供高层次的 join()/scope() API;
- ArrayQueue, a bounded MPMC queue that allocates a fixed-capacity buffer on construction.
- 固定大小的 MPMC 队列, 有 buffer 意味着可以实现按发送端反压;
- SegQueue, an unbounded MPMC queue that allocates small buffers, segments, on demand.
- 无固定大小的 MPMC 队列;
Memory management
- epoch, an epoch-based garbage collector.
Thread synchronization
- channel, multi-producer multi-consumer channels for message passing.
- Parker, a thread parking primitive.
- ShardedLock, a sharded reader-writer lock with fast concurrent reads.
- WaitGroup, for synchronizing the beginning or end of some computation.
Utilities
- Backoff, for exponential backoff in spin loops.
- CachePadded, for padding and aligning a value to the length of a cache line.
- scope, for spawning threads that borrow local variables from the stack.
说明:
- crossbeam 的 channel 支持 MPMC, 而且性能比标准库的高.
- Parker 和 scope 已经在标准库 thread 中实现了.
- ShardedLock 相比标准库的 RwLock 性能更高, 特别适用于读多写少的场景;
- WaitGroup 类似于 golang 的 waitgroup;
示例参考: https://blog.logrocket.com/concurrent-programming-rust-crossbeam/
- AtomicCell
fn main() {
// AtomicCell Example
println!("Starting AtomicCell example...");
let atomic_value: AtomicCell<u32> = AtomicCell::new(12);
let arc = Arc::new(atomic_value);
let mut thread_handles_ac: Vec<thread::JoinHandle<()>> = Vec::new();
for i in 1..10 {
thread_handles_ac.push(run_thread(arc.clone(), i, i % 2 == 0));
}
thread_handles_ac
.into_iter()
.for_each(|th| th.join().expect("can't join thread"));
println!("value after threads finished: {}", arc.load());
println!("AtomicCell example finished!");
}
fn run_thread(val: Arc<AtomicCell<u32>>, num: u32, store: bool) -> thread::JoinHandle<()> {
thread::spawn(move || {
if store {
val.fetch_add(1);
}
println!("Hello from thread {}! value: {}", num, val.load());
})
}
- ArrayQueue
fn run_producer(q: Arc<ArrayQueue<u32>>, num: u32) -> thread::JoinHandle<()> {
thread::spawn(move || {
println!("Hello from producer thread {} - pushing...!", num);
for _ in 0..20 {
q.push(num).expect("pushing failed");
}
})
}
fn run_consumer(q: Arc<ArrayQueue<u32>>, num: u32) -> thread::JoinHandle<()> {
thread::spawn(move || {
println!("Hello from producer thread {} - popping!", num);
for _ in 0..20 {
q.pop();
}
})
}
fn main() {
// ArrayQueue Example
println!("---------------------------------------");
println!("Starting ArrayQueue example...");
let q: ArrayQueue<u32> = ArrayQueue::new(100);
let arc_q = Arc::new(q);
let mut thread_handles_aq: Vec<thread::JoinHandle<()>> = Vec::new();
for i in 1..5 {
thread_handles_aq.push(run_producer(arc_q.clone(), i));
}
for i in 1..5 {
thread_handles_aq.push(run_consumer(arc_q.clone(), i));
}
thread_handles_aq
.into_iter()
.for_each(|th| th.join().expect("can't join thread"));
println!("values in q after threads finished: {}", arc_q.len());
println!("ArrayQueue example finished!");
}
- channel
fn run_producer_chan(s: Sender<u32>, num: u32) -> thread::JoinHandle<()> {
thread::spawn(move || {
println!("Hello from producer thread {} - pushing...!", num);
for _ in 0..1000 {
s.send(num).expect("send failed");
}
})
}
fn run_consumer_chan(r: Receiver<u32>, num: u32) -> thread::JoinHandle<()> {
thread::spawn(move || {
let mut i = 0;
println!("Hello from producer thread {} - popping!", num);
loop {
if let Err(_) = r.recv() {
println!(
"last sender dropped - stopping consumer thread, messages received: {}",
i
);
break;
}
i += 1;
}
})
}
fn main() {
// channel Example
println!("---------------------------------------");
println!("Starting channel example...");
let (s, r) = unbounded();
for i in 1..5 {
run_producer_chan(s.clone(), i);
}
drop(s);
for i in 1..5 {
run_consumer_chan(r.clone(), i);
}
println!("channel example finished!");
}
- waitgroup
fn do_work(thread_num: i32) {
let num = rand::thread_rng().gen_range(100..500);
thread::sleep(std::time::Duration::from_millis(num));
let mut sum = 0;
for i in 0..10 {
sum += sum + num * i;
}
println!(
"thread {} calculated sum: {}, num: {}",
thread_num, sum, num
);
thread::sleep(std::time::Duration::from_millis(num));
}
fn main() {
// WaitGroup Example
println!("---------------------------------------");
println!("Starting WaitGroup example...");
let wg = WaitGroup::new();
for i in 0..50 {
let wg_clone = wg.clone();
thread::spawn(move || {
do_work(i);
drop(wg_clone);
});
}
println!("waiting for all threads to finish...!");
wg.wait();
println!("all threads finished!");
println!("WaitGroup example finished!");
}