tokio feature #
# 使用 full feature
tokio = { version = "1", features = ["full"] }
# 或单独启用各个 feature
tokio = { version = "1", features = ["rt", "net"] }
- full : Enables all features listed below except test-util and tracing.
- rt : Enables tokio::spawn, the
current-thread scheduler
, and non-scheduler utilities. - rt-multi-thread : Enables the heavier,
multi-threaded, work-stealing scheduler
. - io-util : Enables the IO based
Ext traits
. - io-std : Enable Stdout, Stdin and Stderr types.
- net : Enables tokio::net types such as TcpStream, UnixStream and UdpSocket, as well as (on Unix-like systems) AsyncFd and (on FreeBSD) PollAio.
- time : Enables tokio::time types and allows the schedulers to enable the built in timer.
- process : Enables tokio::process types.
- macros : Enables
#[tokio::main] and #[tokio::test]
macros. - sync : Enables all tokio::sync types.
- signal : Enables all tokio::signal types.
- fs : Enables tokio::fs types.
- test-util : Enables testing based infrastructure for the Tokio runtime.
- parking_lot : As a potential optimization, use the parking_lot crate’s synchronization primitives internally. Also, this dependency is necessary to construct some of our primitives in a const context. MSRV may increase according to the parking_lot release in use.
下面是启用 tokio_unstable
feature 后才能使用的功能:
- tracing: Enables tracing events.
- task::Builder:注意不是 runtime::Builder
- Some methods on task::JoinSet
- runtime::RuntimeMetrics
- runtime::Builder::unhandled_panic
- task::Id
在项目的 .cargo/config.toml
中启用 tokio_unstable
feature:
[build]
rustflags = ["--cfg", "tokio_unstable"]
#[tokio::main]
#
创建一个 Runtime,对于复杂的 Runtime 设置可以使用 Builder;
- 一般用于
async main fn()
函数,否则每次调用该async fn
时都新建一个 Runtime 来运行;
// Multi-threaded runtime
#[tokio::main(flavor = "multi_thread", worker_threads = 10)]
#[tokio::main(worker_threads = 2)]
// Current thread runtime
#[tokio::main(flavor = "current_thread")]
#[tokio::main(flavor = "current_thread", start_paused = true)]
// 示例
#[tokio::main]
async fn main() {
println!("Hello world");
}
// 等效为
fn main() {
tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.unwrap()
.block_on(async {
println!("Hello world");
})
}
join!()
#
并发执行传入的 async fn,直到它们都完成 (标准库 std::future module
也提供该宏):
- join!() 必须在 async context 中运行,比如
async fn/block/closure
; - tokio 使用
单线程
来执行这些 async task,所以一个 task 可能 blocking 其它 task 的执行,如果要真正并发执行需要使用 spawn(); - join!() 等待它们都完成,如果 async task 都返回 Result 且想要在遇到第一个 Error 时停止执行,可以使用
try_join!()
async fn do_stuff_async() {}
async fn more_async_work() {}
#[tokio::main]
async fn main() {
let (first, second) = tokio::join!(
do_stuff_async(),
more_async_work());
}
类似的还有 JoinSet 类型对象。
pin!()
#
select!{}
#
并发执行多个 async expression,然后并发 .await 返回的 Future 对象,对于第一个匹配 pattern 的 branch,执行对应的 handler,同时 drop 其它 branch 正在 await 的 Future 对象:
// select!{} 宏各分支的格式。
<pattern> = <async expression> (, if <precondition>)? => <handler>,
async fn do_stuff_async() { }
async fn more_async_work() {}
#[tokio::main]
async fn main() {
tokio::select! {
_ = do_stuff_async() => {
println!("do_stuff_async() completed first")
}
_ = more_async_work() => {
println!("more_async_work() completed first")
}
};
}
关于 select! + loop 中对 future 的使用方式可以参考:14-rust-lang-async.md
task_local!()
#
对于传给 spawn() 的 async fn,必须实现 Send + Sync + 'static
, 它有可能被调度到不同的线程上运行,所以不能使用 thread_local 变量,但可使用 task local 变量。
该宏生成一个 tokio::task::LocalKey 使用的 local key:
tokio::task_local! {
pub static ONE: u32;
static TWO: f32;
}
tokio::task_local! {
static NUMBER: u32;
}
// 生成的 ONE、TWO、NUMBER 都是 tokio::task::LocalKey 类型。
NUMBER.scope(1, async move {
assert_eq!(NUMBER.get(), 1);
}).await;
NUMBER.scope(2, async move {
assert_eq!(NUMBER.get(), 2);
NUMBER.scope(3, async move {
assert_eq!(NUMBER.get(), 3);
}).await;
}).await;
tokio::runtime module #
tokio Runtime 提供 IO event loop 的 IO task 调度,以及基于 timer 的调度 (如调度使用 tokio::time::sleep() 的 async task)。
一般不需要手动创建 Runtime,而是使用 #[tokio::main]
宏来标识 async fn main()
函数。
如果要精细控制 Runtime 参数,可以使用 tokio::runtime::Builder
。
在 Runtime 上下文中,可以使用 tokio::spawn()/spawn_local()
来提交异步任务,使用 spwan_blocking()
提交同步任务。
#[tokio::main]
async fn main() {
println!("Hello world");
}
// 等效为
fn main() {
tokio::runtime::Builder::new_multi_thread()
.enable_all() // 启用所有 resource driver(IO/timer)
.build()
.unwrap()
.block_on(async {
println!("Hello world");
})
}
// 另一个例子,手动创建 Runtime
use tokio::net::TcpListener;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::runtime::Runtime;
fn main() -> Result<(), Box<dyn std::error::Error>> {
// 创建一个 multi thread 和 enable all 的 Runtime
let rt = Runtime::new()?;
rt.block_on(async {
let listener = TcpListener::bind("127.0.0.1:8080").await?;
loop {
let (mut socket, _) = listener.accept().await?;
tokio::spawn(async move {
let mut buf = [0; 1024];
loop {
let n = match socket.read(&mut buf).await {
// socket closed
Ok(n) if n == 0 => return,
Ok(n) => n,
Err(e) => {
println!("failed to read from socket; err = {:?}", e);
return;
}
};
if let Err(e) = socket.write_all(&buf[0..n]).await {
println!("failed to write to socket; err = {:?}", e);
return;
}
}
});
}
})
}
Runtime 有两种类型(默认使用的是多线程版本):
Multi-Thread Scheduler
: 使用每个 CPU 一个 thread 的 worker 线程池, 一次性创建完毕,有自己的 local quque,IO/Timer Driver;Current-Thread Scheduler
: 提供单线程执行器,所有异步任务都在该线程上执行;
这两个 Runtime 都有两类 queue:一个全局 global queue 和 by thread 的 thread local queue:
- global queue 和 local queue 都是 FIFO 类型。
- thread 提交的任务先加到 local queue。
- local queue 初始容量是 256 个 tasks,超过的会被移动到 global queue。
- local queue 中有一个 LIFO slot,可以理解为 local queue 中的特权任务,可以避免 FIFO 排队带来的延迟。但只有一个槽位,被占用期间就不能再使用。
- current-thread scheduler 默认启用 lifo slot optimazition,即:有新 task 被 wake 时,优先添加到 local queue;
各 thread 默认先从 local queue 获取 task,然后是 global queue,如果都为空,则从其它 thread 的 local queue steal tasks
。
- 先从 local queue 获取 task,如果为空,再从 global queue 获取任务。或者从 local queue 获取
global_queue_interval()
(默认 31)个任务后,再从 global queue 获取任务; - 当没有任务可以调度,或者调度任务超过
event_interval()
次后(默认 61)后,检查 IO/Timer event;
总结:任务队列的执行顺序: 本地 LIFO slot -》 local queue -》 global queue -》 other thread local queue
Runtime 除了管理 Scheduler 以外,还管理各种 resource driver, 可以单独启用它们(目前就 io 和 time 两种类型)或一次全部启用:
Builder::enable_io()
:network/fs 等 IO 调度;Builder::enable_time()
:定时器调度;Builder::enable_all()
:启用所有 resrouce driver。
Runtime 在调度任务的间隙,周期检查这些 IO/Timer 是否 Ready,按需唤醒相应的 task 来执行。默认当没有任务可以调度,或调度的任务数超过 61 时(可以使用 event_interval()
来设置),Runtime 检查
IO/Timer。
创建 Runtime 时,默认为每一个 CPU 创建一个 thread,形成固定 thread 数量的 worker thread pool
。
Runtime 还维护一个 blocking thread pool
,其中的 thread 在调用 spawn_blocking()
时临时创建,所以这个 pool 的线程数量不固定,但 idle 一段时间后会自动被 Runtime 清理。
- spawn_blocking() 适合调度 CPU 计算密集型任务,它们会长时间占用 CPU 而不释放,所以可能阻塞调度其它 Ready 的异步 Task。
Runtime 确保所有 task 都是 公平调度
,以防止个别 task 一直可以调度的情况时其它 task 得不到运行(任务饥饿问题):
- 对于计算密集型任务,长时间占据 CPU 而不释放:Tokio libray 中强制插入一些 yield point,从而确保有机会调度其它任务运行。
- 计算密集型任务,更建议使用 spawn_blocking() 来执行,它会在单独的线程池中来执行该任务;
- 对于本地任务更新太快、太频繁,全局任务无法被执行到情况:Tokio 为每个任务记录循环次数,当加入 local queue 的次数超过一定上线后会先搁置,有先处理低优先级(如 global queue 中的)任务。
另外,可以配置如下调度参数:
MAX_TASKS
指定任意时刻 Runtime 创建的最大数量 task;MAX_SCHEDULE
指定 Runtime 可以调度的最大数量;MAX_DELAY
指定任务被唤醒后,调度器执行它的最大延迟;
MAX_TASKS 和 MAX_SCHEDULE 是可以配置的,MAX_DELAY 是自动计算的。
Runtime 调度器不保证任务调度的顺序。
Runtime #
Runtime 对象提供 IO driver、task 调度、计时器、线程池等运行异步任务所需的功能。
关闭 Runtime:
- drop Runtime 对象;
- 调用
shutdown_background()
或shutdown_timeout()
方法;
当 drop Runtime 时,默认 wait forever
,直到所有已经 spawned 的任务被 stopped:
- Runtime::spawn() 的 task 只在 yield 时(下一次 .await 位置)才会被 dropped,否则就一直运行直到结束;
- Runtime::spawn_blocking() 的 task 会一直运行,直到结束返回。
Runtime 的 shutdown_background() 和 shutdown_timeout()
方法,可以 避免这种 waiting forerver 的等待。当 timeout 时,如果 task 没有结束,则运行它的 thread 会
被泄露(leak),task 会在后台一直运行直到结束。
Runtime 方法:
new()
: 创建一个多线程 Runtime,启用所有 resource driver, 自动创建一个 Handle 并调用 enter(), 从而可以使用 tokio::spawn()。block_on()
: 在 Runtime 中执行异步 task,只能在同步上下文中调用该方法,它是同步和异步代码的结合点,在异步上下文中执行则会 panic。- 在当前线程中执行 Future task,block 当前线程直到 task 返回。如果要并行执行 task,则需要在 task 内部调用 spawn() 来提交 task。
spawn() 和 spawn_blocking()
: 返回的 JoinHandle 对象实现了 Future,可以 .await 获得结果;enter()
: 设置 thread local Runtime,后续 tokio::spawn() 感知该 Runtime。一般用于手动创建 Runtime 的场景。handler()
: 返回一个可以在该 Runtime 上 spawn task 的 Handle 对象,它基于引用计数实现了 Clone() 方法,用于将 Runtime 在多个 thread 间共享。
impl Runtime
// 创建一个多线程的调度器 Runtime,启用所有默认 resource driver,一般使用 #[tokio::main] 来自动调用。
pub fn new() -> Result<Runtime>
// 返回一个实现了引用计数的 Handle 对象,可以 spawn() 任务,将 Runtime 在多个 thread 间共享。
pub fn handle(&self) -> &Handle
// 在 worker thread pool 中立即运行 async task,立即返回,而不管是否 await 返回的 JoinHandle。
pub fn spawn<F>(&self, future: F) -> JoinHandle<F::Output>
where
// 多线程 Runtime 要求传入的 future,以及 future 返回的对象,都必须是 Send + 'static
F: Future + Send + 'static,
F::Output: Send + 'static
// 提交同步任务,也是立即返回
pub fn spawn_blocking<F, R>(&self, func: F) -> JoinHandle<R>
where
F: FnOnce() -> R + Send + 'static,
R: Send + 'static
// 执行 future,直到结束。
pub fn block_on<F: Future>(&self, future: F) -> F::Output
// 设置 thread local Runtime,后续可以使用 tokio::spawn() 等 module 级别函数(它们感知 EnterGuard)
pub fn enter(&self) -> EnterGuard<'_>
// 关闭 Runtime,不能再提交任务和检查 IO/Timer event
pub fn shutdown_timeout(self, duration: Duration)
pub fn shutdown_background(self)
Runtime::enter() 或 Handle::enter() 创建一个 thread local 的 Runtime Context 变量来标识当前的 Runtime,可以使用 tokio::spawn() 等方法在该 Runtime context 中提交异步任务。
- Runtime::new() 自动创建一个 Handle 并调用它的 enter();
use tokio::runtime::Runtime;
use tokio::task::JoinHandle;
fn function_that_spawns(msg: String) -> JoinHandle<()> {
// tokio::spawn 感知所在线程的 EnterGuard 对象
tokio::spawn(async move {
println!("{}", msg);
})
}
fn main() {
// 手动创建 Runtime
let rt = Runtime::new().unwrap();
// By entering the context, we tie `tokio::spawn` to this executor.
let _guard = rt.enter();
let s = "Hello World!".to_string();
let handle = function_that_spawns(s);
// block_on() 的 handle 是一个 async Future,等待它 Ready 时返回
rt.block_on(handle).unwrap();
}
Drop Runtime 时默认会等待所有 task 结束,则可能导致无限期等待。shutdown_timeout() 和 shutdown_background() 关闭 Runtime,但不等待所有 spawn 的 task 结束。这两个方法返回时, task 可能还会在后台运行,对应的 thread 被泄露。
use tokio::runtime::Runtime;
use tokio::task;
use std::thread;
use std::time::Duration;
fn main() {
let runtime = Runtime::new().unwrap();
runtime.block_on(
async move {
// spawn_blocking() 立即返回,所以该 block_on() 也立即返回
task::spawn_blocking(
move || {
thread::sleep(Duration::from_secs(10_000));
});
});
runtime.shutdown_timeout(Duration::from_millis(100));
}
Runtime 可以通过多方方式进行共享:
Arc<Runtime>
Handle
Entering
runtime context
Arc
use tokio::runtime::Handle;
#[tokio::main]
async fn main () {
let handle = Handle::current();
// 创建一个 std 线程
std::thread::spawn(move || {
// Using Handle::block_on to run async code in the new thread.
handle.block_on(async {
println!("hello");
});
});
}
runtime::Builder #
Builder 用于创建自定义参数的 Runtime:
use tokio::runtime::Builder;
fn main() {
let runtime = Builder::new_multi_thread()
.worker_threads(4)
.thread_name("my-custom-name")
.thread_stack_size(3 * 1024 * 1024)
.build()
.unwrap();
// use runtime ...
}
Builder 方法:
- new_current_thread() :单线程调度器;
- new_multi_thread() : 多线程调度器;
pub fn new_current_thread() -> Builder
pub fn new_multi_thread() -> Builder
pub fn new_multi_thread_alt() -> Builder
// 启用所有 resource driver,如 io/net/timer 等
pub fn enable_all(&mut self) -> &mut Self
// 默认为 CPU Core 数量,会覆盖环境变量 TOKIO_WORKER_THREADS 值。一次性创建这个数量的线程。
pub fn worker_threads(&mut self, val: usize) -> &mut Self
// blocking 线程池中线程数量,按需创建,idle 超过一定时间后被回收
pub fn max_blocking_threads(&mut self, val: usize) -> &mut Self
// blocking 线程池中线程空闲时间,默认 10s
pub fn thread_keep_alive(&mut self, duration: Duration) -> &mut Self
// 线程名称
pub fn thread_name(&mut self, val: impl Into<String>) -> &mut Self
pub fn thread_name_fn<F>(&mut self, f: F) -> &mut Self where F: Fn() -> String + Send + Sync + 'static
// 线程栈大小
pub fn thread_stack_size(&mut self, val: usize) -> &mut Self
pub fn on_thread_start<F>(&mut self, f: F) -> &mut Self where F: Fn() + Send + Sync + 'static
pub fn on_thread_stop<F>(&mut self, f: F) -> &mut Self where F: Fn() + Send + Sync + 'static
pub fn on_thread_park<F>(&mut self, f: F) -> &mut Self where F: Fn() + Send + Sync + 'static
pub fn on_thread_unpark<F>(&mut self, f: F) -> &mut Self where F: Fn() + Send + Sync + 'static
// 构建生成 Runtime
pub fn build(&mut self) -> Result<Runtime>
// 检查 global queue event 的 scheduler ticks
pub fn global_queue_interval(&mut self, val: u32) -> &mut Self
// 检查 event 的 scheduler ticks
pub fn event_interval(&mut self, val: u32) -> &mut Self
// Available on tokio_unstable only.
pub fn unhandled_panic(&mut self, behavior: UnhandledPanic) -> &mut Self
// Available on tokio_unstable only.
pub fn disable_lifo_slot(&mut self) -> &mut Self
// Available on tokio_unstable only.
pub fn rng_seed(&mut self, seed: RngSeed) -> &mut Self
// Available on tokio_unstable only.
pub fn enable_metrics_poll_count_histogram(&mut self) -> &mut Self
pub fn metrics_poll_count_histogram_scale( &mut self, histogram_scale: HistogramScale ) -> &mut Self
pub fn metrics_poll_count_histogram_resolution( &mut self, resolution: Duration) -> &mut Self
pub fn metrics_poll_count_histogram_buckets( &mut self, buckets: usize ) -> &mut Self
impl Builder
// Available on crate feature net, or Unix and crate feature process, or Unix and crate feature signal only.
pub fn enable_io(&mut self) -> &mut Self
// Available on crate feature net, or Unix and crate feature process, or Unix and crate feature signal only.
pub fn max_io_events_per_tick(&mut self, capacity: usize) -> &mut Self
// Available on crate feature time only.
pub fn enable_time(&mut self) -> &mut Self
impl Builder
pub fn start_paused(&mut self, start_paused: bool) -> &mut Self
on_thread_*()
函数可以用来对创建的 thread 进行操作,例如进行绑核(使用 core_affinity_rs 库):
runtime.on_thread_start(move || {
core_affinity::set_for_current(core_id.clone());
})
tokio::task #
task 是轻量级,无阻塞的调度&执行单元:
-
轻量级:用户级的 Tokio Runtime 调度,而非操作系统内核调度;
-
协作式调度:OS thread 一般是抢占式调度;协作式调度 cooperatively 表示 task 会一直运行直到 yield(例如数据没有准备好,.await 位置会 yield,同时 tokio API lib 中默认强制插入了一些 yield point,确保即使 task 没有 yield。底层的 lib 也会周期 yield),这时 Tokio Runtime 会调度 其他 task 来运行;
-
非阻塞:需要使用 tokio crate 提供的非阻塞 IO 来进行 IO/Net/Fs 在 async context 下的 APIs 操作,这些 API 不会阻塞线程,而是 yield,这样 tokio Runtime 可以执行其它 task;
异步运行时将 async fn 或 async block 作 为调度和执行的单元(它们都返回 Future),称为异步任务 async task
:
Runtime::block_on()
: 使用当前线程来运行异步任务,是同步代码和异步代码的分界点;task::spawn_local()
: 在 block_on() 所在的线程空闲时运行异步任务;task::spawn()
: 在一个线程池中立即运行异步任务;task::spawn_blocking()
:立即创建一个线程来运行指定的同步函数
。
tokio Runtime 没有使用 OS thread 的抢占式调度,而是使用协作式调度:
- 异步代码中显式使用的 await 关键字代表一个可调度(或 yield)的位置,也及开发者和 Runtime 关于异步任务调度的协作式约定;
- 可以避免一个 task 执行时长时间占有 CPU 而影响其他 task 的执行。这是通过在 tokio libray 中
强制插入一些 yield point
,从而强制实现 task 周期返回 executor,从而可以调度其他 task 运行。
task::unconstrained
可以对 task 规避 tokio 协作式调度(coop),使用它包裹的 Future task 不会被 forced to yield to Tokio:
use tokio::{task, sync::mpsc};
let fut = async {
let (tx, mut rx) = mpsc::unbounded_channel();
for i in 0..1000 {
let _ = tx.send(());
// This will always be ready. If coop was in effect, this code would be
// forced to yield periodically. However, if left unconstrained, then
// this code will never yield.
rx.recv().await;
}
};
task::unconstrained(fut).await;
block_on() #
同步代码和异步代码的结合点是 block_on()
,它是一个 同步函数
,所以不能在 async fn 中调用它。
它在当前线程上 poll 传入的 Future 对象,直到 Ready 返回, 如果是 Pending 且没有其他 aysnc task(如通过 task::spawn_local() 提交的任务 )可以 poll,则 block_on() 会 sleep。
// 在单线程中并发发起多个 request
let requests = vec![
("example.com".to_string(), 80, "/".to_string()),
("www.red-bean.com".to_string(), 80, "/".to_string()),
("en.wikipedia.org".to_string(), 80, "/".to_string()),
];
// block_on() 是同步函数,返回传入的 async task 的返回值。
let results = async_std::task::block_on(many_requests(requests));
for result in results {
match result {
Ok(response) => println!("{}", response),
Err(err) => eprintln!("error: {}", err),
}
}
在 block_on() 内部处于异步上下文中,可以使用 spawn()/spawn_blocking()
来创建立即执行的其他 task,block_on() 并不会等待这些 task 执行结束
才返回,可使用 .await 来等待这些 task 执行结束。
block_on() 在当前线程 poll 传入的 Future 对象,当它空闲时,为了能在该线程中同时 poll 其它 Future,可以使用 spawn_local() 来提交任务。
spawn_local() #
可以多次调用 spawn_local(),它将传入的 Future 添加到 block_on() 线程所处理的 task pool 中,当 block_on() 处理某个 task pending 时,会从该 pool 中获取下一个 Future task 进行 spawn_local。
注:spawn_local() 提交的任务不会被立即执行,而是后续调用 block_on() 或它空闲时才会被执行。而 spawn() 和 spawn_blocking() 提交的任务会被立即执行。
spawn_local() 要求传入的 async task Future 对象是 'static 的
(因为是单线程执行,所以不要求实现 Send+Sync
),这是由于该 Future 的 执行时机是
不确定的,如果不 .await 它返回的 handle,则有可能 many_requests() 函数返回了但 Future 还没有被执行,从而导致引用失效。
解决办法:
- 传入一个 async 辅助函数,它拥有对应的参数值,这样该函数就实现了 ‘static;
- 使用 async move {} 来捕获对象,这样该 block 返回的 Future 也实现了 ‘static;
// async 辅助函数,拥有传入参数值的所有权
async fn cheapo_owning_request(host: String, port: u16, path: String) -> std::io::Result<String> {
cheapo_request(&host, port, &path).await
}
for (host, port, path) in requests {
handles.push(task::spawn_local(cheapo_owning_request(host, port, path)));
}
spawn() #
是在一个专门的线程池(tokio 称为 worker thread 或 core thread)中并发执行 async task。spawn() 和 spawn_local() 一样,也返回 JoinHandler。JoinHandler 实现 了 Future,需要 .await 获得结果。但是它不依赖调用 block_on() 来 poll,而是执行候该函数后立即开始被 poll。
由于传给 spawn() 的 Future 可能被线程池中任意线程执行,而且在暂停恢复后可能会在另一个线程中运行,所以不能使用线程本地存储变量(解决办法是使用 task_local! 宏),同时
Future 对象必须满足 Future+Send+'static
,这样该 Future 对象才能安全的 move 到其他线程。(类似于 std:🧵:spawn() 对闭包的要求);
async block/fn 不满足 Send 的常见情况是跨 .await 使用 Rc 对象、MutexGuard 对象:
use async_std::task;
use std::rc::Rc;
let mut handles = vec![];
for (host, port, path) in requests {
handles.push(task::spawn(
// block 捕获了上下问文对象所有权,所以返回的是 'static Future 对象
async move {
cheapo_request(&host, port, &path).await
}
));
}
// 错误:reluctant() 返回的 Future 没有实现 Send。
async fn reluctant() -> String {
let string = Rc::new("ref-counted string".to_string());
some_asynchronous_thing().await; // Rc 跨 await
format!("Your splendid string: {}", string)
}
task::spawn(reluctant());
// 解决办法:将不支持 Send 的变量隔离在单独的 .await 所在的 block。
async fn reluctant() -> String {
let return_value = {
let string = Rc::new("ref-counted string".to_string());
format!("Your splendid string: {}", string)
};
some_asynchronous_thing().await; // await 没有跨 Rc
return_value
}
另一个常见的错误是 trait object 默认未实现 Send,所以 Box<dyn std::error::Error>
对象不支持 Send。
比如 some_fallible_thing()
的返回值 Result 有效性是整个 match 表达 式,所以跨越了 Ok 分支的 .await, 这时 Result 如果不是 Send 就报
错:
type GenericError = Box<dyn std::error::Error>;
type GenericResult<T> = Result<T, GenericError>;
fn some_fallible_thing() -> GenericResult<i32> {
//...
}
// 该异步函数返回的 Future 没有实现 Send
async fn unfortunate() {
match some_fallible_thing() {
Err(error) => {
// 返回的 Error 类型是 Box<dyn std::error::Error> 没有实现 Send
report_error(error);
}
Ok(output) => {
// 返回值跨越了这个 .await
use_output(output).await;
}
} }
// 报错
async_std::task::spawn(unfortunate());
解决办法:为 trait object 添加 Send 限界:重新定义 Error, 添加 Send Bound,‘static 是 Box
type GenericError = Box<dyn std::error::Error + Send + Sync + 'static>;
type GenericResult<T> = Result<T, GenericError>;
.await 表达式是 Rust 执行异步 poll 的时间点(yield point),默认情况下 task 可以被异步运行时调度到其它 thread 中执行, 所以 async fn 中涉及跨 await 的对象都需要是能 Send 。
spawn_blocking() #
如果使用的不是 tokio crate 的 APIs,如标准库的 IO APIs,则可能会阻塞 tokio Runtime。对于可能会引起阻塞的任务,tokio 提供了在 async context
中运行同步函数的 task::spawn_blocking() 和 task::block_in_place()
函数。
task::spawn_blocking() 是在单独的 blocing thread pool 中运行同步任务(clouse 标识),从而避免阻塞运行 aysnc task 的线程。
// async context 中,在单独的线程中运行可能会阻塞 tokio 的代码
let join = task::spawn_blocking(|| {
// do some compute-heavy work or call synchronous code
"blocking completed"
});
let result = join.await?;
assert_eq!(result, "blocking completed");
task::spawn_blocking() 是异步运行时用来解决可能会阻塞或长时间运行(CPU 密集型)的任务问题,它的参数是一个同步闭包函数,运行时会立即创建一个新的线程来执行(tokio 中称 为 blocking thread,该线程也组成线程池,在被过期回收前可以被运行时重复使用),它返回一个 Future,可以被 .await 结果。
async fn verify_password(password: &str, hash: &str, key: &str) -> Result<bool, argonautica::Error>
{
let password = password.to_string();
let hash = hash.to_string();
let key = key.to_string();
tokio::task::spawn_blocking(
// 参数是一个同步闭包函数,而非 async task
move || {
argonautica::Verifier::default()
.with_hash(hash)
.with_password(password)
.with_secret_key(key)
.verify()
}).await }
对于长时间运行的任务,可以主动 yield :
while computation_not_done() {
tokio::task::yield_now().await;
}
另外,Tokio Runtime 是协作式调度,它的 library 中也强制插入一些 yield point,从而确保有机会调度其它任务运行。
block_in_place() #
如果使用的多线程 runtime,则 task::block_in_place()
也是可用的,它也是在 async context 中运行可能 blocking 当前线程的代码,但是它是将
Runtime 的 worker thread 转换为 blocking thread 来实现的,这样可以避免上下文切换来提升性能:
use tokio::task;
let result = task::block_in_place(|| {
// do some compute-heavy work or call synchronous code
"blocking completed"
});
assert_eq!(result, "blocking completed");
async task 的 Future+Send+‘static 约束 #
async task 可以使用 async fn/block/closure 来构造,它们都是返回实现 Future trait 的语法糖。
- task::spawn()、spawn_blocking() 立即在后台运行异步任务,而不管是否 .await 它返回的 JoinHandle;
- spawn_local(): 当调用 block_on() 时执行,或则执行 block_on() 的线程空闲时执行。(因为它只是向执行 block_on() 的线程的队列提交任务)。
spawn() 所需的 async task 返回值约束是 Future + Send + 'static
, 所以:
- 不能直接共享借用 stack 上的变量(但是可以共享借用全局 static/static mut 常量,它们具有 ‘static 生命周期);
- 使用 async move 来将引用的外部对象的所有权转移到 task, 从而满足 ‘static 的要求;
- task 在多个 thread 间调度, 所以必须是可以 Send 的, 这意味着 task 内部的跨 await 的变量(因为在 await 处生成 Future object, 它记录了上下文变量)必须实现 Send;
use tokio::task;
#[tokio::main]
async fn main() {
let v = vec![1, 2, 3];
task::spawn(async {
println!("Here's a vec: {:?}", v); // 错误,v 是共享借用,不满足 'static 要求
});
}
// error[E0373]: async block may outlive the current function, but
// it borrows `v`, which is owned by the current function
// --> src/main.rs:7:23
// |
// 7 | task::spawn(async {
// | _______________________^
// 8 | | println!("Here's a vec: {:?}", v);
// | | - `v` is borrowed here
// 9 | | });
// | |_____^ may outlive borrowed value `v`
// |
// note: function requires argument type to outlive `'static`
// --> src/main.rs:7:17
// |
// 7 | task::spawn(async {
// | _________________^
// 8 | | println!("Here's a vector: {:?}", v);
// 9 | | });
// | |_____^
// help: to force the async block to take ownership of `v` (and any other
// referenced variables), use the `move` keyword
// |
// 7 | task::spawn(async move {
// 8 | println!("Here's a vec: {:?}", v);
// 9 | });
// |
// Error:Rc 不满足 Send 要求
use tokio::task::yield_now;
use std::rc::Rc;
#[tokio::main]
async fn main() {
tokio::spawn(async {
let rc = Rc::new("hello");
// `rc` is used after `.await`. It must be persisted to the task's state.
yield_now().await;
println!("{}", rc);
});
}
// OK
use tokio::task::yield_now;
use std::rc::Rc;
#[tokio::main]
async fn main() {
tokio::spawn(async {
{
let rc = Rc::new("hello");
println!("{}", rc);
}
// `rc` is no longer used. It is **not** persisted when the task yields to the scheduler
yield_now().await; // yield_now() 返回 Future,必须 await 才会实际 yield
});
}
解决办法:
- 将 Rc 改成实现 Send 的 Arc;
- 将 Rc 对象隔离,确保不跨 .await 有效。
其它不需要 Future + Send + 'static
的特殊 task 任务类型:
- tokio::task::block_in_place(f) ,其中 f 同步闭包函数: FnOnce() -> R
- tokio::task::LocalSet::spawn_local(f) ,其中 f 是 Future + ‘static
// F 是同步函数闭包,而不是 async block 或 fn。 在当前 thread 执行同步函数,而 spawn_blocking() 是在一个线程池中执行同步函数。
pub fn block_in_place<F, R>(f: F) -> R where F: FnOnce() -> R
// F 不需要实现 Send+Sync,在 block_on() 所在的 thread 上执行 future
pub fn spawn_local<F>(&self, future: F) -> JoinHandle<F::Output>
where
F: Future + 'static,
F::Output: 'static,
let local = task::LocalSet::new();
local.spawn_local(async {
// ...
});
JoinHandle #
task::spawn() 和 task::spawn_blocking() 返回 JoinHandle
类型:
- 当 JoinHandle 被 Drop 时,关联的 task 会被从 Runtime detach,即继续运行但不能再 join 它。
- 它的 abort_handle() 方法返回可以 remote abort 该 task 的 AbortHandle;
JoinHandle 实现了 Future trait,.await 它时返回 Result<T, JoinError>
:
impl<T> JoinHandle<T>
// Abort the task associated with the handle.
pub fn abort(&self)
// Checks if the task associated with this JoinHandle has finished.
pub fn is_finished(&self) -> bool
// Returns a new AbortHandle that can be used to remotely abort this task.
pub fn abort_handle(&self) -> AbortHandle
// 示例
use tokio::{time, task};
let mut handles = Vec::new();
handles.push(tokio::spawn(async {
time::sleep(time::Duration::from_secs(10)).await;
true
}));
handles.push(tokio::spawn(async {
time::sleep(time::Duration::from_secs(10)).await;
false
}));
let abort_handles: Vec<task::AbortHandle> = handles.iter().map(
|h| h.abort_handle()).collect();
for handle in abort_handles {
handle.abort();
}
for handle in handles {
assert!(handle.await.unwrap_err().is_cancelled());
}
JoinHandle 实现了 Future,可以被 .await,返回 Result<T, JoinError>
, JoinError
类型用于判断该 task 的出错原因,如是否被
cancelled/panic 等。
impl JoinError
pub fn is_cancelled(&self) -> bool
pub fn is_panic(&self) -> bool
pub fn into_panic(self) -> Box<dyn Any + Send + 'static>
pub fn try_into_panic(self) -> Result<Box<dyn Any + Send + 'static>, JoinError>
pub fn id(&self) -> Id
// 示例
use std::panic;
#[tokio::main]
async fn main() {
let err = tokio::spawn(async {
panic!("boom");
}).await.unwrap_err();
assert!(err.is_panic());
}
如果 task panic,则 .await 返回 JoinErr:
let join = task::spawn(async {
panic!("something bad happened!")
});
// The returned result indicates that the task failed.
assert!(join.await.is_err());
spawn 的 task 可以通过 JoinHandle.abort()
方法来取消(Cancelled), 对应的 task 会在 下一次 yield 时(如 .await)
时被终止。这时
JoinHandle 的 .await 结果是 JoinErr,它的 is_cancelled() 为 true。abort task 并不代表 task 一定以 JoinErr 结束,因为有些任务可
能在 yield 前正常结束,这时 .await 返回正常结果。
abort() 方法可能在 task 被终止前返回,可以使用 JoinHandle .await 来确保 task 被终止后返回。
对于 spawn_blocking() 创建的任务,由于不是 async,所以调用它返回的 JoinHandle.abort() 是无效的,task 会持续运行。
yield_now() #
async fn task::yield_now()
类似于 std::thread::yield_now()
, .await 该函数时会使当前 task yield to tokio
Runtime 调度器,让其它 task 被调度执行。
use tokio::task;
async {
task::spawn(async {
// ...
println!("spawned task done!")
});
// Yield, allowing the newly-spawned task to execute first.
task::yield_now().await; // 必须 .await
println!("main task done!");
}
JoinSet/AbortHandle #
JoinSet: spawn 一批 task,等待部分或全部执行完成,按照完成的顺序返回。
- 所有任务的返回类型 T 必须相同;
- 如果 JoinSet 被 Drop,则其中的所有 task 立即被 aborted;
对比:标准库的 std::future::join!()
宏是等待全部 task 都完成,而不能单独等待部分完成。
use tokio::task::JoinSet;
#[tokio::main]
async fn main() {
let mut set = JoinSet::new();
for i in 0..10 {
// spawn() 提交的 async task 立即被运行
set.spawn(async move { i });
}
let mut seen = [false; 10];
// join_next() 返回下一个完成的 task 结果
while let Some(res) = set.join_next().await {
let idx = res.unwrap();
seen[idx] = true;
}
for i in 0..10 {
assert!(seen[i]);
}
}
JoinSet 的方法:
impl<T> JoinSet<T>
pub fn new() -> Self
// 返回 JoinSet 中 task 数量
pub fn len(&self) -> usize
pub fn is_empty(&self) -> bool
impl<T: 'static> JoinSet<T>
// 使用 Builder 来构造一个 task:只能设置 name 然后再 spawn
pub fn build_task(&mut self) -> Builder<'_, T>
use tokio::task::JoinSet;
#[tokio::main]
async fn main() -> std::io::Result<()> {
let mut set = JoinSet::new();
// Use the builder to configure a task's name before spawning it.
set.build_task()
.name("my_task")
.spawn(async { /* ... */ })?;
Ok(())
}
// JoinSet 的 spawn_xx() 方法返回的是 AbortHandle,而非 JoinHandle。
// 在 JoinSet 中 spawn 一个 task,该 task 会立即运行
pub fn spawn<F>(&mut self, task: F) -> AbortHandle where F: Future<Output = T> + Send + 'static, T: Send
// 在指定的 Handler 对应的 Runtime 上 spawn task
pub fn spawn_on<F>(&mut self, task: F, handle: &Handle) -> AbortHandle where F: Future<Output = T> + Send + 'static, T: Send
pub fn spawn_local<F>(&mut self, task: F) -> AbortHandle where F: Future<Output = T> + 'static
pub fn spawn_local_on<F>( &mut self, task: F, local_set: &LocalSet ) -> AbortHandle where F: Future<Output = T> + 'static
pub fn spawn_blocking<F>(&mut self, f: F) -> AbortHandle where F: FnOnce() -> T + Send + 'static, T: Send
pub fn spawn_blocking_on<F>(&mut self, f: F, handle: &Handle) -> AbortHandle where F: FnOnce() -> T + Send + 'static, T: Send
// 示例
use tokio::task::JoinSet;
#[tokio::main]
async fn main() {
let mut set = JoinSet::new();
for i in 0..10 {
set.spawn_blocking(move || { i });
}
let mut seen = [false; 10];
while let Some(res) = set.join_next().await {
let idx = res.unwrap();
seen[idx] = true;
}
for i in 0..10 {
assert!(seen[i]);
}
}
// 等待直到其中一个 task 完成,返回它的 output,注意是异步函数,需要 .await 才返回结果。
// 当 JoinSet 为空时,返回 None,可用于 while let 中。
pub async fn join_next(&mut self) -> Option<Result<T, JoinError>>
pub async fn join_next_with_id(&mut self) -> Option<Result<(Id, T), JoinError>>
// try_join_next() 返回已经完成的任务输出,如果没有完成的任务则返回 None。
// 该方法不等待直到一个任务完成。
pub fn try_join_next(&mut self) -> Option<Result<T, JoinError>>
pub fn try_join_next_with_id(&mut self) -> Option<Result<(Id, T), JoinError>>
// Aborts all tasks and waits for them to finish shutting down. Calling this
// method is equivalent to calling abort_all and then calling join_next in a
// loop until it returns None.
pub async fn shutdown(&mut self)
// Aborts all tasks on this JoinSet. This does not remove the tasks from the
// JoinSet. To wait for the tasks to complete cancellation, you should call
// join_next in a loop until the JoinSet is empty.
pub fn abort_all(&mut self)
// Removes all tasks from this JoinSet without aborting them. The tasks removed
// by this call will continue to run in the background even if the JoinSet
// is dropped.
pub fn detach_all(&mut self)
JoinSet 的各 spawn() 都返回 AbortHandle
对象,通过该对象可以 abort 对应的 spawned task,而不等待运行结束。
不像 JoinHandle 那样 await task 结束,AbortHandle 只用于 terminate task 而不等待它结束。
Drop AbortHandle 释放了终止 task 的权利,但并不会终止任务。
impl AbortHandle
// 终止 task,这时 JoinHandle 可能返回 JoinError 错误(也可能正常结束)
pub fn abort(&self)
// abort/canceld task 是否完成
pub fn is_finished(&self) -> bool
pub fn id(&self) -> Id
LocalSet/spawn_local() #
LocalSet 是在同一个 thread 上运行一批异步 task,可以避免 tokio::spawn() 的 Future task 必须实现 Send 的要求(如在 async task 中使用 Rc):
-
task::LocalSet::new() 创建一个 LocalSet, 可以直接在该对象上使用 spawn_local() 提交任务。
-
task::LocalSet::run_until() 运行一个 async Future task,该方法为 task 创建一个 LocalSet 上下文,所以可以在 task 中使用 tokio::task::spawn_local() 来多次提交任务,当所有任务都结束时,run_until() 返回的对象 .await 返回。
-
当 LocalSet 中所有 task 都结束时,.await 返回。
只能使用 spawn_local() 方法来提交任务,不能使用 task::spawn()。
tokio::spawn_local() 向 LocalSet 提交的 !Send task 在 Runtime::block_on()
所在的单线程中调用。所以
LocalSet::run_until() 只能在 #[tokio::main]/#[tokio::test]
或者 Runtime::block_on()
中调用,不能在
tokio::spawn()
中调用。
use std::rc::Rc;
use tokio::task;
#[tokio::main]
async fn main() {
let nonsend_data = Rc::new("my nonsend data...");
let local = task::LocalSet::new();
// run_util() 自动调用 local.enter(), Future 位于该 LocalSet context 中
local.run_until(async move {
let nonsend_data = nonsend_data.clone();
// 在 run_until() 内部,可以多次使用 task::spawn_local() 提交任务, 都在 block_on() 所在的单线程上执行。
task::spawn_local(async move {
println!("{}", nonsend_data);
// ...
}).await.unwrap();
}).await; // .await 等待所有 spawn_local() 提交的 task 结束时返回
}
// 另一个例子
use tokio::{task, time};
use std::rc::Rc;
#[tokio::main]
async fn main() {
let nonsend_data = Rc::new("world");
let local = task::LocalSet::new();
let nonsend_data2 = nonsend_data.clone();
// 使用 LocalSet.spawn_local() 提交任务
local.spawn_local(async move {
// ...
println!("hello {}", nonsend_data2)
});
local.spawn_local(async move {
time::sleep(time::Duration::from_millis(100)).await;
println!("goodbye {}", nonsend_data)
});
// 等待 local 中提交的任务都结束
local.await;
}
LocalSet 的方法:
impl LocalSet
pub fn new() -> LocalSet
// 进入 LocalSet 的 context,这样后续使用 tokio::task::spawn_local() 提交 !Send task;
pub fn enter(&self) -> LocalEnterGuard
// 提交一个 !Send task, 返回可以 .await 的 JoinHandle
pub fn spawn_local<F>(&self, future: F) -> JoinHandle<F::Output> where F: Future + 'static, F::Output: 'static
// 在指定的 Runtime 中同步执行 future,直到返回。
// 内部调用 Runtime::block_on() 函数,所以需要在同步上下文中调用。
pub fn block_on<F>(&self, rt: &Runtime, future: F) -> F::Output where F: Future
// 执行一个 Future, 内部可以使用 tokio::task::spawn_local() 提交 !Send task, 运行直到这些 task 结束;
pub async fn run_until<F>(&self, future: F) -> F::Output where F: Future
pub fn unhandled_panic(&mut self, behavior: UnhandledPanic) -> &mut Self
pub fn id(&self) -> Id
tokio::mutex #
Rust 在 .await 时有可能将当前 task 转移到其他 thread 运行, 所以需要 async task block 实现 Send + ‘static。
由于标准库的 MutexGuard 没有实现 Send, 在持有 MutexGuard 的情况下不能跨 .await。
use std::sync::{Mutex, MutexGuard};
async fn increment_and_do_stuff(mutex: &Mutex<i32>) {
let mut lock: MutexGuard<i32> = mutex.lock().unwrap();
*lock += 1;
do_something_async().await;
} // lock goes out of scope here
编译报错: error: future cannot be sent between threads safely
error: future cannot be sent between threads safely
--> src/lib.rs:13:5
|
13 | tokio::spawn(async move {
| ^^^^^^^^^^^^ future created by async block is not `Send`
|
::: /playground/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-0.2.21/src/task/spawn.rs:127:21
|
127 | T: Future + Send + 'static,
| ---- required by this bound in `tokio::task::spawn::spawn`
|
= help: within `impl std::future::Future`, the trait `std::marker::Send` is not implemented for `std::sync::MutexGuard<'_, i32>`
note: future is not `Send` as this value is used across an await
--> src/lib.rs:7:5
|
4 | let mut lock: MutexGuard<i32> = mutex.lock().unwrap();
| -------- has type `std::sync::MutexGuard<'_, i32>` which is not `Send`
...
7 | do_something_async().await;
| ^^^^^^^^^^^^^^^^^^^^^^^^^^ await occurs here, with `mut lock` maybe used later
8 | }
| - `mut lock` is later dropped here
解决办法: 将 MutexGuard 的 destructor 在 await 前运行:
// This works!
async fn increment_and_do_stuff(mutex: &Mutex<i32>) {
{
let mut lock: MutexGuard<i32> = mutex.lock().unwrap();
*lock += 1;
} // lock goes out of scope here
do_something_async().await;
}
// 错误:这是由于 lock 变量在 await 时还有效。
async fn increment_and_do_stuff(mutex: &Mutex<i32>) {
let mut lock: MutexGuard<i32> = mutex.lock().unwrap();
*lock += 1;
drop(lock);
do_something_async().await;
}
或者使用 struct 来封装 Mutex, 然后在同步函数中处理锁:
use std::sync::Mutex;
struct CanIncrement {
mutex: Mutex<i32>,
}
impl CanIncrement {
// This function is not marked async.
fn increment(&self) {
let mut lock = self.mutex.lock().unwrap();
*lock += 1;
}
}
async fn increment_and_do_stuff(can_incr: &CanIncrement) {
// 在异步函数中调用加锁的同步函数。
can_incr.increment();
do_something_async().await;
}
或者使用 tokio::sync::Mutex 类型,它支持跨 .await(也即异步感知的 Mutex 类型), 但性能会差一些:
use tokio::sync::Mutex; // note! This uses the Tokio mutex
// This compiles! (but restructuring the code would be better in this case)
async fn increment_and_do_stuff(mutex: &Mutex<i32>) {
let mut lock = mutex.lock().await;
*lock += 1;
do_something_async().await;
} // lock goes out of scope here
综上, 对于 Mutex 和带来的 Send 问题:
- 首选将 Mutex 封装到 Struct 和同步函数中;
- 或者将 Mutex 在 .await 前解构(必须是 block 解构, 而不是 drop());
- 或者 spawn 一个 task 来专门管理 state, 其他 task 使用 message 来对它进行操作;(actor 模式)
异步任务中可以使用 std::sync::Arc/Mutex 同步原语来对共享内存进行并发访问。这里使用 std::sync::Mutex 而非 tokio::sync::Mutex, 只有当Mutex 跨 await 时才需要使用异步 Mutex。
Mutex lock 阻塞时会阻塞当前 thread, 会导致其他 async task 也不能调度到该 thread 执行。缺省情况下, tokio runtime 使用 multi-thread scheduler, task 被调度到那个 thread 是 tokio runtime 决定的。
use bytes::Bytes;
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
// 定义类型别名
type Db = Arc<Mutex<HashMap<String, Bytes>>>;
use tokio::net::TcpListener;
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
#[tokio::main]
async fn main() {
let listener = TcpListener::bind("127.0.0.1:6379").await.unwrap();
println!("Listening");
let db = Arc::new(Mutex::new(HashMap::new()));
loop {
let (socket, _) = listener.accept().await.unwrap();
// Clone the handle to the hash map.
let db = db.clone();
println!("Accepted");
tokio::spawn(async move {
process(socket, db).await;
});
}
}
use tokio::net::TcpStream;
use mini_redis::{Connection, Frame};
async fn process(socket: TcpStream, db: Db) {
use mini_redis::Command::{self, Get, Set};
// Connection, provided by `mini-redis`, handles parsing frames from the socket
let mut connection = Connection::new(socket);
while let Some(frame) = connection.read_frame().await.unwrap() {
let response = match Command::from_frame(frame).unwrap() {
Set(cmd) => {
// db 是 MutexGuard 类型, 生命周期没有跨 await, 所以可以使用同步 Mutex;
let mut db = db.lock().unwrap();
db.insert(cmd.key().to_string(), cmd.value().clone());
Frame::Simple("OK".to_string())
}
Get(cmd) => {
let db = db.lock().unwrap();
if let Some(value) = db.get(cmd.key()) {
Frame::Bulk(value.clone())
} else {
Frame::Null
}
}
cmd => panic!("unimplemented {:?}", cmd),
};
// Write the response to the client
connection.write_frame(&response).await.unwrap();
}
}
为了尽量降低 Mutex 竞争导致的 task/thread block, 可以:
- Switching to a dedicated task to manage state and use message passing.
Shard the mutex
- Restructure the code to
avoid the mutex
.
第二种情况的例子: 将单一 Mutex<HashMap<_, __>>
实例, 拆解为 N 个实例:
- 第三方 dashmap crate 提供了性能更高的 hashmap 实现。
type ShardedDb = Arc<Vec<Mutex<HashMap<String, Vec<u8>>>>>;
fn new_sharded_db(num_shards: usize) -> ShardedDb {
let mut db = Vec::with_capacity(num_shards);
for _ in 0..num_shards {
db.push(Mutex::new(HashMap::new()));
}
Arc::new(db)
}
// 后续查找 map 前先计算下 key 的 hash
let shard = db[hash(key) % db.len()].lock().unwrap();
shard.insert(key, value);
tokio::sync #
tokio 提供了 4 种类型 channel:
- oneshot:发送和接收一个值;
- mpsc:多个发送方,一个接收方;
- broadcast:多个发送方,多个接收方;
- watch:只保证接收方收到最新值,不保证它们收到所有值;
std:sync::mpsc 和 crossbeam::channel 都是同步 channel, 不能在 async 上下文中使用, 否则可能 block 当前线程和 task。
async_channel crate
提供了 multi-producer multi-consumer channel 类型。
oneshot #
oneshot:The oneshot channel supports sending a single value from a single producer to a single consumer
. This channel is usually used to send the result of a computation to a waiter. Example:
using a oneshot channel to receive the result of a computation.
- oneshot::channel() 用于创建一对 Sender 和 Receiver;
- Sender 的 send() 方法是同步方法,故可以在同步或异步上下文中使用;
- Receiver .await 返回 Sender 发送的值;
use tokio::sync::oneshot;
async fn some_computation() -> String {
"represents the result of the computation".to_string()
}
#[tokio::main]
async fn main() {
// 创建一对发送和接收 handler
let (tx, rx) = oneshot::channel();
tokio::spawn(async move {
let res = some_computation().await;
tx.send(res).unwrap(); // send() 是非异步的方法, 所以不需要 .await
});
// Do other work while the computation is happening in the background
// Wait for the computation result
let res = rx.await.unwrap();
}
如果 sender 在 send 前被 drop,则 receiver 失败,错误为 error::RecvError
:
use tokio::sync::oneshot;
#[tokio::main]
async fn main() {
let (tx, rx) = oneshot::channel::<u32>();
tokio::spawn(async move {
drop(tx);
});
match rx.await {
Ok(_) => panic!("This doesn't happen"),
Err(_) => println!("the sender dropped"),
}
}
在 tokio::select! loop 中使用 oneshot channel 时,需要在 channel 前添加 &mut
:
use tokio::sync::oneshot;
use tokio::time::{interval, sleep, Duration};
#[tokio::main]
async fn main() {
// select!{} 使用 &mut recv,所以 recv 必须是 mut 类型。
let (send, mut recv) = oneshot::channel();
let mut interval = interval(Duration::from_millis(100));
tokio::spawn(async move {
sleep(Duration::from_secs(1)).await;
send.send("shut down").unwrap();
});
loop {
tokio::select! {
_ = interval.tick() => println!("Another 100ms"),
// select! 自动 .await recv 的返回值
msg = &mut recv => {
println!("Got message: {}", msg.unwrap());
break;
}
}
}
}
oneshot::Receiver 的 close() 关闭 Receiver,这时 Sender 的 send() 方法会失败。
use tokio::sync::oneshot;
use tokio::sync::oneshot::error::TryRecvError;
#[tokio::main]
async fn main() {
let (tx, mut rx) = oneshot::channel();
assert!(!tx.is_closed());
rx.close();
assert!(tx.is_closed());
assert!(tx.send("never received").is_err());
match rx.try_recv() {
Err(TryRecvError::Closed) => {}
_ => unreachable!(),
}
}
oneshot::Sender 的 closed() 方法等待 oneshot::Receiver 被 closed 或被 Drop 时返回。
Sender 的 is_closed() 方法用于获取 Receiver 是否被 closed 或 Drop,这时 Sender 的 send() 方法会失败:
use tokio::sync::oneshot;
#[tokio::main]
async fn main() {
let (mut tx, rx) = oneshot::channel::<()>();
tokio::spawn(async move {
drop(rx);
});
// 等待 rx 被 drop 时返回
tx.closed().await;
println!("the receiver dropped");
}
// 使用 select!
use tokio::sync::oneshot;
use tokio::time::{self, Duration};
async fn compute() -> String {
// Complex computation returning a `String`
}
#[tokio::main]
async fn main() {
let (mut tx, rx) = oneshot::channel();
tokio::spawn(async move {
tokio::select! {
_ = tx.closed() => {
// The receiver dropped, no need to do any further work
}
value = compute() => {
// The send can fail if the channel was closed at the exact
// same time as when compute() finished, so just ignore the
// failure.
let _ = tx.send(value);
}
}
});
// Wait for up to 10 seconds
let _ = time::timeout(Duration::from_secs(10), rx).await;
}
mpsc #
mpsc 是标准库的 mpsc 的异步感知版本:
use tokio::io::{self, AsyncWriteExt};
use tokio::net::TcpStream;
use tokio::sync::mpsc;
#[tokio::main]
async fn main() -> io::Result<()> {
let mut socket = TcpStream::connect("www.example.com:1234").await?;
let (tx, mut rx) = mpsc::channel(100);
for _ in 0..10 {
// Each task needs its own `tx` handle. This is done by cloning the original handle.
let tx = tx.clone();
tokio::spawn(async move {
tx.send(&b"data to write"[..]).await.unwrap();
});
}
// The `rx` half of the channel returns `None` once **all** `tx` clones
// drop. To ensure `None` is returned, drop the handle owned by the current
// task. If this `tx` handle is not dropped,
// there will always be a single outstanding `tx` handle.
drop(tx);
while let Some(res) = rx.recv().await {
socket.write_all(res).await?;
}
Ok(())
}
mpsc 和 oneshot 联合使用,可以实现 req/resp 响应的对共享资源的处理:
use tokio::sync::{oneshot, mpsc};
use Command::Increment;
enum Command {
Increment,
// Other commands can be added here
}
#[tokio::main]
async fn main() {
let (cmd_tx, mut cmd_rx) = mpsc::channel::<
(Command, oneshot::Sender<u64>)>(100);
// Spawn a task to manage the counter
tokio::spawn(async move {
let mut counter: u64 = 0;
while let Some((cmd, response)) = cmd_rx.recv().await {
match cmd {
Increment => {
let prev = counter;
counter += 1;
response.send(prev).unwrap();
}
}
}
});
let mut join_handles = vec![];
// Spawn tasks that will send the increment command.
for _ in 0..10 {
let cmd_tx = cmd_tx.clone();
join_handles.push(tokio::spawn(async move {
let (resp_tx, resp_rx) = oneshot::channel();
cmd_tx.send((Increment, resp_tx)).await.ok().unwrap();
let res = resp_rx.await.unwrap();
println!("previous value = {}", res);
}));
}
// Wait for all tasks to complete
for join_handle in join_handles.drain(..) {
join_handle.await.unwrap();
}
}
actor 模式 #
tokio mpsc message passing 机制(actor 模式):
-
一个 tokio spawn task 作为 manager 角色, 通过 buffered mpsc channel 接收 message, 然后根据 message 类型来操作有状态对象, 由于只有 manager 来串行操作该对象, 所以可以避免加锁。
- buffered mpsc channel 类型可以实现 sender 反压。
-
manager 通过 message 中的 response channel 来向发送者响应结果;
use bytes::Bytes;
use mini_redis::client;
use tokio::sync::{mpsc, oneshot};
/// Multiple different commands are multiplexed over a single channel.
#[derive(Debug)]
enum Command {
Get {
key: String,
resp: Responder<Option<Bytes>>, // 发送响应的 oneshot 类型 channel
},
Set {
key: String,
val: Bytes,
resp: Responder<()>,
},
}
/// Provided by the requester and used by the manager task to send the command response back to the requester.
type Responder<T> = oneshot::Sender<mini_redis::Result<T>>;
#[tokio::main]
async fn main() {
// 创建一个 buffered 类型 channel, 当消费的慢时可以对发送端反压 ( send(...).await 将阻塞一段时间), 从而降低内存和并发度, 防止消耗过多系统资源。
let (tx, mut rx) = mpsc::channel(32);
let tx2 = tx.clone();
let manager = tokio::spawn(async move {
// Open a connection to the mini-redis address.
let mut client = client::connect("127.0.0.1:6379").await.unwrap();
while let Some(cmd) = rx.recv().await {
match cmd {
Command::Get { key, resp } => {
let res = client.get(&key).await;
// 使用 let _ = 来忽略 result 错误。
let _ = resp.send(res); // 通过 oneshot channel 发送响应
}
Command::Set { key, val, resp } => {
let res = client.set(&key, val).await;
// Ignore errors
let _ = resp.send(res);
}
}
}
});
// Spawn two tasks, one setting a value and other querying for key that was set.
let t1 = tokio::spawn(async move {
let (resp_tx, resp_rx) = oneshot::channel(); // 构建响应 channel
let cmd = Command::Get {
key: "foo".to_string(),
resp: resp_tx,
};
// Send the GET request
if tx.send(cmd).await.is_err() {
eprintln!("connection task shutdown");
return;
}
// Await the response
let res = resp_rx.await;
println!("GOT (Get) = {:?}", res);
});
let t2 = tokio::spawn(async move {
let (resp_tx, resp_rx) = oneshot::channel();
let cmd = Command::Set {
key: "foo".to_string(),
val: "bar".into(),
resp: resp_tx,
};
// Send the SET request
if tx2.send(cmd).await.is_err() {
eprintln!("connection task shutdown");
return;
}
// Await the response
let res = resp_rx.await;
println!("GOT (Set) = {:?}", res);
});
t1.await.unwrap();
t2.await.unwrap();
manager.await.unwrap();
}
broadcast #
broadcast channel: 从多个发送端向多个接收端发送多个值,可以实现 fan out 模式,如 pub/sub 或 chat 系统。
-
tokio::sync::broadcast::channel(N) 创建一个指定容量为 N 的 bounded,multi-producer,multi-consumer channel,当 channel 中元素数量达到 N 后,最老的元素将被清理,没有消费该元素的 Receiver 的 recv() 方法将返回 RecvError::Lagged 错误,然后该 Receiver 的读写位置将更新到 channel 中当前最老的元素,下一次 recv() 将返回该元素。通过这种机制,Receiver 可以感知是否 Lagged 以及做相应的处理。
-
Sender 实现了 clone,可以 clone 多个实例,然后在多个 task 中使用。当所有Sender 都被 drop 时,channel 将处于 closed 状态,这时 Receiver 的 recv() 将返回 RecvError::Closed 错误。
-
Sender::subscribe() 创建新的 Receiver,它接收调用 subscribe() 创建它后发送的消息。当所有 Receiver 都被 drop 时, Sender::send() 方法返回 SendError;
-
Sender 发送的值会被 clone 后发送给所有 Receiver,直到它们都收到这个值后, 该值才会从 channel 中移除;
use tokio::sync::broadcast;
#[tokio::main]
async fn main() {
let (tx, mut rx1) = broadcast::channel(16);
// 创建一个新的接收端
let mut rx2 = tx.subscribe();
tokio::spawn(async move {
assert_eq!(rx1.recv().await.unwrap(), 10);
assert_eq!(rx1.recv().await.unwrap(), 20);
});
tokio::spawn(async move {
assert_eq!(rx2.recv().await.unwrap(), 10);
assert_eq!(rx2.recv().await.unwrap(), 20);
});
tx.send(10).unwrap();
tx.send(20).unwrap();
}
// 感知 Lagged
use tokio::sync::broadcast;
#[tokio::main]
async fn main() {
let (tx, mut rx) = broadcast::channel(2);
tx.send(10).unwrap();
tx.send(20).unwrap();
// 由于 channel 容量为 2,所以发送 30 后,10 将被丢弃。
tx.send(30).unwrap();
// rx 没有收到被丢弃的 10,所以第一次 recv 报错。
assert!(rx.recv().await.is_err());
// 后续再 recv 时,开始接收最老的数据。
assert_eq!(20, rx.recv().await.unwrap());
assert_eq!(30, rx.recv().await.unwrap());
}
watch #
watch channel:从多个发送端向多个接收端发送多个值,但是 channel 中只保存最新的一个值,所以如果接收端存在延迟,则不能保证它接收了所有的中间值。类似于容量为 1 的 broadcast channel。
使用场景:广播配置变更,应用状态变更,优雅关闭等。
- watch::channel(initial_value): 创建一个 watch channel 时可以指定一个初始值;
- Sender::subscribe():创建一个新的 Receiver,它只接收后面新发送的值;
- Sender 实现了 Clone trait,可以 clone 多个实例来发送数据。同时 Sender 和 Receiver 都是 thread safe;
- Sender::is_closed()/closed() 为 Sender 提供检查所有 Receiver 是否被 closed 或 Drop 的方法。
- Sender::send() :必须在有 Receiver 的情况下才能发送成功,而 send_if_modified, send_modify, 或 send_replace 方法可以在没有 Receiver 的情况下发送成功;
- send_modify(modify: FnOnce(&mut T)); 无条件的更新 T 值,然后通知所有接收者,可以在没有接收者的情况下使用。
- send_if_modified
(&self, modify: F) -> bool 和 send_modify() 类似,但是 modify 闭包返回 bool 值为 true 时才通知所有接收者。
let sender = tokio::sync::watch::Sender::new(0u8);
// 没有 receiver,所以发送出错
assert!(sender.send(3).is_err());
// 使用 subscribe() 方法创建一个 receiver
let _rec = sender.subscribe();
// 发送成功
assert!(sender.send(4).is_ok());
Receiver 使用 Receiver::changed() 来接收 un-seen 值更新,如果没有 un-seen 值,则该方法会 sleep 直到有 un-seen 值或者 Sender 被 Drop,如果有 un-seen 值则该方法立即返回 Ok(()) 。
-
Receiver 使用 Receiver::borrow_and_update() 来获取最新值,并标记为 seen。
-
如果只是获取最新值而不标记为 seen,则使用 Receiver::borrow()。borrow()没有将值标记为 seen,所以下一次调用 Receiver::changed() 时将立即 返回 Ok(())。 borrow_and_update() 将值标记为 seen,所以下一次调用 Receiver::changed() 时将 sleep 直到有新的值;
-
Receiver 在 borrow 值时会将 channel 设置一个 read lock,所以当 borrow 时间较长时,可能会阻塞 Sender;
注意:一般需要先调用 changed() 方法来确认有新的更新值后,在调用 borrow_and_update() 或 borrow() 方法来获得最新的值。否则直接调用这两个方法时,将可能返回老的 值。
use tokio::sync::watch;
use tokio::time::{self, Duration, Instant};
use std::io;
#[derive(Debug, Clone, Eq, PartialEq)]
struct Config {
timeout: Duration,
}
impl Config {
async fn load_from_file() -> io::Result<Config> {
// file loading and deserialization logic here
}
}
async fn my_async_operation() {
// Do something here
}
#[tokio::main]
async fn main() {
let mut config = Config::load_from_file().await.unwrap();
// 创建 watch channel 并提供初始值
let (tx, rx) = watch::channel(config.clone());
// Spawn a task to monitor the file.
tokio::spawn(async move {
loop {
// Wait 10 seconds between checks
time::sleep(Duration::from_secs(10)).await;
// Load the configuration file
let new_config = Config::load_from_file().await.unwrap();
// If the configuration changed, send the new config value on the
// watch channel.
if new_config != config {
tx.send(new_config.clone()).unwrap();
config = new_config;
}
}
});
let mut handles = vec![];
// Spawn tasks that runs the async operation for at most `timeout`.
// If the timeout elapses, restart the operation.
//
// The task simultaneously watches the `Config` for changes. When the
// timeout duration changes, the timeout is updated without restarting
// the in-flight operation.
for _ in 0..5 {
// Clone a config watch handle for use in this task
let mut rx = rx.clone();
let handle = tokio::spawn(async move {
// Start the initial operation and pin the future to the stack.
// Pinning to the stack is required to resume the operation across
// multiple calls to `select!`
let op = my_async_operation();
tokio::pin!(op);
// Get the initial config value
let mut conf = rx.borrow().clone();
let mut op_start = Instant::now();
let sleep = time::sleep_until(op_start + conf.timeout);
tokio::pin!(sleep);
loop {
tokio::select! {
_ = &mut sleep => {
// The operation elapsed. Restart it
op.set(my_async_operation());
// Track the new start time
op_start = Instant::now();
// Restart the timeout
sleep.set(time::sleep_until(op_start + conf.timeout));
}
_ = rx.changed() => {
// 获得最新值,然后标记为 seen
conf = rx.borrow_and_update().clone();
// The configuration has been updated. Update the `sleep` using the new
// `timeout` value.
sleep.as_mut().reset(op_start + conf.timeout);
}
_ = &mut op => {
// The operation completed!
return
}
}
}
});
handles.push(handle);
}
for handle in handles.drain(..) {
handle.await.unwrap();
}
}
Notify #
tokio::sync::Notify 用于通知一个或所有的 task wakup,它本身不携带任何数据:
A Notify can be thought of as a Semaphore starting with 0 permits
.The notified().await
method
waits for a permit to become available, and notify_one() sets a permit
if there currently are no
available permits.
- notified(&self) -> Notified<’_> :
Wait
for a notification. 返回的 Notified 实现 Future,.await 时将被阻塞, 直到收到通知; - notify_one(): Notifies the
first
waiting task. - notify_last(&self): Notifies the
last
waiting task. - notify_waiters(&self): Notifies
all
waiting tasks.
use tokio::sync::Notify;
use std::sync::Arc;
#[tokio::main]
async fn main() {
let notify = Arc::new(Notify::new());
let notify2 = notify.clone();
let handle = tokio::spawn(async move {
notify2.notified().await; // 等待被 Notify
println!("received notification");
});
println!("sending notification");
notify.notify_one();
// Wait for task to receive notification.
handle.await.unwrap();
}
#[tokio::main]
async fn main() {
let notify = Arc::new(Notify::new());
let notify2 = notify.clone();
let notified1 = notify.notified();
let notified2 = notify.notified();
let handle = tokio::spawn(async move {
println!("sending notifications");
notify2.notify_waiters();
});
notified1.await;
notified2.await;
println!("received notifications");
}
tokio::sync::Notify 可以作为替代 waker 的异步函数内部 thread 间同步的工具:
use tokio::sync::Notify;
use std::sync::Arc;
use std::time::{Duration, Instant};
use std::thread;
async fn delay(dur: Duration) {
let when = Instant::now() + dur;
let notify = Arc::new(Notify::new());
let notify_clone = notify.clone();
thread::spawn(move || {
let now = Instant::now();
if now < when {
thread::sleep(when - now);
}
notify_clone.notify_one();
});
notify.notified().await;
}
tokio::io #
tokio::io 没有定义和使用 Read/Write trait
,而是定义和使用 AsyncRead/AsyncWrite/AsyncSeek trait
,同时为这两个 trait 定义了:
- 异步 Buf 版本:
AsyncBufRead,AsyncBufReadExt
; - 其它扩展 trait:
AsyncReadExt/AsyncWriteExt/AsyncSeekExt
;
从 AsyncRead 创建 tokio::io::BufReader 对象, 实现 AsyncRead/AsyncBufRead trait.
从 AsyncWrite 创建 Struct tokio::io::BufWriter 对象, 实现 AsyncWrite trait.
从同时实现了 AsyncRead/AsyncWrite 的对象创建 struct BufStream<RW>
(例如 TCPStream 对象), 它实现了 AsyncBufRead/AsyncWrite
.
AsyncRead/AsyncWrite/AsyncBufRead trait
提供的 poll_XX()
开头的方法,不方便直接使用,而各种 Ext trait 则提供了更常用的
Read/Write/Lines() 等方法。
AsyncReadExt trait
返回的对象都实现了 Future:
pub trait AsyncReadExt: AsyncRead {
// Provided methods
fn chain<R>(self, next: R) -> Chain<Self, R> where Self: Sized, R: AsyncRead { ... }
fn read<'a>(&'a mut self, buf: &'a mut [u8]) -> Read<'a, Self> where Self: Unpin { ... }
fn read_buf<'a, B>(&'a mut self, buf: &'a mut B) -> ReadBuf<'a, Self, B> where Self: Unpin, B: BufMut + ?Sized
fn read_exact<'a>(&'a mut self, buf: &'a mut [u8]) -> ReadExact<'a, Self> where Self: Unpin { ... }
fn read_u8(&mut self) -> ReadU8<&mut Self> where Self: Unpin { ... }
// https://docs.rs/tokio/latest/src/tokio/io/util/read.rs.html#43
impl<R> Future for Read<'_, R> where R: AsyncRead + Unpin + ?Sized,
{
type Output = io::Result<usize>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<usize>> {
let me = self.project();
let mut buf = ReadBuf::new(me.buf);
ready!(Pin::new(me.reader).poll_read(cx, &mut buf))?;
Poll::Ready(Ok(buf.filled().len()))
}
}
例如: read() 返回的 Read 对象实现了 Future, Poll ready 时返回 io::Resultasync fn
形式,但由于返回
的对象实现了 Future, 它们也可以被 .await 轮询:
use tokio::fs::File;
// 需要显式导入 AsyncReadExt 和 AsyncWriteExt trait
use tokio::io::{self, AsyncReadExt, AsyncWriteExt};
#[tokio::main]
async fn main() -> io::Result<()> {
let mut f = File::open("foo.txt").await?;
let mut buffer = [0; 10];
// read up to 10 bytes
let n = f.read(&mut buffer[..]).await?;
println!("The bytes: {:?}", &buffer[..n]);
let mut buffer = Vec::new();
// read the whole file
f.read_to_end(&mut buffer).await?;
Ok(())
let mut file = File::create("foo.txt").await?;
// Writes some prefix of the byte string, but not necessarily all of it.
let n = file.write(b"some bytes").await?;
println!("Wrote the first {} bytes of 'some bytes'.", n);
Ok(())
}
tokio::io::split()
函数将传入的支持 AsyncRead + AsyncWrite
的 Stream 对象, 如 TCPStream, 拆分为 ReadHalf<T>, WriteHalf<T>
, 前者实现 AsyncRead trait, 后者实现 AsyncWrite trait。
pub fn split<T>(stream: T) -> (ReadHalf<T>, WriteHalf<T>)
where
T: AsyncRead + AsyncWrite,
// 示例:
// https://tokio.rs/tokio/tutorial/io
use tokio::io::{self, AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpStream;
#[tokio::main]
async fn main() -> io::Result<()> {
let socket = TcpStream::connect("127.0.0.1:6142").await?;
let (mut rd, mut wr) = io::split(socket);
// Write data in the background
tokio::spawn(async move {
wr.write_all(b"hello\r\n").await?;
wr.write_all(b"world\r\n").await?;
// Sometimes, the rust type inferencer needs a little help
Ok::<_, io::Error>(())
});
let mut buf = vec![0; 128];
loop {
let n = rd.read(&mut buf).await?;
if n == 0 {
break;
}
println!("GOT {:?}", &buf[..n]);
}
Ok(())
}
tokio::io::duplex()
函数创建一个使用内存作为缓冲的 DuplexStream 类型对象,它实现了 AsyncRead/AsyncWrite trait。
DuplexStream 被 drop 时, 另一端 read 还可以继续读取内存中的数据, 读到 0 byte 表示 EOF,而另一端 write 时立即返回 Err(BrokenPipe) ;
pub fn duplex(max_buf_size: usize) -> (DuplexStream, DuplexStream)
// 示例
let (mut client, mut server) = tokio::io::duplex(64);
client.write_all(b"ping").await?;
let mut buf = [0u8; 4];
server.read_exact(&mut buf).await?;
assert_eq!(&buf, b"ping");
server.write_all(b"pong").await?;
client.read_exact(&mut buf).await?;
assert_eq!(&buf, b"pong");
select!{} #
tokio::select!{} 宏用于同时 .await 多个 async task,当其中一个完成时返回, drop 其它分支的 task Future 对象。
必须在 async context 中使用 select!, 如 async functions, closures, 和 blocks。
use tokio::sync::oneshot;
#[tokio::main]
async fn main() {
let (tx1, rx1) = oneshot::channel();
let (tx2, rx2) = oneshot::channel();
tokio::spawn(async {
let _ = tx1.send("one");
});
tokio::spawn(async {
let _ = tx2.send("two");
});
// select 并发 poll rx1 和 rx2, 如果 rx1 先 Ready 则结果赋值给 val 并 drop rx2.
tokio::select! {
val = rx1 => {
println!("rx1 completed first with {:?}", val);
}
val = rx2 => {
println!("rx2 completed first with {:?}", val);
}
}
}
tokio::sync::oneshot::Receiver 实现了 Drop, 被 Drop 时对应的 tokio::sync::oneshot::Sender 可以收到 closed 通知 (sender 的 async closed() 方法返回)。
use tokio::sync::oneshot;
async fn some_operation() -> String {
// Compute value here
}
#[tokio::main]
async fn main() {
let (mut tx1, rx1) = oneshot::channel();
let (tx2, rx2) = oneshot::channel();
tokio::spawn(async {
// Select on the operation and the oneshot's `closed()` notification.
tokio::select! {
val = some_operation() => {
let _ = tx1.send(val);
}
// tx1.closed() 是一个 async 方法, 故可以被 select, 当对端 rx1 被 Drop 时返回.
_ = tx1.closed() => {
// `some_operation()` is canceled, the task completes and
// `tx1` is dropped.
}
}
});
tokio::spawn(async {
let _ = tx2.send("two");
});
// rx2 先 .await 返回时, rx1 会被 Drop, 这时上面的 tx1.closed() .await 返回
tokio::select! {
val = rx1 => {
println!("rx1 completed first with {:?}", val);
}
val = rx2 => {
println!("rx2 completed first with {:?}", val);
}
}
}
select! 宏支持多个 branch(当前限制为 64 个):
<pattern> = <async expression> (, if <precondition>)? => <handler>
else => <expression>
Rust 在 单线程异步执行
所有 branch 的 async expressions (一个 branch 的 async expression 不 Ready 时, 调度执行另一个
branch 的 async expression), 当第一个 async expressions 返回且它的 result 与 pattern 匹配时, Drop 所有其它 async
expression, 然后执行 handler。如果 result 与 pattern 不匹配, 继续等待下一个异步返回并检查返回值是否匹配.
每个 branch 还可以有 if 表达式, 如果 if 表达式结果为 false, 则对应 async expression 还是会被执行, 但是返回的 Future 不会被 .await Poll。(例如在 loop 中多次 poll 一个 Pin 的 Future 对象, 一旦该对象 Ready, 后续就不能再 poll 了, 这时需要使用 if 表达式来排除该 branch)。
每次进入执行 select! 时:
-
先执行个 branch 的 if 表达式, 结果为 false 时, disable 对应 branch;
-
执行所有 async expression(包括 disable 的 branch, 但是不 poll 它) , 注意是在单 thread 中异步执行这些expression;
-
并发 .await poll 未被 disable 的 expression 返回的 Future 对象;
-
当第一个 poll 返回时, 看是否匹配 pattern, 如果不匹配则 disable 该 branch, 继续等待其它 branch .await poll 返回, 重复 1-3; 如果匹 配, 则执行该 branch 的 handler, 同时 drop 其它 branch 的 Future 对象.
-
如果所有 branch 都被 disable(如都不匹配 pattern), 则执行 else branch handler, 如果没有提供 else 则 panic.
各分支的 aysnc expression 可以执行复杂的计算,只要结果是 Future 对象即可:
use tokio::net::TcpStream;
use tokio::sync::oneshot;
#[tokio::main]
async fn main() {
let (tx, rx) = oneshot::channel();
// Spawn a task that sends a message over the oneshot
tokio::spawn(async move {
tx.send("done").unwrap();
});
tokio::select! {
socket = TcpStream::connect("localhost:3465") => {
println!("Socket connected {:?}", socket);
}
msg = rx => {
println!("received message first {:?}", msg);
}
}
}
// 另一个例子
use tokio::net::TcpListener;
use tokio::sync::oneshot;
use std::io;
#[tokio::main]
async fn main() -> io::Result<()> {
let (tx, rx) = oneshot::channel();
tokio::spawn(async move {
tx.send(()).unwrap();
});
let mut listener = TcpListener::bind("localhost:3465").await?;
// 并发执行两个 branch, 直到 rx 收到消息或者 accept 出错。
tokio::select! {
// 使用 async {} 来定义一个 async expression, 该 expression 返回 Result
// select! 会 .await 这个 expression 和 rx, 当 rx 返回时该 expression 不会被继续 .await
_ = async {
loop {
// listener 出错时才返回
let (socket, _) = listener.accept().await?;
tokio::spawn(async move { process(socket) });
}
// Help the rust type inferencer out
Ok::<_, io::Error>(())
} => {}
_ = rx => {
println!("terminating accept loop");
}
}
Ok(())
}
async expression 返回值支持 pattern match, 并且可以有一个 else 分支, 当所有 pattern match 都不匹配时, 才执行 else 分支。
use tokio::sync::mpsc;
#[tokio::main]
async fn main() {
let (mut tx1, mut rx1) = mpsc::channel(128);
let (mut tx2, mut rx2) = mpsc::channel(128);
tokio::spawn(async move {
// Do something w/ `tx1` and `tx2`
});
// 这里 drop 的是 recv() 返回的 Future,而非 rx1 和 rx2 对象本身
tokio::select! {
Some(v) = rx1.recv() => {
println!("Got {:?} from rx1", v);
}
Some(v) = rx2.recv() => {
println!("Got {:?} from rx2", v);
}
else => {
println!("Both channels closed");
}
}
}
select! 宏有返回值, 各 branch 必须返回相同类型值:
async fn computation1() -> String {
// .. computation
}
async fn computation2() -> String {
// .. computation
}
#[tokio::main]
async fn main() {
let out = tokio::select! {
res1 = computation1() => res1,
res2 = computation2() => res2,
};
println!("Got = {}", out);
}
Errors:
- 如果 async expression 中有 ? 则表达式返回 Result, 例如 accept().await? 出错时, res 为 Result::Err, 这时 handler 返回该 Error。
- 如果是 handler 中有 ?, 则会立即将 error 传递到 select 表达式外所在的函数, 如 res? 会将错误传递到 main() 函数;
use tokio::net::TcpListener;
use tokio::sync::oneshot;
use std::io;
#[tokio::main]
async fn main() -> io::Result<()> {
// [setup `rx` oneshot channel]
let listener = TcpListener::bind("localhost:3465").await?;
tokio::select! {
res = async {
loop {
// await 结果为 Err 时,表达式返回,将 res 设置为 Err
let (socket, _) = listener.accept().await?;
tokio::spawn(async move { process(socket) });
}
// Help the rust type inferencer out
Ok::<_, io::Error>(())
} => {
// handler 内 res 为 Err 时,? 表达式直接将 Err 返回给 select! 所在的函数。
res?;
}
_ = rx => {
println!("terminating accept loop");
}
}
Ok(())
}
Borrowing: 对于 tokio::spawn(async {..}) 必须 move 捕获要使用的数据, 但是对于 select! 的多 branch async expression 则不需 要, 只要遵守 borrow 的规则即可, 比如同时访问 & 共享数据, 唯一访问 &mut 数据。
这是因为 tokio 在一个单线程中异步并发 poll 所有表达式返回的 Future 对象。 select! 必须异步并发执行所有 aysnc expression, 并且当第一个 expression 返回的结果匹配 pattern 时才 drop 其它正在执行的 async expression。
use tokio::io::AsyncWriteExt;
use tokio::net::TcpStream;
use std::io;
use std::net::SocketAddr;
async fn race(
data: &[u8],
addr1: SocketAddr,
addr2: SocketAddr
) -> io::Result<()> {
tokio::select! {
// OK:两个 async expression 共享借用 data
Ok(_) = async {
let mut socket = TcpStream::connect(addr1).await?;
socket.write_all(data).await?;
Ok::<_, io::Error>(())
} => {}
Ok(_) = async {
let mut socket = TcpStream::connect(addr2).await?;
socket.write_all(data).await?;
Ok::<_, io::Error>(())
} => {}
else => {}
};
Ok(())
}
#[tokio::main]
async fn main() {
let (tx1, rx1) = oneshot::channel();
let (tx2, rx2) = oneshot::channel();
let mut out = String::new();
tokio::spawn(async move {
// Send values on `tx1` and `tx2`.
});
// 由于 select 保证只执行一个 handler, 所以多个 handler 中可以 &mut 使用相同的数据 &mut out。
tokio::select! {
_ = rx1 => {
out.push_str("rx1 completed");
}
_ = rx2 => {
out.push_str("rx2 completed");
}
}
println!("{}", out);
}
Loop: 并发执行多个 async expression, 当它们都 recv() 返回时, select 随机选择一个 branch 来执行 handler, 未执行的 handler
branch 的 message 也不会丢(称为 Cancellation safety
)。
use tokio::sync::mpsc;
#[tokio::main]
async fn main() {
let (tx1, mut rx1) = mpsc::channel(128);
let (tx2, mut rx2) = mpsc::channel(128);
let (tx3, mut rx3) = mpsc::channel(128);
loop {
// 当 rx1.recv() 返回时, drop 的是 rx2.recv() 和 rx3.recv() 的 Future
// 对象, 而不是 rx2 和 rx3, 所以下一次 loop 还可以继续调用。
// 当 channel 被 close , .recv() 返回 None, 不匹配 Some, 所以其它 rx 还是继续进行, 直到所有 rx 都被关闭.
// 注意: std::sync::mpsc 的 Receiver.recv() 返回 Result, 而 tokio::sync::mpsc 的 Receiver.recv() 返回 Option.
let msg = tokio::select! {
Some(msg) = rx1.recv() => msg,
Some(msg) = rx2.recv() => msg,
Some(msg) = rx3.recv() => msg,
else => { break }
};
println!("Got {:?}", msg);
}
println!("All channels have been closed.");
}
Cancellation safety
: 在 loop 中使用 select! 来从多个 branch 接收 message 时, 需要确保接收被 cancel 时不会丢失 message。
以下方法是 cancellation safe 的:
- tokio::sync::mpsc::Receiver::recv
- tokio::sync::mpsc::UnboundedReceiver::recv
- tokio::sync::broadcast::Receiver::recv
- tokio::sync:⌚:Receiver::changed
- tokio::net::TcpListener::accept
- tokio::net::UnixListener::accept
- tokio::signal::unix::Signal::recv
- tokio::io::AsyncReadExt::read on any AsyncRead
- tokio::io::AsyncReadExt::read_buf on any AsyncRead
- tokio::io::AsyncWriteExt::write on any AsyncWrite
- tokio::io::AsyncWriteExt::write_buf on any AsyncWrite
- tokio_stream::StreamExt::next on any Stream
- futures::stream::StreamExt::next on any Stream
以下方法不是 cancellation safe 的, 有可能会导致丢失数据:
- tokio::io::AsyncReadExt::read_exact
- tokio::io::AsyncReadExt::read_to_end
- tokio::io::AsyncReadExt::read_to_string
- tokio::io::AsyncWriteExt::write_all
以下方法也不是 cancellation safe 的, 因为它们使用 queue 来保证公平, cancel 时会丢失 queue 中的数据:
- tokio::sync::Mutex::lock
- tokio::sync::RwLock::read
- tokio::sync::RwLock::write
- tokio::sync::Semaphore::acquire
- tokio::sync::Notify::notified
如何判断方法是 cancellation safe?主要是看 .await 位置,因为当异步方法被 cancelled 时,总是在 .await 的位置被取消。当方法的 .await 位置被重 启时,如果函数功能还正常,那么就是 cancellation safe 的。
Cancellation safety 定义的方式:如果一个 Future 还没有完成,在 drop 这个 Future 或重建它时,它的行为必须是 no-op 的。也就是当一个 Future 未 ready 时, drop 该 Future 什么都不影响。 这也是 loop 中使用 select!的要求。没有这个要求的话,在 loop 中重新执行 select! 时,会重新进展。
在 loop+select!{} 宏中,要对 future 对象重复 poll,而 select!{} 在并发 .await 时,如果有返回值,则会 drop 其它 branch 的 Future 对 象,所以在 select branch 中不能直接使用 future 对象,以避免它被 drop 后下一次 loop 失败 select 失败。
常见的解决办法是:
- 创建一个 future mut 对象;
- 使用 pin!(future) 来创建一个同名的,但类型是
Pin<&mut impl Future<Output=xx>>
的对象 future; - 在 select branch 中使用
&mut future
来进行轮询。(这是因为 Future 的 poll() 方法的 self 签名是self: Pin<&mut Self>)
async fn action() {
// Some asynchronous logic
}
#[tokio::main]
async fn main() {
let (mut tx, mut rx) = tokio::sync::mpsc::channel(128);
let operation = action(); // 返回一个 Future 对象, 没有对它 .await
// 必须先 Pin 该 Future 对象,创建一个同名的类型为 Pin<&mut impl Future<Output=i32> 类型对象 ;
tokio::pin!(operation);
loop {
tokio::select! {
// &mut operation 类型是已经 Pin 住类型的 &mut 借用,
// 即 &mut Pin<&mut impl Future<Output=i32>
// .await 表达式支持这种类型的表达式调用, 所以可以被 poll 轮询。
//
// 当该 branch 因不匹配或无返回值而被 drop/cancel 时,operation 任然有效,下一次 select!{} 可以接续使用 &mut operation
_ = &mut operation => break,
Some(v) = rx.recv() => {
if v % 2 == 0 {
break;
}
}
}
}
}
Modifying a branch
- async fn 一旦 completion 就不能再 resume 执行: 一般情况下, future.await? 会消耗 future 对象.
- branch 可以使用 if 表达式先判断是否需要执行, 如:
&mut operation, if !done => {xxx};
async fn action(input: Option<i32>) -> Option<String> {
// If the input is `None`, return `None`.
// This could also be written as `let i = input?;`
let i = match input {
Some(input) => input,
None => return None,
};
// async logic here
}
#[tokio::main]
async fn main() {
let (mut tx, mut rx) = tokio::sync::mpsc::channel(128);
let mut done = false;
let operation = action(None);
tokio::pin!(operation);
tokio::spawn(async move {
let _ = tx.send(1).await;
let _ = tx.send(3).await;
let _ = tx.send(2).await;
});
loop {
tokio::select! {
// 先判断 done, 为 false 时才执行 async expression。
// 这里使用的是 &mut, 所以 .await 并不会消耗对象, operation 在下一次 loop 还存在。
res = &mut operation, if !done => {
done = true;
if let Some(v) = res {
println!("GOT = {}", v);
return;
}
}
Some(v) = rx.recv() => {
if v % 2 == 0 {
// `.set` is a method on `Pin`.
operation.set(action(Some(v)));
done = false;
}
}
}
}
}
tokio::spawn 和 select! 的区别:
-
都可以并发执行都各异步任务,但是 spawn() 是 tokio runtime 调度执行,而且可能位于其它线程上,所以调度的 async task 必须满足
Future + Send + Sync + 'static
,而 ‘static 意味着 task 中不能有共享借用(需要 move); -
select! 在单个 task 线程上执行所有 branch 的异步任务,所以它们是串行的。(比如一个 async expression 未 Ready 时, 执行另一个 branch 的 async expression).
tokio-stream #
Stream 是 std::iter::Iterator
的异步版本, 返回一系列 value。当前,Stream 还不在 Rust 标准库中, 是 futures-core
crate 定义
的 Stream trait 类型。
poll_next()
方法返回的 Poll 类型值语义:
- Ready<Some(Item)>: 返回下一个值;
- Ready
: Stream 结束; - Pending: 值不 Ready;
// https://docs.rs/futures-core/0.3.30/futures_core/stream/trait.Stream.html
pub trait Stream {
type Item;
// Required method
fn poll_next(
self: Pin<&mut Self>,
cx: &mut Context<'_>
) -> Poll<Option<Self::Item>>;
// Provided method
fn size_hint(&self) -> (usize, Option<usize>) { ... }
}
tokio 在单独的 tokio-stream crate
中提供 Stream 支持,它通过 pub use futures_core::Stream;
来 re-export
futures-core crate 定义的 Stream trait。
tokio_stream crate 提供了如下 module 函数:
empty
: 创建一个返回空内容的 stream;iter
: 将一个 Iterator 转换为 stream;once
: 创建一个只返回一个元素的 stream;pending
: 创建一个不会 Ready 的 stream;
tokio_stream::StreamExt #
它实际是 futures::StreamExt
的 pub use 导出,是 futures_core::stream::Stream
子 trait, 提供了常用的额外 trait 方法,
包括各种 adapter 方法(如 map/filter 等), 以及用于迭代的 next() 方法:
- 实现 Stream 的类型也自动实现了 StreamExt。
- timeout() 可以作为一种通用的异步超时机制。
pub trait StreamExt: Stream {
// next() 迭代返回下一个元素,返回的 Next 类型对象实现了 Future trait,.await 时返回 Option
fn next(&mut self) -> Next<'_, Self> where Self: Unpin
fn try_next<T, E>(&mut self) -> TryNext<'_, Self> where Self: Stream<Item = Result<T, E>> + Unpin { ... }
fn map<T, F>(self, f: F) -> Map<Self, F> where F: FnMut(Self::Item) -> T, Self: Sized
fn map_while<T, F>(self, f: F) -> MapWhile<Self, F> where F: FnMut(Self::Item) -> Option<T>, Self: Sized { ... }
fn then<F, Fut>(self, f: F) -> Then<Self, Fut, F> where F: FnMut(Self::Item) -> Fut, Fut: Future, Self: Sized { ... }
fn merge<U>(self, other: U) -> Merge<Self, U> where U: Stream<Item = Self::Item>, Self: Sized { ... }
fn filter<F>(self, f: F) -> Filter<Self, F> where F: FnMut(&Self::Item) -> bool, Self: Sized { ... }
fn filter_map<T, F>(self, f: F) -> FilterMap<Self, F> where F: FnMut(Self::Item) -> Option<T>, Self: Sized { ... }
fn fuse(self) -> Fuse<Self> where Self: Sized { ... }
fn take(self, n: usize) -> Take<Self> where Self: Sized { ... }
fn take_while<F>(self, f: F) -> TakeWhile<Self, F> where F: FnMut(&Self::Item) -> bool, Self: Sized { ... }
fn skip(self, n: usize) -> Skip<Self> where Self: Sized { ... }
fn skip_while<F>(self, f: F) -> SkipWhile<Self, F> where F: FnMut(&Self::Item) -> bool, Self: Sized { ... }
fn all<F>(&mut self, f: F) -> AllFuture<'_, Self, F> where Self: Unpin, F: FnMut(Self::Item) -> bool { ... }
fn any<F>(&mut self, f: F) -> AnyFuture<'_, Self, F> where Self: Unpin, F: FnMut(Self::Item) -> bool { ... }
fn chain<U>(self, other: U) -> Chain<Self, U> where U: Stream<Item = Self::Item>, Self: Sized { ... }
fn fold<B, F>(self, init: B, f: F) -> FoldFuture<Self, B, F> where Self: Sized, F: FnMut(B, Self::Item) -> B { ... }
fn collect<T>(self) -> Collect<Self, T> where T: FromStream<Self::Item>, Self: Sized
// Timeout 实现了 Future trait,.await 时返回 Result<<T as Future>::Output, Elapsed>,
// 当 await 超时时返回 Elapsed Error。可以作为一种通用的异步超时机制。
fn timeout(self, duration: Duration) -> Timeout<Self> where Self: Sized
fn timeout_repeating(self, interval: Interval) -> TimeoutRepeating<Self> where Self: Sized
fn throttle(self, duration: Duration) -> Throttle<Self> where Self: Sized
fn chunks_timeout( self, max_size: usize, duration: Duration ) -> ChunksTimeout<Self> where Self: Sized
fn peekable(self) -> Peekable<Self> where Self: Sized
}
示例:
// StreamExt trait 为 Stream 提供了常用的 next() 方法。
use tokio_stream::{self as stream, StreamExt};
#[tokio::main]
async fn main() {
// empty() 迭代时立即结束
let mut none = stream::empty::<i32>();
assert_eq!(None, none.next().await);
// iter() 函数将一个 Iterator 转换为 Stream
let mut stream = stream::iter(vec![17, 19]);
assert_eq!(stream.next().await, Some(17));
assert_eq!(stream.next().await, Some(19));
assert_eq!(stream.next().await, None);
// once() 函数返回只能迭代生成一个元素的 stream
let mut one = stream::once(1);
assert_eq!(Some(1), one.next().await);
assert_eq!(None, one.next().await);
// pending() 函数返回一个迭代式 pending 的 stream
let mut never = stream::pending::<i32>();
// This will never complete
never.next().await;
unreachable!();
}
// timeout() 示例
use tokio_stream::{self as stream, StreamExt};
use std::time::Duration;
let int_stream = int_stream.timeout(Duration::from_secs(1));
tokio::pin!(int_stream);
// When no items time out, we get the 3 elements in succession:
assert_eq!(int_stream.try_next().await, Ok(Some(1)));
assert_eq!(int_stream.try_next().await, Ok(Some(2)));
assert_eq!(int_stream.try_next().await, Ok(Some(3)));
assert_eq!(int_stream.try_next().await, Ok(None));
Stream 没有实现迭代器, 不支持 for-in 迭代, 只能在 StreamExt 的基础上使用 while-let
循环来迭代:
use tokio_stream::{self as stream, StreamExt};
#[tokio::main]
async fn main() {
let mut stream = stream::iter(vec![0, 1, 2]);
while let Some(value) = stream.next().await {
println!("Got {}", value);
}
}
module tokio_stream::wrappers #
该 module 提供将其它类型转换为 Stream 的各种 wrappers 类型:
- BroadcastStream A wrapper around tokio::sync::broadcast::Receiver that implements Stream.
- CtrlBreakStreaml A wrapper around CtrlBreak that implements Stream.
- CtrlCStream A wrapper around CtrlC that implements Stream.
- IntervalStream A wrapper around Interval that implements Stream.
- LinesStream A wrapper around tokio::io::Lines that implements Stream.
- ReadDirStream A wrapper around tokio::fs::ReadDir that implements Stream.
- ReceiverStream A wrapper around tokio::sync::mpsc::Receiver that implements Stream.
- SignalStream A wrapper around Signal that implements Stream.
- SplitStream A wrapper around tokio::io::Split that implements Stream.
- TcpListenerStream A wrapper around TcpListener that implements Stream.
- UnboundedReceiverStream A wrapper around tokio::sync::mpsc::UnboundedReceiver that implements Stream.
- UnixListenerStream A wrapper around UnixListener that implements Stream.
- WatchStream A wrapper around tokio::sync:⌚:Receiver that implements Stream.
use tokio_stream::{StreamExt, wrappers::WatchStream};
use tokio::sync::watch;
let (tx, rx) = watch::channel("hello");
let mut rx = WatchStream::new(rx); // 从 Watch 创建一个 WatchStream
assert_eq!(rx.next().await, Some("hello"));
tx.send("goodbye").unwrap();
assert_eq!(rx.next().await, Some("goodbye"));
// 复杂的例子
use tokio_stream::StreamExt;
use mini_redis::client;
async fn publish() -> mini_redis::Result<()> {
let mut client = client::connect("127.0.0.1:6379").await?;
client.publish("numbers", "1".into()).await?;
client.publish("numbers", "two".into()).await?;
client.publish("numbers", "3".into()).await?;
client.publish("numbers", "four".into()).await?;
client.publish("numbers", "five".into()).await?;
client.publish("numbers", "6".into()).await?;
Ok(())
}
async fn subscribe() -> mini_redis::Result<()> {
let client = client::connect("127.0.0.1:6379").await?;
let subscriber = client.subscribe(vec!["numbers".to_string()]).await?;
// 返回一个 Stream 对象
let messages = subscriber.into_stream();
// next() 要求 message Stream 必须是 Pinned
tokio::pin!(messages);
while let Some(msg) = messages.next().await {
println!("got = {:?}", msg);
}
Ok(())
}
#[tokio::main]
async fn main() -> mini_redis::Result<()> {
tokio::spawn(async {
publish().await
});
subscribe().await?;
println!("DONE");
Ok(())
}
StreamExt 的各种 Adapters 方法 #
从 Stream 生成新的 Stream, 如 map/take/filter:
let messages = subscriber
.into_stream()
.take(3);
let messages = subscriber
.into_stream()
.filter(|msg| match msg {
Ok(msg) if msg.content.len() == 1 => true,
_ => false,
})
.take(3);
let messages = subscriber
.into_stream()
.filter(|msg| match msg {
Ok(msg) if msg.content.len() == 1 => true,
_ => false,
})
.map(|msg| msg.unwrap().content)
.take(3);
实现一个 Stream #
- 能返回值时, poll_next() 返回 Poll::Ready, 否则返回 Poll::Pending;
- Ready<Some
>: 迭代值,Ready 迭代结束;
- Ready<Some
- 一般在 poll_next() 内部使用 Future 和其他 Stream 来实现;
use tokio_stream::Stream;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::Duration;
struct Interval {
rem: usize,
delay: Delay,
}
impl Interval {
fn new() -> Self {
Self {
rem: 3,
delay: Delay { when: Instant::now() }
}
}
}
impl Stream for Interval {
type Item = ();
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<()>>
{
// 迭代结束
if self.rem == 0 {
return Poll::Ready(None);
}
match Pin::new(&mut self.delay).poll(cx) {
// 返回迭代值
Poll::Ready(_) => {
let when = self.delay.when + Duration::from_millis(10);
self.delay = Delay { when };
self.rem -= 1;
Poll::Ready(Some(()))
}
// 返回 Pending
Poll::Pending => Poll::Pending,
}
}
}
tokio_util 的 ReaderStream 和 StreamReader #
tokio_util crate 提供了 Stream/Reader 相关的 struct 类型。
-
ReaderStream:将一个 AsyncRead 转换为 byte chunks Stream。ReaderStream 实现了 Stream trait,迭代返回的元素类型为 Bytes,即一块连续的 u8 内存区域。
// ReaderStream 类型关联函数 impl<R: AsyncRead> ReaderStream<R> // Convert an AsyncRead into a Stream with item type // Result<Bytes, std::io::Error>. pub fn new(reader: R) -> Self // Convert an AsyncRead into a Stream with item type // Result<Bytes, std::io::Error>, with a // specific read buffer initial capacity. pub fn with_capacity(reader: R, capacity: usize) -> Self // ReaderStream 实现了 Stream trait,迭代返回的元素类型为 Bytes, 即一块连续的 u8 内存区域 impl<R: AsyncRead> Stream for ReaderStream<R> type Item = Result<Bytes, Error> // 示例 use tokio_stream::StreamExt; use tokio_util::io::ReaderStream; let data = b"hello, world!"; // &[u8] 实现了 AsyncRead trait let mut stream = ReaderStream::new(&data[..]); // Read all of the chunks into a vector. let mut stream_contents = Vec::new(); // chunk 类型为 bytes::Bytes, 可以 Deref<[u8]> 来使用 while let Some(chunk) = stream.next().await { stream_contents.extend_from_slice(&chunk?); } // Once the chunks are concatenated, we should have the original data. assert_eq!(stream_contents, data);
-
StreamReader:将一个 byte chunks Stream(Stream 迭代返回的 Item 类型为 Result<bytes::Buf, E>)转换为 AsyncRead,它同时实现了 AsyncBufRead trait。
impl<S, B, E> StreamReader<S, B> where S: Stream<Item = Result<B, E>>, B: Buf, E: Into<Error>, // new() 从 Stream 创建一个 StreamReader,Stream 需要每次迭代返回一个 Result<bytes::Buf, E> 对象 // // Bytes, BytesMut, &[u8], VecDeque<u8> 等(不含 Vec<u8>)均实现 bytes::Buf pub fn new(stream: S) -> Self // 示例 use bytes::Bytes; use tokio::io::{AsyncReadExt, Result}; use tokio_util::io::StreamReader; // Create a stream from an iterator. let stream = tokio_stream::iter(vec![ // Bytes 实现了 Buf trait Result::Ok(Bytes::from_static(&[0, 1, 2, 3])), Result::Ok(Bytes::from_static(&[4, 5, 6, 7])), Result::Ok(Bytes::from_static(&[8, 9, 10, 11])), ]); // Convert it to an AsyncRead. // stream 迭代的 Item 要实现 bytes::Buf trait let mut read = StreamReader::new(stream); // Read five bytes from the stream. let mut buf = [0; 5]; read.read_exact(&mut buf).await?; assert_eq!(buf, [0, 1, 2, 3, 4]); // Read the rest of the current chunk. assert_eq!(read.read(&mut buf).await?, 3); assert_eq!(&buf[..3], [5, 6, 7]); // Read the next chunk. assert_eq!(read.read(&mut buf).await?, 4); assert_eq!(&buf[..4], [8, 9, 10, 11]); // We have now reached the end. assert_eq!(read.read(&mut buf).await?, 0);
futures_util crate #
futures_util
crate 提供了 Sink/SinkExt trait
定义:
- Sink : 可以向它异步发送值的对象;
- SinkExt : 为 Sink 对象提供方便使用的扩展方法;
Sink/SinkExt 同在如下对象的发送侧:
- Channels
- Sockets
- Pipes
Sink 是两阶段的异步发送:首先发起一个 Send,然后 Polling 是否完成。有点像 buffer write:内部有一个队列来缓冲写操作,当队列满时,发送操作被阻塞,当 flush 时才缓冲 的数据都进行实际写入操作。
Sink 和 SinkExt 在 Frame 中得到广泛应用。常用 SinkExt 提供的方法, 如 send()/send_all()
。
- send_all() 将一个 stream 的所有数据都进行发送,也即消费整个 stream。
pub trait Sink<Item> {
type Error;
// Required methods
// 为调用 start_send 做准备,当返回 Ready<Ok<()>> 时才能调用 start_send,否则阻塞;
fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_> ) -> Poll<Result<(), Self::Error>>;
// 向 Sink 发送数据,每次调用前都必须确保 poll_ready 返回 Ready<Ok<()>>
// 由于是异步发送,对于内部有 buffer 的 Sink,需要调用 poll_flush()/poll_close() 方法来确保缓冲的发送值被实际写入;
fn start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error>;
// flush 缓冲的值
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_> ) -> Poll<Result<(), Self::Error>>;
// flush 缓冲的值,并关闭该 Sink
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_> ) -> Poll<Result<(), Self::Error>>;
}
pub trait SinkExt<Item>: Sink<Item> {
// Provided methods
fn with<U, Fut, F, E>(self, f: F) -> With<Self, Item, U, Fut, F> where F: FnMut(U) -> Fut, Fut: Future<Output = Result<Item, E>>, E: From<Self::Error>, Self: Sized { ... }
fn with_flat_map<U, St, F>(self, f: F) -> WithFlatMap<Self, Item, U, St, F> where F: FnMut(U) -> St, St: Stream<Item = Result<Item, Self::Error>>, Self: Sized { ... }
fn sink_map_err<E, F>(self, f: F) -> SinkMapErr<Self, F> where F: FnOnce(Self::Error) -> E, Self: Sized { ... }
fn sink_err_into<E>(self) -> SinkErrInto<Self, Item, E> where Self: Sized, Self::Error: Into<E> { ... }
fn buffer(self, capacity: usize) -> Buffer<Self, Item> where Self: Sized { ... }
fn close(&mut self) -> Close<'_, Self, Item> where Self: Unpin { ... }
// Fanout items to multiple sinks.
// This adapter clones each incoming item and forwards it to both this as well as the other sink at the same time.
fn fanout<Si>(self, other: Si) -> Fanout<Self, Si> where Self: Sized, Item: Clone, Si: Sink<Item, Error = Self::Error> { ... }
fn flush(&mut self) -> Flush<'_, Self, Item> where Self: Unpin { ... }
// A future that completes after the given item has been fully processed into the sink, including flushing.
// Note that, because of the flushing requirement, it is usually better to batch together items to send via feed or send_all, rather than flushing between each item.
fn send(&mut self, item: Item) -> Send<'_, Self, Item> where Self: Unpin { ... }
// A future that completes after the given item has been received by the sink.
// Unlike send, the returned future does not flush the sink. It is the caller’s responsibility to ensure all pending items are processed, which can be done via flush or close.
fn feed(&mut self, item: Item) -> Feed<'_, Self, Item> where Self: Unpin { ... }
fn send_all<'a, St>(&'a mut self, stream: &'a mut St ) -> SendAll<'a, Self, St> where St: TryStream<Ok = Item, Error = Self::Error> + Stream + Unpin + ?Sized, Self: Unpin { ... }
fn left_sink<Si2>(self) -> Either<Self, Si2> where Si2: Sink<Item, Error = Self::Error>, Self: Sized { ... }
fn right_sink<Si1>(self) -> Either<Si1, Self> where Si1: Sink<Item, Error = Self::Error>, Self: Sized { ... }
fn compat(self) -> CompatSink<Self, Item> where Self: Sized + Unpin { ... }
fn poll_ready_unpin( &mut self, cx: &mut Context<'_> ) -> Poll<Result<(), Self::Error>> where Self: Unpin { ... }
fn start_send_unpin(&mut self, item: Item) -> Result<(), Self::Error> where Self: Unpin { ... }
fn poll_flush_unpin( &mut self, cx: &mut Context<'_> ) -> Poll<Result<(), Self::Error>> where Self: Unpin { ... }
fn poll_close_unpin( &mut self, cx: &mut Context<'_> ) -> Poll<Result<(), Self::Error>> where Self: Unpin { ... }
}
tokio_util crate #
tokio_util 提供了如下 module、类型和功能:
- tokio_util::codec module: 提供了
Encoder、Decoder trait
,以及实现它们的 FramedRead、FramedWrite、Framed、LinesCodec、BytesCodec、AnyDelimiterCodec struct 类型。 - tokio_util::io module: 提供了 InspectReader、InspectWriter、ReaderStream、StreamReader 类型;
- tokio_util::net module: 提供了 Listener trait 和 ListenerAcceptFut struct 类型;
- tokio_util::sync module: 提供了
CancellationToken struct
类型; - tokio_util::task::task_tracker module: 提供了 TaskTracker、TaskTrackerToken 等类型;
- tokio_util::time::delay_queue: module 提供了
DelayQueue
类型; - tokio_util::udp module: 提供了
UdpFramed
类型;
module tokio_util::codec #
module tokio_util::codec
提供了将 AsyncRead/AsyncWrite 转换为 Stream/Sink, 并且将 byte stream sequence 转换为
有意义的 chunks 即 frames 的能力:
-
struct FrameWrite: A
Sink
of frames encoded to anAsyncWrite
.- 当作 Sink 来发送数据并编码为 Frame;
- Sink 从 AsyncWrite 转换而来, 它的 SinkExt trait 提供的
send()/send_all()/feed()
方法可以用于发送数据。
-
struct FrameRead: A
Stream
of messages decoded from anAsyncRead
.- 当作 Stream 来读取数据并解码为 Frame;
- Stream 从 AsyncRead 转换而来, 它的 StreamExt trait 提供的 next() 方法一次返回一个 chunk (bytes::Bytes 类型) 。
-
struct Framed: A
unified Stream and Sink
interface to an underlying I/O object, using theEncoderand Decoder traits
to encode and decode frames.
在创建 FrameWrite/FrameRead/Framed
时需要传入实现 Encoder/Decoder trait
对象, 用于从 Stream 中解码出 Frame 对象, 向
Sink 中写入编码的 Frame 对象.
tokio_util::codec 提供了如下实现 Encoder/Decoder trait
的类型:
-
AnyDelimiterCodec
: A simple Decoder and Encoder implementation that splits up data into chunks based on any character in the given delimiter string. -
BytesCodec
: A simple Decoder and Encoder implementation that just ships bytes around. -
LinesCodec
: A simple Decoder and Encoder implementation that splits up data into lines.
// encoding
use futures::sink::SinkExt;
use tokio_util::codec::LinesCodec;
use tokio_util::codec::FramedWrite;
#[tokio::main]
async fn main() {
let buffer = Vec::new();
let messages = vec!["Hello", "World"];
let encoder = LinesCodec::new();
// buffer 实现了AsyncWrite,故可以作为 FramedWrite::new() 参数,返回的 writer 实现了 Sink 和 SinkExt trait。
let mut writer = FramedWrite::new(buffer, encoder);
// 异步向 Sink 写数据,自动编码为 Frame。
// send() 是 SinkExt 提供的方法。
writer.send(messages[0]).await.unwrap();
writer.send(messages[1]).await.unwrap();
let buffer = writer.get_ref();
assert_eq!(buffer.as_slice(), "Hello\nWorld\n".as_bytes());
}
// decoding
use tokio_stream::StreamExt;
use tokio_util::codec::LinesCodec;
use tokio_util::codec::FramedRead;
#[tokio::main]
async fn main() {
let message = "Hello\nWorld".as_bytes();
let decoder = LinesCodec::new();
// message 实现了 AsyncRead,故可以作为 FrameRead::new() 参数,返回的 reader 实现了 Stream 和 StreamExt。
// 每次读取返回一个解码后的 frame。
let mut reader = FramedRead::new(message, decoder);
let frame1 = reader.next().await.unwrap().unwrap();
let frame2 = reader.next().await.unwrap().unwrap();
assert!(reader.next().await.is_none());
assert_eq!(frame1, "Hello");
assert_eq!(frame2, "World");
}
FrameReader 从 AsyncRead 中使用 decoder 解码出 Frame,过程大概如下:
use tokio::io::AsyncReadExt;
// buf 是内部带读写指针的缓存,实现了 AsyncRead trait 和 bytes::BufMut trait.
let mut buf = bytes::BytesMut::new();
loop {
// The read_buf call will append to buf rather than overwrite existing data.
// read_buf 方法使用 &mut bytes::BufMut 从 io_resource 中读取一段数据(长度未知)后 append 写入 buf.
let len = io_resource.read_buf(&mut buf).await?;
if len == 0 {
while let Some(frame) = decoder.decode_eof(&mut buf)? {
yield frame;
}
break;
}
// 解码出 frame:如果 buf 中数据不足,返回 None,触发下一次 loop 读数据。
while let Some(frame) = decoder.decode(&mut buf)? {
yield frame;
}
}
FrameWriter 向 Sink 写入使用 encoder 编码的数据,大概过程如下:
use tokio::io::AsyncWriteExt;
use bytes::Buf;
const MAX: usize = 8192;
let mut buf = bytes::BytesMut::new(); // buf 是内部带读写指针的缓存
loop {
tokio::select! {
// 持续向 buf 写数据
num_written = io_resource.write(&buf), if !buf.is_empty() => {
buf.advance(num_written?);
},
// 从 buf 数据生成一个 frame
frame = next_frame(), if buf.len() < MAX => {
encoder.encode(frame, &mut buf)?;
},
_ = no_more_frames() => {
io_resource.write_all(&buf).await?;
io_resource.shutdown().await?;
return Ok(());
},
}
}
为自定义类型实现 Encoder/Decoder trait #
从而生成自定义的 Frame 数据(Item):
- decode(&mut self, src: &mut BytesMut) -> Result<OptionSelf::Item, Self::Error>
- encode(&mut self, item: String, dst: &mut BytesMut) -> Result<(), Self::Error>
这两个方法都是用 BytesMut 来读写 Frame 数据。
use tokio_util::codec::Decoder;
use bytes::{BytesMut, Buf};
struct MyStringDecoder {}
const MAX: usize = 8 * 1024 * 1024;
// 实现 Decoder trait, 从输入的 src 中解析出 Item 类型对象
impl Decoder for MyStringDecoder {
type Item = String;
type Error = std::io::Error;
fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
if src.len() < 4 {
// Not enough data to read length marker.
return Ok(None);
}
// Read length marker.
let mut length_bytes = [0u8; 4];
length_bytes.copy_from_slice(&src[..4]);
let length = u32::from_le_bytes(length_bytes) as usize;
// Check that the length is not too large to avoid a denial of
// service attack where the server runs out of memory.
if length > MAX {
return Err(std::io::Error::new(
std::io::ErrorKind::InvalidData,
format!("Frame of length {} is too large.", length)
));
}
const HEADER_SIZE: usize = 4;
if src.len() < HEADER_SIZE + length {
let missing_bytes = HEADER_SIZE + length - src.len();
src.reserve(missing_bytes);
return Ok(None);
}
// Use advance to modify src such that it no longer contains this frame.
let data = src[4..4 + length].to_vec();
src.advance(4 + length);
// Convert the data to a string, or fail if it is not valid utf-8.
match String::from_utf8(data) {
Ok(string) => Ok(Some(string)),
Err(utf8_error) => {
Err(std::io::Error::new(
std::io::ErrorKind::InvalidData,
utf8_error.utf8_error(),
))
},
}
}
}
// 实现 Encoder trait
use tokio_util::codec::Encoder;
use bytes::BytesMut;
struct MyStringEncoder {}
const MAX: usize = 8 * 1024 * 1024;
impl Encoder<String> for MyStringEncoder {
type Error = std::io::Error;
fn encode(&mut self, item: String, dst: &mut BytesMut) -> Result<(), Self::Error> {
// Don't send a string if it is longer than the other end will
// accept.
if item.len() > MAX {
return Err(std::io::Error::new(
std::io::ErrorKind::InvalidData,
format!("Frame of length {} is too large.", item.len())
));
}
// Convert the length into a byte array.
// The cast to u32 cannot overflow due to the length check above.
let len_slice = u32::to_le_bytes(item.len() as u32);
// Reserve space in the buffer.
dst.reserve(4 + item.len());
// Write the length and string to the buffer.
dst.extend_from_slice(&len_slice);
dst.extend_from_slice(item.as_bytes());
Ok(())
}
}
Module tokio_util::time 提供的 DelayQueue #
- pub fn insert_at(&mut self, value: T, when: Instant) -> Key: 插入元素, 并指定过期的绝对时间;
- pub fn insert(&mut self, value: T, timeout: Duration) -> Key: 插入元素, 并指定超时时间;
- pub fn poll_expired(&mut self, cx: &mut Context<’_>) -> Poll<Option<Expired
»: 返回下一个过期的元素的 Key,如果没有过期元素则返回 Pending。。需要使用 futures::ready!() 来 poll 返回的 Poll 对象。(不能使用 .await, 因为 Poll 类型没有实现 Future)。
通过 insert() 返回的 Key, 后续可以查询/删除/重置对应的元素;
use tokio_util::time::DelayQueue;
use std::time::Duration;
let mut delay_queue = DelayQueue::new();
let key1 = delay_queue.insert("foo", Duration::from_secs(5));
let key2 = delay_queue.insert("bar", Duration::from_secs(10));
assert!(delay_queue.deadline(&key1) < delay_queue.deadline(&key2));
// Remove the entry
let item = delay_queue.remove(&key);
assert_eq!(*item.get_ref(), "foo");
// "foo" is scheduled to be returned in 5 seconds
delay_queue.reset_at(&key, Instant::now() + Duration::from_secs(10));
// "foo" is now scheduled to be returned in 10 seconds
// 另一个例子
use tokio_util::time::{DelayQueue, delay_queue};
use futures::ready; // 也可以使用 std::task::ready!() 宏
use std::collections::HashMap;
use std::task::{Context, Poll};
use std::time::Duration;
struct Cache {
entries: HashMap<CacheKey, (Value, delay_queue::Key)>,
expirations: DelayQueue<CacheKey>,
}
const TTL_SECS: u64 = 30;
impl Cache {
fn insert(&mut self, key: CacheKey, value: Value) {
let delay = self.expirations.insert(key.clone(), Duration::from_secs(TTL_SECS));
self.entries.insert(key, (value, delay));
}
fn get(&self, key: &CacheKey) -> Option<&Value> {
self.entries.get(key).map(|&(ref v, _)| v)
}
fn remove(&mut self, key: &CacheKey) {
if let Some((_, cache_key)) = self.entries.remove(key) {
self.expirations.remove(&cache_key);
}
}
fn poll_purge(&mut self, cx: &mut Context<'_>) -> Poll<()> {
// ready!() 宏返回 Poll::Ready 的值, 如果不 Ready 则一致阻塞。
while let Some(entry) = ready!(self.expirations.poll_expired(cx)) {
self.entries.remove(entry.get_ref());
}
Poll::Ready(())
}
}
Trait tokio_util::time::FutureExt 为所有实现了 Future 添加 timeout() 方法。 #
对于 Stream,还可以使用 StreamExt 提供的 timeout() 方法。
pub trait FutureExt: Future {
// Provided method
fn timeout(self, timeout: Duration) -> Timeout<Self> where Self: Sized { ... }
}
// 示例
use tokio::{sync::oneshot, time::Duration};
use tokio_util::time::FutureExt;
let (tx, rx) = oneshot::channel::<()>();
let res = rx.timeout(Duration::from_millis(10)).await;
assert!(res.is_err());
tokio_test unit testing #
#[tokio::test]
默认创建一个单线程的 current_thread runtime。
tokio::time 提供了 pause()/resume()/advance()
方法。
tokio::time::pause()
: 将当前 Instant::now() 保存,后续调用 Instant::now() 时将返回保存的值。保存的值可以使用 advance() 修改。该函数
只适合 current_thread runtime,也就是 #[tokio::test]
创建和使用的 runtime。(这里的 Instant 是 tokio 提供的,标准库 Instant
不受 影响。 )
自动前进(auto-advance):当 time 被 paused,且当前 runtime 空闲时,clock 会被自动前进到下一个 pending timer。这意味着 Sleep
和其它
time 相关函数、方法被 await 时,会引起 runtime 时间的前进。
#[tokio::test]
async fn paused_time() {
tokio::time::pause();
let start = std::time::Instant::now();
tokio::time::sleep(Duration::from_millis(500)).await;
println!("{:?}ms", start.elapsed().as_millis()); // 打印 0ms
}
可以使用属性宏参数来更简便的开启 time pause:
#[tokio::main(flavor = "current_thread", start_paused = true)]
async fn main() {
println!("Hello world");
}
#[tokio::test(start_paused = true)]
async fn paused_time() {
let start = std::time::Instant::now();
tokio::time::sleep(Duration::from_millis(500)).await;
println!("{:?}ms", start.elapsed().as_millis());
}
虽然开启了 time pause, 但是异步函数的执行顺序和时间关系还是正常保持的: 立即打印 4 次 “Tick!”
#[tokio::test(start_paused = true)]
async fn interval_with_paused_time() {
let mut interval = interval(Duration::from_millis(300));
let _ = timeout(Duration::from_secs(1), async move {
loop {
interval.tick().await;
println!("Tick!");
}
})
.await;
}
使用 tokio_test::io::Builder
来 Mock AsyncRead and AsyncWrite:
use tokio::io::{AsyncBufReadExt, AsyncRead, AsyncWrite, AsyncWriteExt, BufReader};
async fn handle_connection<Reader, Writer>(
reader: Reader,
mut writer: Writer,
) -> std::io::Result<()>
where
Reader: AsyncRead + Unpin,
Writer: AsyncWrite + Unpin,
{
let mut line = String::new();
let mut reader = BufReader::new(reader);
loop {
if let Ok(bytes_read) = reader.read_line(&mut line).await {
if bytes_read == 0 {
break Ok(());
}
writer
.write_all(format!("Thanks for your message.\r\n").as_bytes())
.await
.unwrap();
}
line.clear();
}
}
#[tokio::test]
async fn client_handler_replies_politely() {
let reader = tokio_test::io::Builder::new()
.read(b"Hi there\r\n")
.read(b"How are you doing?\r\n")
.build();
let writer = tokio_test::io::Builder::new()
.write(b"Thanks for your message.\r\n")
.write(b"Thanks for your message.\r\n")
.build();
let _ = handle_connection(reader, writer).await;
}
tokio::net #
提供了和 std 类似的 TCP/UDP/Unix 异步实现。
TcpListener/TcpStream
UdpSocket
UnixListener/UnixStream
UnixDatagram
tokio::net::unix::pipe
函数 tokio::net::lookup_host()
来进行域名解析:
use tokio::net;
use std::io;
#[tokio::main]
async fn main() -> io::Result<()> {
for addr in net::lookup_host("localhost:3000").await? {
println!("socket address is {}", addr);
}
Ok(())
}
TcpListener 提供了 bind/accept 方法:
- bind() 关联函数返回 TcpListener 类型对象;
- accept() 方法返回
TCPStream
和对端 SocketAddr; - from_std()/into_std() 方法可以在标准库的 TcpListener 间转换, 这样可以使用标准库创建和设置 TcpListener, 然后创建 tokio TcpListener.
use std::error::Error;
use tokio::net::TcpListener;
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
let std_listener = std::net::TcpListener::bind("127.0.0.1:0")?;
std_listener.set_nonblocking(true)?;
let listener = TcpListener::from_std(std_listener)?;
Ok(())
}
TcpStream 实现了 AsyncRead/AsyncWrite, 通过 split() 方法可以分别返回读端和写端.
TcpSocket 是未转换为 TcpListener 和 TcpStream 的对象, 可用于设置 TCP 相关参数.
-
pub fn new_v4() -> Result
-
pub fn new_v6() -> Result
-
pub fn bind(&self, addr: SocketAddr) -> Result<()>
-
pub fn bind_device(&self, interface: Option<&[u8]>) -> Result<()>
-
pub async fn connect(self, addr: SocketAddr) -> Result
-
pub fn listen(self, backlog: u32) -> Result
-
pub fn set_keepalive(&self, keepalive: bool) -> Result<()>
-
pub fn set_reuseaddr(&self, reuseaddr: bool) -> Result<()>
-
pub fn set_recv_buffer_size(&self, size: u32) -> Result<()>
-
…
use tokio::net::TcpSocket;
use std::io;
#[tokio::main]
async fn main() -> io::Result<()> {
let addr = "127.0.0.1:8080".parse().unwrap();
let socket = TcpSocket::new_v4()?;
// On platforms with Berkeley-derived sockets, this allows to quickly rebind a socket, without
// needing to wait for the OS to clean up the previous one.
//
// On Windows, this allows rebinding sockets which are actively in use, which allows “socket
// hijacking”, so we explicitly don't set it here.
// https://docs.microsoft.com/en-us/windows/win32/winsock/using-so-reuseaddr-and-so-exclusiveaddruse
socket.set_reuseaddr(true)?;
socket.bind(addr)?;
let listener = socket.listen(1024)?;
Ok(())
let addr = "127.0.0.1:8080".parse().unwrap();
let socket = TcpSocket::new_v4()?;
let stream = socket.connect(addr).await?;
Ok(())
}
tokio 的 TcpStream::connect() 没有提供 timeout,但是可以使用 tokio::time::timeout 来实现:
const CONNECTION_TIME: u64 = 100;
// ...
let (socket, _response) = match tokio::time::timeout(
Duration::from_secs(CONNECTION_TIME),
tokio::net::TcpStream::connect("127.0.0.1:8080")
).await
{
Ok(ok) => ok,
Err(e) => panic!(format!("timeout while connecting to server : {}", e)),
}
.expect("Error while connecting to server")
tokio 的 TcpStream 也没有提供其它 read/write timeout,但是可以标准库提供了,可以从标准库 TcpStream 创建 tokio TcpStream:
let std_stream = std::net::TcpStream::connect("127.0.0.1:8080")
.expect("Couldn't connect to the server...");
std_stream.set_write_timeout(None).expect("set_write_timeout call failed");
std_stream.set_nonblocking(true)?;
let stream = tokio::net::TcpStream::from_std(std_stream)?;
对于 UDP,只有 UdpSocket 一种类型,它提供 bind/connet/send/recv/send_to/recv_from
等方法。
-
bind: 监听指定的地址和端口。由于 UDP 是无连接的,使用 recv_from 来接收任意 client 发送的数据,使用 send_to可以向任意 target 发送数据。
-
connect: UDP 虽然是无连接的,但是也支持使用 connect() 方法,效果是为返回的 UdpSocket 指定了 target ip 和port,可以使用 send/recv 方法来 只向该 target 发送和接收数据。
use tokio::net::UdpSocket;
use std::io;
#[tokio::main]
async fn main() -> io::Result<()> {
let sock = UdpSocket::bind("0.0.0.0:8080").await?;
// use `sock`
Ok(())
}
对于 Unix Socket,提供两种类型:
- 面向连接的:
UnixListener/UnixStream
- 面向无连接的:
UnixDatagram
Unix Socket 使用本地文件(一般是 .sock 结尾)寻址(named socket),也可以使用没有关联文件的 unnamed Unix Socket。
关闭 named socket 时,对应的 socket 文件并不会被自动删除。
如果没有 unix socket bind 到该文件上,则 client 连接会失败。
- pub fn bind
(path: P) -> Result
where P: AsRef : 使用指定 socket 文件创建一个 named unix Datagram socket; - pub fn pair() -> Result<(UnixDatagram, UnixDatagram)>:返回一对 unamed unix Datagram socket;
- pub fn unbound() -> Result
:创建一个没有绑定任何 addr 的 socket。 - pub fn connect<P: AsRef
>(&self, path: P) -> Result<()>:Datagram 是无连接的,调用该方法后可以使用 send/recv 从指定的 path 收发消息。
use tokio::net::UnixDatagram;
use tempfile::tempdir;
// We use a temporary directory so that the socket files left by the bound sockets will get cleaned up.
let tmp = tempdir()?;
// Bind each socket to a filesystem path
let tx_path = tmp.path().join("tx");
let tx = UnixDatagram::bind(&tx_path)?;
let rx_path = tmp.path().join("rx");
let rx = UnixDatagram::bind(&rx_path)?;
let bytes = b"hello world";
tx.send_to(bytes, &rx_path).await?;
let mut buf = vec![0u8; 24];
let (size, addr) = rx.recv_from(&mut buf).await?;
let dgram = &buf[..size];
assert_eq!(dgram, bytes);
assert_eq!(addr.as_pathname().unwrap(), &tx_path);
// unnamed unix socket
use tokio::net::UnixDatagram;
// Create the pair of sockets
let (sock1, sock2) = UnixDatagram::pair()?;
// Since the sockets are paired, the paired send/recv
// functions can be used
let bytes = b"hello world";
sock1.send(bytes).await?;
let mut buff = vec![0u8; 24];
let size = sock2.recv(&mut buff).await?;
let dgram = &buff[..size];
assert_eq!(dgram, bytes);
UnixListener bind() 返回 UnixListener,它的 accept() 方法返回 UnixStream:
use tokio::net::UnixListener;
#[tokio::main]
async fn main() {
let listener = UnixListener::bind("/path/to/the/socket").unwrap();
loop {
match listener.accept().await {
Ok((stream, _addr)) => {
println!("new client!");
}
Err(e) => { /* connection failed */ }
}
}
}
UnixStream 实现了 AsyncRead/AsyncWrite,它的 connect() 也返回一个 UnixStream:
use tokio::io::Interest;
use tokio::net::UnixStream;
use std::error::Error;
use std::io;
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
let dir = tempfile::tempdir().unwrap();
let bind_path = dir.path().join("bind_path");
let stream = UnixStream::connect(bind_path).await?;
loop {
let ready = stream.ready(Interest::READABLE | Interest::WRITABLE).await?;
if ready.is_readable() {
let mut data = vec![0; 1024];
// Try to read data, this may still fail with `WouldBlock`
// if the readiness event is a false positive.
match stream.try_read(&mut data) {
Ok(n) => {
println!("read {} bytes", n);
}
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
continue;
}
Err(e) => {
return Err(e.into());
}
}
}
if ready.is_writable() {
// Try to write data, this may still fail with `WouldBlock`
// if the readiness event is a false positive.
match stream.try_write(b"hello world") {
Ok(n) => {
println!("write {} bytes", n);
}
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
continue;
}
Err(e) => {
return Err(e.into());
}
}
}
}
}
socket2 crate #
如果要使用其它标准库或 tokio::net 没有提供的 socket 设置,可以使用 socket2 crate
, 例如为 socket 设置更详细的 KeepAalive 参数,设置 tcp
user timeout 等:
use std::time::Duration;
use socket2::{Socket, TcpKeepalive, Domain, Type};
let socket = Socket::new(Domain::IPV4, Type::STREAM, None)?;
let keepalive = TcpKeepalive::new()
.with_time(Duration::from_secs(4));
// Depending on the target operating system, we may also be able to
// configure the keepalive probe interval and/or the number of
// retries here as well.
socket.set_tcp_keepalive(&keepalive)?;
关于 TCP_USER_TIMEOUT 可以为连接建立(即 connect())和数据读写指定超时时间:
- https://codearcana.com/posts/2015/08/28/tcp-keepalive-is-a-lie.html
- https://blog.cloudflare.com/when-tcp-sockets-refuse-to-die/
tokio::signal #
tokio::signal module
提供了信号捕获和处理的能力。
pub async fn ctrl_c() -> Result<()>
:当收到 C-c 发送的 SIGINT 信号时 .await 返回;
use tokio::signal;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
signal::ctrl_c().await?;
println!("ctrl-c received!");
Ok(())
}
对于其它信号,使用 SignalKind 和 signal 函数创建一个 Signal,然后调用它的 recv() 方法:
// Wait for SIGHUP on Unix
use tokio::signal::unix::{signal, SignalKind};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// An infinite stream of hangup signals.
let mut stream = signal(SignalKind::hangup())?;
// Print whenever a HUP signal is received
loop {
stream.recv().await;
println!("got signal HUP");
}
}
tokio::time #
tokio::time module 提供了如下类型,它们都必须在异步上下文中使用:
- Sleep: 睡眠;
- Interval: 在固定时间间隔流式产生一个值(stream yielding a value);
- Timeout:为一个 future 或 stream 封装执行的过期时间,过期后 future 或 stream 被取消,并返回一个错误;
不能在 async Rutnime 中使用标准库的 sleep,否则它会阻塞当前线程执行其它异步任务。
// Wait 100ms and print “100 ms have elapsed”
use std::time::Duration;
use tokio::time::sleep;
#[tokio::main]
async fn main() {
sleep(Duration::from_millis(100)).await;
println!("100 ms have elapsed");
}
// Require that an operation takes no more than 1s.
use tokio::time::{timeout, Duration};
async fn long_future() {}
// 任意 Feature 都可以设置异步 timeout
let res = timeout(Duration::from_secs(1), long_future()).await;
if res.is_err() {
println!("operation timed out");
}
// A simple example using interval to execute a task every two seconds.
use tokio::time;
async fn task_that_takes_a_second() {
println!("hello");
time::sleep(time::Duration::from_secs(1)).await
}
#[tokio::main]
async fn main() {
let mut interval = time::interval(time::Duration::from_secs(2));
for _i in 0..5 {
interval.tick().await;
task_that_takes_a_second().await;
}
}
tokio::time 提供了 pause()/resume()/advance() 方法。
tokio::time::pause(): 将当前 Instant::now() 保存,后续调用 Instant::now() 时将返回保存的值。保存的值可以使用 advance() 修改。该函数只适
合 current_thread runtime,也就是 #[tokio::test]
默认使用的 runtime。(这里的 Instant 是 tokio 提供的,标准库 Instant 不受影
响。 )
自动前进(auto-advance):当 time 被 paused,且当前 runtime 空闲时,clock 会被自动前进到下一个 pending timer。这意味着 Sleep
和其它
time 相关函数、方法被 await 时,会引起 runtime 前景时间。
#[tokio::main(flavor = "current_thread", start_paused = true)]
async fn main() {
println!("Hello world");
}
tokio::process #
use tokio::io::AsyncWriteExt;
use tokio::process::Command;
use std::process::Stdio;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let mut cmd = Command::new("sort");
// Specifying that we want pipe both the output and the input. Similarly to capturing the
// output, by configuring the pipe to stdin it can now be used as an asynchronous writer.
cmd.stdout(Stdio::piped());
cmd.stdin(Stdio::piped());
let mut child = cmd.spawn().expect("failed to spawn command");
// These are the animals we want to sort
let animals: &[&str] = &["dog", "bird", "frog", "cat", "fish"];
let mut stdin = child
.stdin
.take()
.expect("child did not have a handle to stdin");
// Write our animals to the child process Note that the behavior of `sort` is to buffer _all
// input_ before writing any output. In the general sense, it is recommended to write to the
// child in a separate task as awaiting its exit (or output) to avoid deadlocks (for example,
// the child tries to write some output but gets stuck waiting on the parent to read from it,
// meanwhile the parent is stuck waiting to write its input completely before reading the
// output).
stdin
.write(animals.join("\n").as_bytes())
.await
.expect("could not write to stdin");
// We drop the handle here which signals EOF to the child process. This tells the child process
// that it there is no more data on the pipe.
drop(stdin);
let op = child.wait_with_output().await?;
// Results should come back in sorted order
assert_eq!(op.stdout, "bird\ncat\ndog\nfish\nfrog\n".as_bytes());
Ok(())
}
使用其它进程的输出作为输入:
use tokio::join;
use tokio::process::Command;
use std::process::Stdio;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let mut echo = Command::new("echo")
.arg("hello world!")
.stdout(Stdio::piped())
.spawn()
.expect("failed to spawn echo");
let tr_stdin: Stdio = echo
.stdout
.take()
.unwrap()
.try_into()
.expect("failed to convert to Stdio");
let tr = Command::new("tr")
.arg("a-z")
.arg("A-Z")
.stdin(tr_stdin)
.stdout(Stdio::piped())
.spawn()
.expect("failed to spawn tr");
let (echo_result, tr_output) = join!(echo.wait(), tr.wait_with_output());
assert!(echo_result.unwrap().success());
let tr_output = tr_output.expect("failed to await tr");
assert!(tr_output.status.success());
assert_eq!(tr_output.stdout, b"HELLO WORLD!\n");
Ok(())
}