rayon #
https://docs.rs/rayon/latest/rayon/index.html
Rayon is a data-parallelism library that makes it easy to convert sequential computations into parallel.
It is lightweight and convenient for introducing parallelism into existing code. It guarantees data-race free executions and takes advantage of parallelism when sensible, =based on work-load= at runtime.
rayon 是基于 crossbeam 实现了 work-stealing deques。
使用 rayon 的两种方式:
- 高层次 API:使用迭代器模式, 来并行处理集合元素; 对应的是 ParallelIterator 和 IndexedParallelIterator。
- 低层次 API:
- join(taskA, taskB) 提供两个任务 A 和 B 的并行处理, 可以调用多次, 向当前线程添加多个任务, 但是添加的有可能被其他线程 steal 执行.
- scope() 创建一个 Scope 对象, 并可以 spawn() 任意数量的并行任务, 这些任务可以共享父 thread 的 stack 变量;
这两种模式创建的并行任务, rayon 都是使用固定数量线程的 thread pool 和 workload 动态平衡技术(work stealing)来并行处理。
使用 ThreadPoolBuilder 来自定义全局的或自定义线程池。
迭代器并行:
use rayon::prelude::*;
fn sum_of_squares(input: &[i32]) -> i32 {
input.par_iter()
.map(|i| i * i)
.sum()
}
fn increment_all(input: &mut [i32]) {
input.par_iter_mut()
.for_each(|p| *p += 1);
}
join() 并行: join() 与 spawn 两个 thread 来执行 closures 类似, 但是 rayon join() 的效率更高, 开销更低;
let mut v = vec![5, 1, 8, 22, 0, 44];
quick_sort(&mut v);
assert_eq!(v, vec![0, 1, 5, 8, 22, 44]);
fn quick_sort<T:PartialOrd+Send>(v: &mut [T]) {
if v.len() > 1 {
let mid = partition(v);
let (lo, hi) = v.split_at_mut(mid);
rayon::join(|| quick_sort(lo),
|| quick_sort(hi));
}
}
// Partition rearranges all items `<=` to the pivot
// item (arbitrary selected to be the last item in the slice)
// to the first half of the slice. It then returns the
// "dividing point" where the pivot is placed.
fn partition<T:PartialOrd+Send>(v: &mut [T]) -> usize {
let pivot = v.len() - 1;
let mut i = 0;
for j in 0..pivot {
if v[j] <= v[pivot] {
v.swap(i, j);
i += 1;
}
}
v.swap(i, pivot);
i
}
scope() 并行: 和标准库 thread scope 类似, 都可以通过 scope 对象 spawn 一系列任务, 并且可以共享父 thread stack 变量。但标准库 thread 的 scope.spawn() 不支持嵌套,而 rayon scope 的 spawn 支持嵌套, 而且产生的是在 thread pool 中执行的异步 task 而非 thread。
// point start
rayon::scope(|s| {
s.spawn(|s| { // task s.1
s.spawn(|s| { // task s.1.1
rayon::scope(|t| {
t.spawn(|_| ()); // task t.1
t.spawn(|_| ()); // task t.2
});
});
});
s.spawn(|s| { // task s.2
});
// point mid
});
// point end
三方库 rayon 和 crossbeam 提供了更丰富的并发支持:
- rayon 提供了并行迭代器和 join()/scope() 并发机制, 底层使用 thread pool 和 workload-stealing 平衡机制来实现高性能并发;
- crossbeam 提供了丰富的线程安全原语, 如线程安全的 MSMR 的 Queue/Channel 等.
标准库 thread 可以实现 fork-join 模式, 比如:
- 将并行任务拆分为多批, 每一批用一个 thread 来执行, 形成一个多 thread 并行执行的模式(具体 thread 数量取决于批次), 然后主线程在 join 这些 thread 来获取结果;
- 将任务发送到 FIFO 的 queue 或 channel, 然后起固定数量的线程, 从 FIFO 中获取任务, 在将结果发送到结果 channel, 主线程从结果 channel 获得结果, 并等待所有线程执行结束.
标准库 thread 实现 fork-join 的问题:
- 起的线程数量来源于批次, 需要选择合适的线程数量.
- 不同线程执行任务的负载不一样, 不能动态在线程间平衡负载;
基于 rayon 提供的 scope 来实现 fork-join 模式, 可以很好的解决上面的标准库实现模式的问题, 它们采用了 thread pool(线程数量和 CPU 数量一致) 和 work-stealing parallelism 算法, 可以很好的实现数据并行处理.
crossbeam scope 只是提供了 spawn 多个 scope thread 的能力并不具有 thread pool 和 work-stealing 能力, 目前已经被标准库的 thread scope 所支持,而且标准库的 scope thread 性能更高, 所以不建议单纯使用 crossbeam scope 库.
- rayon/crossbeam scope 的另一个优势是支持 spawn() 嵌套, 但标准库 scope thread 不支持;
scope spawn 的闭包没有返回值, 但是可以通过 stack 变量和 channel 等机制来收集并行处理结果:
// https://docs.rs/rayon/latest/rayon/struct.Scope.html
let mut value_a = None;
let mut value_b = None;
let mut value_c = None;
rayon::scope(|s| {
s.spawn(|s1| {
// ^ this is the same scope as `s`; this handle `s1`
// is intended for use by the spawned task,
// since scope handles cannot cross thread boundaries.
value_a = Some(22);
// the scope `s` will not end until all these tasks are done
s1.spawn(|_| {
value_b = Some(44);
});
});
s.spawn(|_| {
value_c = Some(66);
});
});
assert_eq!(value_a, Some(22));
assert_eq!(value_b, Some(44));
assert_eq!(value_c, Some(66));
It depends on what you want to do. If you are doing io-bound work, Tokio would be what you want – you would use it as a runtime for the async capabilities in Rust.
If you have cpu-bound work, then rayon is what you want to use. Rayon is =a work-stealing parallelism crate= – it will schedule work to be done, and different threads will schedule portions of it as they become available to do work. It’s very easy to get 100% CPU utilization across all cores use Rayon if your work is naturally paralellizable, and the interface is dead simple: anywhere you do a .iter(), you can turn it into a .par_iter(), and Rayon will parallelize the work. Note there is some overhead to using Rayon – you normally will be better off doing your work on a single thread unless you have a large number of elements in your collection… I found for my application I needed more than 1e6 elements before I saw an appreciable performance benefit to using Rayon.
As others said, Crossbeam is for sending and receiving messages across threads. I use it alongside of tokio and rayon.
You are right, threads have a noticeable cost when spawning. Depending on what you are trying to do, it could be useful =a thread pool= or, even simpler, using rayon 122. The latter provides a good way of handling parallelization and it is =much more fine grained than a raw thread pool= (i.e. it uses a =work stealing algorithm=).
To give more background on this: https://news.ycombinator.com/item?id=31037971
- crossbeam is =a building block used in Rayon=, which is Rust’s main =data parallelism library=, roughly similar to OpenMP.
- crossbeam channels are very fast. They’re in the same league as golang, sometimes faster (depending how you measure of course).
- crossbeam is an order of magnitude =faster than channels= in the Rust’s standard library, while being more powerful and simpler to use. This is an example where experimentation in a 3rd party crate paid off compared to the standard library where a basic implementation went to die.
- it integrates with Rust’s borrow checker of course, so you can use complex multi-threaded constructs (e.g. =share stack-allocated objects= between threads) without fear of data races or use-after-free.
https://users.rust-lang.org/t/scoped-threads-std-or-crossbeam/89248/2
FWIW, I’ve had a much better experience with =rayon::scope= 34 (and more generally =rayon::ThreadPool::scope= 9) than with either crossbeam directly or the “new” std:🧵:scope.
It’s a larger dependency for sure.