跳过正文

目录

rayon
#

https://docs.rs/rayon/latest/rayon/index.html

Rayon is a data-parallelism library that makes it easy to convert sequential computations into parallel.

It is lightweight and convenient for introducing parallelism into existing code. It guarantees data-race free executions and takes advantage of parallelism when sensible, =based on work-load= at runtime.

rayon 是基于 crossbeam 实现了 work-stealing deques。

使用 rayon 的两种方式:

  1. 高层次 API:使用迭代器模式, 来并行处理集合元素; 对应的是 ParallelIterator 和 IndexedParallelIterator。
  2. 低层次 API:
  3. join(taskA, taskB) 提供两个任务 A 和 B 的并行处理, 可以调用多次, 向当前线程添加多个任务, 但是添加的有可能被其他线程 steal 执行.
  4. scope() 创建一个 Scope 对象, 并可以 spawn() 任意数量的并行任务, 这些任务可以共享父 thread 的 stack 变量;

这两种模式创建的并行任务, rayon 都是使用固定数量线程的 thread pool 和 workload 动态平衡技术(work stealing)来并行处理。

使用 ThreadPoolBuilder 来自定义全局的或自定义线程池。

迭代器并行:

use rayon::prelude::*;
fn sum_of_squares(input: &[i32]) -> i32 {
    input.par_iter()
         .map(|i| i * i)
         .sum()
}

fn increment_all(input: &mut [i32]) {
    input.par_iter_mut()
         .for_each(|p| *p += 1);
}

join() 并行: join() 与 spawn 两个 thread 来执行 closures 类似, 但是 rayon join() 的效率更高, 开销更低;

let mut v = vec![5, 1, 8, 22, 0, 44];
quick_sort(&mut v);
assert_eq!(v, vec![0, 1, 5, 8, 22, 44]);

fn quick_sort<T:PartialOrd+Send>(v: &mut [T]) {
   if v.len() > 1 {
       let mid = partition(v);
       let (lo, hi) = v.split_at_mut(mid);
       rayon::join(|| quick_sort(lo),
                   || quick_sort(hi));
   }
}

// Partition rearranges all items `<=` to the pivot
// item (arbitrary selected to be the last item in the slice)
// to the first half of the slice. It then returns the
// "dividing point" where the pivot is placed.
fn partition<T:PartialOrd+Send>(v: &mut [T]) -> usize {
    let pivot = v.len() - 1;
    let mut i = 0;
    for j in 0..pivot {
        if v[j] <= v[pivot] {
            v.swap(i, j);
            i += 1;
        }
    }
    v.swap(i, pivot);
    i
}

scope() 并行: 和标准库 thread scope 类似, 都可以通过 scope 对象 spawn 一系列任务, 并且可以共享父 thread stack 变量。但标准库 thread 的 scope.spawn() 不支持嵌套,而 rayon scope 的 spawn 支持嵌套, 而且产生的是在 thread pool 中执行的异步 task 而非 thread。

// point start
rayon::scope(|s| {
    s.spawn(|s| { // task s.1
        s.spawn(|s| { // task s.1.1
            rayon::scope(|t| {
                t.spawn(|_| ()); // task t.1
                t.spawn(|_| ()); // task t.2
            });
        });
    });
    s.spawn(|s| { // task s.2
    });
    // point mid
});
// point end

三方库 rayon 和 crossbeam 提供了更丰富的并发支持:

  1. rayon 提供了并行迭代器和 join()/scope() 并发机制, 底层使用 thread pool 和 workload-stealing 平衡机制来实现高性能并发;
  2. crossbeam 提供了丰富的线程安全原语, 如线程安全的 MSMR 的 Queue/Channel 等.

标准库 thread 可以实现 fork-join 模式, 比如:

  1. 将并行任务拆分为多批, 每一批用一个 thread 来执行, 形成一个多 thread 并行执行的模式(具体 thread 数量取决于批次), 然后主线程在 join 这些 thread 来获取结果;
  2. 将任务发送到 FIFO 的 queue 或 channel, 然后起固定数量的线程, 从 FIFO 中获取任务, 在将结果发送到结果 channel, 主线程从结果 channel 获得结果, 并等待所有线程执行结束.

标准库 thread 实现 fork-join 的问题:

  1. 起的线程数量来源于批次, 需要选择合适的线程数量.
  2. 不同线程执行任务的负载不一样, 不能动态在线程间平衡负载;

基于 rayon 提供的 scope 来实现 fork-join 模式, 可以很好的解决上面的标准库实现模式的问题, 它们采用了 thread pool(线程数量和 CPU 数量一致) 和 work-stealing parallelism 算法, 可以很好的实现数据并行处理.

crossbeam scope 只是提供了 spawn 多个 scope thread 的能力并不具有 thread pool 和 work-stealing 能力, 目前已经被标准库的 thread scope 所支持,而且标准库的 scope thread 性能更高, 所以不建议单纯使用 crossbeam scope 库.

  • rayon/crossbeam scope 的另一个优势是支持 spawn() 嵌套, 但标准库 scope thread 不支持;

scope spawn 的闭包没有返回值, 但是可以通过 stack 变量和 channel 等机制来收集并行处理结果:

// https://docs.rs/rayon/latest/rayon/struct.Scope.html
let mut value_a = None;
let mut value_b = None;
let mut value_c = None;
rayon::scope(|s| {
    s.spawn(|s1| {
          // ^ this is the same scope as `s`; this handle `s1`
          //   is intended for use by the spawned task,
          //   since scope handles cannot cross thread boundaries.

        value_a = Some(22);

        // the scope `s` will not end until all these tasks are done
        s1.spawn(|_| {
            value_b = Some(44);
        });
    });

    s.spawn(|_| {
        value_c = Some(66);
    });
});
assert_eq!(value_a, Some(22));
assert_eq!(value_b, Some(44));
assert_eq!(value_c, Some(66));

It depends on what you want to do. If you are doing io-bound work, Tokio would be what you want – you would use it as a runtime for the async capabilities in Rust.

If you have cpu-bound work, then rayon is what you want to use. Rayon is =a work-stealing parallelism crate= – it will schedule work to be done, and different threads will schedule portions of it as they become available to do work. It’s very easy to get 100% CPU utilization across all cores use Rayon if your work is naturally paralellizable, and the interface is dead simple: anywhere you do a .iter(), you can turn it into a .par_iter(), and Rayon will parallelize the work. Note there is some overhead to using Rayon – you normally will be better off doing your work on a single thread unless you have a large number of elements in your collection… I found for my application I needed more than 1e6 elements before I saw an appreciable performance benefit to using Rayon.

As others said, Crossbeam is for sending and receiving messages across threads. I use it alongside of tokio and rayon.


You are right, threads have a noticeable cost when spawning. Depending on what you are trying to do, it could be useful =a thread pool= or, even simpler, using rayon 122. The latter provides a good way of handling parallelization and it is =much more fine grained than a raw thread pool= (i.e. it uses a =work stealing algorithm=).


To give more background on this: https://news.ycombinator.com/item?id=31037971

  • crossbeam is =a building block used in Rayon=, which is Rust’s main =data parallelism library=, roughly similar to OpenMP.
  • crossbeam channels are very fast. They’re in the same league as golang, sometimes faster (depending how you measure of course).
  • crossbeam is an order of magnitude =faster than channels= in the Rust’s standard library, while being more powerful and simpler to use. This is an example where experimentation in a 3rd party crate paid off compared to the standard library where a basic implementation went to die.
  • it integrates with Rust’s borrow checker of course, so you can use complex multi-threaded constructs (e.g. =share stack-allocated objects= between threads) without fear of data races or use-after-free.

https://users.rust-lang.org/t/scoped-threads-std-or-crossbeam/89248/2

FWIW, I’ve had a much better experience with =rayon::scope= 34 (and more generally =rayon::ThreadPool::scope= 9) than with either crossbeam directly or the “new” std:🧵:scope.

It’s a larger dependency for sure.

参考
#

  1. https://morestina.net/blog/1432/parallel-stream-processing-with-rayon
  2. https://developers.redhat.com/blog/2021/04/30/how-rust-makes-rayons-data-parallelism-magical

相关文章

程序的编译和链接:gcc、clang、glibc、musl 和 rustc
·
Rust Cargo
系统总结了使用 gcc、clang、rustc 编译器进行程序的编译和链接过程,以及使用 musl 进行静态链接的方案。
gdb
·
Gnu Tool Gdb
gdb 个人速查手册。
gas X86_64 汇编 - 个人参考手册
·
Gnu Asm Gcc Manual
GCC 编译器 - 个人参考手册
·
Gnu Gcc Manual
gcc 编译器个人参考手册。