跳过正文

目录

crossbeam
#

https://docs.rs/crossbeam/latest/crossbeam/index.html

提供一些并发编程的工具:

Atomics

  • AtomicCell, a thread-safe mutable memory location.
  • AtomicConsume, for reading from primitive atomic types with “consume” ordering.

Data structures

  • deque, work-stealing deques for building task schedulers.
    • 被 rayon 使用, 提供高层次的 join()/scope() API;
  • ArrayQueue, a bounded MPMC queue that allocates a fixed-capacity buffer on construction.
    • 固定大小的 MPMC 队列, 有 buffer 意味着可以实现按发送端反压;
  • SegQueue, an unbounded MPMC queue that allocates small buffers, segments, on demand.
    • 无固定大小的 MPMC 队列;

Memory management

  • epoch, an epoch-based garbage collector.

Thread synchronization

  • channel, multi-producer multi-consumer channels for message passing.
  • Parker, a thread parking primitive.
  • ShardedLock, a sharded reader-writer lock with fast concurrent reads.
  • WaitGroup, for synchronizing the beginning or end of some computation.

Utilities

  • Backoff, for exponential backoff in spin loops.
  • CachePadded, for padding and aligning a value to the length of a cache line.
  • scope, for spawning threads that borrow local variables from the stack.

说明:

  • crossbeam 的 channel 支持 MPMC, 而且性能比标准库的高.
  • Parker 和 scope 已经在标准库 thread 中实现了.
  • ShardedLock 相比标准库的 RwLock 性能更高, 特别适用于读多写少的场景;
  • WaitGroup 类似于 golang 的 waitgroup;

示例参考: https://blog.logrocket.com/concurrent-programming-rust-crossbeam/

  1. AtomicCell
fn main() {
    // AtomicCell Example
    println!("Starting AtomicCell example...");
    let atomic_value: AtomicCell<u32> = AtomicCell::new(12);
    let arc = Arc::new(atomic_value);

    let mut thread_handles_ac: Vec<thread::JoinHandle<()>> = Vec::new();
    for i in 1..10 {
        thread_handles_ac.push(run_thread(arc.clone(), i, i % 2 == 0));
    }

    thread_handles_ac
        .into_iter()
        .for_each(|th| th.join().expect("can't join thread"));

    println!("value after threads finished: {}", arc.load());
    println!("AtomicCell example finished!");
}

fn run_thread(val: Arc<AtomicCell<u32>>, num: u32, store: bool) -> thread::JoinHandle<()> {
    thread::spawn(move || {
        if store {
            val.fetch_add(1);
        }
        println!("Hello from thread {}! value: {}", num, val.load());
    })
}
  1. ArrayQueue
fn run_producer(q: Arc<ArrayQueue<u32>>, num: u32) -> thread::JoinHandle<()> {
    thread::spawn(move || {
        println!("Hello from producer thread {} - pushing...!", num);
        for _ in 0..20 {
            q.push(num).expect("pushing failed");
        }
    })
}

fn run_consumer(q: Arc<ArrayQueue<u32>>, num: u32) -> thread::JoinHandle<()> {
    thread::spawn(move || {
        println!("Hello from producer thread {} - popping!", num);
        for _ in 0..20 {
            q.pop();
        }
    })
}

fn main() {
    // ArrayQueue Example
    println!("---------------------------------------");
    println!("Starting ArrayQueue example...");
    let q: ArrayQueue<u32> = ArrayQueue::new(100);
    let arc_q = Arc::new(q);

    let mut thread_handles_aq: Vec<thread::JoinHandle<()>> = Vec::new();

    for i in 1..5 {
        thread_handles_aq.push(run_producer(arc_q.clone(), i));
    }

    for i in 1..5 {
        thread_handles_aq.push(run_consumer(arc_q.clone(), i));
    }

    thread_handles_aq
        .into_iter()
        .for_each(|th| th.join().expect("can't join thread"));
    println!("values in q after threads finished: {}", arc_q.len());
    println!("ArrayQueue example finished!");
}
  1. channel
fn run_producer_chan(s: Sender<u32>, num: u32) -> thread::JoinHandle<()> {
    thread::spawn(move || {
        println!("Hello from producer thread {} - pushing...!", num);
        for _ in 0..1000 {
            s.send(num).expect("send failed");
        }
    })
}

fn run_consumer_chan(r: Receiver<u32>, num: u32) -> thread::JoinHandle<()> {
    thread::spawn(move || {
        let mut i = 0;
        println!("Hello from producer thread {} - popping!", num);
        loop {
            if let Err(_) = r.recv() {
                println!(
                    "last sender dropped - stopping consumer thread, messages received: {}",
                    i
                );
                break;
            }
            i += 1;
        }
    })
}

fn main() {
        // channel Example
    println!("---------------------------------------");
    println!("Starting channel example...");
    let (s, r) = unbounded();

    for i in 1..5 {
        run_producer_chan(s.clone(), i);
    }
    drop(s);

    for i in 1..5 {
        run_consumer_chan(r.clone(), i);
    }

    println!("channel example finished!");
}
  1. waitgroup
fn do_work(thread_num: i32) {
    let num = rand::thread_rng().gen_range(100..500);
    thread::sleep(std::time::Duration::from_millis(num));
    let mut sum = 0;
    for i in 0..10 {
        sum += sum + num * i;
    }
    println!(
        "thread {} calculated sum: {}, num: {}",
        thread_num, sum, num
    );
    thread::sleep(std::time::Duration::from_millis(num));
}

fn main() {
    // WaitGroup Example
    println!("---------------------------------------");
    println!("Starting WaitGroup example...");
    let wg = WaitGroup::new();

    for i in 0..50 {
        let wg_clone = wg.clone();
        thread::spawn(move || {
            do_work(i);
            drop(wg_clone);
        });
    }

    println!("waiting for all threads to finish...!");
    wg.wait();
    println!("all threads finished!");
    println!("WaitGroup example finished!");
}