# 使用 full feature
tokio = { version = "1", features = ["full"] }
# 或单独启用各个 feature
tokio = { version = "1", features = ["rt", "net"] }
- full : Enables all features listed below except test-util and tracing.
- rt : Enables tokio::spawn, the
current-thread scheduler
, and non-scheduler utilities. - rt-multi-thread : Enables the heavier,
multi-threaded, work-stealing scheduler
. - io-util : Enables the IO based
Ext traits
. - io-std : Enable Stdout, Stdin and Stderr types.
- net : Enables tokio::net types such as TcpStream, UnixStream and UdpSocket, as well as (on Unix-like systems) AsyncFd and (on FreeBSD) PollAio.
- time : Enables tokio::time types and allows the schedulers to enable the built in timer.
- process : Enables tokio::process types.
- macros : ~ Enables#[tokio::main] and #[tokio::test]~ macros.
- sync : Enables all tokio::sync types.
- signal : Enables all tokio::signal types.
- fs : Enables tokio::fs types.
- test-util : Enables testing based infrastructure for the Tokio runtime.
- parking_lot : As a potential optimization, use the parking_lot crate’s synchronization primitives internally. Also, this dependency is necessary to construct some of our primitives in a const context. MSRV may increase according to the parking_lot release in use.
下面是启用 tokio_unstable
feature 后才能使用的功能:
- tracing: Enables tracing events.
- task::Builder 注意不是 runtime::Builder
- Some methods on task::JoinSet
- runtime::RuntimeMetrics
- runtime::Builder::unhandled_panic
- task::Id
一般是在项目的 .cargo/config.toml 中配置的:
[build]
rustflags = ["--cfg", "tokio_unstable"]
#[tokio::main]
: 创建和设置一个 Runtime,对于复杂的 Runtime 参数可以使用 Builder;
- 一般用于 async main fn,否则每次调用该 async fn 时都新建一个 Runtime 来运行;
- 默认是 Multi-threaded runtime,为每个 CPU Core 创建一个 thread 的
worker thread pool
来调度执行 spawn() 产生的 Future task, 支持 work-stealing strategy;
// Multi-threaded runtime
#[tokio::main(flavor = "multi_thread", worker_threads = 10)]
#[tokio::main(worker_threads = 2)]
// Current thread runtime
#[tokio::main(flavor = "current_thread")]
#[tokio::main(flavor = "current_thread", start_paused = true)]
// 示例
#[tokio::main]
async fn main() {
println!("Hello world");
}
// 等效为
fn main() {
tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.unwrap()
.block_on(async {
println!("Hello world");
})
}
join!()
并发执行传入的 async fn,直到它们 都完成
(标准库 std::future module 也提供该宏):
- join!() 必须在 async context 中运行,比如 async fn/block/closure;
- tokio 使用
单线程
来执行这些 async task,所以一个 task 可能 blocking 其它 task 的执行,如果要真正并发执行,需要使用 spawn(); - 如果 async task 都返回 Result,join!() 也是等待它们都完成。如果要在遇到第一个 Error 时停止执行,可以使用
try_join!()
async fn do_stuff_async() {}
async fn more_async_work() {}
#[tokio::main]
async fn main() {
let (first, second) = tokio::join!(
do_stuff_async(),
more_async_work());
}
pin!()
: 拥有传入的 async task 的 Future 对象,返回一个 Pin 类型对象
,可以确保该 Future 对象的栈内存不发生移动:
- select! 宏在执行某个 branch 时会 cancel/drop 其它 branch 的 Future 对象,为了能在 loop 中使用 select!, 需要传入 &mut future 而非 future 对象本身(防止被 drop)。
use tokio::{pin, select};
use tokio_stream::{self as stream, StreamExt};
async fn my_async_fn() {}
#[tokio::main]
async fn main() {
// 从可迭代对象创建一个 async stream。
let mut stream = stream::iter(vec![1, 2, 3, 4]);
let future = my_async_fn();
pin!(future);
// select! 宏在执行某个 branch 时会 cancel/drop 其它 branch 的 Future 对象,
// 为了能在 loop 中使 用 select!, 需要传入 &mut future 而非 future 对象本身
// (防止被 drop)。
loop {
select! {
_ = &mut future => {
break;
}
// drop 的是 next() 返回的 future 对象而非 stream
Some(val) = stream.next() => {
println!("got value = {}", val);
}
}
}
}
// 同时创建多一个 Future 的 Pin 类型
#[tokio::main]
async fn main() {
pin! {
let future1 = my_async_fn();
let future2 = my_async_fn();
}
select! {
_ = &mut future1 => {}
_ = &mut future2 => {}
}
}
select!()
: 并发执行多个 async expression,然后并发 .await 返回的 Future 对象,对于第一个匹配 pattern 的 branch,执行对应的 handler,同时 drop 其它 branch 正在 await 的 Future 对象:
<pattern> = <async expression> (, if <precondition>)? => <handler>,
async fn do_stuff_async() { }
async fn more_async_work() {}
#[tokio::main]
async fn main() {
tokio::select! {
_ = do_stuff_async() => {
println!("do_stuff_async() completed first")
}
_ = more_async_work() => {
println!("more_async_work() completed first")
}
};
}
task_local!()
: 对于传给 spawn() 的 async fn,必须实现 Send + Sync + ‘static, 它有可能被调度到不同的线程上运行,所以不能使用 thread_local 变量,但可使用 task local 变量。该宏生成一个 tokio::task::LocalKey 使用的 local key:
tokio::task_local! {
pub static ONE: u32;
static TWO: f32;
}
tokio::task_local! {
static NUMBER: u32;
}
// 生成的 ONE、TWO、NUMBER 都是 tokio::task::LocalKey 类型。
NUMBER.scope(1, async move {
assert_eq!(NUMBER.get(), 1);
}).await;
NUMBER.scope(2, async move {
assert_eq!(NUMBER.get(), 2);
NUMBER.scope(3, async move {
assert_eq!(NUMBER.get(), 3);
}).await;
}).await;
1 tokio::runtime #
tokio Runtime 提供 IO event loop 的 IO task 调度,以及基于 timer 的调度。(如调度使用 tokio::time::sleep() 的 async task)。
一般不需要手动创建 Runtime,而是使用 #[tokio::main]
宏来标识 async fn main()
函数。如果要精细控制 Runtime 的参数,可以使用 tokio::runtime::Builder
。
在 Runtime 上下文中,可以使用 tokio::spawn()/spawn_local() 来执行异步任务,使用 spwan_blocking() 执行同步任务。
#[tokio::main]
async fn main() {
println!("Hello world");
}
// 等效为
fn main() {
tokio::runtime::Builder::new_multi_thread()
.enable_all() // 启用所有 resource driver(IO/timer)
.build()
.unwrap()
.block_on(async {
println!("Hello world");
})
}
// 另一个例子,手动创建 Runtime
use tokio::net::TcpListener;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::runtime::Runtime;
fn main() -> Result<(), Box<dyn std::error::Error>> {
// 创建一个 multi thread 和 enable all 的 Runtime
let rt = Runtime::new()?;
rt.block_on(async {
let listener = TcpListener::bind("127.0.0.1:8080").await?;
loop {
let (mut socket, _) = listener.accept().await?;
tokio::spawn(async move {
let mut buf = [0; 1024];
loop {
let n = match socket.read(&mut buf).await {
// socket closed
Ok(n) if n == 0 => return,
Ok(n) => n,
Err(e) => {
println!("failed to read from socket; err = {:?}", e);
return;
}
};
if let Err(e) = socket.write_all(&buf[0..n]).await {
println!("failed to write to socket; err = {:?}", e);
return;
}
}
});
}
})
}
tokio Runtime 有两种类型(默认使用的是多线程版本):
- Multi-Thread Scheduler : 使用每个 CPU 一个 thread 的 worker 线程池, 使用 work-stealing 机制执行 spawn() 提交的 Future task;
- Current-Thread Scheduler : 提供单线程执行器,所有异步任务都在该线程上执行;
这两个 Runtime 都有两个 queue:global queue 和 local queue:
- 先从 local queue 获取 task,如果为空,再从 global queue 获取任务。或者从 local queue 获取
global_queue_interval
(默认 31)个任务后,再从 global queue 获取任务; - 当没有任务可以调度,或者调度任务超过
event_interval
次后(默认 61)后,检查 IO/Timer event; - current-thread scheduler 默认启用 lifo slot optimazition,即:有新 task 被 wake 时,添加到 local queue;
对于 multi-thread scheduler,有一个固定 thread 数量的 thread pool,它在创建 Runtime 时创建。有一个 global queue 和每个 thread 一个的 local queue。local queue 初始容量是 256 个 tasks,超过的会被移动 到 global queue。默认先从 local queue 获取 task,然后是
global queue,如果都为空,则从其它 thread 的 local queue steal tasks
。
tokio Runtime 除了管理 Scheduler 以外,还管理各种 resource driver,需要单独启用它们(目前就 io 和 time 两种类型)或一次全部启用:
Builder::enable_io()
:包括 network/fs 等 IO;Builder::enable_time()
:定时器调度;Builder::enable_all()
:启用所有 resrouce driver。
Runtime 在调度任务的间隙,周期检查这些 IO/Timer 是否 Ready,从而按需唤醒响应的 task 来执行。默认当没有任务可以调度,或者调度的任务数超过 61 时检查 IO/Timer,这个次数可以使用 event_interval 来设置。
创建 Runtime 时,默认为每一个 CPU 创建一个 thread,形成固定 thread 数量的 worker thread pool
。同时,tokio Runtime 还维护一个 blocking thread pool
,其中的 thread 在调用 spawn_blocking() 时临时创建,这个 pool 中的线程数量不固定,而且 idle 一段时间后会自动被 tokio Runtime 清理。
tokio Runtime 确保所有 task 都是 公平调度
,防止个别 task 一直可以调度的情况下,其它 task 得不到运行。
MAX_TASKS
指定任意时刻 runtime 创建的最大数量 task;MAX_SCHEDULE
指定 runtime 可以调度的最大数量;MAX_DELAY
指定任务被唤醒后,调度器执行它的最大延迟;
MAX_TASKS 和 MAX_SCHEDULE 是可以配置的,MAX_DELAY 是运行时自动计算的。
运行时调度器不保证任务调度的顺序。
1.1 Runtime #
Runtime 类型对象提供 IO driver、task 调度、计时器、线程池等运行异步任务所需的功能。
关闭 Runtime:
- drop runtime 对象;
- 调用
shutdown_background() 或 shutdown_timeout()
方法;
当 drop Runtime 时默认 wait forver
,直到所有已经 spawned 的任务被 stopped:
- Tasks spawned through
Runtime::spawn
keep running until theyyield. Then they are dropped
. They are not guaranteed to run to completion, but might do so if they do not yield until completion.- task 只有 yield 时(例如下一次 .await 位置)才会被 dropped,否则就一直运行直到结束;
- Blocking functions spawned through
Runtime::spawn_blocking
keep running until they return
.
调用 Runtime 的 shutdown_background() and shutdown_timeout()
方法,可以=避免这种 waiting forerver= 的等待。当 timeout 时,如果 task 没有结束,则运行它的 thread 会被泄露(leak),task 会一直运行直到结束。
Runtime 方法:
new()
- 创建一个多线程的 Runtime,启用所有 resource driver, 自动创建一个 Handle 并调用它的 enter(), 从而可以使用 tokio::spwan()。 一般用 #[tokio::main] 来自动调用。
block_on()
- 在 Runtime 中执行异步 task,只能在同步上下文中调用该方法,它是同步代码和异步代码的结合点,如果在异步上下文中执行则会 panic。
- 在当前线程中执行 Future task,block 当前线程直到 task 返回。如果要并行执行 task,则需要在 task 内部调用 spawn() 来提交 task;
spawn() 和 spawn_blocking()
- 返回的 JoinHandle 对象实现了 Future,可以 .await 获得结果;
enter()
- 设置 thread local Runtime,后续 tokio::spawn() 感知该 Runtime。一般用于手动创建 Runtime 的场景。
handler()
- 返回一个可以在该 Runtime 上 spawn task 的 Handle 对象,它基于引用计数实现了 Clone() 方法,可以将 Runtime 在多个 thread 间共享。
impl Runtime
// 创建一个多线程的调度器 Runtime,启用所有默认 resource driver,
// 一般使用 #[tokio::main] 来自动调用。
pub fn new() -> Result<Runtime>
// 返回一个实现了引用计数的 Handle 对象,可以 spawn() 任务,将 Runtime 在多个
// thread 间共享。
pub fn handle(&self) -> &Handle
// 在 worker thread pool 中立即运行 async task,而不管是否 await 返回的 JoinHandle
pub fn spawn<F>(&self, future: F) -> JoinHandle<F::Output>
where
// 多线程 Runtime 要求传入的 feture 必须是 Send + 'static
F: Future + Send + 'static,
F::Output: Send + 'static
// 提交同步任务
pub fn spawn_blocking<F, R>(&self, func: F) -> JoinHandle<R>
where
F: FnOnce() -> R + Send + 'static,
R: Send + 'static
// 执行 future,直到结束。对于多线程 Runtime,当 block_on 返回时,spawn() 的 task
// 可能还在运行,如果要确保这些 task 完成,可以对它们进行 .await.
pub fn block_on<F: Future>(&self, future: F) -> F::Output
// 设置 thread local Runtime,后续可以使用 tokio::spawn() 等函数(它们感知 EnterGuard)
pub fn enter(&self) -> EnterGuard<'_>
// 关闭 Runtime,不能再提交任务和 IO/Timer
pub fn shutdown_timeout(self, duration: Duration)
pub fn shutdown_background(self)
impl Runtime
// Available on tokio_unstable only.
pub fn metrics(&self) -> RuntimeMetrics
Runtime::enter() 或 Handle::enter() 创建 Runtime context,它会创建一个 thread local 变量来标识当前的 Runtime,可以使用 tokio::spawn() 等方法在该 Runtime context 中提交异步任务。。Runtime::new() 自动创建一个 Handle 并调用它的 enter();
use tokio::runtime::Runtime;
use tokio::task::JoinHandle;
fn function_that_spawns(msg: String) -> JoinHandle<()> {
// tokio::spawn 感知所在线程的 EnterGuard 对象
tokio::spawn(async move {
println!("{}", msg);
})
}
fn main() {
// 手动创建 Runtime
let rt = Runtime::new().unwrap();
let s = "Hello World!".to_string();
// By entering the context, we tie `tokio::spawn` to this executor.
let _guard = rt.enter();
let handle = function_that_spawns(s);
rt.block_on(handle).unwrap();
}
runtime.shutdown_timeout() 和 shutdown_background() 关闭 Runtime,而不等待所有spawn 的 task 结束。而 Drop Runtime 时默认会等待所有 task 结束,则可能导致无限期等待。这两个方法不会无限期等待,task 可能还会在后台运行,故对应的 thread 可能会被泄露。
use tokio::runtime::Runtime;
use tokio::task;
use std::thread;
use std::time::Duration;
fn main() {
let runtime = Runtime::new().unwrap();
runtime.block_on(async move {
task::spawn_blocking(move || {
thread::sleep(Duration::from_secs(10_000));
});
});
runtime.shutdown_timeout(Duration::from_millis(100));
}
Runtime 可以通过多方方式进行 Sharing:
- Using an
Arc<Runtime>
. - Using a
Handle
. Entering
the runtime context.
Arc<Runtime> 和 Handle(Runtime.handle() 方法返回) 都基于引用计数实现了 Clone() 方法,可以将 Runtime 在多个 thread 间共享。Handler::current() 返回当前 Runtime 的 Handle,可以 spawn() 并发异步任务。
use tokio::runtime::Handle;
#[tokio::main]
async fn main () {
let handle = Handle::current();
// 创建一个 std 线程
std::thread::spawn(move || {
// Using Handle::block_on to run async code in the new thread.
handle.block_on(async {
println!("hello");
});
});
}
1.2 Builder #
Builder 用于创建自定义参数的 Runtime:
use tokio::runtime::Builder;
fn main() {
let runtime = Builder::new_multi_thread()
.worker_threads(4)
.thread_name("my-custom-name")
.thread_stack_size(3 * 1024 * 1024)
.build()
.unwrap();
// use runtime ...
}
Builder 方法:
- new_current_thread() :单线程调度器;
- new_multi_thread() : 多线程调度器;
impl Builder
pub fn new_current_thread() -> Builder
pub fn new_multi_thread() -> Builder
pub fn new_multi_thread_alt() -> Builder
// 启用所有 resource driver,如 io/net/timer 等
pub fn enable_all(&mut self) -> &mut Self
// 默认为 CPU Core 数量,会覆盖环境变量 TOKIO_WORKER_THREADS 值。一次性创建。
pub fn worker_threads(&mut self, val: usize) -> &mut Self
// blocking 线程池中线程数量,按需创建,idle 超过一定时间后被回收
pub fn max_blocking_threads(&mut self, val: usize) -> &mut Self
// blocking 线程池中线程空闲时间,默认 10s
pub fn thread_keep_alive(&mut self, duration: Duration) -> &mut Self
pub fn thread_name(&mut self, val: impl Into<String>) -> &mut Self
pub fn thread_name_fn<F>(&mut self, f: F) -> &mut Self
where F: Fn() -> String + Send + Sync + 'static
pub fn thread_stack_size(&mut self, val: usize) -> &mut Self
pub fn on_thread_start<F>(&mut self, f: F) -> &mut Self
where F: Fn() + Send + Sync + 'static
pub fn on_thread_stop<F>(&mut self, f: F) -> &mut Self
where F: Fn() + Send + Sync + 'static
pub fn on_thread_park<F>(&mut self, f: F) -> &mut Self
where F: Fn() + Send + Sync + 'static
pub fn on_thread_unpark<F>(&mut self, f: F) -> &mut Self
where F: Fn() + Send + Sync + 'static
// 构建生成 Runtime
pub fn build(&mut self) -> Result<Runtime>
// 检查 global queue event 的 scheduler ticks
pub fn global_queue_interval(&mut self, val: u32) -> &mut Self
// 检查 event 的 scheduler ticks
pub fn event_interval(&mut self, val: u32) -> &mut Self
// Available on tokio_unstable only.
pub fn unhandled_panic(&mut self, behavior: UnhandledPanic) -> &mut Self
// Available on tokio_unstable only.
pub fn disable_lifo_slot(&mut self) -> &mut Self
// Available on tokio_unstable only.
pub fn rng_seed(&mut self, seed: RngSeed) -> &mut Self
// Available on tokio_unstable only.
pub fn enable_metrics_poll_count_histogram(&mut self) -> &mut Self
pub fn metrics_poll_count_histogram_scale( &mut self, histogram_scale: HistogramScale ) -> &mut Self
pub fn metrics_poll_count_histogram_resolution( &mut self, resolution: Duration) -> &mut Self
pub fn metrics_poll_count_histogram_buckets( &mut self, buckets: usize ) -> &mut Self
impl Builder
// Available on crate feature net, or Unix and crate feature process, or Unix
// and crate feature signal only.
pub fn enable_io(&mut self) -> &mut Self
// Available on crate feature net, or Unix and crate feature process, or Unix
// and crate feature signal only.
pub fn max_io_events_per_tick(&mut self, capacity: usize) -> &mut Self
// Available on crate feature time only.
pub fn enable_time(&mut self) -> &mut Self
impl Builder
pub fn start_paused(&mut self, start_paused: bool) -> &mut Self
2 tokio::task #
task 是轻量级,无阻塞的调度&执行单元,三大特点:
- 轻量级:用户级的 Tokio runtime 调度,而非操作系统内核调度;
- 协作式调度:OS thread 一般是抢占式调度;协作式调度 cooperatively 表示 task 会一直运行直到 yield(例如数据没有准备好,.await 位置会 yield,同时 tokio API lib 中默认强制插入了一些 yield point,确保即使 task 没有 yield,底层的 lib 也会周期 yield),这时 Tokio Runtime 会调度其他 task 来运行;
- 非阻塞:需要使用 tokio crate 提供的非阻塞 IO 来进行 IO/Net/Fs 在 async context 下的 APIs 操作,这些 API 不会阻塞线程,而是 yield,这样 tokio Runtime 可以执行其它 task;
异步运行时将 async fn 或 async block 作为调度和执行的单元(它们都返回 Future),称为异步任务 async task
:
Runtime::block_on()
: 使用当前线程来运行异步任务,是同步代码和异步代码的分界点;task::spawn_local()
: 在 block_on() 所在的线程空闲时运行异步任务;task::spawn()
: 在一个线程池中立即运行异步任务;task::spawn_blocking()
:立即创建一个线程来运行指定的同步函数
。
同步代码和异步代码的结合点是 block_on(),block_on() 是一个 同步函数
,所以不能在 async fn 中调用它。它在当前线程上 poll 传入的 Future 对象
,直到 Ready 返回,如果是 Pending 且没有其他 aysnc task(如通过 task::spawn_local() 提交的任务)可以 poll 则 block_on() 会 sleep。
block_on() 在当前线程 poll 传入的 Future 对象,当它空闲时,为了能在该线程中同时 poll 其它 Future,可以使用 spawn_local() 来提交任务。可以多次调用 spawn_local(),它将传入的 Future 添加到 block_on() 线程所处理的 task pool 中,当 block_on() 处理某个 task pending 时,会从该 pool 中获取下一个 Future task 进行 spawn_local。
poll() 返回 JoinHandle 类型,它实现了 Future trait,可以进行 .await 来获得最终值。如果一个 task 执行时间很长,会导致 block_on 没有机会执行其他 task,引起性能问题。
spawn_local() 要求传入的 async task Future 对象是 ‘static 的(因为是单线程执行,所以不要求实现 Send+Sync),这是由于该 Future 的 执行时机是不确定的
,如果不 .await 它返回的 handle,则有可能 many_requests() 函数返回了,但是 Future 还没有被执行,从而导致引用失效。
解决办法:
- 传入一个 async 辅助函数,它拥有对应的参数值,这样该函数就实现了 ‘static;
- 使用 async move {} 来捕获对象,这样该 block 返回的 Future 也实现了 ‘static;
// async 辅助函数,拥有传入参数值的所有权
async fn cheapo_owning_request(host: String, port: u16, path: String) -> std::io::Result<String> {
cheapo_request(&host, port, &path).await
}
for (host, port, path) in requests {
handles.push(task::spawn_local(cheapo_owning_request(host, port, path)));
}
// 在单线程中并发发起多个 request
let requests = vec![
("example.com".to_string(), 80, "/".to_string()),
("www.red-bean.com".to_string(), 80, "/".to_string()),
("en.wikipedia.org".to_string(), 80, "/".to_string()),
];
// block_on() 是同步函数,返回传入的 async task 的返回值。
let results = async_std::task::block_on(many_requests(requests));
for result in results {
match result {
Ok(response) => println!("{}", response),
Err(err) => eprintln!("error: {}", err),
}
}
spawn() 是在一个专门的线程池(tokio 称为 worker thread 或 core thread)中并发执行 async task。spawn() 和 spawn_local() 一样,也返回 JoinHandler,JoinHandler 实现了 Future,需要 .await 获得结果。但是它不依赖调用 block_on() 来 poll,而是执行候该函数后立即开始被 poll。由于传给 spawn() 的 Future 可能被线程池中任意线程执行,而且在暂停恢复后可能会在另一个线程中运行,所以不能使用线程本地存储变量(解决办法是使用 task_local! 宏),同时 Future 对象必须满足 Future+Send+'static
,这样该 Future 对象才能安全的 move 到其他线程执行。(类似于 std::thread::spawn() 对闭包的要求);
use async_std::task;
let mut handles = vec![];
for (host, port, path) in requests {
handles.push(task::spawn(
// block 捕获了上下问文对象所有权,所以返回的是 'static Future 对象
async move {
cheapo_request(&host, port, &path).await
}
));
}
use async_std::task;
use std::rc::Rc;
// 错误:reluctant() 返回的 Future 没有实现 Send。
async fn reluctant() -> String {
let string = Rc::new("ref-counted string".to_string());
some_asynchronous_thing().await; // Rc 跨 await
format!("Your splendid string: {}", string)
}
task::spawn(reluctant());
// 解决办法:将不支持 Send 的变量隔离在单独的 .await 所在的 block。
async fn reluctant() -> String {
let return_value = {
let string = Rc::new("ref-counted string".to_string());
format!("Your splendid string: {}", string)
};
some_asynchronous_thing().await; // await 没有跨 Rc
return_value
}
另一个常见的错误是 Box<dyn std::error::Error> 对象不支持 Send,比如 some_fallible_thing() 的返回值 Result 有效性是整个 match 表达式,所以跨越了 Ok 分支的 .await, 这时 Result 如果不是 Send 就报错:
type GenericError = Box<dyn std::error::Error>;
type GenericResult<T> = Result<T, GenericError>;
fn some_fallible_thing() -> GenericResult<i32> {
//...
}
// 该异步函数返回的 Future 没有实现 Send
async fn unfortunate() {
match some_fallible_thing() {
Err(error) => {
// 返回的 Error 类型是 Box<dyn std::error::Error> 没有实现 Send
report_error(error);
}
Ok(output) => {
// 返回值跨越了这个 .await
use_output(output).await;
}
} }
// 报错
async_std::task::spawn(unfortunate());
解决办法是:重新定义 Error, 添加 Send Bound,‘static 是 Box<dyn Trait> 默认的 lifetime,可以不添加。
type GenericError = Box<dyn std::error::Error + Send + Sync + 'static>;
type GenericResult<T> = Result<T, GenericError>;
.await 表达式是 Rust 执行异步 poll 的时间点(yield point),默认情况下 task 可以被异步运行时调度到其它 thread 中执行,所以 async fn 中涉及跨 await 的对象,都需要是能 Send 的。
spawn_blocking() 是异步运行时用来解决可能会阻塞或长时间运行(CPU 消耗性)的任务问题,它的参数是一个 同步闭包函数
,运行时会立即创建一个 =新的线程来执行=(tokio 中称为 blocking thread,该线程也组成线程池,在被过期回收前可以被运行时重复使用),它返回一个 Future,可以被 .await 结果。
async fn verify_password(password: &str, hash: &str, key: &str) ->
Result<bool, argonautica::Error>
{
let password = password.to_string();
let hash = hash.to_string();
let key = key.to_string();
tokio::task::spawn_blocking(
// 参数是一个同步闭包函数,而非前面的 async task
move || {
argonautica::Verifier::default()
.with_hash(hash)
.with_password(password)
.with_secret_key(key)
.verify()
}).await }
// 对于长时间运行的任务,可以主动 yield
while computation_not_done() {
tokio::task::yield_now().await;
}
如果在 async 中使用跨 await 的 Mutex,则应该使用异步感知的 Mutex 而非标准库的 Mutext。异步感知指的是后续加锁如果失败,则 yield 当前任务,防止单线程死锁:
// https://tokio.rs/tokio/tutorial/shared-state
use std::sync::{Mutex, MutexGuard};
async fn increment_and_do_stuff(mutex: &Mutex<i32>) {
let mut lock: MutexGuard<i32> = mutex.lock().unwrap();
*lock += 1;
do_something_async().await;
// lock 还有效,直到函数返回时被 drop
} // lock goes out of scope here
报错:std::sync::MutexGuard 没有实现 Send:
error: future cannot be sent between threads safely
--> src/lib.rs:13:5
|
13 | tokio::spawn(async move {
| ^^^^^^^^^^^^ future created by async block is not `Send`
|
::: /playground/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-0.2.21/src/task/spawn.rs:127:21
|
127 | T: Future + Send + 'static,
| ---- required by this bound in `tokio::task::spawn::spawn`
|
= help: within `impl std::future::Future`, the trait `std::marker::Send` is not implemented for `std::sync::MutexGuard<'_, i32>`
note: future is not `Send` as this value is used across an await
--> src/lib.rs:7:5
|
4 | let mut lock: MutexGuard<i32> = mutex.lock().unwrap();
| -------- has type `std::sync::MutexGuard<'_, i32>` which is not `Send`
...
7 | do_something_async().await;
| ^^^^^^^^^^^^^^^^^^^^^^^^^^ await occurs here, with `mut lock` maybe used later
8 | }
| - `mut lock` is later dropped here
解决办法:将 MutexGuard 封装在单独的 block,使其不跨 await 有效。
async fn increment_and_do_stuff(mutex: &Mutex<i32>) {
{
let mut lock: MutexGuard<i32> = mutex.lock().unwrap();
*lock += 1;
} // lock goes out of scope here
do_something_async().await;
}
或者使用 tokio 提供的异步感知 Mutex 类型:
use async_std::sync::Mutex;
pub struct Outbound(Mutex<TcpStream>);
impl Outbound {
pub fn new(to_client: TcpStream) -> Outbound {
Outbound(Mutex::new(to_client))
}
pub async fn send(&self, packet: FromServer) -> ChatResult<()> {
let mut guard = self.0.lock().await;
utils::send_as_json(&mut *guard, &packet).await?;
guard.flush().await?;
Ok(())
} }
async task 可以使用 async fn 或 async block 来构造,它们都是返回实现 Future trait 的语法糖。
- task::spawn() 立即在后台运行异步任务,而不管是否 .await 它返回的 JoinHandle;
- JoinHandle 实现了 Future trait,.await 它时返回 Result<T, JoinError>;
use tokio::task;
// 需要位于 Runtime Context,spawn 的 task 立即在后台执行,而不管是否 .await 它返回的
// 实现 Future 的 JoinHandle。
let join = task::spawn(async {
// ...
"hello world!"
});
// ...
// Await the result of the spawned task.
let result = join.await?;
assert_eq!(result, "hello world!");
// 如果 task panic,则 .await 返回 JoinErr
let join = task::spawn(async {
panic!("something bad happened!")
});
// The returned result indicates that the task failed.
assert!(join.await.is_err());
spawn() 所需的 async task 的签名是 Future + Send + 'static
, 所以:
- 不能直接共享借用 stack 上的变量(但是可以共享借用全局 static/static mut 常量,它们具有 ‘static 生命周期);
- 使用 move 来将引用的外部对象的所有权转移到 task, 从而满足 ‘static 的要求;
- task 在多个 thread 间调度, 所以必须是可以 Send 的, 这意味着 task 内部的跨 await 的变量(因为在 await 处生成 Future object, 它记录了上下文变量)必须实现 Send;
use tokio::task;
#[tokio::main]
async fn main() {
let v = vec![1, 2, 3];
task::spawn(async {
println!("Here's a vec: {:?}", v); // 错误
});
}
// error[E0373]: async block may outlive the current function, but
// it borrows `v`, which is owned by the current function
// --> src/main.rs:7:23
// |
// 7 | task::spawn(async {
// | _______________________^
// 8 | | println!("Here's a vec: {:?}", v);
// | | - `v` is borrowed here
// 9 | | });
// | |_____^ may outlive borrowed value `v`
// |
// note: function requires argument type to outlive `'static`
// --> src/main.rs:7:17
// |
// 7 | task::spawn(async {
// | _________________^
// 8 | | println!("Here's a vector: {:?}", v);
// 9 | | });
// | |_____^
// help: to force the async block to take ownership of `v` (and any other
// referenced variables), use the `move` keyword
// |
// 7 | task::spawn(async move {
// 8 | println!("Here's a vec: {:?}", v);
// 9 | });
// |
// Error
use tokio::task::yield_now;
use std::rc::Rc;
#[tokio::main]
async fn main() {
tokio::spawn(async {
let rc = Rc::new("hello");
// `rc` is used after `.await`. It must be persisted to the
// task's state.
yield_now().await;
println!("{}", rc);
});
}
// OK
use tokio::task::yield_now;
use std::rc::Rc;
#[tokio::main]
async fn main() {
tokio::spawn(async {
// The scope forces `rc` to drop before `.await`.
{
let rc = Rc::new("hello");
println!("{}", rc);
}
// `rc` is no longer used. It is **not** persisted when the task
// yields to the scheduler
yield_now().await; // yield_now() 返回 Future,必须 await 才会实际 yield
});
}
解决办法: 将 Rc 改成实现 Send 的 Arc:
error: future cannot be sent between threads safely
--> src/main.rs:6:5
|
6 | tokio::spawn(async {
| ^^^^^^^^^^^^ future created by async block is not `Send`
|
::: [..]spawn.rs:127:21
|
127 | T: Future + Send + 'static,
| ---- required by this bound in
| `tokio::task::spawn::spawn`
|
= help: within `impl std::future::Future`, the trait
| `std::marker::Send` is not implemented for
| `std::rc::Rc<&str>`
note: future is not `Send` as this value is used across an await
--> src/main.rs:10:9
|
7 | let rc = Rc::new("hello");
| -- has type `std::rc::Rc<&str>` which is not `Send`
...
10 | yield_now().await;
| ^^^^^^^^^^^^^^^^^ await occurs here, with `rc` maybe
| used later
11 | println!("{}", rc);
12 | });
| - `rc` is later dropped here
其它不需要 Future + Send + 'static
的特殊 task 任务类型:
- tokio::task::block_in_place(f) // f 同步闭包函数: FnOnce() -> R
- tokio::task::LocalSet::spawn_local(f) // f 是 Future + ‘static
// F 是同步函数闭包,而不是 async block 或 fn。
// 在当前 thread 执行同步函数,而 spawn_blocking() 是在一个线程池中执行同步函数。
pub fn block_in_place<F, R>(f: F) -> R
where
F: FnOnce() -> R
// F 不需要实现 Send+Sync
// 在 block_on() 所在的 thread 上执行 future
pub fn spawn_local<F>(&self, future: F) -> JoinHandle<F::Output>
where
F: Future + 'static,
F::Output: 'static,
let local = task::LocalSet::new();
local.spawn_local(async {
// ...
});
task::spawn() 和 task::spawn_blocking() 返回 JoinHandle
类型:
- JoinHandle 实现了 Future,.await 返回结果;
- 当 JoinHandle 被 Drop 时,关联的 task 会被从 Runtime detach,继续运行,但不能再 join 它。
- 它的 abort_handle() 返回可以 remote abort 该 task 的 AbortHandle;
impl<T> JoinHandle<T>
// Abort the task associated with the handle.
pub fn abort(&self)
// Checks if the task associated with this JoinHandle has finished.
pub fn is_finished(&self) -> bool
// Returns a new AbortHandle that can be used to remotely abort this task.
pub fn abort_handle(&self) -> AbortHandle
use tokio::{time, task};
let mut handles = Vec::new();
handles.push(tokio::spawn(async {
time::sleep(time::Duration::from_secs(10)).await;
true
}));
handles.push(tokio::spawn(async {
time::sleep(time::Duration::from_secs(10)).await;
false
}));
let abort_handles: Vec<task::AbortHandle> = handles.iter().map(
|h| h.abort_handle()).collect();
for handle in abort_handles {
handle.abort();
}
for handle in handles {
assert!(handle.await.unwrap_err().is_cancelled());
}
JoinHandle 实现了 Future,它的 Output = Result<T, JoinError>, JoinError
类型用于判断该 task 是否被 cancelled/panic 等出错原因:
impl JoinError
pub fn is_cancelled(&self) -> bool
pub fn is_panic(&self) -> bool
pub fn into_panic(self) -> Box<dyn Any + Send + 'static>
pub fn try_into_panic(self) -> Result<Box<dyn Any + Send + 'static>, JoinError>
pub fn id(&self) -> Id
use std::panic;
#[tokio::main]
async fn main() {
let err = tokio::spawn(async {
panic!("boom");
}).await.unwrap_err();
assert!(err.is_panic());
}
spawn 的 task 可以通过 JoinHandle.abort()
方法来取消(Cancelled), 对应的 task 会在 下一次 yield 时(如 .await)
时被终止。这时 JoinHandle 的 .await 结果是 JoinErr,它的 is_cancelled() 为 true。abort task 并不代表 task 一定以 JoinErr 结束,因为有些任务可能在 yield 前正常结束,这时 .await 返回正常结果。
abort() 方法可能在 task 被终止前返回,可以使用 JoinHandle .await 来确保 task 被终止后返回。
对于 spawn_blocking() 创建的任务,由于不是 async,所以调用它返回的 JoinHandle.abort() 是无效的,task 会持续运行。
如果使用的不是 tokio crate 的 APIs,如标准库的 IO APIs,则可能会阻塞 tokio Runtime。对于可能会引起阻塞的任务,tokio 提供了在 async context 中运行同步函数的 task::spawn_blocking() 和 task::block_in_place()
函数。
task::spawn_blocking() 是在单独的 blocing thread pool 中运行同步任务(clouse 标识),从而避免阻塞运行 aysnc task 的线程。
// async context 中,在单独的线程中运行可能会阻塞 tokio 的代码
let join = task::spawn_blocking(|| {
// do some compute-heavy work or call synchronous code
"blocking completed"
});
let result = join.await?;
assert_eq!(result, "blocking completed");
如果使用的多线程 runtime,则 task::block_in_place()
也是可用的,它也是在 async context 中运行可能 blocking 当前线程的代码,但是它是将 Runtime 的 worker thread 转换为 blocking thread 来实现的,这样可以避免上下文切换来提升性能:
use tokio::task;
let result = task::block_in_place(|| {
// do some compute-heavy work or call synchronous code
"blocking completed"
});
assert_eq!(result, "blocking completed");
async fn task::yield_now()
类似于 std::thread::yield_now(), .await 该函数时会使当前 task yield to tokio Runtime 调度器,让其它 task 被调度执行。
use tokio::task;
async {
task::spawn(async {
// ...
println!("spawned task done!")
});
// Yield, allowing the newly-spawned task to execute first.
task::yield_now().await; // 必须 .await
println!("main task done!");
}
协作式调度:tokio Runtime 没有使用 OS thread 的抢占式调度,而是使用协作式调度,可以避免一个 task 执行时长时间占有 CPU 而影响其他 task 的执行。这是通过在 tokio libray 中 强制插入一些 yield point
,从而强制实现 task 周期返回 executor,从而可以调度其他 task 运行。
task::unconstrained
可以对 task 规避 tokio 协作式调度(coop),使用它包裹的 Future task 不会被 forced to yield to Tokio:
use tokio::{task, sync::mpsc};
let fut = async {
let (tx, mut rx) = mpsc::unbounded_channel();
for i in 0..1000 {
let _ = tx.send(());
// This will always be ready. If coop was in effect, this code would be
// forced to yield periodically. However, if left unconstrained, then
// this code will never yield.
rx.recv().await;
}
};
task::unconstrained(fut).await;
3 JoinSet/AbortHandle #
JoinSet: spawn 一批 task,等待一些或全部执行完成,按照完成的顺序返回。
- 所有任务的返回类型 T 必须相同;
- 如果 JoinSet 被 Drop,则其中的所有 task 立即被 aborted;
对比:标准库的 std::future::join!() 宏是等待所有 task 都完成,而不能单独等待某一个完成。
use tokio::task::JoinSet;
#[tokio::main]
async fn main() {
let mut set = JoinSet::new();
for i in 0..10 {
set.spawn(async move { i });
}
let mut seen = [false; 10];
// join_next() 返回下一个返回的 task 结果
while let Some(res) = set.join_next().await {
let idx = res.unwrap();
seen[idx] = true;
}
for i in 0..10 {
assert!(seen[i]);
}
}
JoinSet 的方法:
impl<T> JoinSet<T>
pub fn new() -> Self
// 返回 JoinSet 中 task 数量
pub fn len(&self) -> usize
pub fn is_empty(&self) -> bool
impl<T: 'static> JoinSet<T>
// 使用 Builder 来构造一个 task:只能设置 name 然后再 spawn
pub fn build_task(&mut self) -> Builder<'_, T>
use tokio::task::JoinSet;
#[tokio::main]
async fn main() -> std::io::Result<()> {
let mut set = JoinSet::new();
// Use the builder to configure a task's name before spawning it.
set.build_task()
.name("my_task")
.spawn(async { /* ... */ })?;
Ok(())
}
// JoinSet 的 spawn_xx() 方法返回的是 AbortHandle,而非 JoinHandle。
// 在 JoinSet 中 spawn 一个 task,该 task 会立即运行
pub fn spawn<F>(&mut self, task: F) -> AbortHandle
where F: Future<Output = T> + Send + 'static, T: Send
// 在指定的 Handler 对应的 Runtime 上 spawn task
pub fn spawn_on<F>(&mut self, task: F, handle: &Handle) -> AbortHandle
where F: Future<Output = T> + Send + 'static, T: Send
pub fn spawn_local<F>(&mut self, task: F) -> AbortHandle
where F: Future<Output = T> + 'static
pub fn spawn_local_on<F>( &mut self, task: F, local_set: &LocalSet ) -> AbortHandle
where F: Future<Output = T> + 'static
pub fn spawn_blocking<F>(&mut self, f: F) -> AbortHandle
where F: FnOnce() -> T + Send + 'static, T: Send
pub fn spawn_blocking_on<F>(&mut self, f: F, handle: &Handle) -> AbortHandle
where F: FnOnce() -> T + Send + 'static, T: Send
use tokio::task::JoinSet;
#[tokio::main]
async fn main() {
let mut set = JoinSet::new();
for i in 0..10 {
set.spawn_blocking(move || { i });
}
let mut seen = [false; 10];
while let Some(res) = set.join_next().await {
let idx = res.unwrap();
seen[idx] = true;
}
for i in 0..10 {
assert!(seen[i]);
}
}
// 等待直到其中一个 task 完成,返回它的 output,注意是异步函数,需要 .await 才返回结果。
// 当 JoinSet 为空时,返回 None,可用于 while let 中。
pub async fn join_next(&mut self) -> Option<Result<T, JoinError>>
pub async fn join_next_with_id(&mut self) -> Option<Result<(Id, T), JoinError>>
pub fn try_join_next(&mut self) -> Option<Result<T, JoinError>>
pub fn try_join_next_with_id(&mut self) -> Option<Result<(Id, T), JoinError>>
// Aborts all tasks and waits for them to finish shutting down. Calling this
// method is equivalent to calling abort_all and then calling join_next in a
// loop until it returns None.
pub async fn shutdown(&mut self)
// Aborts all tasks on this JoinSet. This does not remove the tasks from the
// JoinSet. To wait for the tasks to complete cancellation, you should call
// join_next in a loop until the JoinSet is empty.
pub fn abort_all(&mut self)
// Removes all tasks from this JoinSet without aborting them. The tasks removed
// by this call will continue to run in the background even if the JoinSet
// is dropped.
pub fn detach_all(&mut self)
JoinSet 的各 spawn() 都返回 AbortHandle
对象,通过该对象可以 abort 对应的 spawned task,而不等待运行结束。不像 JoinHandle 那样 await task 结束,AbortHandle 只用于 terminate task 而不等待它结束。Drop AbortHandle 释放了终止 task 的权利,并不会终止任务。
impl AbortHandle
// 终止 task。JoinHandle 可能返回 JoinError 错误(也可能正常结束)
pub fn abort(&self)
// abort/canceld task 是否完成
pub fn is_finished(&self) -> bool
pub fn id(&self) -> Id
4 LocalSet/spawn_local() #
LocalSet 是在同一个 thread 上运行一批异步 task,可以避免 tokio::spawn() 的 Future task 必须实现 Send 的要求(如在 async task 中使用 Rc):
- task::LocalSet::new() 创建一个 LocalSet;
- task::LocalSet::run_until() 运行一个 async Future task,该方法为 task 创建一个 LocalSet 上下文,所以可以在 task 中使用 tokio::task::spawn_local() 函数来提交任务。
- 只能使用 spawn_local() 方法来提交任务,不能使用 task::spawn();
- 当 LocalSet 中所有 task 都结束时,.await 返回。
tokio::spawn_local() 向 LocalSet 提交的 !Send task 在 Runtime::block_on() 所在的单线程中调用。所以 LocalSet::run_until() 只能在 #[tokio::main]/#[tokio::test] 或者 Runtime::block_on() 中调用,不能在 tokio::spawn() 中调用。
use std::rc::Rc;
use tokio::task;
#[tokio::main]
async fn main() {
let nonsend_data = Rc::new("my nonsend data...");
let local = task::LocalSet::new();
// run_util() 自动调用 local.enter(), Future 位于该 LocalSet context 中
local.run_until(async move {
let nonsend_data = nonsend_data.clone();
// 在 run_until() 内部,可以多次使用 task::spawn_local() 提交任务,
// 都在 block_on() 所在的单线程上执行。
task::spawn_local(async move {
println!("{}", nonsend_data);
// ...
}).await.unwrap();
}).await; // .await 等待所有 spawn_local() 提交的 task 结束时返回
}
// 另一个例子
use tokio::{task, time};
use std::rc::Rc;
#[tokio::main]
async fn main() {
let nonsend_data = Rc::new("world");
let local = task::LocalSet::new();
let nonsend_data2 = nonsend_data.clone();
// 使用 LocalSet.spawn_local() 提交任务
local.spawn_local(async move {
// ...
println!("hello {}", nonsend_data2)
});
local.spawn_local(async move {
time::sleep(time::Duration::from_millis(100)).await;
println!("goodbye {}", nonsend_data)
});
// ...
local.await;
}
LocalSet 的方法:
impl LocalSet
pub fn new() -> LocalSet
// 进入 LocalSet 的 context,这样后续使用 tokio::task::spawn_local() 提交
// !Send task;
pub fn enter(&self) -> LocalEnterGuard
// 提交一个 !Send task, 返回可以 .await 的 JoinHandle
pub fn spawn_local<F>(&self, future: F) -> JoinHandle<F::Output>
where F: Future + 'static, F::Output: 'static
// 在指定的 Runtime 中同步执行 future,直到返回。
// 内部调用 Runtime::block_on() 函数,所以需要在同步上下文中调用。
pub fn block_on<F>(&self, rt: &Runtime, future: F) -> F::Output
where F: Future
// 执行一个 Future, 内部可以使用 tokio::task::spawn_local() 提交 !Send task,
// 运行直到这些 task 结束;
pub async fn run_until<F>(&self, future: F) -> F::Output where F: Future
pub fn unhandled_panic(&mut self, behavior: UnhandledPanic) -> &mut Self
pub fn id(&self) -> Id
5 tokio::mutex #
Rust 在 .await 时有可能将当前 task 转移到其他 thread 运行, 所以需要 async task block 实现 Send.
由于标准库的 MutexGuard 没有实现 Send, 在持有 MutexGuard 的情况下不能跨 .await。
use std::sync::{Mutex, MutexGuard};
async fn increment_and_do_stuff(mutex: &Mutex<i32>) {
let mut lock: MutexGuard<i32> = mutex.lock().unwrap();
*lock += 1;
do_something_async().await;
} // lock goes out of scope here
编译报错: error: future cannot be sent between threads safely
error: future cannot be sent between threads safely
--> src/lib.rs:13:5
|
13 | tokio::spawn(async move {
| ^^^^^^^^^^^^ future created by async block is not `Send`
|
::: /playground/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-0.2.21/src/task/spawn.rs:127:21
|
127 | T: Future + Send + 'static,
| ---- required by this bound in `tokio::task::spawn::spawn`
|
= help: within `impl std::future::Future`, the trait `std::marker::Send` is not implemented for `std::sync::MutexGuard<'_, i32>`
note: future is not `Send` as this value is used across an await
--> src/lib.rs:7:5
|
4 | let mut lock: MutexGuard<i32> = mutex.lock().unwrap();
| -------- has type `std::sync::MutexGuard<'_, i32>` which is not `Send`
...
7 | do_something_async().await;
| ^^^^^^^^^^^^^^^^^^^^^^^^^^ await occurs here, with `mut lock` maybe used later
8 | }
| - `mut lock` is later dropped here
解决办法: 将 MutexGuard 的 destructor 在 await 前运行:
// This works!
async fn increment_and_do_stuff(mutex: &Mutex<i32>) {
{
let mut lock: MutexGuard<i32> = mutex.lock().unwrap();
*lock += 1;
} // lock goes out of scope here
do_something_async().await;
}
// This fails too.
// 这是由于 lock 变量在 await 时还有效。
async fn increment_and_do_stuff(mutex: &Mutex<i32>) {
let mut lock: MutexGuard<i32> = mutex.lock().unwrap();
*lock += 1;
drop(lock);
do_something_async().await;
}
或者使用 struct 来封装 Mutex, 然后在同步函数中处理锁:
use std::sync::Mutex;
struct CanIncrement {
mutex: Mutex<i32>,
}
impl CanIncrement {
// This function is not marked async.
fn increment(&self) {
let mut lock = self.mutex.lock().unwrap();
*lock += 1;
}
}
async fn increment_and_do_stuff(can_incr: &CanIncrement) {
// 在异步函数中,调用加锁的同步函数。
can_incr.increment();
do_something_async().await;
}
或者使用 tokio 的异步 Mutext, tokio::sync::Mutex 支持跨 .await, 但是性能会差一些:
use tokio::sync::Mutex; // note! This uses the Tokio mutex
// This compiles!
// (but restructuring the code would be better in this case)
async fn increment_and_do_stuff(mutex: &Mutex<i32>) {
let mut lock = mutex.lock().await;
*lock += 1;
do_something_async().await;
} // lock goes out of scope here
综上, 对于 Mutex 和带来的 Send 问题:
- 首选将 Mutex 封装到 Struct 和同步函数中;
- 或者将 Mutex 在 .await 前解构(必须是 block 解构, 而不是 drop());
- 或者 spawn 一个 task 来专门管理 state, 其他 task 使用 message 来对它进行操作.
异步任务中可以使用 std::sync::Arc/Mutex 同步原语来对共享内存进行并发访问。这里使用 std::sync::Mutex 而非 tokio::sync::Mutex, =只有当 Mutex 跨 await 时才需要使用异步 Mutex=。
Mutex lock 阻塞时会阻塞当前 thread, 会导致其他 async task 也不能调度到该 thread 执行。缺省情况下, tokio runtime 使用 multi-thread scheduler, task 被调度到那个 thread 是tokio runtime 决定的。
tokio 还提供 current_thread runtime flavor, 它是一个轻量级, 单线程的 runtime。
use bytes::Bytes;
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
// 定义类型别名
type Db = Arc<Mutex<HashMap<String, Bytes>>>;
use tokio::net::TcpListener;
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
#[tokio::main]
async fn main() {
let listener = TcpListener::bind("127.0.0.1:6379").await.unwrap();
println!("Listening");
let db = Arc::new(Mutex::new(HashMap::new()));
loop {
let (socket, _) = listener.accept().await.unwrap();
// Clone the handle to the hash map.
let db = db.clone();
println!("Accepted");
tokio::spawn(async move {
process(socket, db).await;
});
}
}
use tokio::net::TcpStream;
use mini_redis::{Connection, Frame};
async fn process(socket: TcpStream, db: Db) {
use mini_redis::Command::{self, Get, Set};
// Connection, provided by `mini-redis`, handles parsing frames from the socket
let mut connection = Connection::new(socket);
while let Some(frame) = connection.read_frame().await.unwrap() {
let response = match Command::from_frame(frame).unwrap() {
Set(cmd) => {
// db 是 MutexGuard 类型, 生命周期没有跨 await,
// 所以可以使用同步 Mutex;
let mut db = db.lock().unwrap();
db.insert(cmd.key().to_string(), cmd.value().clone());
Frame::Simple("OK".to_string())
}
Get(cmd) => {
let db = db.lock().unwrap();
if let Some(value) = db.get(cmd.key()) {
Frame::Bulk(value.clone())
} else {
Frame::Null
}
}
cmd => panic!("unimplemented {:?}", cmd),
};
// Write the response to the client
connection.write_frame(&response).await.unwrap();
}
}
为了尽量降低 Mutex 竞争导致的 task/thread block, 可以:
- Switching to a dedicated task to manage state and use message passing.
Shard the mutex
- Restructure the code to
avoid the mutex
.
第二种情况的例子: 将单一 Mutex<HashMap<_, __>>
实例, 拆解为 N 个实例:
- 第三方 crate dashmap 提供了更复杂的 sharded hash map;
type ShardedDb = Arc<Vec<Mutex<HashMap<String, Vec<u8>>>>>;
fn new_sharded_db(num_shards: usize) -> ShardedDb {
let mut db = Vec::with_capacity(num_shards);
for _ in 0..num_shards {
db.push(Mutex::new(HashMap::new()));
}
Arc::new(db)
}
// 后续查找 map 前先计算下 key 的 hash
let shard = db[hash(key) % db.len()].lock().unwrap();
shard.insert(key, value);
6 tokio::sync #
tokio 提供了 4 种类型 channel:
- oneshot:发送和接收一个值;
- mpsc:多个发送方,一个接收方;
- broadcast:多个发送方,多个接收方;
- watch:只保证接收方收到最新值,不保证它们收到所有值;
std:sync::mpsc 和 crossbeam::channel 都是 同步 channel
, 不能在 async func 使用, 否则可能 block 当前线程和 task.
tokio::sync:mpsc 是 multi-producer signle-consumer channel, 可以在 async 异步函数中使用, 每个 message 只能被一个 consumer 消费。
async_channel crate
提供了 multi-producer multi-consumer channel,
oneshot:The oneshot channel supports sending a single value from a single producer to a single consumer
. This channel is usually used to send the result of a computation to a waiter. Example: using a oneshot channel to
receive the result of a computation.
- oneshot::channel() 用于创建一对 Sender 和 Receiver;
- Sender 的 send() 方法是同步方法,故可以在同步或异步上下文中使用;
- Receiver .await 返回 Sender 发送的值,Sender 被 Drop 后,Receiver .await 返回 error::RecvError;
use tokio::sync::oneshot;
async fn some_computation() -> String {
"represents the result of the computation".to_string()
}
#[tokio::main]
async fn main() {
// 创建一对发送和接收 handler
let (tx, rx) = oneshot::channel();
tokio::spawn(async move {
let res = some_computation().await;
tx.send(res).unwrap(); // send() 是非异步的方法
});
// Do other work while the computation is happening in the background
// Wait for the computation result
let res = rx.await.unwrap();
}
如果 sender 在 send 前被 drop,则 receiver 失败,错误为 error::RecvError
:
use tokio::sync::oneshot;
#[tokio::main]
async fn main() {
let (tx, rx) = oneshot::channel::<u32>();
tokio::spawn(async move {
drop(tx);
});
match rx.await {
Ok(_) => panic!("This doesn't happen"),
Err(_) => println!("the sender dropped"),
}
}
在 tokio::select! loop 中使用 oneshot channel 时,需要在 channel 前添加 &mut
:
use tokio::sync::oneshot;
use tokio::time::{interval, sleep, Duration};
#[tokio::main]
async fn main() {
// select!{} 使用 &mut recv,所以 recv 必须是 mut 类型。
let (send, mut recv) = oneshot::channel();
let mut interval = interval(Duration::from_millis(100));
tokio::spawn(async move {
sleep(Duration::from_secs(1)).await;
send.send("shut down").unwrap();
});
loop {
tokio::select! {
_ = interval.tick() => println!("Another 100ms"),
// select! 自动 .await recv 的返回值
msg = &mut recv => {
println!("Got message: {}", msg.unwrap());
break;
}
}
}
}
oneshot::Receiver 的 close() 关闭 Receiver,这时 Sender 的 send() 方法会失败。
use tokio::sync::oneshot;
use tokio::sync::oneshot::error::TryRecvError;
#[tokio::main]
async fn main() {
let (tx, mut rx) = oneshot::channel();
assert!(!tx.is_closed());
rx.close();
assert!(tx.is_closed());
assert!(tx.send("never received").is_err());
match rx.try_recv() {
Err(TryRecvError::Closed) => {}
_ => unreachable!(),
}
}
oneshot::Sender 的 closed() 方法可以等待 oneshot::Receiver 被 closed 或被 Drop 时返回。Sender 的 is_closed() 方法用于获取 Receiver 是否被 closed 或 Drop,这时 Sender 的 send() 方法会失败:
use tokio::sync::oneshot;
#[tokio::main]
async fn main() {
let (mut tx, rx) = oneshot::channel::<()>();
tokio::spawn(async move {
drop(rx);
});
// 等待 rx 被 drop 时返回
tx.closed().await;
println!("the receiver dropped");
}
// 使用 select!
use tokio::sync::oneshot;
use tokio::time::{self, Duration};
async fn compute() -> String {
// Complex computation returning a `String`
}
#[tokio::main]
async fn main() {
let (mut tx, rx) = oneshot::channel();
tokio::spawn(async move {
tokio::select! {
_ = tx.closed() => {
// The receiver dropped, no need to do any further work
}
value = compute() => {
// The send can fail if the channel was closed at the exact
// same time as when compute() finished, so just ignore the
// failure.
let _ = tx.send(value);
}
}
});
// Wait for up to 10 seconds
let _ = time::timeout(Duration::from_secs(10), rx).await;
}
mpsc 是标准库的 mpsc 的异步版本:
use tokio::io::{self, AsyncWriteExt};
use tokio::net::TcpStream;
use tokio::sync::mpsc;
#[tokio::main]
async fn main() -> io::Result<()> {
let mut socket = TcpStream::connect("www.example.com:1234").await?;
let (tx, mut rx) = mpsc::channel(100);
for _ in 0..10 {
// Each task needs its own `tx` handle. This is done by cloning the
// original handle.
let tx = tx.clone();
tokio::spawn(async move {
tx.send(&b"data to write"[..]).await.unwrap();
});
}
// The `rx` half of the channel returns `None` once **all** `tx` clones
// drop. To ensure `None` is returned, drop the handle owned by the current
// task. If this `tx` handle is not dropped,
// there will always be a single outstanding `tx` handle.
drop(tx);
while let Some(res) = rx.recv().await {
socket.write_all(res).await?;
}
Ok(())
}
mpsc 和 oneshot 联合使用,可以实现 req/resp 响应的对共享资源的处理:
use tokio::sync::{oneshot, mpsc};
use Command::Increment;
enum Command {
Increment,
// Other commands can be added here
}
#[tokio::main]
async fn main() {
let (cmd_tx, mut cmd_rx) = mpsc::channel::<
(Command, oneshot::Sender<u64>)>(100);
// Spawn a task to manage the counter
tokio::spawn(async move {
let mut counter: u64 = 0;
while let Some((cmd, response)) = cmd_rx.recv().await {
match cmd {
Increment => {
let prev = counter;
counter += 1;
response.send(prev).unwrap();
}
}
}
});
let mut join_handles = vec![];
// Spawn tasks that will send the increment command.
for _ in 0..10 {
let cmd_tx = cmd_tx.clone();
join_handles.push(tokio::spawn(async move {
let (resp_tx, resp_rx) = oneshot::channel();
cmd_tx.send((Increment, resp_tx)).await.ok().unwrap();
let res = resp_rx.await.unwrap();
println!("previous value = {}", res);
}));
}
// Wait for all tasks to complete
for join_handle in join_handles.drain(..) {
join_handle.await.unwrap();
}
}
tokio mpsc message passing 机制:
- 一个 tokio spawn task 作为 manager 角色, 通过 buffered mpsc channel 接收 message, 然后根据 message 类型来操作有状态对象, 由于只有 manager 来串行操作该对象, 所以可以避免加锁。
- manager 通过 message 中的 response channel 来向发送者响应结果;
use bytes::Bytes;
use mini_redis::client;
use tokio::sync::{mpsc, oneshot};
/// Multiple different commands are multiplexed over a single channel.
#[derive(Debug)]
enum Command {
Get {
key: String,
resp: Responder<Option<Bytes>>, // 发送响应的 oneshot 类型 channel
},
Set {
key: String,
val: Bytes,
resp: Responder<()>,
},
}
/// Provided by the requester and used by the manager task to send the command
/// response back to the requester.
type Responder<T> = oneshot::Sender<mini_redis::Result<T>>;
#[tokio::main]
async fn main() {
// 创建一个 buffered 类型 channel, 当消费的慢时可以对发送端反压
// ( send(...).await 将阻塞一段时间), 从而降低内存和并发度, 防止消耗过多
// 系统资源。
let (tx, mut rx) = mpsc::channel(32);
let tx2 = tx.clone();
let manager = tokio::spawn(async move {
// Open a connection to the mini-redis address.
let mut client = client::connect("127.0.0.1:6379").await.unwrap();
while let Some(cmd) = rx.recv().await {
match cmd {
Command::Get { key, resp } => {
let res = client.get(&key).await;
// Ignore errors
// 使用 let _ = 来忽略 result 错误。
let _ = resp.send(res); // 通过 oneshot channel 发送响应
}
Command::Set { key, val, resp } => {
let res = client.set(&key, val).await;
// Ignore errors
let _ = resp.send(res);
}
}
}
});
// Spawn two tasks, one setting a value and other querying for key
// that was set.
let t1 = tokio::spawn(async move {
let (resp_tx, resp_rx) = oneshot::channel(); // 构建响应 channel
let cmd = Command::Get {
key: "foo".to_string(),
resp: resp_tx,
};
// Send the GET request
if tx.send(cmd).await.is_err() {
eprintln!("connection task shutdown");
return;
}
// Await the response
let res = resp_rx.await;
println!("GOT (Get) = {:?}", res);
});
let t2 = tokio::spawn(async move {
let (resp_tx, resp_rx) = oneshot::channel();
let cmd = Command::Set {
key: "foo".to_string(),
val: "bar".into(),
resp: resp_tx,
};
// Send the SET request
if tx2.send(cmd).await.is_err() {
eprintln!("connection task shutdown");
return;
}
// Await the response
let res = resp_rx.await;
println!("GOT (Set) = {:?}", res);
});
t1.await.unwrap();
t2.await.unwrap();
manager.await.unwrap();
}
broadcast channel: 从多个发送端向多个接收端发送多个值
,可以实现 fan out 模式,如 pub/sub 或 chat 系统。
- tokio::sync::broadcast::channel(N) 创建一个指定容量为 N 的 bounded,multi-producer,multi-consumer channel,当 channel 中元素数量达到 N 后,最老的元素将被清理,没有消费该元素的 Receiver 的 recv() 方法将返回 RecvError::Lagged 错误,然后该 Receiver 的读写位置将更新到 channel 中当前 最老的元素,下一次 recv() 将返回该元素。通过这种机制,Receiver 可以感知是否 Lagged 以及做相应的处理。
- Sender 实现了 clone,可以 clone 多个实例,然后在多个 task 中使用。当所有Sender 都被 drop 时,channel 将处于 closed 状态,这时 Receiver 的 recv() 将返回 RecvError::Closed 错误。
- Sender::subscribe() 创建新的 Receiver,它接收调用 subscribe() 创建它
后
发送的消息。当所有 Receiver 都被 drop 时,Sender::send() 方法返回 SendError; - Sender 发送的值会被 clone 后发送给所有 Receiver,直到它们
都收到
这个值后, 该值才会从 channel 中移除;
use tokio::sync::broadcast;
#[tokio::main]
async fn main() {
let (tx, mut rx1) = broadcast::channel(16);
// 创建一个新的接收端
let mut rx2 = tx.subscribe();
tokio::spawn(async move {
assert_eq!(rx1.recv().await.unwrap(), 10);
assert_eq!(rx1.recv().await.unwrap(), 20);
});
tokio::spawn(async move {
assert_eq!(rx2.recv().await.unwrap(), 10);
assert_eq!(rx2.recv().await.unwrap(), 20);
});
tx.send(10).unwrap();
tx.send(20).unwrap();
}
// 感知 Lagged
use tokio::sync::broadcast;
#[tokio::main]
async fn main() {
let (tx, mut rx) = broadcast::channel(2);
tx.send(10).unwrap();
tx.send(20).unwrap();
// 由于 channel 容量为 2,所以发送 30 后,10 将被丢弃。
tx.send(30).unwrap();
// The receiver lagged behind
// rx 没有收到被丢弃的 10,所以第一次 recv 报错。
assert!(rx.recv().await.is_err());
// 后续再 recv 时,开始接收最老的数据
// At this point, we can abort or continue with lost messages
assert_eq!(20, rx.recv().await.unwrap());
assert_eq!(30, rx.recv().await.unwrap());
}
watch channel: 从多个发送端向多个接收端发送多个值
,但是 channel 中只保存最新的一个值,所以如果接收端存在延迟,则不能保证它接收了所有的中间值。类似于容量为 1 的 broadcast channel。
使用场景:广播配置变更,应用状态变更,优雅关闭等。
- watch::channel(initial_value): 创建一个 watch channel 时可以指定一个初始值;
- Sender::subscribe()
创建一个新的 Receiver
,它只接收后面新发送的值; - Sender 实现了 Clone trait,可以 clone 多个实例来发送数据。同时 Sender 和 Receiver 都是 thread safe;
- Sender::is_closed()/closed() 为 Sender 提供检查所有 Receiver 是否被 closed 或 Drop 的方法。
- Sender::send() 必须在有 Receiver 的情况下才能发送成功,而 send_if_modified, send_modify, 或 send_replace 方法可以在没有 Receiver 的情况下发送成功;
- send_modify(modify: FnOnce(&mut T)); 无条件的更新 T 值,然后通知所有接收者,可以在没有接收者的情况下使用。
- send_if_modified<F>(&self, modify: F) -> bool 和 send_modify() 类似,但是 modify 闭包返回 bool 值为 true 时才通知所有接收者。
let sender = tokio::sync::watch::Sender::new(0u8);
// 没有 receiver,所以发送出错
assert!(sender.send(3).is_err());
// 使用 subscribe() 方法创建一个 receiver
let _rec = sender.subscribe();
// 发送成功
assert!(sender.send(4).is_ok());
Receiver 使用 Receiver::changed() 来接收 un-seen 值更新,如果没有 un-seen 值,则该方法会 sleep 直到有 un-seen 值或者 Sender 被 Drop,如果有 un-seen 值则该方法立即返回 Ok(()) 。
- Receiver 使用 Receiver::borrow_and_update() 来获取最新值,并标记为 seen。
- 如果只是获取最新值而不标记为 seen,则使用 Receiver::borrow()。borrow()没有将值标记为 seen,所以下一次调用 Receiver::changed() 时将立即返回 Ok(())。 borrow_and_update() 将值标记为 seen,所以下一次调用 Receiver::changed() 时将 sleep 直到有新的值;
- Receiver 在 borrow 值时会将 channel 设置一个 read lock,所以当 borrow 时间较长时,可能会阻塞 Sender;
use tokio::sync::watch;
use tokio::time::{self, Duration, Instant};
use std::io;
#[derive(Debug, Clone, Eq, PartialEq)]
struct Config {
timeout: Duration,
}
impl Config {
async fn load_from_file() -> io::Result<Config> {
// file loading and deserialization logic here
}
}
async fn my_async_operation() {
// Do something here
}
#[tokio::main]
async fn main() {
let mut config = Config::load_from_file().await.unwrap();
// 创建 watch channel 并提供初始值
let (tx, rx) = watch::channel(config.clone());
// Spawn a task to monitor the file.
tokio::spawn(async move {
loop {
// Wait 10 seconds between checks
time::sleep(Duration::from_secs(10)).await;
// Load the configuration file
let new_config = Config::load_from_file().await.unwrap();
// If the configuration changed, send the new config value on the
// watch channel.
if new_config != config {
tx.send(new_config.clone()).unwrap();
config = new_config;
}
}
});
let mut handles = vec![];
// Spawn tasks that runs the async operation for at most `timeout`.
// If the timeout elapses, restart the operation.
//
// The task simultaneously watches the `Config` for changes. When the
// timeout duration changes, the timeout is updated without restarting
// the in-flight operation.
for _ in 0..5 {
// Clone a config watch handle for use in this task
let mut rx = rx.clone();
let handle = tokio::spawn(async move {
// Start the initial operation and pin the future to the stack.
// Pinning to the stack is required to resume the operation across
// multiple calls to `select!`
let op = my_async_operation();
tokio::pin!(op);
// Get the initial config value
let mut conf = rx.borrow().clone();
let mut op_start = Instant::now();
let sleep = time::sleep_until(op_start + conf.timeout);
tokio::pin!(sleep);
loop {
tokio::select! {
_ = &mut sleep => {
// The operation elapsed. Restart it
op.set(my_async_operation());
// Track the new start time
op_start = Instant::now();
// Restart the timeout
sleep.set(time::sleep_until(op_start + conf.timeout));
}
_ = rx.changed() => {
// 获得最新值,然后标记为 seen
conf = rx.borrow_and_update().clone();
// The configuration has been updated. Update the
// `sleep` using the new `timeout` value.
sleep.as_mut().reset(op_start + conf.timeout);
}
_ = &mut op => {
// The operation completed!
return
}
}
}
});
handles.push(handle);
}
for handle in handles.drain(..) {
handle.await.unwrap();
}
}
tokio::sync::Notify 用于通知一个或所有的 task wakup,它本身不携带任何数据:
- A Notify can be thought of as a Semaphore starting with
0 permits
.Thenotified().await
method waits for a permit to become available, andnotify_one() sets a permit
if there currently are no available permits. - notified(&self) -> Notified<’_> :
Wait
for a notification. 返回的 Notified实现 Future,.await 时将被阻塞,直到收到通知; - notify_one(): Notifies the
first
waiting task. - notify_last(&self): Notifies the
last
waiting task. - notify_waiters(&self): Notifies
all
waiting tasks.
use tokio::sync::Notify;
use std::sync::Arc;
#[tokio::main]
async fn main() {
let notify = Arc::new(Notify::new());
let notify2 = notify.clone();
let handle = tokio::spawn(async move {
notify2.notified().await; // 等待被 Notify
println!("received notification");
});
println!("sending notification");
notify.notify_one();
// Wait for task to receive notification.
handle.await.unwrap();
}
#[tokio::main]
async fn main() {
let notify = Arc::new(Notify::new());
let notify2 = notify.clone();
let notified1 = notify.notified();
let notified2 = notify.notified();
let handle = tokio::spawn(async move {
println!("sending notifications");
notify2.notify_waiters();
});
notified1.await;
notified2.await;
println!("received notifications");
}
tokio::sync::Notify 可以作为替代 waker 的异步函数内部 thread 间同步的工具:
use tokio::sync::Notify;
use std::sync::Arc;
use std::time::{Duration, Instant};
use std::thread;
async fn delay(dur: Duration) {
let when = Instant::now() + dur;
let notify = Arc::new(Notify::new());
let notify_clone = notify.clone();
thread::spawn(move || {
let now = Instant::now();
if now < when {
thread::sleep(when - now);
}
notify_clone.notify_one();
});
notify.notified().await;
}
7 tokio::io #
tokio::io 没有定义和使用 Read/Write trait,而是定义和使用 AsyncRead/AsyncWrite/AsyncSeek trait
,同时为这两个 trait 定义了:
- 异步 Buf 版本:AsyncBufRead,AsyncBufReadExt;
- 其它扩展 trait:AsyncReadExt/AsyncWriteExt/AsyncSeekExt;
AsyncRead/AsyncWrite/AsyncBufRead trait 提供的 poll_XX() 开头的方法,不方便直接使用,而各种 Ext trait 则提供了更常用的 Read/Write/Lines 等方法。
从 AsyncRead 创建 tokio::io::BufReader 对象, 实现 AsyncRead/AsyncBufRead trait.
从 AsyncWrite 创建 Struct tokio::io::BufWriter 对象, 实现 AsyncWrite trait.
从同时实现了 AsyncRead/AsyncWrite 的对象创建 struct BufStream<RW> (例如 TCPStream 对象), 它实现了 AsyncBufRead 和 AsyncWrite.
AsyncReadExt trait 返回的对象都实现了 Future:
pub trait AsyncReadExt: AsyncRead {
// Provided methods
fn chain<R>(self, next: R) -> Chain<Self, R> where Self: Sized, R: AsyncRead { ... }
fn read<'a>(&'a mut self, buf: &'a mut [u8]) -> Read<'a, Self> where Self: Unpin { ... }
fn read_buf<'a, B>(&'a mut self, buf: &'a mut B) -> ReadBuf<'a, Self, B> where Self: Unpin, B: BufMut + ?Sized
fn read_exact<'a>(&'a mut self, buf: &'a mut [u8]) -> ReadExact<'a, Self> where Self: Unpin { ... }
fn read_u8(&mut self) -> ReadU8<&mut Self> where Self: Unpin { ... }
例如 read() 返回的 Read 对象实现了 Future, Poll ready 时返回 io::Result<usize>, 所以即使这些方法没有使用 async fn 形式,但由于返回的对象实现了 Future, 它们也可以被 .await 轮询:
// https://docs.rs/tokio/latest/src/tokio/io/util/read.rs.html#43
impl<R> Future for Read<'_, R> where R: AsyncRead + Unpin + ?Sized,
{
type Output = io::Result<usize>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<usize>> {
let me = self.project();
let mut buf = ReadBuf::new(me.buf);
ready!(Pin::new(me.reader).poll_read(cx, &mut buf))?;
Poll::Ready(Ok(buf.filled().len()))
}
}
// 示例
use tokio::fs::File;
// 需要显式导入 AsyncReadExt 和 AsyncWriteExt trait
use tokio::io::{self, AsyncReadExt, AsyncWriteExt};
#[tokio::main]
async fn main() -> io::Result<()> {
let mut f = File::open("foo.txt").await?;
let mut buffer = [0; 10];
// read up to 10 bytes
let n = f.read(&mut buffer[..]).await?;
println!("The bytes: {:?}", &buffer[..n]);
let mut buffer = Vec::new();
// read the whole file
f.read_to_end(&mut buffer).await?;
Ok(())
let mut file = File::create("foo.txt").await?;
// Writes some prefix of the byte string, but not necessarily all of it.
let n = file.write(b"some bytes").await?;
println!("Wrote the first {} bytes of 'some bytes'.", n);
Ok(())
}
tokio::io::split() 函数将传入的支持 AsyncRead + AsyncWrite 的 Stream 对象, 如 TCPStream, 拆分为 ReadHalf<T>, WriteHalf<T>, 前者实现 AsyncRead trait,后者实现 AsyncWrite trait。echo server client: 使用 tokio::io::split() 将实现 read/write 对象拆分为 read 和 write 两个对象。
pub fn split<T>(stream: T) -> (ReadHalf<T>, WriteHalf<T>)
where
T: AsyncRead + AsyncWrite,
// https://tokio.rs/tokio/tutorial/io
use tokio::io::{self, AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpStream;
#[tokio::main]
async fn main() -> io::Result<()> {
let socket = TcpStream::connect("127.0.0.1:6142").await?;
let (mut rd, mut wr) = io::split(socket);
// Write data in the background
tokio::spawn(async move {
wr.write_all(b"hello\r\n").await?;
wr.write_all(b"world\r\n").await?;
// Sometimes, the rust type inferencer needs a little help
Ok::<_, io::Error>(())
});
let mut buf = vec![0; 128];
loop {
let n = rd.read(&mut buf).await?;
if n == 0 {
break;
}
println!("GOT {:?}", &buf[..n]);
}
Ok(())
}
echo server: 尽量避免 stack buffer, 因为跨 .await 的上下文变量会随者 task Future 对象一起被保存, 如果使用较大的 stack buffer 变量, 则自动生成的 task Future 对象就比较大, buffer size 一般是 page sized 对齐的, 这导致 task 的大小大概是 $page-size + a-few-bytes, 浪费内存。
use tokio::io::{self, AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpListener;
#[tokio::main]
async fn main() -> io::Result<()> {
let listener = TcpListener::bind("127.0.0.1:6142").await?;
loop {
let (mut socket, _) = listener.accept().await?;
tokio::spawn(async move {
// 在堆中分配 buff 内存, 而不使用 stack 上的 array
let mut buf = vec![0; 1024];
loop {
match socket.read(&mut buf).await {
Ok(0) => return,
Ok(n) => {
if socket.write_all(&buf[..n]).await.is_err() {
return;
}
}
Err(_) => {
return;
}
}
}
});
}
}
tokio::io::duplex() 函数创建一个使用内存作为缓冲的 DuplexStream 类型对象,它实现了 AsyncRead/AsyncWrite trait。DuplexStream 被 drop 时, 另一端 read 还可以继续读取内存中的数据, 读到 0 byte 表示 EOF,而另一端 write 时立即返回 Err(BrokenPipe) ;
pub fn duplex(max_buf_size: usize) -> (DuplexStream, DuplexStream)
// 示例
let (mut client, mut server) = tokio::io::duplex(64);
client.write_all(b"ping").await?;
let mut buf = [0u8; 4];
server.read_exact(&mut buf).await?;
assert_eq!(&buf, b"ping");
server.write_all(b"pong").await?;
client.read_exact(&mut buf).await?;
assert_eq!(&buf, b"pong");
8 select!{} #
tokio::select!{} 宏用于同时 .await 多个 async task,当其中一个完成时返回, drop 其它分支的 task Future 对象。必须在 async context 中使用 select!, 如 async functions, closures, and blocks。
use tokio::sync::oneshot;
#[tokio::main]
async fn main() {
let (tx1, rx1) = oneshot::channel();
let (tx2, rx2) = oneshot::channel();
tokio::spawn(async {
let _ = tx1.send("one");
});
tokio::spawn(async {
let _ = tx2.send("two");
});
// select 并发 poll rx1 和 rx2, 如果 rx1 先 Ready 则结果赋值给 val,
// 同时 drop rx2.
tokio::select! {
val = rx1 => {
println!("rx1 completed first with {:?}", val);
}
val = rx2 => {
println!("rx2 completed first with {:?}", val);
}
}
}
tokio::sync::oneshot::Receiver 实现了 Drop, 被 Drop 时对应的 tokio::sync::oneshot::Sender 可以收到 closed 通知(sender 的 async closed() 方法返回)。
tokio::spawn(async {…}) 立即调度执行一个异步 task, 返回一个实现 Future 的 JoinHandle, Future 的关联类型是 Result<T, JoinError>, 所以通过 .await 它可以获得 task 的返回值 T。
use tokio::sync::oneshot;
async fn some_operation() -> String {
// Compute value here
}
#[tokio::main]
async fn main() {
let (mut tx1, rx1) = oneshot::channel();
let (tx2, rx2) = oneshot::channel();
tokio::spawn(async {
// Select on the operation and the oneshot's `closed()` notification.
tokio::select! {
val = some_operation() => {
let _ = tx1.send(val);
}
// tx1.closed() 是一个 async 方法, 故可以被 select,
// 当对端 rx1 被 Drop 时返回.
_ = tx1.closed() => {
// `some_operation()` is canceled, the task completes and
// `tx1` is dropped.
}
}
});
tokio::spawn(async {
let _ = tx2.send("two");
});
// rx2 先 .await 返回时, rx1 会被 Drop, 这时上面的 tx1.closed() .await 返回
tokio::select! {
val = rx1 => {
println!("rx1 completed first with {:?}", val);
}
val = rx2 => {
println!("rx2 completed first with {:?}", val);
}
}
}
select! 宏支持多个 branch(当前限制为 64 个branchs):
<pattern> = <async expression> (, if <precondition>)? => <handler>
else => <expression>
Rust 在 单线程异步执行
所有 branch 的 async expressions (一个 branch 的 async expression 不 Ready 时, 调度执行另一个 branch 的 async expression), 当第一个 async expressions 返回且它的 result 与 pattern 匹配时, Drop 所有其它 async expression, 然后执行 handler。如果 result 与 pattern 不匹配, 继续等待下一个异步返回并检查返回值是否匹配.
每个 branch 还可以有 if 表达式, 如果 if 表达式结果为 false, 则对应 async expression 还是会被执行, 但是返回的 Future 不会被 .await Poll. (例如在 loop 中多次 poll 一个 Pin 的 Future 对象, 一旦该对象 Ready, 后续就不能再 poll 了, 这时需要使用 if 表达式来排除该 branch).
每次进入执行 select! 时:
- 先执行个 branch 的 if 表达式, 结果为 false 时, disable 对应 branch;
- 执行所有 async expression(包括 disable 的 branch, 但是不 poll 它) , 注意是在单 thread 中异步执行这些expression;
- 并发 .await poll 未被 disable 的 expression 返回的 Future 对象;
- 当第一个 poll 返回时, 看是否匹配 pattern, 如果不匹配则 disable 该 branch, 继续等待其它 branch .await poll 返回, 重复 1-3; 如果匹配, 则执行该 branch 的 handler, 同时 drop 其它 branch 的 Future 对象.
- 如果所有 branch 都被 disable(如都不匹配 pattern), 则执行 else branch handler,如果没有提供 else 则 panic.
各分支的 aysnc expression 可以执行复杂的计算,只要结果是 Future 对象即可:
use tokio::net::TcpStream;
use tokio::sync::oneshot;
#[tokio::main]
async fn main() {
let (tx, rx) = oneshot::channel();
// Spawn a task that sends a message over the oneshot
tokio::spawn(async move {
tx.send("done").unwrap();
});
tokio::select! {
socket = TcpStream::connect("localhost:3465") => {
println!("Socket connected {:?}", socket);
}
msg = rx => {
println!("received message first {:?}", msg);
}
}
}
// 另一个例子
use tokio::net::TcpListener;
use tokio::sync::oneshot;
use std::io;
#[tokio::main]
async fn main() -> io::Result<()> {
let (tx, rx) = oneshot::channel();
tokio::spawn(async move {
tx.send(()).unwrap();
});
let mut listener = TcpListener::bind("localhost:3465").await?;
// 并发执行两个 branch, 直到 rx 收到消息或者 accept 出错.
tokio::select! {
// 使用 async {} 来定义一个 async expression, 该 expression 返回 Result
// select! 会 .await 这个 expression 和 rx, 当 rx 返回时该 expression
// 才不会被继续 .await
_ = async {
loop {
// listener 出错时才返回
let (socket, _) = listener.accept().await?;
tokio::spawn(async move { process(socket) });
}
// Help the rust type inferencer out
Ok::<_, io::Error>(())
} => {}
_ = rx => {
println!("terminating accept loop");
}
}
Ok(())
}
async expression 返回值支持 pattern match, 并且可以有一个 else 分支, 当所有 pattern match 都不匹配时, 才执行 else 分支.
use tokio::sync::mpsc;
#[tokio::main]
async fn main() {
let (mut tx1, mut rx1) = mpsc::channel(128);
let (mut tx2, mut rx2) = mpsc::channel(128);
tokio::spawn(async move {
// Do something w/ `tx1` and `tx2`
});
// 这里 drop 的是 recv() 返回的 Future,而非 rx1 和 rx2 对象本身
tokio::select! {
Some(v) = rx1.recv() => {
println!("Got {:?} from rx1", v);
}
Some(v) = rx2.recv() => {
println!("Got {:?} from rx2", v);
}
else => {
println!("Both channels closed");
}
}
}
select! 宏有返回值, 各 branch 必须返回相同类型值:
async fn computation1() -> String {
// .. computation
}
async fn computation2() -> String {
// .. computation
}
#[tokio::main]
async fn main() {
let out = tokio::select! {
res1 = computation1() => res1,
res2 = computation2() => res2,
};
println!("Got = {}", out);
}
Errors:
- 如果 async exprssion 中有 ? 则表达式返回 Result, 例如 accept().await? 出错时,res 为 Result::Err, 这时 handler 返回该 Error。
- 如果是 handler 中有 ?, 则会立即将 error 传递到 select 表达式外, 如 res? 会将错误传递到 main() 函数;
use tokio::net::TcpListener;
use tokio::sync::oneshot;
use std::io;
#[tokio::main]
async fn main() -> io::Result<()> {
// [setup `rx` oneshot channel]
let listener = TcpListener::bind("localhost:3465").await?;
tokio::select! {
res = async {
loop {
// await 结果为 Err 时,表达式返回,将 res 设置为 Err
let (socket, _) = listener.accept().await?;
tokio::spawn(async move { process(socket) });
}
// Help the rust type inferencer out
Ok::<_, io::Error>(())
} => {
// handler 内 res 为 Err 时,? 表达式直接将 Err 返回给 select! 所在
// 的函数。
res?;
}
_ = rx => {
println!("terminating accept loop");
}
}
Ok(())
}
Borrowing: 对于 tokio::spawn(async {..}) 必须 move 捕获要使用的数据, 但是对于 select! 的多 branch async expression 则不需要, 只要遵守 borrow 的规则即可,比如同时访问 & 共享数据, 唯一访问 &mut 数据。这是因为 tokio 在一个单线程中异步并发 poll 所有表达式返回的 Future 对象。select! 必须异步并发执行所有 aysnc expression, 并且当第一个 expression 返回的结果匹配 pattern 时才 drop 其它正在执行的 async expression。
use tokio::io::AsyncWriteExt;
use tokio::net::TcpStream;
use std::io;
use std::net::SocketAddr;
async fn race(
data: &[u8],
addr1: SocketAddr,
addr2: SocketAddr
) -> io::Result<()> {
tokio::select! {
// OK:两个 async expression 共享借用 data
Ok(_) = async {
let mut socket = TcpStream::connect(addr1).await?;
socket.write_all(data).await?;
Ok::<_, io::Error>(())
} => {}
Ok(_) = async {
let mut socket = TcpStream::connect(addr2).await?;
socket.write_all(data).await?;
Ok::<_, io::Error>(())
} => {}
else => {}
};
Ok(())
}
#[tokio::main]
async fn main() {
let (tx1, rx1) = oneshot::channel();
let (tx2, rx2) = oneshot::channel();
let mut out = String::new();
tokio::spawn(async move {
// Send values on `tx1` and `tx2`.
});
// 由于 select 保证只执行一个 handler, 所以多个 handler 中可以 &mut
// 使用相同的数据。
tokio::select! {
_ = rx1 => {
out.push_str("rx1 completed");
}
_ = rx2 => {
out.push_str("rx2 completed");
}
}
println!("{}", out);
}
Loop: 并发执行多个 async expression, 当它们都 recv() 返回时, select 随机选择一个 branch 来执行 handler, 未执行的 handler branch 的 message 也不会丢(称为 Cancellation safety).
use tokio::sync::mpsc;
#[tokio::main]
async fn main() {
let (tx1, mut rx1) = mpsc::channel(128);
let (tx2, mut rx2) = mpsc::channel(128);
let (tx3, mut rx3) = mpsc::channel(128);
loop {
// 当 rx1.recv() 返回时, drop 的是 rx2.recv() 和 rx3.recv() 的 Future
// 对象, 而不是 rx2 和 rx3, 所以下一次 loop 还可以继续调用.
// 当 channel 被 close , .recv() 返回 None, 不匹配 Some, 所以其它 rx
// 还是继续进行, 直到所有 rx 都被关闭.
// 注意: std::sync::mpsc 的 Receiver.recv() 返回 Result, 而
// tokio::sync::mpsc.recv() 返回 Option.
let msg = tokio::select! {
Some(msg) = rx1.recv() => msg,
Some(msg) = rx2.recv() => msg,
Some(msg) = rx3.recv() => msg,
else => { break }
};
println!("Got {:?}", msg);
}
println!("All channels have been closed.");
}
Cancellation safety
: 在 loop 中使用 select! 来从多个 branch source 接收 message 时, 需要确保接收调用被 cancel 时不会丢失 message.
以下方法是 cancellation safe 的:
- tokio::sync::mpsc::Receiver::recv
- tokio::sync::mpsc::UnboundedReceiver::recv
- tokio::sync::broadcast::Receiver::recv
- tokio::sync::watch::Receiver::changed
- tokio::net::TcpListener::accept
- tokio::net::UnixListener::accept
- tokio::signal::unix::Signal::recv
- tokio::io::AsyncReadExt::read on any AsyncRead
- tokio::io::AsyncReadExt::read_buf on any AsyncRead
- tokio::io::AsyncWriteExt::write on any AsyncWrite
- tokio::io::AsyncWriteExt::write_buf on any AsyncWrite
- tokio_stream::StreamExt::next on any Stream
- futures::stream::StreamExt::next on any Stream
以下方法不是 cancellation safe 的, 有可能会导致丢失数据:
- tokio::io::AsyncReadExt::read_exact
- tokio::io::AsyncReadExt::read_to_end
- tokio::io::AsyncReadExt::read_to_string
- tokio::io::AsyncWriteExt::write_all
以下方法也不是 cancellation safe 的, 因为它们使用 queue 来保证公平, cancel 时会丢失 queue 中的数据:
- tokio::sync::Mutex::lock
- tokio::sync::RwLock::read
- tokio::sync::RwLock::write
- tokio::sync::Semaphore::acquire
- tokio::sync::Notify::notified
如何判断方法是 cancellation safe?主要是看 .await 位置。因为当异步方法被 cancelled 时,总是在 .await 的位置被取消。当方法的 .await 位置被重启时,如果函数功能还正常,那么就是 cancellation safe 的。
Cancellation safety 定义的方式:如果一个 Future 还没有完成,在 drop 这个 Future 或重建它时,它的行为必须是 no-op 的。也就是当一个 Future 未 ready 时, drop 该Future 什么都不影响。 这也是 loop 中使用 select!的要求。没有这个要求的话,在 loop 中重新执行 select! 时,会重新进展。
pin!(future) 宏的实现方式等效于 std::pin::Pin::new(&mut future), 它存入 future 对象的 &mut 借用(所以 future 类型需要是 mut 类型),返回一个 Pin<Ptr> 类型的同名值 pinned,其中 Ptr 是 &mut impl Future<Output=xx> 类型:
- pinned.as_ref() 方法返回 Pin<&<Ptr as Deref>::Target> 即 Pin<& impl Future<Output=xx>> 类型;
- pinned.as_mut() 方法返回 Pin<&mut <Ptr as Deref>::Target> 即 Pin<&mut impl Future<Output=xx>> 类型;
在 loop+select!{} 宏中,要对 future 对象重复 poll,而 select!{} 在并发 .await 时,如果有返回值,则会 drop 其它 branch 的 Future 对象,所以在 select branch 中不能直接使用 future 对象,以避免它被 drop 后下一次 loop 失败 select 失败。
常见的解决办法是:
- 创建一个 future mut 对象;
- 使用 pin!(future) 来创建一个同名的,但类型是 Pin<&mut impl Future<Output=xx>> 的对象 future;
- 在 select branch 中使用 &mut future 来进行轮询。(这是因为 Future 的 poll() 方法的 self 签名是self: Pin<&mut Self>)
async fn action() {
// Some asynchronous logic
}
#[tokio::main]
async fn main() {
let (mut tx, mut rx) = tokio::sync::mpsc::channel(128);
let operation = action(); // 返回一个 Future 对象, 没有对它 .await
// 必须先 Pin 该 Future 对象,创建一个同名的类型为
// Pin<&mut impl Future<Output=i32> 类型对象 ;
tokio::pin!(operation);
loop {
tokio::select! {
// &mut operation 类型是已经 Pin 住类型的 &mut 借用,
// 即 &mut Pin<&mut impl Future<Output=i32>
// .await 表达式支持这种类型的表达式调用, 所以可以被 poll 轮询。
//
// 当该 branch 因不匹配或无返回值而被 drop/cancel 时,operation 任然有效,
// 下一次 select!{} 可以接续使用 &mut operation
_ = &mut operation => break,
Some(v) = rx.recv() => {
if v % 2 == 0 {
break;
}
}
}
}
}
Modifying a branch
- async fn 一旦 completion 就不能再 resume 执行: 一般情况下, future.await? 会消耗 future 对象.
- branch 可以使用 if 表达式先判断是否需要执行, 如: &mut operation, if !done => {xxx};
async fn action(input: Option<i32>) -> Option<String> {
// If the input is `None`, return `None`.
// This could also be written as `let i = input?;`
let i = match input {
Some(input) => input,
None => return None,
};
// async logic here
}
#[tokio::main]
async fn main() {
let (mut tx, mut rx) = tokio::sync::mpsc::channel(128);
let mut done = false;
let operation = action(None);
tokio::pin!(operation);
tokio::spawn(async move {
let _ = tx.send(1).await;
let _ = tx.send(3).await;
let _ = tx.send(2).await;
});
loop {
tokio::select! {
// 先判断 done, 为 true 时才执行 async expression. 这里使用的是
// &mut 引用, 所以 .await 并不会消耗对象, operation 在下一次
// loop 还存在.
res = &mut operation, if !done => {
done = true;
if let Some(v) = res {
println!("GOT = {}", v);
return;
}
}
Some(v) = rx.recv() => {
if v % 2 == 0 {
// `.set` is a method on `Pin`.
operation.set(action(Some(v)));
done = false;
}
}
}
}
}
tokio::spawn 和 select! 的区别:
- 都可以并发执行都各异步任务,但是 spawn() 是 tokio runtime 调度执行,而且可能位于其它线程上,所以调度的 async task 必须满足 Future + Send + Sync + ‘static ,而 ‘static 意味着 task 中不能有共享借用(需要 move);
- select! 在单个 task 线程上执行所有 branch 的异步任务,所以它们不可能并发执行,而是串行的。(比如一个 async expression 未 Ready 时,执行另一个 branch 的 async expression).
9 tokio-stream #
Stream 是 std::iter::Iterator 的异步版本, 返回一系列 value。当前,Stream 还不在 Rust 标准库中, 是 futures-core
crate 定义的 Stream trait 类型。
// https://docs.rs/futures-core/0.3.30/futures_core/stream/trait.Stream.html
pub trait Stream {
type Item;
// Required method
fn poll_next(
self: Pin<&mut Self>,
cx: &mut Context<'_>
) -> Poll<Option<Self::Item>>;
// Provided method
fn size_hint(&self) -> (usize, Option<usize>) { ... }
}
// poll_next() 方法返回值:
// + Ready<Some(Item)>: 返回下一个值;
// + Ready<None>: Stream 结束;
// + Pending: 值不 Ready;
tokio 在单独的 tokio-stream crate
中提供 Stream 支持,它通过 pub use futures_core::Stream;
来 re-export futures-core crate 定义的 Stream trait。
tokio_stream crate 提供了如下 module 函数:
- empty: Creates a stream that yields nothing.
- iter: Converts an Iterator into a Stream which is always ready to yield the next value.
- once: Creates a stream that emits an element exactly once.
- pending: Creates a stream that is never ready
tokio_stream::StreamExt(实际是 futures::StreamExt 的 pub use 导出)
是 futures_core::stream::Stream 子 trait, 提供了常用的额外 trait 方法, 包括各种 adapter 方法(如 map/filter 等), 以及 用于迭代的 next() 方法
:
- 实现 Stream 的类型也自动实现了 StreamExt。
pub trait StreamExt: Stream {
// next() 迭代返回下一个元素,返回的 Next 类型对象实现了 Future trait,
// .await 时返回 Option
fn next(&mut self) -> Next<'_, Self> where Self: Unpin
fn try_next<T, E>(&mut self) -> TryNext<'_, Self>
where Self: Stream<Item = Result<T, E>> + Unpin { ... }
fn map<T, F>(self, f: F) -> Map<Self, F>
where F: FnMut(Self::Item) -> T, Self: Sized
fn map_while<T, F>(self, f: F) -> MapWhile<Self, F>
where F: FnMut(Self::Item) -> Option<T>,
Self: Sized { ... }
fn then<F, Fut>(self, f: F) -> Then<Self, Fut, F>
where F: FnMut(Self::Item) -> Fut,
Fut: Future,
Self: Sized { ... }
fn merge<U>(self, other: U) -> Merge<Self, U>
where U: Stream<Item = Self::Item>,
Self: Sized { ... }
fn filter<F>(self, f: F) -> Filter<Self, F>
where F: FnMut(&Self::Item) -> bool,
Self: Sized { ... }
fn filter_map<T, F>(self, f: F) -> FilterMap<Self, F>
where F: FnMut(Self::Item) -> Option<T>,
Self: Sized { ... }
fn fuse(self) -> Fuse<Self>
where Self: Sized { ... }
fn take(self, n: usize) -> Take<Self>
where Self: Sized { ... }
fn take_while<F>(self, f: F) -> TakeWhile<Self, F>
where F: FnMut(&Self::Item) -> bool,
Self: Sized { ... }
fn skip(self, n: usize) -> Skip<Self>
where Self: Sized { ... }
fn skip_while<F>(self, f: F) -> SkipWhile<Self, F>
where F: FnMut(&Self::Item) -> bool,
Self: Sized { ... }
fn all<F>(&mut self, f: F) -> AllFuture<'_, Self, F>
where Self: Unpin,
F: FnMut(Self::Item) -> bool { ... }
fn any<F>(&mut self, f: F) -> AnyFuture<'_, Self, F>
where Self: Unpin,
F: FnMut(Self::Item) -> bool { ... }
fn chain<U>(self, other: U) -> Chain<Self, U>
where U: Stream<Item = Self::Item>,
Self: Sized { ... }
fn fold<B, F>(self, init: B, f: F) -> FoldFuture<Self, B, F>
where Self: Sized,
F: FnMut(B, Self::Item) -> B { ... }
fn collect<T>(self) -> Collect<Self, T> where T: FromStream<Self::Item>, Self: Sized
// Timeout 实现了 Future trait,.await 时返回
// Result<<T as Future>::Output, Elapsed>,
// 当 await 超时时返回 Elapsed Error。所以,可以作为一种通用的异步超时机制。
fn timeout(self, duration: Duration) -> Timeout<Self> where Self: Sized
fn timeout_repeating(self, interval: Interval) -> TimeoutRepeating<Self> where Self: Sized
fn throttle(self, duration: Duration) -> Throttle<Self> where Self: Sized
fn chunks_timeout( self, max_size: usize, duration: Duration ) -> ChunksTimeout<Self> where Self: Sized
fn peekable(self) -> Peekable<Self> where Self: Sized
}
示例:
// StreamExt trait 为 Stream 提供了常用的 next() 方法。
use tokio_stream::{self as stream, StreamExt};
#[tokio::main]
async fn main() {
// empty() 迭代式立即结束
let mut none = stream::empty::<i32>();
assert_eq!(None, none.next().await);
// iter() 函数将一个 Iterator 转换为 Stream
let mut stream = stream::iter(vec![17, 19]);
assert_eq!(stream.next().await, Some(17));
assert_eq!(stream.next().await, Some(19));
assert_eq!(stream.next().await, None);
// once() 函数返回只能迭代生成一个元素的 stream
let mut one = stream::once(1);
assert_eq!(Some(1), one.next().await);
assert_eq!(None, one.next().await);
// pending() 函数返回一个迭代式 pending 的 stream
let mut never = stream::pending::<i32>();
// This will never complete
never.next().await;
unreachable!();
}
// timeout() 示例
use tokio_stream::{self as stream, StreamExt};
use std::time::Duration;
let int_stream = int_stream.timeout(Duration::from_secs(1));
tokio::pin!(int_stream);
// When no items time out, we get the 3 elements in succession:
assert_eq!(int_stream.try_next().await, Ok(Some(1)));
assert_eq!(int_stream.try_next().await, Ok(Some(2)));
assert_eq!(int_stream.try_next().await, Ok(Some(3)));
assert_eq!(int_stream.try_next().await, Ok(None));
Stream 没有实现迭代器, 不支持 for-in 迭代, 只能在 StreamExt 的基础上使用 while-let 循环来迭代:
use tokio_stream::{self as stream, StreamExt};
#[tokio::main]
async fn main() {
let mut stream = stream::iter(vec![0, 1, 2]);
while let Some(value) = stream.next().await {
println!("Got {}", value);
}
}
module tokio_stream::wrappers
提供 转换为 Stream
的各种 wrappers 类型:
- BroadcastStream A wrapper around tokio::sync::broadcast::Receiver that implements Stream.
- CtrlBreakStreaml A wrapper around CtrlBreak that implements Stream.
- CtrlCStream A wrapper around CtrlC that implements Stream.
- IntervalStream A wrapper around Interval that implements Stream.
- LinesStream A wrapper around tokio::io::Lines that implements Stream.
- ReadDirStream A wrapper around tokio::fs::ReadDir that implements Stream.
- ReceiverStream A wrapper around tokio::sync::mpsc::Receiver that implements Stream.
- SignalStream A wrapper around Signal that implements Stream.
- SplitStream A wrapper around tokio::io::Split that implements Stream.
- TcpListenerStream A wrapper around TcpListener that implements Stream.
- UnboundedReceiverStream A wrapper around tokio::sync::mpsc::UnboundedReceiver that implements Stream.
- UnixListenerStream A wrapper around UnixListener that implements Stream.
- WatchStream A wrapper around tokio::sync::watch::Receiver that implements Stream.
use tokio_stream::{StreamExt, wrappers::WatchStream};
use tokio::sync::watch;
let (tx, rx) = watch::channel("hello");
let mut rx = WatchStream::new(rx); // 从 Watch 创建一个 WatchStream
assert_eq!(rx.next().await, Some("hello"));
tx.send("goodbye").unwrap();
assert_eq!(rx.next().await, Some("goodbye"));
// 复杂的例子
use tokio_stream::StreamExt;
use mini_redis::client;
async fn publish() -> mini_redis::Result<()> {
let mut client = client::connect("127.0.0.1:6379").await?;
client.publish("numbers", "1".into()).await?;
client.publish("numbers", "two".into()).await?;
client.publish("numbers", "3".into()).await?;
client.publish("numbers", "four".into()).await?;
client.publish("numbers", "five".into()).await?;
client.publish("numbers", "6".into()).await?;
Ok(())
}
async fn subscribe() -> mini_redis::Result<()> {
let client = client::connect("127.0.0.1:6379").await?;
let subscriber = client.subscribe(vec!["numbers".to_string()]).await?;
let messages = subscriber.into_stream(); // 返回一个 Stream 对象
// next() 要求 message Stream 必须是 Pinned
tokio::pin!(messages); // Pin 到 stack
while let Some(msg) = messages.next().await {
println!("got = {:?}", msg);
}
Ok(())
}
#[tokio::main]
async fn main() -> mini_redis::Result<()> {
tokio::spawn(async {
publish().await
});
subscribe().await?;
println!("DONE");
Ok(())
}
Adapters 从 Stream 生成新的 Stream, 如 map/take/filter:
let messages = subscriber
.into_stream()
.take(3);
let messages = subscriber
.into_stream()
.filter(|msg| match msg {
Ok(msg) if msg.content.len() == 1 => true,
_ => false,
})
.take(3);
let messages = subscriber
.into_stream()
.filter(|msg| match msg {
Ok(msg) if msg.content.len() == 1 => true,
_ => false,
})
.map(|msg| msg.unwrap().content)
.take(3);
实现一个 Stream:
- 能返回值时, poll_next() 返回 Poll::Ready, 否则返回 Poll::Pending;
- Ready<Some<T>>: 迭代值,Ready<None> 迭代结束;
- 一般在 poll_next() 内部需要使用 Future 和其他 Stream 来实现;
use tokio_stream::Stream;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::Duration;
struct Interval {
rem: usize,
delay: Delay,
}
impl Interval {
fn new() -> Self {
Self {
rem: 3,
delay: Delay { when: Instant::now() }
}
}
}
impl Stream for Interval {
type Item = ();
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<()>>
{
if self.rem == 0 {
return Poll::Ready(None);
}
match Pin::new(&mut self.delay).poll(cx) {
Poll::Ready(_) => {
let when = self.delay.when + Duration::from_millis(10);
self.delay = Delay { when };
self.rem -= 1;
Poll::Ready(Some(()))
}
Poll::Pending => Poll::Pending,
}
}
}
9.1 ReaderStream 和 StreamReader #
tokio_util crate 提供了两个 Stream/Reader 相关的 struct 类型。
- ReaderStream:将一个 AsyncRead 转换为 byte chunks Stream。ReaderStream 实现了 Stream trait,迭代返回的元素类型为 Bytes,即一块连续的 u8 内存区域。
// ReaderStream 类型关联函数 impl<R: AsyncRead> ReaderStream<R> // Convert an AsyncRead into a Stream with item type // Result<Bytes, std::io::Error>. pub fn new(reader: R) -> Self // Convert an AsyncRead into a Stream with item type // Result<Bytes, std::io::Error>, with a // specific read buffer initial capacity. pub fn with_capacity(reader: R, capacity: usize) -> Self // ReaderStream 实现了 Stream trait,迭代返回的元素类型为 Bytes, // 即一块连续的 u8 内存区域 impl<R: AsyncRead> Stream for ReaderStream<R> type Item = Result<Bytes, Error> // 示例 use tokio_stream::StreamExt; use tokio_util::io::ReaderStream; let data = b"hello, world!"; // &[u8] 实现了 AsyncRead trait let mut stream = ReaderStream::new(&data[..]); // Read all of the chunks into a vector. let mut stream_contents = Vec::new(); // chunk 类型为 bytes::Bytes, 可以 Deref<[u8]> 来使用 while let Some(chunk) = stream.next().await { stream_contents.extend_from_slice(&chunk?); } // Once the chunks are concatenated, we should have the original data. assert_eq!(stream_contents, data);
- StreamReader:将一个 byte chunks Stream(Stream 迭代返回的 Item 类型为 Result<bytes::Buf, E>)转换为 AsyncRead,它同时实现了 AsyncBufRead trait。
impl<S, B, E> StreamReader<S, B> where S: Stream<Item = Result<B, E>>, B: Buf, E: Into<Error>, // new() 从 Stream 创建一个 StreamReader,Stream 需要每次迭代返回一个 // Result<bytes::Buf, E> 对象 // // Bytes, BytesMut, &[u8], VecDeque<u8> 等(不含 Vec<u8>)均实现 bytes::Buf pub fn new(stream: S) -> Self // 示例 use bytes::Bytes; use tokio::io::{AsyncReadExt, Result}; use tokio_util::io::StreamReader; // Create a stream from an iterator. let stream = tokio_stream::iter(vec![ // Bytes 实现了 Buf trait Result::Ok(Bytes::from_static(&[0, 1, 2, 3])), Result::Ok(Bytes::from_static(&[4, 5, 6, 7])), Result::Ok(Bytes::from_static(&[8, 9, 10, 11])), ]); // Convert it to an AsyncRead. // stream 迭代的 Item 要实现 bytes::Buf trait let mut read = StreamReader::new(stream); // Read five bytes from the stream. let mut buf = [0; 5]; read.read_exact(&mut buf).await?; assert_eq!(buf, [0, 1, 2, 3, 4]); // Read the rest of the current chunk. assert_eq!(read.read(&mut buf).await?, 3); assert_eq!(&buf[..3], [5, 6, 7]); // Read the next chunk. assert_eq!(read.read(&mut buf).await?, 4); assert_eq!(&buf[..4], [8, 9, 10, 11]); // We have now reached the end. assert_eq!(read.read(&mut buf).await?, 0);
10 futures_util crate #
futures_util
crate 提供了 Sink 和 SinkExt trait 的定义:
- Sink : A Sink is a value into which other values can be sent asynchronously;
- SinkExt : An extension trait for Sinks that provides a variety of convenient combinator functions;
常用的是 SinkExt 提供的方法, 如 send()/send_all()。
Sink 和 SinkExt 在 Frame 中得到广泛应用。
pub trait Sink<Item> {
type Error;
// Required methods
fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_> ) -> Poll<Result<(), Self::Error>>;
fn start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error>;
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_> ) -> Poll<Result<(), Self::Error>>;
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_> ) -> Poll<Result<(), Self::Error>>;
}
pub trait SinkExt<Item>: Sink<Item> {
// Provided methods
fn with<U, Fut, F, E>(self, f: F) -> With<Self, Item, U, Fut, F> where F: FnMut(U) -> Fut, Fut: Future<Output = Result<Item, E>>, E: From<Self::Error>, Self: Sized { ... }
fn with_flat_map<U, St, F>(self, f: F) -> WithFlatMap<Self, Item, U, St, F> where F: FnMut(U) -> St, St: Stream<Item = Result<Item, Self::Error>>, Self: Sized { ... }
fn sink_map_err<E, F>(self, f: F) -> SinkMapErr<Self, F> where F: FnOnce(Self::Error) -> E, Self: Sized { ... }
fn sink_err_into<E>(self) -> SinkErrInto<Self, Item, E> where Self: Sized, Self::Error: Into<E> { ... }
fn buffer(self, capacity: usize) -> Buffer<Self, Item> where Self: Sized { ... }
fn close(&mut self) -> Close<'_, Self, Item> where Self: Unpin { ... }
fn fanout<Si>(self, other: Si) -> Fanout<Self, Si> where Self: Sized, Item: Clone, Si: Sink<Item, Error = Self::Error> { ... }
fn flush(&mut self) -> Flush<'_, Self, Item> where Self: Unpin { ... }
fn send(&mut self, item: Item) -> Send<'_, Self, Item> where Self: Unpin { ... }
fn feed(&mut self, item: Item) -> Feed<'_, Self, Item> where Self: Unpin { ... }
fn send_all<'a, St>(&'a mut self, stream: &'a mut St ) -> SendAll<'a, Self, St> where St: TryStream<Ok = Item, Error = Self::Error> + Stream + Unpin + ?Sized, Self: Unpin { ... }
fn left_sink<Si2>(self) -> Either<Self, Si2> where Si2: Sink<Item, Error = Self::Error>, Self: Sized { ... }
fn right_sink<Si1>(self) -> Either<Si1, Self> where Si1: Sink<Item, Error = Self::Error>, Self: Sized { ... }
fn compat(self) -> CompatSink<Self, Item> where Self: Sized + Unpin { ... }
fn poll_ready_unpin( &mut self, cx: &mut Context<'_> ) -> Poll<Result<(), Self::Error>> where Self: Unpin { ... }
fn start_send_unpin(&mut self, item: Item) -> Result<(), Self::Error> where Self: Unpin { ... }
fn poll_flush_unpin( &mut self, cx: &mut Context<'_> ) -> Poll<Result<(), Self::Error>> where Self: Unpin { ... }
fn poll_close_unpin( &mut self, cx: &mut Context<'_> ) -> Poll<Result<(), Self::Error>> where Self: Unpin { ... }
}
11 tokio_util crate #
tokio_util 提供了如下 module、类型和功能:
- tokio_util::codec module 提供了 Encoder、Decoder trait,以及实现它们的 FramedRead、FramedWrite、Framed、LinesCodec、BytesCodec、AnyDelimiterCodec struct 类型。
- tokio_util::io module 提供了 InspectReader、InspectWriter、ReaderStream、StreamReader 类型;
- tokio_util::net module 提供了 Listener trait 和 ListenerAcceptFut struct 类型;
- tokio_util::sync module 提供了 CancellationToken struct 类型;
- tokio_util::task::task_tracker module 提供了 TaskTracker、TaskTrackerToken 等类型;
- tokio_util::time::delay_queue module 提供了 DelayQueue 类型;
- tokio_util::udp module 提供了 UdpFramed 类型;
Module tokio_util::codec
提供了将 AsyncRead/AsyncWrite 转换为 Stream/Sink, 并且将 byte stream sequence 转换为 有意义的 chunks 即 frames
的能力:
- struct FrameWrite
- A
Sink
of frames encoded to anAsyncWrite
. 当作 Sink 来发送数据并编码为 Frame; - struct FrameRead
- A
Stream
of messages decoded from anAsyncRead
.当作 Stream 来读取数据并解码为 Frame; - struct Framed
- A
unified Stream and Sink
interface to an underlyingI/O object, using the Encoder and Decoder traits to encode and decode frames.
Stream 从 AsyncRead 转换而来, 它的 StreamExt trait 提供的 next() 方法一次返回一个 chunk (bytes::Bytes 类型) 。
Sink 从 AsyncWrite 转换而来, 它的 SinkExt trait 提供的 send()/send_all()/feed() 方法可以用于发送数据。
在创建 FrameWrite/FrameRead/Framed 时需要传入实现 Encoder 和 Decoder trait 对象, 用于从 Stream 中解码出 Frame 对象, 向 Sink 中写入编码的 Frame 对象.
tokio_util::codec 提供了如下实现 Encoder 和 Decoder trait 的类型:
AnyDelimiterCodec
: A simple Decoder and Encoder implementation that splits up data into chunks based on any character in the given delimiter string.BytesCodec
: A simple Decoder and Encoder implementation that just ships bytes around.LinesCodec
: A simple Decoder and Encoder implementation that splits up data into lines.
// encoding
use futures::sink::SinkExt;
use tokio_util::codec::LinesCodec;
use tokio_util::codec::FramedWrite;
#[tokio::main]
async fn main() {
let buffer = Vec::new();
let messages = vec!["Hello", "World"];
let encoder = LinesCodec::new();
// buffer 实现了AsyncWrite,故可以作为 FramedWrite::new() 参数,
// 返回的 writer 实现了 Sink 和 SinkExt trait。
let mut writer = FramedWrite::new(buffer, encoder);
// 异步向 Sink 写数据,自动编码为 Frame
writer.send(messages[0]).await.unwrap();
writer.send(messages[1]).await.unwrap();
let buffer = writer.get_ref();
assert_eq!(buffer.as_slice(), "Hello\nWorld\n".as_bytes());
}
// decoding
use tokio_stream::StreamExt;
use tokio_util::codec::LinesCodec;
use tokio_util::codec::FramedRead;
#[tokio::main]
async fn main() {
let message = "Hello\nWorld".as_bytes();
let decoder = LinesCodec::new();
// message 实现了 AsyncRead,故可以作为 FrameRead::new() 参数,
// 返回的 reader 实现了 Stream 和 StreamExt。
// 每次读取返回一个解码后的 frame。
let mut reader = FramedRead::new(message, decoder);
let frame1 = reader.next().await.unwrap().unwrap();
let frame2 = reader.next().await.unwrap().unwrap();
assert!(reader.next().await.is_none());
assert_eq!(frame1, "Hello");
assert_eq!(frame2, "World");
}
FrameReader 从 AsyncRead 中使用 decoder 解码出 Frame 的过程大概如下:
use tokio::io::AsyncReadExt;
// buf 是内部带读写指针的缓存,实现了 AsyncRead trait 和 bytes::BufMut trait.
let mut buf = bytes::BytesMut::new();
loop {
// The read_buf call will append to buf rather than overwrite existing data.
// read_buf 方法使用 &mut bytes::BufMut
// 从 io_resource 中读取一段数据(长度未知)后写入 buf.
let len = io_resource.read_buf(&mut buf).await?;
if len == 0 {
while let Some(frame) = decoder.decode_eof(&mut buf)? {
yield frame;
}
break;
}
// 解码出 frame:如果 buf 中数据不足,返回 None,触发下一次 loop 读数据。
while let Some(frame) = decoder.decode(&mut buf)? {
yield frame;
}
}
FrameWriter 向 Sink 中写入使用 encoder 编码的数据的大概过程如下:
use tokio::io::AsyncWriteExt;
use bytes::Buf;
const MAX: usize = 8192;
let mut buf = bytes::BytesMut::new(); // buf 是内部带读写指针的缓存
loop {
tokio::select! {
// 持续向 buf 写数据
num_written = io_resource.write(&buf), if !buf.is_empty() => {
buf.advance(num_written?);
},
// 从 buf 数据生成一个 frame
frame = next_frame(), if buf.len() < MAX => {
encoder.encode(frame, &mut buf)?;
},
_ = no_more_frames() => {
io_resource.write_all(&buf).await?;
io_resource.shutdown().await?;
return Ok(());
},
}
}
也可以为自定义类型实现 Encoder 和 Decoder trait, 从而可以在 Frame 中使用:
- decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error>
- encode(&mut self, item: String, dst: &mut BytesMut) -> Result<(), Self::Error>
都是用 BytesMut 来读写 Frame 数据。
// 实现 Decoder trait, 从输入的 src: &mut BytesMut 中解析出 Item 类型对象
use tokio_util::codec::Decoder;
use bytes::{BytesMut, Buf};
struct MyStringDecoder {}
const MAX: usize = 8 * 1024 * 1024;
impl Decoder for MyStringDecoder {
type Item = String;
type Error = std::io::Error;
fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
if src.len() < 4 {
// Not enough data to read length marker.
return Ok(None);
}
// Read length marker.
let mut length_bytes = [0u8; 4];
length_bytes.copy_from_slice(&src[..4]);
let length = u32::from_le_bytes(length_bytes) as usize;
// Check that the length is not too large to avoid a denial of
// service attack where the server runs out of memory.
if length > MAX {
return Err(std::io::Error::new(
std::io::ErrorKind::InvalidData,
format!("Frame of length {} is too large.", length)
));
}
const HEADER_SIZE: usize = 4;
if src.len() < HEADER_SIZE + length {
let missing_bytes = HEADER_SIZE + length - src.len();
src.reserve(missing_bytes);
return Ok(None);
}
// Use advance to modify src such that it no longer contains
// this frame.
let data = src[4..4 + length].to_vec();
src.advance(4 + length);
// Convert the data to a string, or fail if it is not valid utf-8.
match String::from_utf8(data) {
Ok(string) => Ok(Some(string)),
Err(utf8_error) => {
Err(std::io::Error::new(
std::io::ErrorKind::InvalidData,
utf8_error.utf8_error(),
))
},
}
}
}
// 实现 Encoder trait
use tokio_util::codec::Encoder;
use bytes::BytesMut;
struct MyStringEncoder {}
const MAX: usize = 8 * 1024 * 1024;
impl Encoder<String> for MyStringEncoder {
type Error = std::io::Error;
fn encode(&mut self, item: String, dst: &mut BytesMut) -> Result<(), Self::Error> {
// Don't send a string if it is longer than the other end will
// accept.
if item.len() > MAX {
return Err(std::io::Error::new(
std::io::ErrorKind::InvalidData,
format!("Frame of length {} is too large.", item.len())
));
}
// Convert the length into a byte array.
// The cast to u32 cannot overflow due to the length check above.
let len_slice = u32::to_le_bytes(item.len() as u32);
// Reserve space in the buffer.
dst.reserve(4 + item.len());
// Write the length and string to the buffer.
dst.extend_from_slice(&len_slice);
dst.extend_from_slice(item.as_bytes());
Ok(())
}
}
Module tokio_util::time
提供了 DelayQueue
类型:
- pub fn insert_at(&mut self, value: T, when: Instant) -> Key: 插入元素, 并指定过期的绝对时间;
- pub fn insert(&mut self, value: T, timeout: Duration) -> Key: 插入元素, 并指定超时时间;
- pub fn poll_expired(&mut self, cx: &mut Context<’_>) -> Poll<Option<Expired<T>>>: 返回下一个过期的元素的 key, 需要使用 futures::ready!() 来 poll 返回的 Poll 对象。(不能使用 .await, 因为 Poll 类型没有实现 Future),如果没有过期元素则返回 Pending。
通过 insert 返回的 Key, 后续可以查询/删除/重置对应的元素;
use tokio_util::time::DelayQueue;
use std::time::Duration;
let mut delay_queue = DelayQueue::new();
let key1 = delay_queue.insert("foo", Duration::from_secs(5));
let key2 = delay_queue.insert("bar", Duration::from_secs(10));
assert!(delay_queue.deadline(&key1) < delay_queue.deadline(&key2));
// Remove the entry
let item = delay_queue.remove(&key);
assert_eq!(*item.get_ref(), "foo");
// "foo" is scheduled to be returned in 5 seconds
delay_queue.reset_at(&key, Instant::now() + Duration::from_secs(10));
// "foo" is now scheduled to be returned in 10 seconds
// 另一个例子
use tokio_util::time::{DelayQueue, delay_queue};
use futures::ready; // 也可以使用 std::task::ready!() 宏
use std::collections::HashMap;
use std::task::{Context, Poll};
use std::time::Duration;
struct Cache {
entries: HashMap<CacheKey, (Value, delay_queue::Key)>,
expirations: DelayQueue<CacheKey>,
}
const TTL_SECS: u64 = 30;
impl Cache {
fn insert(&mut self, key: CacheKey, value: Value) {
let delay = self.expirations.insert(key.clone(), Duration::from_secs(TTL_SECS));
self.entries.insert(key, (value, delay));
}
fn get(&self, key: &CacheKey) -> Option<&Value> {
self.entries.get(key).map(|&(ref v, _)| v)
}
fn remove(&mut self, key: &CacheKey) {
if let Some((_, cache_key)) = self.entries.remove(key) {
self.expirations.remove(&cache_key);
}
}
fn poll_purge(&mut self, cx: &mut Context<'_>) -> Poll<()> {
// ready!() 宏返回 Poll::Ready 的值, 如果不 Ready 则一致阻塞。
while let Some(entry) = ready!(self.expirations.poll_expired(cx)) {
self.entries.remove(entry.get_ref());
}
Poll::Ready(())
}
}
Trait tokio_util::time::FutureExt
为所有实现了 Future 的对象, 添加 timeout() 方法。对于 Stream,可以使用 StreamExt 提供的 timeout() 方法。
pub trait FutureExt: Future {
// Provided method
fn timeout(self, timeout: Duration) -> Timeout<Self>
where Self: Sized { ... }
}
// 示例
use tokio::{sync::oneshot, time::Duration};
use tokio_util::time::FutureExt;
let (tx, rx) = oneshot::channel::<()>();
let res = rx.timeout(Duration::from_millis(10)).await;
assert!(res.is_err());
12 tokio_test unit testing #
https://tokio.rs/tokio/topics/testing
#[tokio::test]
默认创建一个单线程的 current_thread runtime。
tokio::time 提供了 pause()/resume()/advance() 方法。
tokio::time::pause(): 将当前 Instant::now() 保存,后续调用 Instant::now() 时将返回保存的值。保存的值可以使用 advance() 修改。该函数只适合 current_thread runtime,也就是 #[tokio::test] 默认使用的 runtime。(这里的 Instant 是 tokio 提供的,标准库 Instant 不受影响。)
自动前进(auto-advance):当 time 被 paused,且当前 runtime 空闲时,clock 会被自动前进到下一个 pending timer。这意味着 Sleep
和其它 time 相关函数、方法被 await 时,会引起 runtime 前景时间。
#[tokio::test]
async fn paused_time() {
tokio::time::pause();
let start = std::time::Instant::now();
tokio::time::sleep(Duration::from_millis(500)).await;
println!("{:?}ms", start.elapsed().as_millis()); // 打印 0ms
}
可以使用属性来更简便的开启 time pause:
#[tokio::main(flavor = "current_thread", start_paused = true)]
async fn main() {
println!("Hello world");
}
#[tokio::test(start_paused = true)]
async fn paused_time() {
let start = std::time::Instant::now();
tokio::time::sleep(Duration::from_millis(500)).await;
println!("{:?}ms", start.elapsed().as_millis());
}
虽然开启了 time pause, 但是异步函数的执行顺序和时间关系还是正常保持的: 立即打印 4 次 “Tick!”
#[tokio::test(start_paused = true)]
async fn interval_with_paused_time() {
let mut interval = interval(Duration::from_millis(300));
let _ = timeout(Duration::from_secs(1), async move {
loop {
interval.tick().await;
println!("Tick!");
}
})
.await;
}
使用 tokio_test::io::Builder 来 Mock AsyncRead and AsyncWrite:
use tokio::io::{AsyncBufReadExt, AsyncRead, AsyncWrite, AsyncWriteExt, BufReader};
async fn handle_connection<Reader, Writer>(
reader: Reader,
mut writer: Writer,
) -> std::io::Result<()>
where
Reader: AsyncRead + Unpin,
Writer: AsyncWrite + Unpin,
{
let mut line = String::new();
let mut reader = BufReader::new(reader);
loop {
if let Ok(bytes_read) = reader.read_line(&mut line).await {
if bytes_read == 0 {
break Ok(());
}
writer
.write_all(format!("Thanks for your message.\r\n").as_bytes())
.await
.unwrap();
}
line.clear();
}
}
#[tokio::test]
async fn client_handler_replies_politely() {
let reader = tokio_test::io::Builder::new()
.read(b"Hi there\r\n")
.read(b"How are you doing?\r\n")
.build();
let writer = tokio_test::io::Builder::new()
.write(b"Thanks for your message.\r\n")
.write(b"Thanks for your message.\r\n")
.build();
let _ = handle_connection(reader, writer).await;
}
13 tokio::net #
提供了和 std 类似的 TCP/UDP/Unix 异步实现。
TcpListener
andTcpStream
UdpSocket
UnixListener
andUnixStream
UnixDatagram
tokio::net::unix::pipe
提供了一个函数 lookup_host()
来进行域名解析:
use tokio::net;
use std::io;
#[tokio::main]
async fn main() -> io::Result<()> {
for addr in net::lookup_host("localhost:3000").await? {
println!("socket address is {}", addr);
}
Ok(())
}
TcpListener 提供了 bind/accept 方法:
- bind() 关联函数返回 TcpListener 类型对象;
- accept() 方法返回
TCPStream
和对端 SocketAddr; - from_std()/into_std() 方法可以在标准库的 TcpListener 间转换, 这样可以
使用标准库创建和设置 TcpListener
, 然后创建 tokio TcpListener.
use std::error::Error;
use tokio::net::TcpListener;
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
let std_listener = std::net::TcpListener::bind("127.0.0.1:0")?;
std_listener.set_nonblocking(true)?;
let listener = TcpListener::from_std(std_listener)?;
Ok(())
}
TcpStream 实现了 AsyncRead/AsyncWrite, 通过 split() 方法可以分别返回读端和写端.
TcpSocket 是未转换为 TcpListener 和 TcpStream 的对象, 用于设置 TCP 相关参数.
-
pub fn new_v4() -> Result<TcpSocket>
-
pub fn new_v6() -> Result<TcpSocket>
-
pub fn bind(&self, addr: SocketAddr) -> Result<()>
-
pub fn bind_device(&self, interface: Option<&[u8]>) -> Result<()>
-
pub async fn connect(self, addr: SocketAddr) -> Result<TcpStream>
-
pub fn listen(self, backlog: u32) -> Result<TcpListener>
-
pub fn set_keepalive(&self, keepalive: bool) -> Result<()>
-
pub fn set_reuseaddr(&self, reuseaddr: bool) -> Result<()>
-
pub fn set_recv_buffer_size(&self, size: u32) -> Result<()>
-
…
use tokio::net::TcpSocket;
use std::io;
#[tokio::main]
async fn main() -> io::Result<()> {
let addr = "127.0.0.1:8080".parse().unwrap();
let socket = TcpSocket::new_v4()?;
// On platforms with Berkeley-derived sockets, this allows to quickly
// rebind a socket, without needing to wait for the OS to clean up the
// previous one.
//
// On Windows, this allows rebinding sockets which are actively in use,
// which allows “socket hijacking”, so we explicitly don't set it here.
// https://docs.microsoft.com/en-us/windows/win32/winsock/using-so-reuseaddr-and-so-exclusiveaddruse
socket.set_reuseaddr(true)?;
socket.bind(addr)?;
let listener = socket.listen(1024)?;
Ok(())
let addr = "127.0.0.1:8080".parse().unwrap();
let socket = TcpSocket::new_v4()?;
let stream = socket.connect(addr).await?;
Ok(())
}
tokio 的 TcpStream::connect() 没有提供 timeout,但是可以使用 tokio::time::timeout 来实现:
const CONNECTION_TIME: u64 = 100;
// ...
let (socket, _response) = match tokio::time::timeout(
Duration::from_secs(CONNECTION_TIME),
tokio::net::TcpStream::connect("127.0.0.1:8080")
)
.await
{
Ok(ok) => ok,
Err(e) => panic!(format!("timeout while connecting to server : {}", e)),
}
.expect("Error while connecting to server")
tokio 的 TcpStream 也没有提供其它 read/write timeout,但是可以标准库提供了,可以从标准库 TcpStream 创建 tokio TcpStream:
let std_stream = std::net::TcpStream::connect("127.0.0.1:8080")
.expect("Couldn't connect to the server...");
std_stream.set_write_timeout(None).expect("set_write_timeout call failed");
std_stream.set_nonblocking(true)?;
let stream = tokio::net::TcpStream::from_std(std_stream)?;
如果要使用其它标准库或 tokio::net 没有提供的 socket 设置,可以使用 =socket2 crate=, 例如为 socket 设置更详细的 KeepAalive 参数,设置 tcp user timeout 等:
use std::time::Duration;
use socket2::{Socket, TcpKeepalive, Domain, Type};
let socket = Socket::new(Domain::IPV4, Type::STREAM, None)?;
let keepalive = TcpKeepalive::new()
.with_time(Duration::from_secs(4));
// Depending on the target operating system, we may also be able to
// configure the keepalive probe interval and/or the number of
// retries here as well.
socket.set_tcp_keepalive(&keepalive)?;
关于 TCP_USER_TIMEOUT 可以为连接建立(即 connect())和数据读写指定超时时间:
- https://codearcana.com/posts/2015/08/28/tcp-keepalive-is-a-lie.html
- https://blog.cloudflare.com/when-tcp-sockets-refuse-to-die/
对于 UDP,只有 UdpSocket 一种类型,它提供 bind/connet/send/recv/send_to/recv_from 等方法。
- bind
- 监听指定的地址和端口。由于 UDP 是无连接的,使用 recv_from 来接收任意 client 发送的数据,使用 send_to可以向任意 target 发送数据。
- connect
- UDP 虽然是无连接的,但是也支持使用 connect() 方法,效果是为返回的 UdpSocket 指定了 target ip 和port,可以使用 send/recv 方法来只向该 target 发送和接收数据。
use tokio::net::UdpSocket;
use std::io;
#[tokio::main]
async fn main() -> io::Result<()> {
let sock = UdpSocket::bind("0.0.0.0:8080").await?;
// use `sock`
Ok(())
}
对于 Unix Socket,提供两种类型:
- 面向连接的: UnixListener,UnixStream;
- 面向无连接的:UnixDatagram
Unix Socket 使用本地文件(一般是 .sock 结尾)寻址(named socket),也可以使用没有关联文件的 unnamed Unix Socket。
关闭 named socket 时,对应的 socket 文件并不会被自动删除,如果没有 unix socket bind 到该文件上,则 client 连接会失败。
- pub fn bind<P>(path: P) -> Result<UnixDatagram> where P: AsRef<Path> : 使用指定 socket 文件创建一个 named unix Datagram socket;
- pub fn pair() -> Result<(UnixDatagram, UnixDatagram)>:返回一对 unamed unix Datagram socket;
- pub fn unbound() -> Result<UnixDatagram>:创建一个没有绑定任何 addr 的 socket。
- pub fn connect<P: AsRef<Path>>(&self, path: P) -> Result<()>:Datagram 是无连接的,调用该方法后可以使用 send/recv 从指定的 path 收发消息。
use tokio::net::UnixDatagram;
use tempfile::tempdir;
// We use a temporary directory so that the socket files left by the bound
// sockets will get cleaned up.
let tmp = tempdir()?;
// Bind each socket to a filesystem path
let tx_path = tmp.path().join("tx");
let tx = UnixDatagram::bind(&tx_path)?;
let rx_path = tmp.path().join("rx");
let rx = UnixDatagram::bind(&rx_path)?;
let bytes = b"hello world";
tx.send_to(bytes, &rx_path).await?;
let mut buf = vec![0u8; 24];
let (size, addr) = rx.recv_from(&mut buf).await?;
let dgram = &buf[..size];
assert_eq!(dgram, bytes);
assert_eq!(addr.as_pathname().unwrap(), &tx_path);
// unnamed unix socket
use tokio::net::UnixDatagram;
// Create the pair of sockets
let (sock1, sock2) = UnixDatagram::pair()?;
// Since the sockets are paired, the paired send/recv
// functions can be used
let bytes = b"hello world";
sock1.send(bytes).await?;
let mut buff = vec![0u8; 24];
let size = sock2.recv(&mut buff).await?;
let dgram = &buff[..size];
assert_eq!(dgram, bytes);
UnixListener bind() 返回 UnixListener,它的 accept() 方法返回 UnixStream:
use tokio::net::UnixListener;
#[tokio::main]
async fn main() {
let listener = UnixListener::bind("/path/to/the/socket").unwrap();
loop {
match listener.accept().await {
Ok((stream, _addr)) => {
println!("new client!");
}
Err(e) => { /* connection failed */ }
}
}
}
UnixStream 实现了 AsyncRead/AsyncWrite,它的 connect() 也返回一个 UnixStream:
use tokio::io::Interest;
use tokio::net::UnixStream;
use std::error::Error;
use std::io;
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
let dir = tempfile::tempdir().unwrap();
let bind_path = dir.path().join("bind_path");
let stream = UnixStream::connect(bind_path).await?;
loop {
let ready = stream.ready(Interest::READABLE | Interest::WRITABLE).await?;
if ready.is_readable() {
let mut data = vec![0; 1024];
// Try to read data, this may still fail with `WouldBlock`
// if the readiness event is a false positive.
match stream.try_read(&mut data) {
Ok(n) => {
println!("read {} bytes", n);
}
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
continue;
}
Err(e) => {
return Err(e.into());
}
}
}
if ready.is_writable() {
// Try to write data, this may still fail with `WouldBlock`
// if the readiness event is a false positive.
match stream.try_write(b"hello world") {
Ok(n) => {
println!("write {} bytes", n);
}
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
continue;
}
Err(e) => {
return Err(e.into());
}
}
}
}
}
14 tokio::signal #
tokio::signal module 提供了信号捕获和处理的能力。
pub async fn ctrl_c() -> Result<()>:当收到 C-c 发送的 SIGINT 信号时 .await 返回;
use tokio::signal;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
signal::ctrl_c().await?;
println!("ctrl-c received!");
Ok(())
}
对于其它信号,可以使用 SignalKind 和 signal 函数创建一个 Signal,然后调用它的 recv() 方法:
// Wait for SIGHUP on Unix
use tokio::signal::unix::{signal, SignalKind};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// An infinite stream of hangup signals.
let mut stream = signal(SignalKind::hangup())?;
// Print whenever a HUP signal is received
loop {
stream.recv().await;
println!("got signal HUP");
}
}
15 tokio::time #
tokio::time module 提供了如下类型:
- Sleep
- Interval: 在固定时间间隔流式产生一个值(stream yielding a value)
- Timeout:为一个 future 或 stream 封装一个执行的过期时间,过期后 future 或 stream 被取消,并返回一个错误。
上面的类型必须在 async Runtime 上下文中使用:
不能在 async Rutnime 中使用标准库的 sleep,它会阻塞当前线程执行其它异步任务。
// Wait 100ms and print “100 ms have elapsed”
use std::time::Duration;
use tokio::time::sleep;
#[tokio::main]
async fn main() {
sleep(Duration::from_millis(100)).await;
println!("100 ms have elapsed");
}
// Require that an operation takes no more than 1s.
use tokio::time::{timeout, Duration};
async fn long_future() {}
// 任意 Feature 都可以设置异步 timeout
let res = timeout(Duration::from_secs(1), long_future()).await;
if res.is_err() {
println!("operation timed out");
}
// A simple example using interval to execute a task every two seconds.
use tokio::time;
async fn task_that_takes_a_second() {
println!("hello");
time::sleep(time::Duration::from_secs(1)).await
}
#[tokio::main]
async fn main() {
let mut interval = time::interval(time::Duration::from_secs(2));
for _i in 0..5 {
interval.tick().await;
task_that_takes_a_second().await;
}
}
tokio::time 提供了 pause()/resume()/advance() 方法。
tokio::time::pause(): 将当前 Instant::now() 保存,后续调用 Instant::now() 时将返回保存的值。保存的值可以使用 advance() 修改。该函数只适合 current_thread runtime,也就是 #[tokio::test] 默认使用的 runtime。(这里的 Instant 是 tokio 提供的,标准库 Instant 不受影响。)
自动前进(auto-advance):当 time 被 paused,且当前 runtime 空闲时,clock 会被自动前进到下一个 pending timer。这意味着 Sleep
和其它 time 相关函数、方法被 await 时,会引起 runtime 前景时间。
#[tokio::main(flavor = "current_thread", start_paused = true)]
async fn main() {
println!("Hello world");
}
16 tokio::process #
use tokio::io::AsyncWriteExt;
use tokio::process::Command;
use std::process::Stdio;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let mut cmd = Command::new("sort");
// Specifying that we want pipe both the output and the input. Similarly to capturing the
// output, by configuring the pipe to stdin it can now be used as an asynchronous writer.
cmd.stdout(Stdio::piped());
cmd.stdin(Stdio::piped());
let mut child = cmd.spawn().expect("failed to spawn command");
// These are the animals we want to sort
let animals: &[&str] = &["dog", "bird", "frog", "cat", "fish"];
let mut stdin = child
.stdin
.take()
.expect("child did not have a handle to stdin");
// Write our animals to the child process Note that the behavior of `sort` is to buffer _all
// input_ before writing any output. In the general sense, it is recommended to write to the
// child in a separate task as awaiting its exit (or output) to avoid deadlocks (for example,
// the child tries to write some output but gets stuck waiting on the parent to read from it,
// meanwhile the parent is stuck waiting to write its input completely before reading the
// output).
stdin
.write(animals.join("\n").as_bytes())
.await
.expect("could not write to stdin");
// We drop the handle here which signals EOF to the child process. This tells the child
// process that it there is no more data on the pipe.
drop(stdin);
let op = child.wait_with_output().await?;
// Results should come back in sorted order
assert_eq!(op.stdout, "bird\ncat\ndog\nfish\nfrog\n".as_bytes());
Ok(())
}
使用其它进程的输出作为输入:
use tokio::join;
use tokio::process::Command;
use std::process::Stdio;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let mut echo = Command::new("echo")
.arg("hello world!")
.stdout(Stdio::piped())
.spawn()
.expect("failed to spawn echo");
let tr_stdin: Stdio = echo
.stdout
.take()
.unwrap()
.try_into()
.expect("failed to convert to Stdio");
let tr = Command::new("tr")
.arg("a-z")
.arg("A-Z")
.stdin(tr_stdin)
.stdout(Stdio::piped())
.spawn()
.expect("failed to spawn tr");
let (echo_result, tr_output) = join!(echo.wait(), tr.wait_with_output());
assert!(echo_result.unwrap().success());
let tr_output = tr_output.expect("failed to await tr");
assert!(tr_output.status.success());
assert_eq!(tr_output.stdout, b"HELLO WORLD!\n");
Ok(())
}