跳过正文

tokio

··25192 字
Rust Rust-Crate
目录
rust crate - 这篇文章属于一个选集。
§ 1: 本文
# 使用 full feature
tokio = { version = "1", features = ["full"] }

# 或单独启用各个 feature
tokio = { version = "1", features = ["rt", "net"] }
  • full : Enables all features listed below except test-util and tracing.
  • rt : Enables tokio::spawn, the current-thread scheduler, and non-scheduler utilities.
  • rt-multi-thread : Enables the heavier, multi-threaded, work-stealing scheduler.
  • io-util : Enables the IO based Ext traits.
  • io-std : Enable Stdout, Stdin and Stderr types.
  • net : Enables tokio::net types such as TcpStream, UnixStream and UdpSocket, as well as (on Unix-like systems) AsyncFd and (on FreeBSD) PollAio.
  • time : Enables tokio::time types and allows the schedulers to enable the built in timer.
  • process : Enables tokio::process types.
  • macros : ~ Enables#[tokio::main] and #[tokio::test]~ macros.
  • sync : Enables all tokio::sync types.
  • signal : Enables all tokio::signal types.
  • fs : Enables tokio::fs types.
  • test-util : Enables testing based infrastructure for the Tokio runtime.
  • parking_lot : As a potential optimization, use the parking_lot crate’s synchronization primitives internally. Also, this dependency is necessary to construct some of our primitives in a const context. MSRV may increase according to the parking_lot release in use.

下面是启用 tokio_unstable feature 后才能使用的功能:

  • tracing: Enables tracing events.
  • task::Builder 注意不是 runtime::Builder
  • Some methods on task::JoinSet
  • runtime::RuntimeMetrics
  • runtime::Builder::unhandled_panic
  • task::Id

一般是在项目的 .cargo/config.toml 中配置的:

[build]
rustflags = ["--cfg", "tokio_unstable"]

#[tokio::main]: 创建和设置一个 Runtime,对于复杂的 Runtime 参数可以使用 Builder;

  • 一般用于 async main fn,否则每次调用该 async fn 时都新建一个 Runtime 来运行;
  • 默认是 Multi-threaded runtime,为每个 CPU Core 创建一个 thread 的 worker thread pool 来调度执行 spawn() 产生的 Future task, 支持 work-stealing strategy;
// Multi-threaded runtime
#[tokio::main(flavor = "multi_thread", worker_threads = 10)]
#[tokio::main(worker_threads = 2)]

// Current thread runtime
#[tokio::main(flavor = "current_thread")]
#[tokio::main(flavor = "current_thread", start_paused = true)]

// 示例
#[tokio::main]
async fn main() {
    println!("Hello world");
}

// 等效为
fn main() {
    tokio::runtime::Builder::new_multi_thread()
        .enable_all()
        .build()
        .unwrap()
        .block_on(async {
            println!("Hello world");
        })
}

join!() 并发执行传入的 async fn,直到它们 都完成 (标准库 std::future module 也提供该宏):

  1. join!() 必须在 async context 中运行,比如 async fn/block/closure;
  2. tokio 使用 单线程 来执行这些 async task,所以一个 task 可能 blocking 其它 task 的执行,如果要真正并发执行,需要使用 spawn();
  3. 如果 async task 都返回 Result,join!() 也是等待它们都完成。如果要在遇到第一个 Error 时停止执行,可以使用 try_join!()
async fn do_stuff_async() {}
async fn more_async_work() {}

#[tokio::main]
async fn main() {
    let (first, second) = tokio::join!(
        do_stuff_async(),
        more_async_work());
}

pin!() : 拥有传入的 async task 的 Future 对象,返回一个 Pin 类型对象 ,可以确保该 Future 对象的栈内存不发生移动:

  • select! 宏在执行某个 branch 时会 cancel/drop 其它 branch 的 Future 对象,为了能在 loop 中使用 select!, 需要传入 &mut future 而非 future 对象本身(防止被 drop)。
use tokio::{pin, select};
use tokio_stream::{self as stream, StreamExt};

async fn my_async_fn() {}

#[tokio::main]
async fn main() {
    // 从可迭代对象创建一个 async stream。
    let mut stream = stream::iter(vec![1, 2, 3, 4]);

    let future = my_async_fn();
    pin!(future);

    // select! 宏在执行某个 branch 时会 cancel/drop 其它 branch 的 Future 对象,
    // 为了能在 loop 中使 用 select!, 需要传入 &mut future 而非 future 对象本身
    // (防止被 drop)。
    loop {
        select! {
            _ = &mut future => {
                break;
            }
            // drop 的是 next() 返回的 future 对象而非 stream
            Some(val) = stream.next() => {
                println!("got value = {}", val);
            }
        }
    }
}

// 同时创建多一个 Future 的 Pin 类型
#[tokio::main]
async fn main() {
    pin! {
        let future1 = my_async_fn();
        let future2 = my_async_fn();
    }

    select! {
        _ = &mut future1 => {}
        _ = &mut future2 => {}
    }
}

select!() : 并发执行多个 async expression,然后并发 .await 返回的 Future 对象,对于第一个匹配 pattern 的 branch,执行对应的 handler,同时 drop 其它 branch 正在 await 的 Future 对象:

<pattern> = <async expression> (, if <precondition>)? => <handler>,
async fn do_stuff_async() { }
async fn more_async_work() {}

#[tokio::main]
async fn main() {
    tokio::select! {
        _ = do_stuff_async() => {
            println!("do_stuff_async() completed first")
        }

        _ = more_async_work() => {
            println!("more_async_work() completed first")
        }
    };
}

task_local!(): 对于传给 spawn() 的 async fn,必须实现 Send + Sync + ‘static, 它有可能被调度到不同的线程上运行,所以不能使用 thread_local 变量,但可使用 task local 变量。该宏生成一个 tokio::task::LocalKey 使用的 local key:

tokio::task_local! {
    pub static ONE: u32;
    static TWO: f32;
}

tokio::task_local! {
    static NUMBER: u32;
}

// 生成的 ONE、TWO、NUMBER 都是 tokio::task::LocalKey 类型。

NUMBER.scope(1, async move {
    assert_eq!(NUMBER.get(), 1);
}).await;

NUMBER.scope(2, async move {
    assert_eq!(NUMBER.get(), 2);

    NUMBER.scope(3, async move {
        assert_eq!(NUMBER.get(), 3);
    }).await;
}).await;

1 tokio::runtime
#

tokio Runtime 提供 IO event loop 的 IO task 调度,以及基于 timer 的调度。(如调度使用 tokio::time::sleep() 的 async task)。

一般不需要手动创建 Runtime,而是使用 #[tokio::main] 宏来标识 async fn main() 函数。如果要精细控制 Runtime 的参数,可以使用 tokio::runtime::Builder

在 Runtime 上下文中,可以使用 tokio::spawn()/spawn_local() 来执行异步任务,使用 spwan_blocking() 执行同步任务。

#[tokio::main]
async fn main() {
    println!("Hello world");
}
// 等效为
fn main() {
    tokio::runtime::Builder::new_multi_thread()
        .enable_all() // 启用所有 resource driver(IO/timer)
        .build()
        .unwrap()
        .block_on(async {
            println!("Hello world");
        })
}

// 另一个例子,手动创建 Runtime
use tokio::net::TcpListener;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::runtime::Runtime;

fn main() -> Result<(), Box<dyn std::error::Error>> {
    // 创建一个 multi thread 和 enable all 的 Runtime
    let rt  = Runtime::new()?;

    rt.block_on(async {
        let listener = TcpListener::bind("127.0.0.1:8080").await?;
        loop {
            let (mut socket, _) = listener.accept().await?;
            tokio::spawn(async move {
                let mut buf = [0; 1024];
                loop {
                    let n = match socket.read(&mut buf).await {
                        // socket closed
                        Ok(n) if n == 0 => return,
                        Ok(n) => n,
                        Err(e) => {
                            println!("failed to read from socket; err = {:?}", e);
                            return;
                        }
                    };
                    if let Err(e) = socket.write_all(&buf[0..n]).await {
                        println!("failed to write to socket; err = {:?}", e);
                        return;
                    }
                }
            });
        }
    })
}

tokio Runtime 有两种类型(默认使用的是多线程版本):

  • Multi-Thread Scheduler : 使用每个 CPU 一个 thread 的 worker 线程池, 使用 work-stealing 机制执行 spawn() 提交的 Future task;
  • Current-Thread Scheduler : 提供单线程执行器,所有异步任务都在该线程上执行;

这两个 Runtime 都有两个 queue:global queue 和 local queue:

  • 先从 local queue 获取 task,如果为空,再从 global queue 获取任务。或者从 local queue 获取 global_queue_interval (默认 31)个任务后,再从 global queue 获取任务;
  • 当没有任务可以调度,或者调度任务超过 event_interval 次后(默认 61)后,检查 IO/Timer event;
  • current-thread scheduler 默认启用 lifo slot optimazition,即:有新 task 被 wake 时,添加到 local queue;

对于 multi-thread scheduler,有一个固定 thread 数量的 thread pool,它在创建 Runtime 时创建。有一个 global queue 和每个 thread 一个的 local queue。local queue 初始容量是 256 个 tasks,超过的会被移动 到 global queue。默认先从 local queue 获取 task,然后是 global queue,如果都为空,则从其它 thread 的 local queue steal tasks

tokio Runtime 除了管理 Scheduler 以外,还管理各种 resource driver,需要单独启用它们(目前就 io 和 time 两种类型)或一次全部启用:

  • Builder::enable_io() :包括 network/fs 等 IO;
  • Builder::enable_time() :定时器调度;
  • Builder::enable_all() :启用所有 resrouce driver。

Runtime 在调度任务的间隙,周期检查这些 IO/Timer 是否 Ready,从而按需唤醒响应的 task 来执行。默认当没有任务可以调度,或者调度的任务数超过 61 时检查 IO/Timer,这个次数可以使用 event_interval 来设置。

创建 Runtime 时,默认为每一个 CPU 创建一个 thread,形成固定 thread 数量的 worker thread pool 。同时,tokio Runtime 还维护一个 blocking thread pool ,其中的 thread 在调用 spawn_blocking() 时临时创建,这个 pool 中的线程数量不固定,而且 idle 一段时间后会自动被 tokio Runtime 清理。

tokio Runtime 确保所有 task 都是 公平调度 ,防止个别 task 一直可以调度的情况下,其它 task 得不到运行。

  • MAX_TASKS 指定任意时刻 runtime 创建的最大数量 task;
  • MAX_SCHEDULE 指定 runtime 可以调度的最大数量;
  • MAX_DELAY 指定任务被唤醒后,调度器执行它的最大延迟;

MAX_TASKS 和 MAX_SCHEDULE 是可以配置的,MAX_DELAY 是运行时自动计算的。

运行时调度器不保证任务调度的顺序。

1.1 Runtime
#

Runtime 类型对象提供 IO driver、task 调度、计时器、线程池等运行异步任务所需的功能。

关闭 Runtime:

  1. drop runtime 对象;
  2. 调用 shutdown_background() 或 shutdown_timeout() 方法;

当 drop Runtime 时默认 wait forver ,直到所有已经 spawned 的任务被 stopped:

  1. Tasks spawned through Runtime::spawn keep running until they yield. Then they are dropped. They are not guaranteed to run to completion, but might do so if they do not yield until completion.
    • task 只有 yield 时(例如下一次 .await 位置)才会被 dropped,否则就一直运行直到结束;
  2. Blocking functions spawned through Runtime::spawn_blocking keep running until they return.

调用 Runtime 的 shutdown_background() and shutdown_timeout() 方法,可以=避免这种 waiting forerver= 的等待。当 timeout 时,如果 task 没有结束,则运行它的 thread 会被泄露(leak),task 会一直运行直到结束。

Runtime 方法:

new()
创建一个多线程的 Runtime,启用所有 resource driver, 自动创建一个 Handle 并调用它的 enter(), 从而可以使用 tokio::spwan()。 一般用 #[tokio::main] 来自动调用。
block_on()
在 Runtime 中执行异步 task,只能在同步上下文中调用该方法,它是同步代码和异步代码的结合点,如果在异步上下文中执行则会 panic。
  • 在当前线程中执行 Future task,block 当前线程直到 task 返回。如果要并行执行 task,则需要在 task 内部调用 spawn() 来提交 task;
spawn() 和 spawn_blocking()
返回的 JoinHandle 对象实现了 Future,可以 .await 获得结果;
enter()
设置 thread local Runtime,后续 tokio::spawn() 感知该 Runtime。一般用于手动创建 Runtime 的场景。
handler()
返回一个可以在该 Runtime 上 spawn task 的 Handle 对象,它基于引用计数实现了 Clone() 方法,可以将 Runtime 在多个 thread 间共享。
impl Runtime

// 创建一个多线程的调度器 Runtime,启用所有默认 resource driver,
// 一般使用 #[tokio::main] 来自动调用。
pub fn new() -> Result<Runtime>

// 返回一个实现了引用计数的 Handle 对象,可以 spawn() 任务,将 Runtime 在多个
// thread 间共享。
pub fn handle(&self) -> &Handle

// 在 worker thread pool 中立即运行 async task,而不管是否 await 返回的 JoinHandle
pub fn spawn<F>(&self, future: F) -> JoinHandle<F::Output>
where
    // 多线程 Runtime 要求传入的 feture 必须是 Send + 'static
    F: Future + Send + 'static,
    F::Output: Send + 'static

// 提交同步任务
pub fn spawn_blocking<F, R>(&self, func: F) -> JoinHandle<R>
where
    F: FnOnce() -> R + Send + 'static,
    R: Send + 'static

// 执行 future,直到结束。对于多线程 Runtime,当 block_on 返回时,spawn() 的 task
// 可能还在运行,如果要确保这些 task 完成,可以对它们进行 .await.
pub fn block_on<F: Future>(&self, future: F) -> F::Output

// 设置 thread local Runtime,后续可以使用 tokio::spawn() 等函数(它们感知 EnterGuard)
pub fn enter(&self) -> EnterGuard<'_>

// 关闭 Runtime,不能再提交任务和 IO/Timer
pub fn shutdown_timeout(self, duration: Duration)
pub fn shutdown_background(self)

impl Runtime
// Available on tokio_unstable only.
pub fn metrics(&self) -> RuntimeMetrics

Runtime::enter() 或 Handle::enter() 创建 Runtime context,它会创建一个 thread local 变量来标识当前的 Runtime,可以使用 tokio::spawn() 等方法在该 Runtime context 中提交异步任务。。Runtime::new() 自动创建一个 Handle 并调用它的 enter();

use tokio::runtime::Runtime;
use tokio::task::JoinHandle;

fn function_that_spawns(msg: String) -> JoinHandle<()> {
    // tokio::spawn 感知所在线程的 EnterGuard 对象
    tokio::spawn(async move {
        println!("{}", msg);
    })
}

fn main() {
    // 手动创建 Runtime
    let rt = Runtime::new().unwrap();

    let s = "Hello World!".to_string();

    // By entering the context, we tie `tokio::spawn` to this executor.
    let _guard = rt.enter();
    let handle = function_that_spawns(s);

    rt.block_on(handle).unwrap();
}

runtime.shutdown_timeout() 和 shutdown_background() 关闭 Runtime,而不等待所有spawn 的 task 结束。而 Drop Runtime 时默认会等待所有 task 结束,则可能导致无限期等待。这两个方法不会无限期等待,task 可能还会在后台运行,故对应的 thread 可能会被泄露。

use tokio::runtime::Runtime;
use tokio::task;

use std::thread;
use std::time::Duration;

fn main() {
    let runtime = Runtime::new().unwrap();
    runtime.block_on(async move {
        task::spawn_blocking(move || {
            thread::sleep(Duration::from_secs(10_000));
        });
    });

    runtime.shutdown_timeout(Duration::from_millis(100));
}

Runtime 可以通过多方方式进行 Sharing:

  1. Using an Arc<Runtime>.
  2. Using a Handle.
  3. Entering the runtime context.

Arc<Runtime> 和 Handle(Runtime.handle() 方法返回) 都基于引用计数实现了 Clone() 方法,可以将 Runtime 在多个 thread 间共享。Handler::current() 返回当前 Runtime 的 Handle,可以 spawn() 并发异步任务。

use tokio::runtime::Handle;

#[tokio::main]
async fn main () {
    let handle = Handle::current();

    // 创建一个 std 线程
    std::thread::spawn(move || {
        // Using Handle::block_on to run async code in the new thread.
        handle.block_on(async {
            println!("hello");
        });
    });
}

1.2 Builder
#

Builder 用于创建自定义参数的 Runtime:

use tokio::runtime::Builder;

fn main() {
    let runtime = Builder::new_multi_thread()
        .worker_threads(4)
        .thread_name("my-custom-name")
        .thread_stack_size(3 * 1024 * 1024)
        .build()
        .unwrap();
    // use runtime ...
}

Builder 方法:

  • new_current_thread() :单线程调度器;
  • new_multi_thread() : 多线程调度器;
impl Builder
pub fn new_current_thread() -> Builder
pub fn new_multi_thread() -> Builder
pub fn new_multi_thread_alt() -> Builder

// 启用所有 resource driver,如 io/net/timer 等
pub fn enable_all(&mut self) -> &mut Self

// 默认为 CPU Core 数量,会覆盖环境变量 TOKIO_WORKER_THREADS 值。一次性创建。
pub fn worker_threads(&mut self, val: usize) -> &mut Self

// blocking 线程池中线程数量,按需创建,idle 超过一定时间后被回收
pub fn max_blocking_threads(&mut self, val: usize) -> &mut Self

// blocking 线程池中线程空闲时间,默认 10s
pub fn thread_keep_alive(&mut self, duration: Duration) -> &mut Self

pub fn thread_name(&mut self, val: impl Into<String>) -> &mut Self
pub fn thread_name_fn<F>(&mut self, f: F) -> &mut Self
    where F: Fn() -> String + Send + Sync + 'static
pub fn thread_stack_size(&mut self, val: usize) -> &mut Self

pub fn on_thread_start<F>(&mut self, f: F) -> &mut Self
    where F: Fn() + Send + Sync + 'static
pub fn on_thread_stop<F>(&mut self, f: F) -> &mut Self
    where F: Fn() + Send + Sync + 'static
pub fn on_thread_park<F>(&mut self, f: F) -> &mut Self
    where F: Fn() + Send + Sync + 'static
pub fn on_thread_unpark<F>(&mut self, f: F) -> &mut Self
    where F: Fn() + Send + Sync + 'static

// 构建生成 Runtime
pub fn build(&mut self) -> Result<Runtime>

// 检查 global queue event 的 scheduler ticks
pub fn global_queue_interval(&mut self, val: u32) -> &mut Self

// 检查 event 的 scheduler ticks
pub fn event_interval(&mut self, val: u32) -> &mut Self

// Available on tokio_unstable only.
pub fn unhandled_panic(&mut self, behavior: UnhandledPanic) -> &mut Self

// Available on tokio_unstable only.
pub fn disable_lifo_slot(&mut self) -> &mut Self
// Available on tokio_unstable only.
pub fn rng_seed(&mut self, seed: RngSeed) -> &mut Self
// Available on tokio_unstable only.
pub fn enable_metrics_poll_count_histogram(&mut self) -> &mut Self
pub fn metrics_poll_count_histogram_scale( &mut self, histogram_scale: HistogramScale ) -> &mut Self
pub fn metrics_poll_count_histogram_resolution( &mut self, resolution: Duration) -> &mut Self
pub fn metrics_poll_count_histogram_buckets( &mut self, buckets: usize ) -> &mut Self

impl Builder
// Available on crate feature net, or Unix and crate feature process, or Unix
// and crate feature signal only.
pub fn enable_io(&mut self) -> &mut Self
// Available on crate feature net, or Unix and crate feature process, or Unix
// and crate feature signal only.
pub fn max_io_events_per_tick(&mut self, capacity: usize) -> &mut Self

// Available on crate feature time only.
pub fn enable_time(&mut self) -> &mut Self

impl Builder
pub fn start_paused(&mut self, start_paused: bool) -> &mut Self

2 tokio::task
#

task 是轻量级,无阻塞的调度&执行单元,三大特点:

  1. 轻量级:用户级的 Tokio runtime 调度,而非操作系统内核调度;
  2. 协作式调度:OS thread 一般是抢占式调度;协作式调度 cooperatively 表示 task 会一直运行直到 yield(例如数据没有准备好,.await 位置会 yield,同时 tokio API lib 中默认强制插入了一些 yield point,确保即使 task 没有 yield,底层的 lib 也会周期 yield),这时 Tokio Runtime 会调度其他 task 来运行;
  3. 非阻塞:需要使用 tokio crate 提供的非阻塞 IO 来进行 IO/Net/Fs 在 async context 下的 APIs 操作,这些 API 不会阻塞线程,而是 yield,这样 tokio Runtime 可以执行其它 task;

异步运行时将 async fn 或 async block 作为调度和执行的单元(它们都返回 Future),称为异步任务 async task

  • Runtime::block_on() : 使用当前线程来运行异步任务,是同步代码和异步代码的分界点;
  • task::spawn_local() : 在 block_on() 所在的线程空闲时运行异步任务;
  • task::spawn() : 在一个线程池中立即运行异步任务;
  • task::spawn_blocking() :立即创建一个线程来运行指定的 同步函数

同步代码和异步代码的结合点是 block_on(),block_on() 是一个 同步函数 ,所以不能在 async fn 中调用它。它在当前线程上 poll 传入的 Future 对象 ,直到 Ready 返回,如果是 Pending 且没有其他 aysnc task(如通过 task::spawn_local() 提交的任务)可以 poll 则 block_on() 会 sleep。

block_on() 在当前线程 poll 传入的 Future 对象,当它空闲时,为了能在该线程中同时 poll 其它 Future,可以使用 spawn_local() 来提交任务。可以多次调用 spawn_local(),它将传入的 Future 添加到 block_on() 线程所处理的 task pool 中,当 block_on() 处理某个 task pending 时,会从该 pool 中获取下一个 Future task 进行 spawn_local。

poll() 返回 JoinHandle 类型,它实现了 Future trait,可以进行 .await 来获得最终值。如果一个 task 执行时间很长,会导致 block_on 没有机会执行其他 task,引起性能问题。

spawn_local() 要求传入的 async task Future 对象是 ‘static 的(因为是单线程执行,所以不要求实现 Send+Sync),这是由于该 Future 的 执行时机是不确定的 ,如果不 .await 它返回的 handle,则有可能 many_requests() 函数返回了,但是 Future 还没有被执行,从而导致引用失效。

解决办法:

  1. 传入一个 async 辅助函数,它拥有对应的参数值,这样该函数就实现了 ‘static;
  2. 使用 async move {} 来捕获对象,这样该 block 返回的 Future 也实现了 ‘static;
// async 辅助函数,拥有传入参数值的所有权
async fn cheapo_owning_request(host: String, port: u16, path: String) -> std::io::Result<String> {
    cheapo_request(&host, port, &path).await
}

for (host, port, path) in requests {
    handles.push(task::spawn_local(cheapo_owning_request(host, port, path)));
}

// 在单线程中并发发起多个 request
let requests = vec![
    ("example.com".to_string(), 80, "/".to_string()),
    ("www.red-bean.com".to_string(), 80, "/".to_string()),
    ("en.wikipedia.org".to_string(), 80, "/".to_string()),
];

// block_on() 是同步函数,返回传入的 async task 的返回值。
let results = async_std::task::block_on(many_requests(requests));
for result in results {
    match result {
        Ok(response) => println!("{}", response),
        Err(err) => eprintln!("error: {}", err),
    }
}

spawn() 是在一个专门的线程池(tokio 称为 worker thread 或 core thread)中并发执行 async task。spawn() 和 spawn_local() 一样,也返回 JoinHandler,JoinHandler 实现了 Future,需要 .await 获得结果。但是它不依赖调用 block_on() 来 poll,而是执行候该函数后立即开始被 poll。由于传给 spawn() 的 Future 可能被线程池中任意线程执行,而且在暂停恢复后可能会在另一个线程中运行,所以不能使用线程本地存储变量(解决办法是使用 task_local! 宏),同时 Future 对象必须满足 Future+Send+'static ,这样该 Future 对象才能安全的 move 到其他线程执行。(类似于 std::thread::spawn() 对闭包的要求);

use async_std::task;
let mut handles = vec![];
for (host, port, path) in requests {
    handles.push(task::spawn(
        // block 捕获了上下问文对象所有权,所以返回的是 'static Future 对象
        async move {
            cheapo_request(&host, port, &path).await
        }
    ));
}

use async_std::task;
use std::rc::Rc;

// 错误:reluctant() 返回的 Future 没有实现 Send。
async fn reluctant() -> String {
    let string = Rc::new("ref-counted string".to_string());
    some_asynchronous_thing().await; // Rc 跨 await
    format!("Your splendid string: {}", string)
}
task::spawn(reluctant());

// 解决办法:将不支持 Send 的变量隔离在单独的 .await 所在的 block。
async fn reluctant() -> String {
    let return_value = {
        let string = Rc::new("ref-counted string".to_string());
        format!("Your splendid string: {}", string)
    };
    some_asynchronous_thing().await; // await 没有跨 Rc
    return_value
}

另一个常见的错误是 Box<dyn std::error::Error> 对象不支持 Send,比如 some_fallible_thing() 的返回值 Result 有效性是整个 match 表达式,所以跨越了 Ok 分支的 .await, 这时 Result 如果不是 Send 就报错:

type GenericError = Box<dyn std::error::Error>;
type GenericResult<T> = Result<T, GenericError>;

fn some_fallible_thing() -> GenericResult<i32> {
    //...
}

// 该异步函数返回的 Future 没有实现 Send
async fn unfortunate() {
    match some_fallible_thing() {
        Err(error) => {
            // 返回的 Error 类型是 Box<dyn std::error::Error> 没有实现 Send
            report_error(error);
        }
        Ok(output) => {
            // 返回值跨越了这个 .await
            use_output(output).await;
        }
    } }

// 报错
async_std::task::spawn(unfortunate());

解决办法是:重新定义 Error, 添加 Send Bound,‘static 是 Box<dyn Trait> 默认的 lifetime,可以不添加。

type GenericError = Box<dyn std::error::Error + Send + Sync + 'static>;
type GenericResult<T> = Result<T, GenericError>;

.await 表达式是 Rust 执行异步 poll 的时间点(yield point),默认情况下 task 可以被异步运行时调度到其它 thread 中执行,所以 async fn 中涉及跨 await 的对象,都需要是能 Send 的。

spawn_blocking() 是异步运行时用来解决可能会阻塞或长时间运行(CPU 消耗性)的任务问题,它的参数是一个 同步闭包函数 ,运行时会立即创建一个 =新的线程来执行=(tokio 中称为 blocking thread,该线程也组成线程池,在被过期回收前可以被运行时重复使用),它返回一个 Future,可以被 .await 结果。

async fn verify_password(password: &str, hash: &str, key: &str) ->
    Result<bool, argonautica::Error>
{
    let password = password.to_string();
    let hash = hash.to_string();
    let key = key.to_string();

    tokio::task::spawn_blocking(
        // 参数是一个同步闭包函数,而非前面的 async task
        move || {
            argonautica::Verifier::default()
                .with_hash(hash)
                .with_password(password)
                .with_secret_key(key)
                .verify()
        }).await }

// 对于长时间运行的任务,可以主动 yield
while computation_not_done() {
    tokio::task::yield_now().await;
}

如果在 async 中使用跨 await 的 Mutex,则应该使用异步感知的 Mutex 而非标准库的 Mutext。异步感知指的是后续加锁如果失败,则 yield 当前任务,防止单线程死锁:

// https://tokio.rs/tokio/tutorial/shared-state
use std::sync::{Mutex, MutexGuard};

async fn increment_and_do_stuff(mutex: &Mutex<i32>) {
    let mut lock: MutexGuard<i32> = mutex.lock().unwrap();
    *lock += 1;
    do_something_async().await;
    // lock 还有效,直到函数返回时被 drop
} // lock goes out of scope here

报错:std::sync::MutexGuard 没有实现 Send:

error: future cannot be sent between threads safely
   --> src/lib.rs:13:5
    |
13  |     tokio::spawn(async move {
    |     ^^^^^^^^^^^^ future created by async block is not `Send`
    |
   ::: /playground/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-0.2.21/src/task/spawn.rs:127:21
    |
127 |         T: Future + Send + 'static,
    |                     ---- required by this bound in `tokio::task::spawn::spawn`
    |
    = help: within `impl std::future::Future`, the trait `std::marker::Send` is not implemented for `std::sync::MutexGuard<'_, i32>`
note: future is not `Send` as this value is used across an await
   --> src/lib.rs:7:5
    |
4   |     let mut lock: MutexGuard<i32> = mutex.lock().unwrap();
    |         -------- has type `std::sync::MutexGuard<'_, i32>` which is not `Send`
...
7   |     do_something_async().await;
    |     ^^^^^^^^^^^^^^^^^^^^^^^^^^ await occurs here, with `mut lock` maybe used later
8   | }
    | - `mut lock` is later dropped here

解决办法:将 MutexGuard 封装在单独的 block,使其不跨 await 有效。

async fn increment_and_do_stuff(mutex: &Mutex<i32>) {
    {
        let mut lock: MutexGuard<i32> = mutex.lock().unwrap();
        *lock += 1;
    } // lock goes out of scope here

    do_something_async().await;
}

或者使用 tokio 提供的异步感知 Mutex 类型:

use async_std::sync::Mutex;

pub struct Outbound(Mutex<TcpStream>);

impl Outbound {
    pub fn new(to_client: TcpStream) -> Outbound {
        Outbound(Mutex::new(to_client))
    }
    pub async fn send(&self, packet: FromServer) -> ChatResult<()> {
        let mut guard = self.0.lock().await;
        utils::send_as_json(&mut *guard, &packet).await?;
        guard.flush().await?;
        Ok(())
    } }

async task 可以使用 async fn 或 async block 来构造,它们都是返回实现 Future trait 的语法糖。

  • task::spawn() 立即在后台运行异步任务,而不管是否 .await 它返回的 JoinHandle;
  • JoinHandle 实现了 Future trait,.await 它时返回 Result<T, JoinError>;
use tokio::task;

// 需要位于 Runtime Context,spawn 的 task 立即在后台执行,而不管是否 .await 它返回的
// 实现 Future 的 JoinHandle。
let join = task::spawn(async {
    // ...
    "hello world!"
});

// ...

// Await the result of the spawned task.
let result = join.await?;
assert_eq!(result, "hello world!");

// 如果 task panic,则 .await 返回 JoinErr
let join = task::spawn(async {
    panic!("something bad happened!")
});
// The returned result indicates that the task failed.
assert!(join.await.is_err());

spawn() 所需的 async task 的签名是 Future + Send + 'static, 所以:

  1. 不能直接共享借用 stack 上的变量(但是可以共享借用全局 static/static mut 常量,它们具有 ‘static 生命周期);
  2. 使用 move 来将引用的外部对象的所有权转移到 task, 从而满足 ‘static 的要求;
  3. task 在多个 thread 间调度, 所以必须是可以 Send 的, 这意味着 task 内部的跨 await 的变量(因为在 await 处生成 Future object, 它记录了上下文变量)必须实现 Send;
use tokio::task;
#[tokio::main]
async fn main() {
    let v = vec![1, 2, 3];
    task::spawn(async {
        println!("Here's a vec: {:?}", v); // 错误
    });
}
// error[E0373]: async block may outlive the current function, but
//               it borrows `v`, which is owned by the current function
//  --> src/main.rs:7:23
//   |
// 7 |       task::spawn(async {
//   |  _______________________^
// 8 | |         println!("Here's a vec: {:?}", v);
//   | |                                        - `v` is borrowed here
// 9 | |     });
//   | |_____^ may outlive borrowed value `v`
//   |
// note: function requires argument type to outlive `'static`
//  --> src/main.rs:7:17
//   |
// 7 |       task::spawn(async {
//   |  _________________^
// 8 | |         println!("Here's a vector: {:?}", v);
// 9 | |     });
//   | |_____^
// help: to force the async block to take ownership of `v` (and any other
//       referenced variables), use the `move` keyword
//   |
// 7 |     task::spawn(async move {
// 8 |         println!("Here's a vec: {:?}", v);
// 9 |     });
//   |

// Error
use tokio::task::yield_now;
use std::rc::Rc;

#[tokio::main]
async fn main() {
    tokio::spawn(async {
        let rc = Rc::new("hello");
        // `rc` is used after `.await`. It must be persisted to the
        // task's state.
        yield_now().await;
        println!("{}", rc);
    });
}

// OK
use tokio::task::yield_now;
use std::rc::Rc;

#[tokio::main]
async fn main() {
    tokio::spawn(async {
        // The scope forces `rc` to drop before `.await`.
        {
            let rc = Rc::new("hello");
            println!("{}", rc);
        }

        // `rc` is no longer used. It is **not** persisted when the task
        // yields to the scheduler
        yield_now().await; // yield_now() 返回 Future,必须 await 才会实际 yield
    });
}

解决办法: 将 Rc 改成实现 Send 的 Arc:

error: future cannot be sent between threads safely
--> src/main.rs:6:5
	|
	6   |     tokio::spawn(async {
	|     ^^^^^^^^^^^^ future created by async block is not `Send`
	|
	::: [..]spawn.rs:127:21
	|
	127 |         T: Future + Send + 'static,
    |                     ---- required by this bound in
    |                          `tokio::task::spawn::spawn`
    |
    = help: within `impl std::future::Future`, the trait
    |       `std::marker::Send` is not  implemented for
    |       `std::rc::Rc<&str>`
note: future is not `Send` as this value is used across an await
   --> src/main.rs:10:9
    |
7   |         let rc = Rc::new("hello");
    |             -- has type `std::rc::Rc<&str>` which is not `Send`
...
10  |         yield_now().await;
    |         ^^^^^^^^^^^^^^^^^ await occurs here, with `rc` maybe
    |                           used later
11  |         println!("{}", rc);
12  |     });
    |     - `rc` is later dropped here

其它不需要 Future + Send + 'static 的特殊 task 任务类型:

  1. tokio::task::block_in_place(f) // f 同步闭包函数: FnOnce() -> R
  2. tokio::task::LocalSet::spawn_local(f) // f 是 Future + ‘static
// F 是同步函数闭包,而不是 async block 或 fn。
// 在当前 thread 执行同步函数,而 spawn_blocking() 是在一个线程池中执行同步函数。
pub fn block_in_place<F, R>(f: F) -> R
where
    F: FnOnce() -> R

// F 不需要实现 Send+Sync
// 在 block_on() 所在的 thread 上执行 future
pub fn spawn_local<F>(&self, future: F) -> JoinHandle<F::Output>
where
    F: Future + 'static,
    F::Output: 'static,

let local = task::LocalSet::new();
local.spawn_local(async {
    // ...
});

task::spawn() 和 task::spawn_blocking() 返回 JoinHandle 类型:

  • JoinHandle 实现了 Future,.await 返回结果;
  • 当 JoinHandle 被 Drop 时,关联的 task 会被从 Runtime detach,继续运行,但不能再 join 它。
  • 它的 abort_handle() 返回可以 remote abort 该 task 的 AbortHandle;
impl<T> JoinHandle<T>
// Abort the task associated with the handle.
pub fn abort(&self)

// Checks if the task associated with this JoinHandle has finished.
pub fn is_finished(&self) -> bool

// Returns a new AbortHandle that can be used to remotely abort this task.
pub fn abort_handle(&self) -> AbortHandle


use tokio::{time, task};
let mut handles = Vec::new();
handles.push(tokio::spawn(async {
   time::sleep(time::Duration::from_secs(10)).await;
   true
}));
handles.push(tokio::spawn(async {
   time::sleep(time::Duration::from_secs(10)).await;
   false
}));

let abort_handles: Vec<task::AbortHandle> = handles.iter().map(
    |h| h.abort_handle()).collect();

for handle in abort_handles {
    handle.abort();
}
for handle in handles {
    assert!(handle.await.unwrap_err().is_cancelled());
}

JoinHandle 实现了 Future,它的 Output = Result<T, JoinError>, JoinError 类型用于判断该 task 是否被 cancelled/panic 等出错原因:

impl JoinError
pub fn is_cancelled(&self) -> bool
pub fn is_panic(&self) -> bool
pub fn into_panic(self) -> Box<dyn Any + Send + 'static>
pub fn try_into_panic(self) -> Result<Box<dyn Any + Send + 'static>, JoinError>
pub fn id(&self) -> Id

use std::panic;
#[tokio::main]
async fn main() {
    let err = tokio::spawn(async {
        panic!("boom");
    }).await.unwrap_err();
    assert!(err.is_panic());
}

spawn 的 task 可以通过 JoinHandle.abort() 方法来取消(Cancelled), 对应的 task 会在 下一次 yield 时(如 .await) 时被终止。这时 JoinHandle 的 .await 结果是 JoinErr,它的 is_cancelled() 为 true。abort task 并不代表 task 一定以 JoinErr 结束,因为有些任务可能在 yield 前正常结束,这时 .await 返回正常结果。

abort() 方法可能在 task 被终止前返回,可以使用 JoinHandle .await 来确保 task 被终止后返回。

对于 spawn_blocking() 创建的任务,由于不是 async,所以调用它返回的 JoinHandle.abort() 是无效的,task 会持续运行。

如果使用的不是 tokio crate 的 APIs,如标准库的 IO APIs,则可能会阻塞 tokio Runtime。对于可能会引起阻塞的任务,tokio 提供了在 async context 中运行同步函数的 task::spawn_blocking() 和 task::block_in_place() 函数。

task::spawn_blocking() 是在单独的 blocing thread pool 中运行同步任务(clouse 标识),从而避免阻塞运行 aysnc task 的线程。

// async context 中,在单独的线程中运行可能会阻塞 tokio 的代码
let join = task::spawn_blocking(|| {
    // do some compute-heavy work or call synchronous code
    "blocking completed"
});
let result = join.await?;
assert_eq!(result, "blocking completed");

如果使用的多线程 runtime,则 task::block_in_place() 也是可用的,它也是在 async context 中运行可能 blocking 当前线程的代码,但是它是将 Runtime 的 worker thread 转换为 blocking thread 来实现的,这样可以避免上下文切换来提升性能:

use tokio::task;

let result = task::block_in_place(|| {
    // do some compute-heavy work or call synchronous code
    "blocking completed"
});
assert_eq!(result, "blocking completed");

async fn task::yield_now() 类似于 std::thread::yield_now(), .await 该函数时会使当前 task yield to tokio Runtime 调度器,让其它 task 被调度执行。

use tokio::task;

async {
    task::spawn(async {
        // ...
        println!("spawned task done!")
    });

    // Yield, allowing the newly-spawned task to execute first.
    task::yield_now().await; // 必须 .await

    println!("main task done!");
}

协作式调度:tokio Runtime 没有使用 OS thread 的抢占式调度,而是使用协作式调度,可以避免一个 task 执行时长时间占有 CPU 而影响其他 task 的执行。这是通过在 tokio libray 中 强制插入一些 yield point ,从而强制实现 task 周期返回 executor,从而可以调度其他 task 运行。

task::unconstrained 可以对 task 规避 tokio 协作式调度(coop),使用它包裹的 Future task 不会被 forced to yield to Tokio:

use tokio::{task, sync::mpsc};

let fut = async {
    let (tx, mut rx) = mpsc::unbounded_channel();
    for i in 0..1000 {
        let _ = tx.send(());
        // This will always be ready. If coop was in effect, this code would be
        //  forced to yield periodically. However, if left unconstrained, then
        //  this code will never yield.
        rx.recv().await;
    }
};

task::unconstrained(fut).await;

3 JoinSet/AbortHandle
#

JoinSet: spawn 一批 task,等待一些或全部执行完成,按照完成的顺序返回。

  • 所有任务的返回类型 T 必须相同;
  • 如果 JoinSet 被 Drop,则其中的所有 task 立即被 aborted;

对比:标准库的 std::future::join!() 宏是等待所有 task 都完成,而不能单独等待某一个完成。

use tokio::task::JoinSet;

#[tokio::main]
async fn main() {
    let mut set = JoinSet::new();

    for i in 0..10 {
        set.spawn(async move { i });
    }

    let mut seen = [false; 10];
    // join_next() 返回下一个返回的 task 结果
    while let Some(res) = set.join_next().await {
        let idx = res.unwrap();
        seen[idx] = true;
    }

    for i in 0..10 {
        assert!(seen[i]);
    }
}

JoinSet 的方法:

impl<T> JoinSet<T>
pub fn new() -> Self
// 返回 JoinSet 中 task 数量
pub fn len(&self) -> usize
pub fn is_empty(&self) -> bool

impl<T: 'static> JoinSet<T>
// 使用 Builder 来构造一个 task:只能设置 name 然后再 spawn
pub fn build_task(&mut self) -> Builder<'_, T>

use tokio::task::JoinSet;
#[tokio::main]
async fn main() -> std::io::Result<()> {
    let mut set = JoinSet::new();
    // Use the builder to configure a task's name before spawning it.
    set.build_task()
        .name("my_task")
        .spawn(async { /* ... */ })?;
    Ok(())
}

// JoinSet 的 spawn_xx() 方法返回的是 AbortHandle,而非 JoinHandle。
// 在 JoinSet 中 spawn 一个 task,该 task 会立即运行
pub fn spawn<F>(&mut self, task: F) -> AbortHandle
    where F: Future<Output = T> + Send + 'static, T: Send
// 在指定的 Handler 对应的 Runtime 上 spawn task
pub fn spawn_on<F>(&mut self, task: F, handle: &Handle) -> AbortHandle
    where F: Future<Output = T> + Send + 'static, T: Send
pub fn spawn_local<F>(&mut self, task: F) -> AbortHandle
    where F: Future<Output = T> + 'static
pub fn spawn_local_on<F>( &mut self, task: F, local_set: &LocalSet ) -> AbortHandle
    where F: Future<Output = T> + 'static
pub fn spawn_blocking<F>(&mut self, f: F) -> AbortHandle
    where F: FnOnce() -> T + Send + 'static, T: Send
pub fn spawn_blocking_on<F>(&mut self, f: F, handle: &Handle) -> AbortHandle
    where F: FnOnce() -> T + Send + 'static, T: Send

use tokio::task::JoinSet;
#[tokio::main]
async fn main() {
    let mut set = JoinSet::new();
    for i in 0..10 {
        set.spawn_blocking(move || { i });
    }

    let mut seen = [false; 10];
    while let Some(res) = set.join_next().await {
        let idx = res.unwrap();
        seen[idx] = true;
    }
    for i in 0..10 {
        assert!(seen[i]);
    }
}

// 等待直到其中一个 task 完成,返回它的 output,注意是异步函数,需要 .await 才返回结果。
// 当 JoinSet 为空时,返回 None,可用于 while let 中。
pub async fn join_next(&mut self) -> Option<Result<T, JoinError>>
pub async fn join_next_with_id(&mut self) -> Option<Result<(Id, T), JoinError>>
pub fn try_join_next(&mut self) -> Option<Result<T, JoinError>>
pub fn try_join_next_with_id(&mut self) -> Option<Result<(Id, T), JoinError>>

// Aborts all tasks and waits for them to finish shutting down. Calling this
// method is equivalent to calling abort_all and then calling join_next in a
// loop until it returns None.
pub async fn shutdown(&mut self)

// Aborts all tasks on this JoinSet. This does not remove the tasks from the
// JoinSet. To wait for the tasks to complete cancellation, you should call
// join_next in a loop until the JoinSet is empty.
pub fn abort_all(&mut self)

// Removes all tasks from this JoinSet without aborting them. The tasks removed
// by this call will continue to run in the background even if the JoinSet
// is dropped.
pub fn detach_all(&mut self)

JoinSet 的各 spawn() 都返回 AbortHandle 对象,通过该对象可以 abort 对应的 spawned task,而不等待运行结束。不像 JoinHandle 那样 await task 结束,AbortHandle 只用于 terminate task 而不等待它结束。Drop AbortHandle 释放了终止 task 的权利,并不会终止任务。

impl AbortHandle

// 终止 task。JoinHandle 可能返回 JoinError 错误(也可能正常结束)
pub fn abort(&self)

// abort/canceld task 是否完成
pub fn is_finished(&self) -> bool

pub fn id(&self) -> Id

4 LocalSet/spawn_local()
#

LocalSet 是在同一个 thread 上运行一批异步 task,可以避免 tokio::spawn() 的 Future task 必须实现 Send 的要求(如在 async task 中使用 Rc):

  1. task::LocalSet::new() 创建一个 LocalSet;
  2. task::LocalSet::run_until() 运行一个 async Future task,该方法为 task 创建一个 LocalSet 上下文,所以可以在 task 中使用 tokio::task::spawn_local() 函数来提交任务。
    • 只能使用 spawn_local() 方法来提交任务,不能使用 task::spawn();
  3. 当 LocalSet 中所有 task 都结束时,.await 返回。

tokio::spawn_local() 向 LocalSet 提交的 !Send task 在 Runtime::block_on() 所在的单线程中调用。所以 LocalSet::run_until() 只能在 #[tokio::main]/#[tokio::test] 或者 Runtime::block_on() 中调用,不能在 tokio::spawn() 中调用。

use std::rc::Rc;
use tokio::task;

#[tokio::main]
async fn main() {
    let nonsend_data = Rc::new("my nonsend data...");
    let local = task::LocalSet::new();

    // run_util() 自动调用 local.enter(), Future 位于该 LocalSet context 中
    local.run_until(async move {
        let nonsend_data = nonsend_data.clone();
        // 在 run_until() 内部,可以多次使用 task::spawn_local() 提交任务,
        // 都在 block_on() 所在的单线程上执行。
        task::spawn_local(async move {
            println!("{}", nonsend_data);
            // ...
        }).await.unwrap();
    }).await; // .await 等待所有 spawn_local() 提交的 task 结束时返回
}

// 另一个例子
use tokio::{task, time};
use std::rc::Rc;

#[tokio::main]
async fn main() {
    let nonsend_data = Rc::new("world");
    let local = task::LocalSet::new();
    let nonsend_data2 = nonsend_data.clone();

    // 使用 LocalSet.spawn_local() 提交任务
    local.spawn_local(async move {
        // ...
        println!("hello {}", nonsend_data2)
    });

    local.spawn_local(async move {
        time::sleep(time::Duration::from_millis(100)).await;
        println!("goodbye {}", nonsend_data)
    });

    // ...

    local.await;
}

LocalSet 的方法:

impl LocalSet
pub fn new() -> LocalSet

// 进入 LocalSet 的 context,这样后续使用 tokio::task::spawn_local() 提交
// !Send task;
pub fn enter(&self) -> LocalEnterGuard

// 提交一个 !Send task, 返回可以 .await 的 JoinHandle
pub fn spawn_local<F>(&self, future: F) -> JoinHandle<F::Output>
    where F: Future + 'static, F::Output: 'static

// 在指定的 Runtime 中同步执行 future,直到返回。
// 内部调用 Runtime::block_on() 函数,所以需要在同步上下文中调用。
pub fn block_on<F>(&self, rt: &Runtime, future: F) -> F::Output
    where F: Future

// 执行一个 Future, 内部可以使用 tokio::task::spawn_local() 提交 !Send task,
// 运行直到这些 task 结束;
pub async fn run_until<F>(&self, future: F) -> F::Output where F: Future

pub fn unhandled_panic(&mut self, behavior: UnhandledPanic) -> &mut Self
pub fn id(&self) -> Id

5 tokio::mutex
#

Rust 在 .await 时有可能将当前 task 转移到其他 thread 运行, 所以需要 async task block 实现 Send.

由于标准库的 MutexGuard 没有实现 Send, 在持有 MutexGuard 的情况下不能跨 .await。

use std::sync::{Mutex, MutexGuard};

async fn increment_and_do_stuff(mutex: &Mutex<i32>) {
    let mut lock: MutexGuard<i32> = mutex.lock().unwrap();
    *lock += 1;
    do_something_async().await;
} // lock goes out of scope here

编译报错: error: future cannot be sent between threads safely

error: future cannot be sent between threads safely
   --> src/lib.rs:13:5
    |
13  |     tokio::spawn(async move {
    |     ^^^^^^^^^^^^ future created by async block is not `Send`
    |
   ::: /playground/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-0.2.21/src/task/spawn.rs:127:21
    |
127 |         T: Future + Send + 'static,
    |                     ---- required by this bound in `tokio::task::spawn::spawn`
    |
    = help: within `impl std::future::Future`, the trait `std::marker::Send` is not implemented for `std::sync::MutexGuard<'_, i32>`
note: future is not `Send` as this value is used across an await
   --> src/lib.rs:7:5
    |
4   |     let mut lock: MutexGuard<i32> = mutex.lock().unwrap();
    |         -------- has type `std::sync::MutexGuard<'_, i32>` which is not `Send`
...
7   |     do_something_async().await;
    |     ^^^^^^^^^^^^^^^^^^^^^^^^^^ await occurs here, with `mut lock` maybe used later
8   | }
    | - `mut lock` is later dropped here

解决办法: 将 MutexGuard 的 destructor 在 await 前运行:

// This works!
async fn increment_and_do_stuff(mutex: &Mutex<i32>) {
    {
        let mut lock: MutexGuard<i32> = mutex.lock().unwrap();
        *lock += 1;
    } // lock goes out of scope here

    do_something_async().await;
}

// This fails too.
// 这是由于 lock 变量在 await 时还有效。
async fn increment_and_do_stuff(mutex: &Mutex<i32>) {
    let mut lock: MutexGuard<i32> = mutex.lock().unwrap();
    *lock += 1;
    drop(lock);

    do_something_async().await;
}

或者使用 struct 来封装 Mutex, 然后在同步函数中处理锁:

use std::sync::Mutex;

struct CanIncrement {
    mutex: Mutex<i32>,
}

impl CanIncrement {
    // This function is not marked async.
    fn increment(&self) {
        let mut lock = self.mutex.lock().unwrap();
        *lock += 1;
    }
}

async fn increment_and_do_stuff(can_incr: &CanIncrement) {
    // 在异步函数中,调用加锁的同步函数。
    can_incr.increment();
    do_something_async().await;
}

或者使用 tokio 的异步 Mutext, tokio::sync::Mutex 支持跨 .await, 但是性能会差一些:

use tokio::sync::Mutex; // note! This uses the Tokio mutex

// This compiles!
// (but restructuring the code would be better in this case)
async fn increment_and_do_stuff(mutex: &Mutex<i32>) {
    let mut lock = mutex.lock().await;
    *lock += 1;

    do_something_async().await;
} // lock goes out of scope here

综上, 对于 Mutex 和带来的 Send 问题:

  1. 首选将 Mutex 封装到 Struct 和同步函数中;
  2. 或者将 Mutex 在 .await 前解构(必须是 block 解构, 而不是 drop());
  3. 或者 spawn 一个 task 来专门管理 state, 其他 task 使用 message 来对它进行操作.

异步任务中可以使用 std::sync::Arc/Mutex 同步原语来对共享内存进行并发访问。这里使用 std::sync::Mutex 而非 tokio::sync::Mutex, =只有当 Mutex 跨 await 时才需要使用异步 Mutex=。

Mutex lock 阻塞时会阻塞当前 thread, 会导致其他 async task 也不能调度到该 thread 执行。缺省情况下, tokio runtime 使用 multi-thread scheduler, task 被调度到那个 thread 是tokio runtime 决定的。

tokio 还提供 current_thread runtime flavor, 它是一个轻量级, 单线程的 runtime。

use bytes::Bytes;
use std::collections::HashMap;
use std::sync::{Arc, Mutex};

// 定义类型别名
type Db = Arc<Mutex<HashMap<String, Bytes>>>;

use tokio::net::TcpListener;
use std::collections::HashMap;
use std::sync::{Arc, Mutex};

#[tokio::main]
async fn main() {
    let listener = TcpListener::bind("127.0.0.1:6379").await.unwrap();
    println!("Listening");
    let db = Arc::new(Mutex::new(HashMap::new()));
    loop {
        let (socket, _) = listener.accept().await.unwrap();
        // Clone the handle to the hash map.
        let db = db.clone();
        println!("Accepted");
        tokio::spawn(async move {
            process(socket, db).await;
        });
    }
}

use tokio::net::TcpStream;
use mini_redis::{Connection, Frame};

async fn process(socket: TcpStream, db: Db) {
    use mini_redis::Command::{self, Get, Set};

    // Connection, provided by `mini-redis`, handles parsing frames from the socket
    let mut connection = Connection::new(socket);

    while let Some(frame) = connection.read_frame().await.unwrap() {
        let response = match Command::from_frame(frame).unwrap() {
            Set(cmd) => {
                // db 是 MutexGuard 类型, 生命周期没有跨 await,
                // 所以可以使用同步 Mutex;
                let mut db = db.lock().unwrap();
                db.insert(cmd.key().to_string(), cmd.value().clone());
                Frame::Simple("OK".to_string())
            }
            Get(cmd) => {
                let db = db.lock().unwrap();
                if let Some(value) = db.get(cmd.key()) {
                    Frame::Bulk(value.clone())
                } else {
                    Frame::Null
                }
            }
            cmd => panic!("unimplemented {:?}", cmd),
        };

        // Write the response to the client
        connection.write_frame(&response).await.unwrap();
    }
}

为了尽量降低 Mutex 竞争导致的 task/thread block, 可以:

  1. Switching to a dedicated task to manage state and use message passing.
  2. Shard the mutex
  3. Restructure the code to avoid the mutex .

第二种情况的例子: 将单一 Mutex<HashMap<_, __>> 实例, 拆解为 N 个实例:

  • 第三方 crate dashmap 提供了更复杂的 sharded hash map;
type ShardedDb = Arc<Vec<Mutex<HashMap<String, Vec<u8>>>>>;

fn new_sharded_db(num_shards: usize) -> ShardedDb {
    let mut db = Vec::with_capacity(num_shards);
    for _ in 0..num_shards {
        db.push(Mutex::new(HashMap::new()));
    }
    Arc::new(db)
}

// 后续查找 map 前先计算下 key 的 hash
let shard = db[hash(key) % db.len()].lock().unwrap();
shard.insert(key, value);

6 tokio::sync
#

tokio 提供了 4 种类型 channel:

  1. oneshot:发送和接收一个值;
  2. mpsc:多个发送方,一个接收方;
  3. broadcast:多个发送方,多个接收方;
  4. watch:只保证接收方收到最新值,不保证它们收到所有值;

std:sync::mpsc 和 crossbeam::channel 都是 同步 channel , 不能在 async func 使用, 否则可能 block 当前线程和 task.

tokio::sync:mpsc 是 multi-producer signle-consumer channel, 可以在 async 异步函数中使用, 每个 message 只能被一个 consumer 消费。

async_channel crate 提供了 multi-producer multi-consumer channel,

oneshot:The oneshot channel supports sending a single value from a single producer to a single consumer. This channel is usually used to send the result of a computation to a waiter. Example: using a oneshot channel to receive the result of a computation.

  1. oneshot::channel() 用于创建一对 Sender 和 Receiver;
  2. Sender 的 send() 方法是同步方法,故可以在同步或异步上下文中使用;
  3. Receiver .await 返回 Sender 发送的值,Sender 被 Drop 后,Receiver .await 返回 error::RecvError;
use tokio::sync::oneshot;

async fn some_computation() -> String {
    "represents the result of the computation".to_string()
}

#[tokio::main]
async fn main() {
    // 创建一对发送和接收 handler
    let (tx, rx) = oneshot::channel();

    tokio::spawn(async move {
        let res = some_computation().await;
        tx.send(res).unwrap(); // send() 是非异步的方法
    });

    // Do other work while the computation is happening in the background

    // Wait for the computation result
    let res = rx.await.unwrap();
}

如果 sender 在 send 前被 drop,则 receiver 失败,错误为 error::RecvError:

use tokio::sync::oneshot;

#[tokio::main]
async fn main() {
    let (tx, rx) = oneshot::channel::<u32>();

    tokio::spawn(async move {
        drop(tx);
    });

    match rx.await {
        Ok(_) => panic!("This doesn't happen"),
        Err(_) => println!("the sender dropped"),
    }
}

在 tokio::select! loop 中使用 oneshot channel 时,需要在 channel 前添加 &mut:

use tokio::sync::oneshot;
use tokio::time::{interval, sleep, Duration};

#[tokio::main]
async fn main() {
    // select!{} 使用 &mut recv,所以 recv 必须是 mut 类型。
    let (send, mut recv) = oneshot::channel();
    let mut interval = interval(Duration::from_millis(100));

    tokio::spawn(async move {
        sleep(Duration::from_secs(1)).await;
        send.send("shut down").unwrap();
    });

    loop {
        tokio::select! {
            _ = interval.tick() => println!("Another 100ms"),

            // select! 自动 .await recv 的返回值
            msg = &mut recv => {
                println!("Got message: {}", msg.unwrap());
                break;
            }
        }
    }
}

oneshot::Receiver 的 close() 关闭 Receiver,这时 Sender 的 send() 方法会失败。

use tokio::sync::oneshot;
use tokio::sync::oneshot::error::TryRecvError;

#[tokio::main]
async fn main() {
    let (tx, mut rx) = oneshot::channel();

    assert!(!tx.is_closed());

    rx.close();
    assert!(tx.is_closed());

    assert!(tx.send("never received").is_err());

    match rx.try_recv() {
        Err(TryRecvError::Closed) => {}
        _ => unreachable!(),
    }
}

oneshot::Sender 的 closed() 方法可以等待 oneshot::Receiver 被 closed 或被 Drop 时返回。Sender 的 is_closed() 方法用于获取 Receiver 是否被 closed 或 Drop,这时 Sender 的 send() 方法会失败:

use tokio::sync::oneshot;

#[tokio::main]
async fn main() {
    let (mut tx, rx) = oneshot::channel::<()>();

    tokio::spawn(async move {
        drop(rx);
    });

    // 等待 rx 被 drop 时返回
    tx.closed().await;

    println!("the receiver dropped");
}

// 使用 select!
use tokio::sync::oneshot;
use tokio::time::{self, Duration};

async fn compute() -> String {
    // Complex computation returning a `String`
}

#[tokio::main]
async fn main() {
    let (mut tx, rx) = oneshot::channel();

    tokio::spawn(async move {
        tokio::select! {
            _ = tx.closed() => {
                // The receiver dropped, no need to do any further work
            }
            value = compute() => {
                // The send can fail if the channel was closed at the exact
                // same time as when compute() finished, so just ignore the
                // failure.
                let _ = tx.send(value);
            }
        }
    });

    // Wait for up to 10 seconds
    let _ = time::timeout(Duration::from_secs(10), rx).await;
}

mpsc 是标准库的 mpsc 的异步版本:

use tokio::io::{self, AsyncWriteExt};
use tokio::net::TcpStream;
use tokio::sync::mpsc;

#[tokio::main]
async fn main() -> io::Result<()> {
    let mut socket = TcpStream::connect("www.example.com:1234").await?;
    let (tx, mut rx) = mpsc::channel(100);

    for _ in 0..10 {
        // Each task needs its own `tx` handle. This is done by cloning the
        // original handle.
        let tx = tx.clone();

        tokio::spawn(async move {
            tx.send(&b"data to write"[..]).await.unwrap();
        });
    }

    // The `rx` half of the channel returns `None` once **all** `tx` clones
    // drop. To ensure `None` is returned, drop the handle owned by the current
    //  task. If this `tx` handle is not dropped,
    // there will always be a single outstanding `tx` handle.
    drop(tx);

    while let Some(res) = rx.recv().await {
        socket.write_all(res).await?;
    }

    Ok(())
}

mpsc 和 oneshot 联合使用,可以实现 req/resp 响应的对共享资源的处理:

use tokio::sync::{oneshot, mpsc};
use Command::Increment;

enum Command {
    Increment,
    // Other commands can be added here
}

#[tokio::main]
async fn main() {
    let (cmd_tx, mut cmd_rx) = mpsc::channel::<
        (Command, oneshot::Sender<u64>)>(100);

    // Spawn a task to manage the counter
    tokio::spawn(async move {
        let mut counter: u64 = 0;

        while let Some((cmd, response)) = cmd_rx.recv().await {
            match cmd {
                Increment => {
                    let prev = counter;
                    counter += 1;
                    response.send(prev).unwrap();
                }
            }
        }
    });

    let mut join_handles = vec![];

    // Spawn tasks that will send the increment command.
    for _ in 0..10 {
        let cmd_tx = cmd_tx.clone();

        join_handles.push(tokio::spawn(async move {
            let (resp_tx, resp_rx) = oneshot::channel();

            cmd_tx.send((Increment, resp_tx)).await.ok().unwrap();
            let res = resp_rx.await.unwrap();

            println!("previous value = {}", res);
        }));
    }

    // Wait for all tasks to complete
    for join_handle in join_handles.drain(..) {
        join_handle.await.unwrap();
    }
}

tokio mpsc message passing 机制:

  1. 一个 tokio spawn task 作为 manager 角色, 通过 buffered mpsc channel 接收 message, 然后根据 message 类型来操作有状态对象, 由于只有 manager 来串行操作该对象, 所以可以避免加锁。
  2. manager 通过 message 中的 response channel 来向发送者响应结果;
use bytes::Bytes;
use mini_redis::client;
use tokio::sync::{mpsc, oneshot};

/// Multiple different commands are multiplexed over a single channel.
#[derive(Debug)]
enum Command {
    Get {
        key: String,
        resp: Responder<Option<Bytes>>, // 发送响应的 oneshot 类型 channel
    },
    Set {
        key: String,
        val: Bytes,
        resp: Responder<()>,
    },
}

/// Provided by the requester and used by the manager task to send the command
/// response back to the requester.
type Responder<T> = oneshot::Sender<mini_redis::Result<T>>;

#[tokio::main]
async fn main() {
    // 创建一个 buffered 类型 channel, 当消费的慢时可以对发送端反压
    // ( send(...).await 将阻塞一段时间), 从而降低内存和并发度, 防止消耗过多
    // 系统资源。
    let (tx, mut rx) = mpsc::channel(32);
    let tx2 = tx.clone();

    let manager = tokio::spawn(async move {
        // Open a connection to the mini-redis address.
        let mut client = client::connect("127.0.0.1:6379").await.unwrap();

        while let Some(cmd) = rx.recv().await {
            match cmd {
                Command::Get { key, resp } => {
                    let res = client.get(&key).await;
                    // Ignore errors
                    // 使用 let _ = 来忽略 result 错误。
                    let _ = resp.send(res); // 通过 oneshot channel 发送响应
                }
                Command::Set { key, val, resp } => {
                    let res = client.set(&key, val).await;
                    // Ignore errors
                    let _ = resp.send(res);
                }
            }
        }
    });

    // Spawn two tasks, one setting a value and other querying for key
    // that was set.
    let t1 = tokio::spawn(async move {
        let (resp_tx, resp_rx) = oneshot::channel(); // 构建响应 channel
        let cmd = Command::Get {
            key: "foo".to_string(),
            resp: resp_tx,
        };

        // Send the GET request
        if tx.send(cmd).await.is_err() {
            eprintln!("connection task shutdown");
            return;
        }

        // Await the response
        let res = resp_rx.await;
        println!("GOT (Get) = {:?}", res);
    });

    let t2 = tokio::spawn(async move {
        let (resp_tx, resp_rx) = oneshot::channel();
        let cmd = Command::Set {
            key: "foo".to_string(),
            val: "bar".into(),
            resp: resp_tx,
        };

        // Send the SET request
        if tx2.send(cmd).await.is_err() {
            eprintln!("connection task shutdown");
            return;
        }

        // Await the response
        let res = resp_rx.await;
        println!("GOT (Set) = {:?}", res);
    });

    t1.await.unwrap();
    t2.await.unwrap();
    manager.await.unwrap();
}

broadcast channel: 从多个发送端向多个接收端发送多个值 ,可以实现 fan out 模式,如 pub/sub 或 chat 系统。

  • tokio::sync::broadcast::channel(N) 创建一个指定容量为 N 的 bounded,multi-producer,multi-consumer channel,当 channel 中元素数量达到 N 后,最老的元素将被清理,没有消费该元素的 Receiver 的 recv() 方法将返回 RecvError::Lagged 错误,然后该 Receiver 的读写位置将更新到 channel 中当前 最老的元素,下一次 recv() 将返回该元素。通过这种机制,Receiver 可以感知是否 Lagged 以及做相应的处理。
  • Sender 实现了 clone,可以 clone 多个实例,然后在多个 task 中使用。当所有Sender 都被 drop 时,channel 将处于 closed 状态,这时 Receiver 的 recv() 将返回 RecvError::Closed 错误。
  • Sender::subscribe() 创建新的 Receiver,它接收调用 subscribe() 创建它 发送的消息。当所有 Receiver 都被 drop 时,Sender::send() 方法返回 SendError;
  • Sender 发送的值会被 clone 后发送给所有 Receiver,直到它们 都收到 这个值后, 该值才会从 channel 中移除;
use tokio::sync::broadcast;

#[tokio::main]
async fn main() {
    let (tx, mut rx1) = broadcast::channel(16);

    // 创建一个新的接收端
    let mut rx2 = tx.subscribe();

    tokio::spawn(async move {
        assert_eq!(rx1.recv().await.unwrap(), 10);
        assert_eq!(rx1.recv().await.unwrap(), 20);
    });

    tokio::spawn(async move {
        assert_eq!(rx2.recv().await.unwrap(), 10);
        assert_eq!(rx2.recv().await.unwrap(), 20);
    });

    tx.send(10).unwrap();
    tx.send(20).unwrap();
}

// 感知 Lagged
use tokio::sync::broadcast;

#[tokio::main]
async fn main() {
    let (tx, mut rx) = broadcast::channel(2);

    tx.send(10).unwrap();
    tx.send(20).unwrap();
    // 由于 channel 容量为 2,所以发送 30 后,10 将被丢弃。
    tx.send(30).unwrap();

    // The receiver lagged behind
    // rx 没有收到被丢弃的 10,所以第一次 recv 报错。
    assert!(rx.recv().await.is_err());

    // 后续再 recv 时,开始接收最老的数据
    // At this point, we can abort or continue with lost messages
    assert_eq!(20, rx.recv().await.unwrap());
    assert_eq!(30, rx.recv().await.unwrap());
}

watch channel: 从多个发送端向多个接收端发送多个值 ,但是 channel 中只保存最新的一个值,所以如果接收端存在延迟,则不能保证它接收了所有的中间值。类似于容量为 1 的 broadcast channel。

使用场景:广播配置变更,应用状态变更,优雅关闭等。

  • watch::channel(initial_value): 创建一个 watch channel 时可以指定一个初始值;
  • Sender::subscribe() 创建一个新的 Receiver ,它只接收后面新发送的值;
  • Sender 实现了 Clone trait,可以 clone 多个实例来发送数据。同时 Sender 和 Receiver 都是 thread safe;
  • Sender::is_closed()/closed() 为 Sender 提供检查所有 Receiver 是否被 closed 或 Drop 的方法。
  • Sender::send() 必须在有 Receiver 的情况下才能发送成功,而 send_if_modified, send_modify, 或 send_replace 方法可以在没有 Receiver 的情况下发送成功;
  • send_modify(modify: FnOnce(&mut T)); 无条件的更新 T 值,然后通知所有接收者,可以在没有接收者的情况下使用。
  • send_if_modified<F>(&self, modify: F) -> bool 和 send_modify() 类似,但是 modify 闭包返回 bool 值为 true 时才通知所有接收者。
let sender = tokio::sync::watch::Sender::new(0u8);

// 没有 receiver,所以发送出错
assert!(sender.send(3).is_err());

// 使用 subscribe() 方法创建一个 receiver
let _rec = sender.subscribe();
// 发送成功
assert!(sender.send(4).is_ok());

Receiver 使用 Receiver::changed() 来接收 un-seen 值更新,如果没有 un-seen 值,则该方法会 sleep 直到有 un-seen 值或者 Sender 被 Drop,如果有 un-seen 值则该方法立即返回 Ok(()) 。

  • Receiver 使用 Receiver::borrow_and_update() 来获取最新值,并标记为 seen。
  • 如果只是获取最新值而不标记为 seen,则使用 Receiver::borrow()。borrow()没有将值标记为 seen,所以下一次调用 Receiver::changed() 时将立即返回 Ok(())。 borrow_and_update() 将值标记为 seen,所以下一次调用 Receiver::changed() 时将 sleep 直到有新的值;
  • Receiver 在 borrow 值时会将 channel 设置一个 read lock,所以当 borrow 时间较长时,可能会阻塞 Sender;
use tokio::sync::watch;
use tokio::time::{self, Duration, Instant};
use std::io;

#[derive(Debug, Clone, Eq, PartialEq)]
struct Config {
    timeout: Duration,
}

impl Config {
    async fn load_from_file() -> io::Result<Config> {
        // file loading and deserialization logic here
    }
}

async fn my_async_operation() {
    // Do something here
}

#[tokio::main]
async fn main() {
    let mut config = Config::load_from_file().await.unwrap();

    // 创建 watch channel 并提供初始值
    let (tx, rx) = watch::channel(config.clone());

    // Spawn a task to monitor the file.
    tokio::spawn(async move {
        loop {
            // Wait 10 seconds between checks
            time::sleep(Duration::from_secs(10)).await;

            // Load the configuration file
            let new_config = Config::load_from_file().await.unwrap();

            // If the configuration changed, send the new config value on the
            // watch channel.
            if new_config != config {
                tx.send(new_config.clone()).unwrap();
                config = new_config;
            }
        }
    });

    let mut handles = vec![];

    // Spawn tasks that runs the async operation for at most `timeout`.
    // If the timeout elapses, restart the operation.
    //
    // The task simultaneously watches the `Config` for changes. When the
    // timeout duration changes, the timeout is updated without restarting
    // the in-flight operation.

    for _ in 0..5 {
        // Clone a config watch handle for use in this task
        let mut rx = rx.clone();

        let handle = tokio::spawn(async move {
            // Start the initial operation and pin the future to the stack.
            //  Pinning to the stack is required to resume the operation across
            //  multiple calls to `select!`
            let op = my_async_operation();
            tokio::pin!(op);

            // Get the initial config value
            let mut conf = rx.borrow().clone();

            let mut op_start = Instant::now();
            let sleep = time::sleep_until(op_start + conf.timeout);
            tokio::pin!(sleep);

            loop {
                tokio::select! {
                    _ = &mut sleep => {
                        // The operation elapsed. Restart it
                        op.set(my_async_operation());

                        // Track the new start time
                        op_start = Instant::now();

                        // Restart the timeout
                        sleep.set(time::sleep_until(op_start + conf.timeout));
                    }
                    _ = rx.changed() => {
                        // 获得最新值,然后标记为 seen
                        conf = rx.borrow_and_update().clone();

                        // The configuration has been updated. Update the
                        // `sleep` using the new `timeout` value.
                        sleep.as_mut().reset(op_start + conf.timeout);
                    }
                    _ = &mut op => {
                        // The operation completed!
                        return
                    }
                }
            }
        });

        handles.push(handle);
    }

    for handle in handles.drain(..) {
        handle.await.unwrap();
    }
}

tokio::sync::Notify 用于通知一个或所有的 task wakup,它本身不携带任何数据:

  1. A Notify can be thought of as a Semaphore starting with 0 permits.The notified().await method waits for a permit to become available, and notify_one() sets a permit if there currently are no available permits.
  2. notified(&self) -> Notified<’_> : Wait for a notification. 返回的 Notified实现 Future,.await 时将被阻塞,直到收到通知;
  3. notify_one(): Notifies the first waiting task.
  4. notify_last(&self): Notifies the last waiting task.
  5. notify_waiters(&self): Notifies all waiting tasks.
use tokio::sync::Notify;
use std::sync::Arc;

#[tokio::main]
async fn main() {
    let notify = Arc::new(Notify::new());
    let notify2 = notify.clone();

    let handle = tokio::spawn(async move {
        notify2.notified().await; // 等待被 Notify
        println!("received notification");
    });

    println!("sending notification");
    notify.notify_one();

    // Wait for task to receive notification.
    handle.await.unwrap();
}


#[tokio::main]
async fn main() {
    let notify = Arc::new(Notify::new());
    let notify2 = notify.clone();

    let notified1 = notify.notified();
    let notified2 = notify.notified();

    let handle = tokio::spawn(async move {
        println!("sending notifications");
        notify2.notify_waiters();
    });

    notified1.await;
    notified2.await;
    println!("received notifications");
}

tokio::sync::Notify 可以作为替代 waker 的异步函数内部 thread 间同步的工具:

use tokio::sync::Notify;
use std::sync::Arc;
use std::time::{Duration, Instant};
use std::thread;

async fn delay(dur: Duration) {
    let when = Instant::now() + dur;
    let notify = Arc::new(Notify::new());
    let notify_clone = notify.clone();

    thread::spawn(move || {
        let now = Instant::now();

        if now < when {
            thread::sleep(when - now);
        }

        notify_clone.notify_one();
    });


    notify.notified().await;
}

7 tokio::io
#

tokio::io 没有定义和使用 Read/Write trait,而是定义和使用 AsyncRead/AsyncWrite/AsyncSeek trait ,同时为这两个 trait 定义了:

  1. 异步 Buf 版本:AsyncBufRead,AsyncBufReadExt;
  2. 其它扩展 trait:AsyncReadExt/AsyncWriteExt/AsyncSeekExt;

AsyncRead/AsyncWrite/AsyncBufRead trait 提供的 poll_XX() 开头的方法,不方便直接使用,而各种 Ext trait 则提供了更常用的 Read/Write/Lines 等方法。

从 AsyncRead 创建 tokio::io::BufReader 对象, 实现 AsyncRead/AsyncBufRead trait.

从 AsyncWrite 创建 Struct tokio::io::BufWriter 对象, 实现 AsyncWrite trait.

从同时实现了 AsyncRead/AsyncWrite 的对象创建 struct BufStream<RW> (例如 TCPStream 对象), 它实现了 AsyncBufRead 和 AsyncWrite.

AsyncReadExt trait 返回的对象都实现了 Future:

pub trait AsyncReadExt: AsyncRead {
    // Provided methods
    fn chain<R>(self, next: R) -> Chain<Self, R> where Self: Sized, R: AsyncRead { ... }
    fn read<'a>(&'a mut self, buf: &'a mut [u8]) -> Read<'a, Self> where Self: Unpin { ... }
    fn read_buf<'a, B>(&'a mut self, buf: &'a mut B) -> ReadBuf<'a, Self, B> where Self: Unpin, B: BufMut + ?Sized
    fn read_exact<'a>(&'a mut self, buf: &'a mut [u8]) -> ReadExact<'a, Self> where Self: Unpin { ... }
    fn read_u8(&mut self) -> ReadU8<&mut Self> where Self: Unpin { ... }

例如 read() 返回的 Read 对象实现了 Future, Poll ready 时返回 io::Result<usize>, 所以即使这些方法没有使用 async fn 形式,但由于返回的对象实现了 Future, 它们也可以被 .await 轮询:

// https://docs.rs/tokio/latest/src/tokio/io/util/read.rs.html#43
impl<R> Future for Read<'_, R> where R: AsyncRead + Unpin + ?Sized,
{
    type Output = io::Result<usize>;

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<usize>> {
        let me = self.project();
        let mut buf = ReadBuf::new(me.buf);
        ready!(Pin::new(me.reader).poll_read(cx, &mut buf))?;
        Poll::Ready(Ok(buf.filled().len()))
    }
}

// 示例
use tokio::fs::File;
// 需要显式导入 AsyncReadExt 和 AsyncWriteExt trait
use tokio::io::{self, AsyncReadExt, AsyncWriteExt};

#[tokio::main]
async fn main() -> io::Result<()> {
    let mut f = File::open("foo.txt").await?;
    let mut buffer = [0; 10];
    // read up to 10 bytes
    let n = f.read(&mut buffer[..]).await?;
    println!("The bytes: {:?}", &buffer[..n]);

    let mut buffer = Vec::new();
    // read the whole file
    f.read_to_end(&mut buffer).await?;
    Ok(())

    let mut file = File::create("foo.txt").await?;
    // Writes some prefix of the byte string, but not necessarily all of it.
    let n = file.write(b"some bytes").await?;
    println!("Wrote the first {} bytes of 'some bytes'.", n);
    Ok(())
}

tokio::io::split() 函数将传入的支持 AsyncRead + AsyncWrite 的 Stream 对象, 如 TCPStream, 拆分为 ReadHalf<T>, WriteHalf<T>, 前者实现 AsyncRead trait,后者实现 AsyncWrite trait。echo server client: 使用 tokio::io::split() 将实现 read/write 对象拆分为 read 和 write 两个对象。

pub fn split<T>(stream: T) -> (ReadHalf<T>, WriteHalf<T>)
where
    T: AsyncRead + AsyncWrite,

// https://tokio.rs/tokio/tutorial/io
use tokio::io::{self, AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpStream;

#[tokio::main]
async fn main() -> io::Result<()> {
    let socket = TcpStream::connect("127.0.0.1:6142").await?;
    let (mut rd, mut wr) = io::split(socket);

    // Write data in the background
    tokio::spawn(async move {
        wr.write_all(b"hello\r\n").await?;
        wr.write_all(b"world\r\n").await?;

        // Sometimes, the rust type inferencer needs a little help
        Ok::<_, io::Error>(())
    });

    let mut buf = vec![0; 128];
    loop {
        let n = rd.read(&mut buf).await?;
        if n == 0 {
            break;
        }
        println!("GOT {:?}", &buf[..n]);
    }
    Ok(())
}

echo server: 尽量避免 stack buffer, 因为跨 .await 的上下文变量会随者 task Future 对象一起被保存, 如果使用较大的 stack buffer 变量, 则自动生成的 task Future 对象就比较大, buffer size 一般是 page sized 对齐的, 这导致 task 的大小大概是 $page-size + a-few-bytes, 浪费内存。

use tokio::io::{self, AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpListener;

#[tokio::main]
async fn main() -> io::Result<()> {
    let listener = TcpListener::bind("127.0.0.1:6142").await?;
    loop {
        let (mut socket, _) = listener.accept().await?;
        tokio::spawn(async move {
            // 在堆中分配 buff 内存, 而不使用 stack 上的 array
            let mut buf = vec![0; 1024];

            loop {
                match socket.read(&mut buf).await {
                    Ok(0) => return,
                    Ok(n) => {
                        if socket.write_all(&buf[..n]).await.is_err() {
                            return;
                        }
                    }
                    Err(_) => {
                        return;
                    }
                }
            }
        });
    }
}

tokio::io::duplex() 函数创建一个使用内存作为缓冲的 DuplexStream 类型对象,它实现了 AsyncRead/AsyncWrite trait。DuplexStream 被 drop 时, 另一端 read 还可以继续读取内存中的数据, 读到 0 byte 表示 EOF,而另一端 write 时立即返回 Err(BrokenPipe) ;

pub fn duplex(max_buf_size: usize) -> (DuplexStream, DuplexStream)

// 示例
let (mut client, mut server) = tokio::io::duplex(64);

client.write_all(b"ping").await?;

let mut buf = [0u8; 4];
server.read_exact(&mut buf).await?;
assert_eq!(&buf, b"ping");

server.write_all(b"pong").await?;

client.read_exact(&mut buf).await?;
assert_eq!(&buf, b"pong");

8 select!{}
#

tokio::select!{} 宏用于同时 .await 多个 async task,当其中一个完成时返回, drop 其它分支的 task Future 对象。必须在 async context 中使用 select!, 如 async functions, closures, and blocks。

use tokio::sync::oneshot;

#[tokio::main]
async fn main() {
    let (tx1, rx1) = oneshot::channel();
    let (tx2, rx2) = oneshot::channel();

    tokio::spawn(async {
        let _ = tx1.send("one");
    });

    tokio::spawn(async {
        let _ = tx2.send("two");
    });

    // select 并发 poll rx1 和 rx2, 如果 rx1 先 Ready 则结果赋值给 val,
    // 同时 drop rx2.
    tokio::select! {
        val = rx1 => {
            println!("rx1 completed first with {:?}", val);
        }
        val = rx2 => {
            println!("rx2 completed first with {:?}", val);
        }
    }
}

tokio::sync::oneshot::Receiver 实现了 Drop, 被 Drop 时对应的 tokio::sync::oneshot::Sender 可以收到 closed 通知(sender 的 async closed() 方法返回)。

tokio::spawn(async {…}) 立即调度执行一个异步 task, 返回一个实现 Future 的 JoinHandle, Future 的关联类型是 Result<T, JoinError>, 所以通过 .await 它可以获得 task 的返回值 T。

use tokio::sync::oneshot;

async fn some_operation() -> String {
    // Compute value here
}

#[tokio::main]
async fn main() {
    let (mut tx1, rx1) = oneshot::channel();
    let (tx2, rx2) = oneshot::channel();

    tokio::spawn(async {
        // Select on the operation and the oneshot's `closed()` notification.
        tokio::select! {
            val = some_operation() => {
                let _ = tx1.send(val);
            }

            // tx1.closed() 是一个 async 方法, 故可以被 select,
            // 当对端 rx1 被 Drop 时返回.
            _ = tx1.closed() => {
                // `some_operation()` is canceled, the task completes and
                //  `tx1` is dropped.
            }
        }
    });

    tokio::spawn(async {
        let _ = tx2.send("two");
    });

    // rx2 先 .await 返回时, rx1 会被 Drop, 这时上面的 tx1.closed() .await 返回
    tokio::select! {
        val = rx1 => {
            println!("rx1 completed first with {:?}", val);
        }
        val = rx2 => {
            println!("rx2 completed first with {:?}", val);
        }
    }
}

select! 宏支持多个 branch(当前限制为 64 个branchs):

<pattern> = <async expression> (, if <precondition>)? => <handler>
else => <expression>

Rust 在 单线程异步执行 所有 branch 的 async expressions (一个 branch 的 async expression 不 Ready 时, 调度执行另一个 branch 的 async expression), 当第一个 async expressions 返回且它的 result 与 pattern 匹配时, Drop 所有其它 async expression, 然后执行 handler。如果 result 与 pattern 不匹配, 继续等待下一个异步返回并检查返回值是否匹配.

每个 branch 还可以有 if 表达式, 如果 if 表达式结果为 false, 则对应 async expression 还是会被执行, 但是返回的 Future 不会被 .await Poll. (例如在 loop 中多次 poll 一个 Pin 的 Future 对象, 一旦该对象 Ready, 后续就不能再 poll 了, 这时需要使用 if 表达式来排除该 branch).

每次进入执行 select! 时:

  1. 先执行个 branch 的 if 表达式, 结果为 false 时, disable 对应 branch;
  2. 执行所有 async expression(包括 disable 的 branch, 但是不 poll 它) , 注意是在单 thread 中异步执行这些expression;
  3. 并发 .await poll 未被 disable 的 expression 返回的 Future 对象;
  4. 当第一个 poll 返回时, 看是否匹配 pattern, 如果不匹配则 disable 该 branch, 继续等待其它 branch .await poll 返回, 重复 1-3; 如果匹配, 则执行该 branch 的 handler, 同时 drop 其它 branch 的 Future 对象.
  5. 如果所有 branch 都被 disable(如都不匹配 pattern), 则执行 else branch handler,如果没有提供 else 则 panic.

各分支的 aysnc expression 可以执行复杂的计算,只要结果是 Future 对象即可:

use tokio::net::TcpStream;
use tokio::sync::oneshot;

#[tokio::main]
async fn main() {
    let (tx, rx) = oneshot::channel();

    // Spawn a task that sends a message over the oneshot
    tokio::spawn(async move {
        tx.send("done").unwrap();
    });

    tokio::select! {
        socket = TcpStream::connect("localhost:3465") => {
            println!("Socket connected {:?}", socket);
        }
        msg = rx => {
            println!("received message first {:?}", msg);
        }
    }
}

// 另一个例子
use tokio::net::TcpListener;
use tokio::sync::oneshot;
use std::io;

#[tokio::main]
async fn main() -> io::Result<()> {
    let (tx, rx) = oneshot::channel();

    tokio::spawn(async move {
        tx.send(()).unwrap();
    });

    let mut listener = TcpListener::bind("localhost:3465").await?;

    // 并发执行两个 branch, 直到 rx 收到消息或者 accept 出错.
    tokio::select! {
        // 使用 async {} 来定义一个 async expression, 该 expression 返回 Result
        // select! 会 .await 这个 expression 和 rx, 当 rx 返回时该 expression
        // 才不会被继续 .await
        _ = async {
            loop {
                // listener 出错时才返回
                let (socket, _) = listener.accept().await?;
                tokio::spawn(async move { process(socket) });
            }

            // Help the rust type inferencer out
            Ok::<_, io::Error>(())
        } => {}

        _ = rx => {
            println!("terminating accept loop");
        }
    }

    Ok(())
}

async expression 返回值支持 pattern match, 并且可以有一个 else 分支, 当所有 pattern match 都不匹配时, 才执行 else 分支.

use tokio::sync::mpsc;

#[tokio::main]
async fn main() {
    let (mut tx1, mut rx1) = mpsc::channel(128);
    let (mut tx2, mut rx2) = mpsc::channel(128);

    tokio::spawn(async move {
        // Do something w/ `tx1` and `tx2`
    });

    // 这里 drop 的是 recv() 返回的 Future,而非 rx1 和 rx2 对象本身
    tokio::select! {
        Some(v) = rx1.recv() => {
            println!("Got {:?} from rx1", v);
        }
        Some(v) = rx2.recv() => {
            println!("Got {:?} from rx2", v);
        }
        else => {
            println!("Both channels closed");
        }
    }
}

select! 宏有返回值, 各 branch 必须返回相同类型值:

async fn computation1() -> String {
    // .. computation
}

async fn computation2() -> String {
    // .. computation
}

#[tokio::main]
async fn main() {
    let out = tokio::select! {
        res1 = computation1() => res1,
        res2 = computation2() => res2,
    };

    println!("Got = {}", out);
}

Errors:

  • 如果 async exprssion 中有 ? 则表达式返回 Result, 例如 accept().await? 出错时,res 为 Result::Err, 这时 handler 返回该 Error。
  • 如果是 handler 中有 ?, 则会立即将 error 传递到 select 表达式外, 如 res? 会将错误传递到 main() 函数;
use tokio::net::TcpListener;
use tokio::sync::oneshot;
use std::io;

#[tokio::main]
async fn main() -> io::Result<()> {
    // [setup `rx` oneshot channel]
    let listener = TcpListener::bind("localhost:3465").await?;
    tokio::select! {
        res = async {
            loop {
                // await 结果为 Err 时,表达式返回,将 res 设置为 Err
                let (socket, _) = listener.accept().await?;
                tokio::spawn(async move { process(socket) });
            }

            // Help the rust type inferencer out
            Ok::<_, io::Error>(())
        } => {
            // handler 内 res 为 Err 时,? 表达式直接将 Err 返回给 select! 所在
            // 的函数。
            res?;
        }
        _ = rx => {
            println!("terminating accept loop");
        }
    }

    Ok(())
}

Borrowing: 对于 tokio::spawn(async {..}) 必须 move 捕获要使用的数据, 但是对于 select! 的多 branch async expression 则不需要, 只要遵守 borrow 的规则即可,比如同时访问 & 共享数据, 唯一访问 &mut 数据。这是因为 tokio 在一个单线程中异步并发 poll 所有表达式返回的 Future 对象。select! 必须异步并发执行所有 aysnc expression, 并且当第一个 expression 返回的结果匹配 pattern 时才 drop 其它正在执行的 async expression。

use tokio::io::AsyncWriteExt;
use tokio::net::TcpStream;
use std::io;
use std::net::SocketAddr;

async fn race(
    data: &[u8],
    addr1: SocketAddr,
    addr2: SocketAddr
) -> io::Result<()> {
    tokio::select! {
        // OK:两个 async expression 共享借用 data
        Ok(_) = async {
            let mut socket = TcpStream::connect(addr1).await?;
            socket.write_all(data).await?;
            Ok::<_, io::Error>(())
        } => {}

        Ok(_) = async {
            let mut socket = TcpStream::connect(addr2).await?;
            socket.write_all(data).await?;
            Ok::<_, io::Error>(())
        } => {}

        else => {}
    };

    Ok(())
}

#[tokio::main]
async fn main() {
    let (tx1, rx1) = oneshot::channel();
    let (tx2, rx2) = oneshot::channel();

    let mut out = String::new();

    tokio::spawn(async move {
        // Send values on `tx1` and `tx2`.
    });

    // 由于 select 保证只执行一个 handler, 所以多个 handler 中可以 &mut
    // 使用相同的数据。
    tokio::select! {
        _ = rx1 => {
            out.push_str("rx1 completed");
        }
        _ = rx2 => {
            out.push_str("rx2 completed");
        }
    }

    println!("{}", out);
}

Loop: 并发执行多个 async expression, 当它们都 recv() 返回时, select 随机选择一个 branch 来执行 handler, 未执行的 handler branch 的 message 也不会丢(称为 Cancellation safety).

use tokio::sync::mpsc;

#[tokio::main]
async fn main() {
    let (tx1, mut rx1) = mpsc::channel(128);
    let (tx2, mut rx2) = mpsc::channel(128);
    let (tx3, mut rx3) = mpsc::channel(128);

    loop {
	// 当 rx1.recv() 返回时, drop 的是 rx2.recv() 和 rx3.recv() 的 Future
	// 对象, 而不是 rx2 和 rx3, 所以下一次 loop 还可以继续调用.

	// 当 channel 被 close , .recv() 返回 None, 不匹配 Some, 所以其它 rx
	// 还是继续进行, 直到所有 rx 都被关闭.

	// 注意: std::sync::mpsc 的 Receiver.recv() 返回 Result, 而
	// tokio::sync::mpsc.recv() 返回 Option.

        let msg = tokio::select! {
            Some(msg) = rx1.recv() => msg,
            Some(msg) = rx2.recv() => msg,
            Some(msg) = rx3.recv() => msg,
            else => { break }
        };

        println!("Got {:?}", msg);
    }

    println!("All channels have been closed.");
}

Cancellation safety : 在 loop 中使用 select! 来从多个 branch source 接收 message 时, 需要确保接收调用被 cancel 时不会丢失 message.

以下方法是 cancellation safe 的:

  • tokio::sync::mpsc::Receiver::recv
  • tokio::sync::mpsc::UnboundedReceiver::recv
  • tokio::sync::broadcast::Receiver::recv
  • tokio::sync::watch::Receiver::changed
  • tokio::net::TcpListener::accept
  • tokio::net::UnixListener::accept
  • tokio::signal::unix::Signal::recv
  • tokio::io::AsyncReadExt::read on any AsyncRead
  • tokio::io::AsyncReadExt::read_buf on any AsyncRead
  • tokio::io::AsyncWriteExt::write on any AsyncWrite
  • tokio::io::AsyncWriteExt::write_buf on any AsyncWrite
  • tokio_stream::StreamExt::next on any Stream
  • futures::stream::StreamExt::next on any Stream

以下方法不是 cancellation safe 的, 有可能会导致丢失数据:

  • tokio::io::AsyncReadExt::read_exact
  • tokio::io::AsyncReadExt::read_to_end
  • tokio::io::AsyncReadExt::read_to_string
  • tokio::io::AsyncWriteExt::write_all

以下方法也不是 cancellation safe 的, 因为它们使用 queue 来保证公平, cancel 时会丢失 queue 中的数据:

  • tokio::sync::Mutex::lock
  • tokio::sync::RwLock::read
  • tokio::sync::RwLock::write
  • tokio::sync::Semaphore::acquire
  • tokio::sync::Notify::notified

如何判断方法是 cancellation safe?主要是看 .await 位置。因为当异步方法被 cancelled 时,总是在 .await 的位置被取消。当方法的 .await 位置被重启时,如果函数功能还正常,那么就是 cancellation safe 的。

Cancellation safety 定义的方式:如果一个 Future 还没有完成,在 drop 这个 Future 或重建它时,它的行为必须是 no-op 的。也就是当一个 Future 未 ready 时, drop 该Future 什么都不影响。 这也是 loop 中使用 select!的要求。没有这个要求的话,在 loop 中重新执行 select! 时,会重新进展。

pin!(future) 宏的实现方式等效于 std::pin::Pin::new(&mut future), 它存入 future 对象的 &mut 借用(所以 future 类型需要是 mut 类型),返回一个 Pin<Ptr> 类型的同名值 pinned,其中 Ptr 是 &mut impl Future<Output=xx> 类型:

  • pinned.as_ref() 方法返回 Pin<&<Ptr as Deref>::Target> 即 Pin<& impl Future<Output=xx>> 类型;
  • pinned.as_mut() 方法返回 Pin<&mut <Ptr as Deref>::Target> 即 Pin<&mut impl Future<Output=xx>> 类型;

在 loop+select!{} 宏中,要对 future 对象重复 poll,而 select!{} 在并发 .await 时,如果有返回值,则会 drop 其它 branch 的 Future 对象,所以在 select branch 中不能直接使用 future 对象,以避免它被 drop 后下一次 loop 失败 select 失败。

常见的解决办法是:

  1. 创建一个 future mut 对象;
  2. 使用 pin!(future) 来创建一个同名的,但类型是 Pin<&mut impl Future<Output=xx>> 的对象 future;
  3. 在 select branch 中使用 &mut future 来进行轮询。(这是因为 Future 的 poll() 方法的 self 签名是self: Pin<&mut Self>)
async fn action() {
    // Some asynchronous logic
}

#[tokio::main]
async fn main() {
    let (mut tx, mut rx) = tokio::sync::mpsc::channel(128);

    let operation = action(); // 返回一个 Future 对象, 没有对它 .await

    // 必须先 Pin 该 Future 对象,创建一个同名的类型为
    //  Pin<&mut impl Future<Output=i32> 类型对象 ;
    tokio::pin!(operation);

    loop {
        tokio::select! {
        // &mut operation 类型是已经 Pin 住类型的 &mut 借用,
        // 即 &mut Pin<&mut impl Future<Output=i32>
        // .await 表达式支持这种类型的表达式调用, 所以可以被 poll 轮询。
        //
        // 当该 branch 因不匹配或无返回值而被 drop/cancel 时,operation 任然有效,
        // 下一次 select!{} 可以接续使用 &mut operation
            _ = &mut operation => break,
            Some(v) = rx.recv() => {
                if v % 2 == 0 {
                    break;
                }
            }
        }
    }
}

Modifying a branch

  • async fn 一旦 completion 就不能再 resume 执行: 一般情况下, future.await? 会消耗 future 对象.
  • branch 可以使用 if 表达式先判断是否需要执行, 如: &mut operation, if !done => {xxx};
async fn action(input: Option<i32>) -> Option<String> {
    // If the input is `None`, return `None`.
    // This could also be written as `let i = input?;`
    let i = match input {
        Some(input) => input,
        None => return None,
    };
    // async logic here
}

#[tokio::main]
async fn main() {
    let (mut tx, mut rx) = tokio::sync::mpsc::channel(128);

    let mut done = false;
    let operation = action(None);
    tokio::pin!(operation);

    tokio::spawn(async move {
        let _ = tx.send(1).await;
        let _ = tx.send(3).await;
        let _ = tx.send(2).await;
    });

    loop {
        tokio::select! {
        // 先判断 done, 为 true 时才执行 async expression. 这里使用的是
        // &mut 引用, 所以 .await 并不会消耗对象, operation 在下一次
        //  loop 还存在.
            res = &mut operation, if !done => {
                done = true;
                if let Some(v) = res {
                    println!("GOT = {}", v);
                    return;
                }
            }

            Some(v) = rx.recv() => {
                if v % 2 == 0 {
                    // `.set` is a method on `Pin`.
                    operation.set(action(Some(v)));
                    done = false;
                }
            }
        }
    }
}

tokio::spawn 和 select! 的区别:

  1. 都可以并发执行都各异步任务,但是 spawn() 是 tokio runtime 调度执行,而且可能位于其它线程上,所以调度的 async task 必须满足 Future + Send + Sync + ‘static ,而 ‘static 意味着 task 中不能有共享借用(需要 move);
  2. select! 在单个 task 线程上执行所有 branch 的异步任务,所以它们不可能并发执行,而是串行的。(比如一个 async expression 未 Ready 时,执行另一个 branch 的 async expression).

9 tokio-stream
#

Stream 是 std::iter::Iterator 的异步版本, 返回一系列 value。当前,Stream 还不在 Rust 标准库中, 是 futures-core crate 定义的 Stream trait 类型。

// https://docs.rs/futures-core/0.3.30/futures_core/stream/trait.Stream.html
pub trait Stream {
    type Item;

    // Required method
    fn poll_next(
        self: Pin<&mut Self>,
        cx: &mut Context<'_>
    ) -> Poll<Option<Self::Item>>;

    // Provided method
    fn size_hint(&self) -> (usize, Option<usize>) { ... }
}

// poll_next() 方法返回值:
// + Ready<Some(Item)>: 返回下一个值;
// + Ready<None>: Stream 结束;
// + Pending: 值不 Ready;

tokio 在单独的 tokio-stream crate 中提供 Stream 支持,它通过 pub use futures_core::Stream; 来 re-export futures-core crate 定义的 Stream trait。

tokio_stream crate 提供了如下 module 函数:

  • empty: Creates a stream that yields nothing.
  • iter: Converts an Iterator into a Stream which is always ready to yield the next value.
  • once: Creates a stream that emits an element exactly once.
  • pending: Creates a stream that is never ready

tokio_stream::StreamExt(实际是 futures::StreamExt 的 pub use 导出) 是 futures_core::stream::Stream 子 trait, 提供了常用的额外 trait 方法, 包括各种 adapter 方法(如 map/filter 等), 以及 用于迭代的 next() 方法

  • 实现 Stream 的类型也自动实现了 StreamExt。
pub trait StreamExt: Stream {
    // next() 迭代返回下一个元素,返回的 Next 类型对象实现了 Future trait,
    // .await 时返回 Option
    fn next(&mut self) -> Next<'_, Self> where Self: Unpin

    fn try_next<T, E>(&mut self) -> TryNext<'_, Self>
       where Self: Stream<Item = Result<T, E>> + Unpin { ... }

    fn map<T, F>(self, f: F) -> Map<Self, F>
        where F: FnMut(Self::Item) -> T, Self: Sized
    fn map_while<T, F>(self, f: F) -> MapWhile<Self, F>
       where F: FnMut(Self::Item) -> Option<T>,
             Self: Sized { ... }
    fn then<F, Fut>(self, f: F) -> Then<Self, Fut, F>
       where F: FnMut(Self::Item) -> Fut,
             Fut: Future,
             Self: Sized { ... }
    fn merge<U>(self, other: U) -> Merge<Self, U>
       where U: Stream<Item = Self::Item>,
             Self: Sized { ... }
    fn filter<F>(self, f: F) -> Filter<Self, F>
       where F: FnMut(&Self::Item) -> bool,
             Self: Sized { ... }
    fn filter_map<T, F>(self, f: F) -> FilterMap<Self, F>
       where F: FnMut(Self::Item) -> Option<T>,
             Self: Sized { ... }
    fn fuse(self) -> Fuse<Self>
       where Self: Sized { ... }
    fn take(self, n: usize) -> Take<Self>
       where Self: Sized { ... }
    fn take_while<F>(self, f: F) -> TakeWhile<Self, F>
       where F: FnMut(&Self::Item) -> bool,
             Self: Sized { ... }
    fn skip(self, n: usize) -> Skip<Self>
       where Self: Sized { ... }
    fn skip_while<F>(self, f: F) -> SkipWhile<Self, F>
       where F: FnMut(&Self::Item) -> bool,
             Self: Sized { ... }
    fn all<F>(&mut self, f: F) -> AllFuture<'_, Self, F>
       where Self: Unpin,
             F: FnMut(Self::Item) -> bool { ... }
    fn any<F>(&mut self, f: F) -> AnyFuture<'_, Self, F>
       where Self: Unpin,
             F: FnMut(Self::Item) -> bool { ... }
    fn chain<U>(self, other: U) -> Chain<Self, U>
       where U: Stream<Item = Self::Item>,
             Self: Sized { ... }
    fn fold<B, F>(self, init: B, f: F) -> FoldFuture<Self, B, F>
       where Self: Sized,
             F: FnMut(B, Self::Item) -> B { ... }
    fn collect<T>(self) -> Collect<Self, T> where T: FromStream<Self::Item>, Self: Sized

    // Timeout 实现了 Future trait,.await 时返回
    // Result<<T as Future>::Output, Elapsed>,
    // 当 await 超时时返回 Elapsed Error。所以,可以作为一种通用的异步超时机制。
    fn timeout(self, duration: Duration) -> Timeout<Self> where Self: Sized
    fn timeout_repeating(self, interval: Interval) -> TimeoutRepeating<Self> where Self: Sized

    fn throttle(self, duration: Duration) -> Throttle<Self> where Self: Sized
    fn chunks_timeout( self, max_size: usize, duration: Duration ) -> ChunksTimeout<Self> where Self: Sized
    fn peekable(self) -> Peekable<Self> where Self: Sized
}

示例:

// StreamExt trait 为 Stream 提供了常用的 next() 方法。
use tokio_stream::{self as stream, StreamExt};

#[tokio::main]
async fn main() {
    // empty() 迭代式立即结束
    let mut none = stream::empty::<i32>();
    assert_eq!(None, none.next().await);

    // iter() 函数将一个 Iterator 转换为 Stream
    let mut stream = stream::iter(vec![17, 19]);
    assert_eq!(stream.next().await, Some(17));
    assert_eq!(stream.next().await, Some(19));
    assert_eq!(stream.next().await, None);

    // once() 函数返回只能迭代生成一个元素的 stream
    let mut one = stream::once(1);
    assert_eq!(Some(1), one.next().await);
    assert_eq!(None, one.next().await);

    // pending() 函数返回一个迭代式 pending 的 stream
    let mut never = stream::pending::<i32>();
    // This will never complete
    never.next().await;
    unreachable!();
}


// timeout() 示例
use tokio_stream::{self as stream, StreamExt};
use std::time::Duration;

let int_stream = int_stream.timeout(Duration::from_secs(1));
tokio::pin!(int_stream);

// When no items time out, we get the 3 elements in succession:
assert_eq!(int_stream.try_next().await, Ok(Some(1)));
assert_eq!(int_stream.try_next().await, Ok(Some(2)));
assert_eq!(int_stream.try_next().await, Ok(Some(3)));
assert_eq!(int_stream.try_next().await, Ok(None));

Stream 没有实现迭代器, 不支持 for-in 迭代, 只能在 StreamExt 的基础上使用 while-let 循环来迭代:

use tokio_stream::{self as stream, StreamExt};

#[tokio::main]
async fn main() {
    let mut stream = stream::iter(vec![0, 1, 2]);
    while let Some(value) = stream.next().await {
        println!("Got {}", value);
    }
}

module tokio_stream::wrappers 提供 转换为 Stream 的各种 wrappers 类型:

  • BroadcastStream A wrapper around tokio::sync::broadcast::Receiver that implements Stream.
  • CtrlBreakStreaml A wrapper around CtrlBreak that implements Stream.
  • CtrlCStream A wrapper around CtrlC that implements Stream.
  • IntervalStream A wrapper around Interval that implements Stream.
  • LinesStream A wrapper around tokio::io::Lines that implements Stream.
  • ReadDirStream A wrapper around tokio::fs::ReadDir that implements Stream.
  • ReceiverStream A wrapper around tokio::sync::mpsc::Receiver that implements Stream.
  • SignalStream A wrapper around Signal that implements Stream.
  • SplitStream A wrapper around tokio::io::Split that implements Stream.
  • TcpListenerStream A wrapper around TcpListener that implements Stream.
  • UnboundedReceiverStream A wrapper around tokio::sync::mpsc::UnboundedReceiver that implements Stream.
  • UnixListenerStream A wrapper around UnixListener that implements Stream.
  • WatchStream A wrapper around tokio::sync::watch::Receiver that implements Stream.
use tokio_stream::{StreamExt, wrappers::WatchStream};
use tokio::sync::watch;

let (tx, rx) = watch::channel("hello");
let mut rx = WatchStream::new(rx); // 从 Watch 创建一个 WatchStream
assert_eq!(rx.next().await, Some("hello"));

tx.send("goodbye").unwrap();
assert_eq!(rx.next().await, Some("goodbye"));

// 复杂的例子
use tokio_stream::StreamExt;
use mini_redis::client;

async fn publish() -> mini_redis::Result<()> {
    let mut client = client::connect("127.0.0.1:6379").await?;
    client.publish("numbers", "1".into()).await?;
    client.publish("numbers", "two".into()).await?;
    client.publish("numbers", "3".into()).await?;
    client.publish("numbers", "four".into()).await?;
    client.publish("numbers", "five".into()).await?;
    client.publish("numbers", "6".into()).await?;
    Ok(())
}

async fn subscribe() -> mini_redis::Result<()> {
    let client = client::connect("127.0.0.1:6379").await?;
    let subscriber = client.subscribe(vec!["numbers".to_string()]).await?;
    let messages = subscriber.into_stream(); // 返回一个 Stream 对象

    // next() 要求 message Stream 必须是 Pinned
    tokio::pin!(messages); // Pin 到 stack
    while let Some(msg) = messages.next().await {
        println!("got = {:?}", msg);
    }
    Ok(())
}

#[tokio::main]
async fn main() -> mini_redis::Result<()> {
    tokio::spawn(async {
        publish().await
    });
    subscribe().await?;
    println!("DONE");
    Ok(())
}

Adapters 从 Stream 生成新的 Stream, 如 map/take/filter:

let messages = subscriber
    .into_stream()
    .take(3);

let messages = subscriber
    .into_stream()
    .filter(|msg| match msg {
        Ok(msg) if msg.content.len() == 1 => true,
        _ => false,
    })
    .take(3);

let messages = subscriber
    .into_stream()
    .filter(|msg| match msg {
        Ok(msg) if msg.content.len() == 1 => true,
        _ => false,
    })
    .map(|msg| msg.unwrap().content)
    .take(3);

实现一个 Stream:

  • 能返回值时, poll_next() 返回 Poll::Ready, 否则返回 Poll::Pending;
    • Ready<Some<T>>: 迭代值,Ready<None> 迭代结束;
  • 一般在 poll_next() 内部需要使用 Future 和其他 Stream 来实现;
use tokio_stream::Stream;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::Duration;

struct Interval {
    rem: usize,
    delay: Delay,
}

impl Interval {
    fn new() -> Self {
        Self {
            rem: 3,
            delay: Delay { when: Instant::now() }
        }
    }
}

impl Stream for Interval {
    type Item = ();

    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<()>>
    {
        if self.rem == 0 {
            return Poll::Ready(None);
        }

        match Pin::new(&mut self.delay).poll(cx) {
            Poll::Ready(_) => {
                let when = self.delay.when + Duration::from_millis(10);
                self.delay = Delay { when };
                self.rem -= 1;
                Poll::Ready(Some(()))
            }
            Poll::Pending => Poll::Pending,
        }
    }
}

9.1 ReaderStream 和 StreamReader
#

tokio_util crate 提供了两个 Stream/Reader 相关的 struct 类型。

  1. ReaderStream:将一个 AsyncRead 转换为 byte chunks Stream。ReaderStream 实现了 Stream trait,迭代返回的元素类型为 Bytes,即一块连续的 u8 内存区域。
           // ReaderStream 类型关联函数
           impl<R: AsyncRead> ReaderStream<R>
    
           // Convert an AsyncRead into a Stream with item type
           // Result<Bytes, std::io::Error>.
           pub fn new(reader: R) -> Self
    
           // Convert an AsyncRead into a Stream with item type
           // Result<Bytes, std::io::Error>, with a
           // specific read buffer initial capacity.
           pub fn with_capacity(reader: R, capacity: usize) -> Self
    
           // ReaderStream 实现了 Stream trait,迭代返回的元素类型为 Bytes,
           // 即一块连续的 u8 内存区域
           impl<R: AsyncRead> Stream for ReaderStream<R>
               type Item = Result<Bytes, Error>
    
           // 示例
           use tokio_stream::StreamExt;
           use tokio_util::io::ReaderStream;
    
           let data = b"hello, world!";
           // &[u8] 实现了 AsyncRead trait
           let mut stream = ReaderStream::new(&data[..]);
    
           // Read all of the chunks into a vector.
           let mut stream_contents = Vec::new();
    
           // chunk 类型为 bytes::Bytes, 可以 Deref<[u8]> 来使用
           while let Some(chunk) = stream.next().await {
               stream_contents.extend_from_slice(&chunk?);
           }
    
           // Once the chunks are concatenated, we should have the original data.
           assert_eq!(stream_contents, data);
    
  1. StreamReader:将一个 byte chunks Stream(Stream 迭代返回的 Item 类型为 Result<bytes::Buf, E>)转换为 AsyncRead,它同时实现了 AsyncBufRead trait。
           impl<S, B, E> StreamReader<S, B>
           where
               S: Stream<Item = Result<B, E>>,
               B: Buf,
               E: Into<Error>,
    
           // new() 从 Stream 创建一个 StreamReader,Stream 需要每次迭代返回一个
           //  Result<bytes::Buf, E> 对象
           //
           // Bytes, BytesMut, &[u8], VecDeque<u8> 等(不含 Vec<u8>)均实现 bytes::Buf
           pub fn new(stream: S) -> Self
    
    
           // 示例
           use bytes::Bytes;
           use tokio::io::{AsyncReadExt, Result};
           use tokio_util::io::StreamReader;
    
           // Create a stream from an iterator.
           let stream = tokio_stream::iter(vec![
               // Bytes 实现了 Buf trait
               Result::Ok(Bytes::from_static(&[0, 1, 2, 3])),
               Result::Ok(Bytes::from_static(&[4, 5, 6, 7])),
               Result::Ok(Bytes::from_static(&[8, 9, 10, 11])),
           ]);
    
           // Convert it to an AsyncRead.
           // stream 迭代的 Item 要实现 bytes::Buf trait
           let mut read = StreamReader::new(stream);
    
           // Read five bytes from the stream.
           let mut buf = [0; 5];
           read.read_exact(&mut buf).await?;
           assert_eq!(buf, [0, 1, 2, 3, 4]);
    
           // Read the rest of the current chunk.
           assert_eq!(read.read(&mut buf).await?, 3);
           assert_eq!(&buf[..3], [5, 6, 7]);
    
           // Read the next chunk.
           assert_eq!(read.read(&mut buf).await?, 4);
           assert_eq!(&buf[..4], [8, 9, 10, 11]);
    
           // We have now reached the end.
           assert_eq!(read.read(&mut buf).await?, 0);
    

10 futures_util crate
#

futures_util crate 提供了 Sink 和 SinkExt trait 的定义:

  • Sink : A Sink is a value into which other values can be sent asynchronously;
  • SinkExt : An extension trait for Sinks that provides a variety of convenient combinator functions;

常用的是 SinkExt 提供的方法, 如 send()/send_all()。

Sink 和 SinkExt 在 Frame 中得到广泛应用。

pub trait Sink<Item> {
    type Error;

    // Required methods
    fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_> ) -> Poll<Result<(), Self::Error>>;
    fn start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error>;
    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_> ) -> Poll<Result<(), Self::Error>>;
    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_> ) -> Poll<Result<(), Self::Error>>;
}

pub trait SinkExt<Item>: Sink<Item> {
    // Provided methods
    fn with<U, Fut, F, E>(self, f: F) -> With<Self, Item, U, Fut, F> where F: FnMut(U) -> Fut, Fut: Future<Output = Result<Item, E>>, E: From<Self::Error>,             Self: Sized { ... }
    fn with_flat_map<U, St, F>(self, f: F) -> WithFlatMap<Self, Item, U, St, F> where F: FnMut(U) -> St, St: Stream<Item = Result<Item, Self::Error>>, Self: Sized { ... }
    fn sink_map_err<E, F>(self, f: F) -> SinkMapErr<Self, F> where F: FnOnce(Self::Error) -> E, Self: Sized { ... }
    fn sink_err_into<E>(self) -> SinkErrInto<Self, Item, E> where Self: Sized, Self::Error: Into<E> { ... }
    fn buffer(self, capacity: usize) -> Buffer<Self, Item> where Self: Sized { ... }
    fn close(&mut self) -> Close<'_, Self, Item> where Self: Unpin { ... }
    fn fanout<Si>(self, other: Si) -> Fanout<Self, Si> where Self: Sized, Item: Clone, Si: Sink<Item, Error = Self::Error> { ... }
    fn flush(&mut self) -> Flush<'_, Self, Item> where Self: Unpin { ... }
    fn send(&mut self, item: Item) -> Send<'_, Self, Item> where Self: Unpin { ... }
    fn feed(&mut self, item: Item) -> Feed<'_, Self, Item> where Self: Unpin { ... }
    fn send_all<'a, St>(&'a mut self, stream: &'a mut St ) -> SendAll<'a, Self, St> where St: TryStream<Ok = Item, Error = Self::Error> + Stream + Unpin + ?Sized,             Self: Unpin { ... }
    fn left_sink<Si2>(self) -> Either<Self, Si2> where Si2: Sink<Item, Error = Self::Error>, Self: Sized { ... }
    fn right_sink<Si1>(self) -> Either<Si1, Self> where Si1: Sink<Item, Error = Self::Error>, Self: Sized { ... }
    fn compat(self) -> CompatSink<Self, Item> where Self: Sized + Unpin { ... }
    fn poll_ready_unpin( &mut self, cx: &mut Context<'_> ) -> Poll<Result<(), Self::Error>> where Self: Unpin { ... }
    fn start_send_unpin(&mut self, item: Item) -> Result<(), Self::Error> where Self: Unpin { ... }
    fn poll_flush_unpin( &mut self, cx: &mut Context<'_> ) -> Poll<Result<(), Self::Error>> where Self: Unpin { ... }
    fn poll_close_unpin( &mut self, cx: &mut Context<'_> ) -> Poll<Result<(), Self::Error>> where Self: Unpin { ... }
}

11 tokio_util crate
#

tokio_util 提供了如下 module、类型和功能:

  1. tokio_util::codec module 提供了 Encoder、Decoder trait,以及实现它们的 FramedRead、FramedWrite、Framed、LinesCodec、BytesCodec、AnyDelimiterCodec struct 类型。
  2. tokio_util::io module 提供了 InspectReader、InspectWriter、ReaderStream、StreamReader 类型;
  3. tokio_util::net module 提供了 Listener trait 和 ListenerAcceptFut struct 类型;
  4. tokio_util::sync module 提供了 CancellationToken struct 类型;
  5. tokio_util::task::task_tracker module 提供了 TaskTracker、TaskTrackerToken 等类型;
  6. tokio_util::time::delay_queue module 提供了 DelayQueue 类型;
  7. tokio_util::udp module 提供了 UdpFramed 类型;

Module tokio_util::codec 提供了将 AsyncRead/AsyncWrite 转换为 Stream/Sink, 并且将 byte stream sequence 转换为 有意义的 chunks 即 frames 的能力:

struct FrameWrite
A Sink of frames encoded to an AsyncWrite. 当作 Sink 来发送数据并编码为 Frame;
struct FrameRead
A Stream of messages decoded from an AsyncRead .当作 Stream 来读取数据并解码为 Frame;
struct Framed
A unified Stream and Sink interface to an underlyingI/O object, using the Encoder and Decoder traits to encode and decode frames.

Stream 从 AsyncRead 转换而来, 它的 StreamExt trait 提供的 next() 方法一次返回一个 chunk (bytes::Bytes 类型) 。

Sink 从 AsyncWrite 转换而来, 它的 SinkExt trait 提供的 send()/send_all()/feed() 方法可以用于发送数据。

在创建 FrameWrite/FrameRead/Framed 时需要传入实现 Encoder 和 Decoder trait 对象, 用于从 Stream 中解码出 Frame 对象, 向 Sink 中写入编码的 Frame 对象.

tokio_util::codec 提供了如下实现 Encoder 和 Decoder trait 的类型:

  • AnyDelimiterCodec : A simple Decoder and Encoder implementation that splits up data into chunks based on any character in the given delimiter string.
  • BytesCodec : A simple Decoder and Encoder implementation that just ships bytes around.
  • LinesCodec : A simple Decoder and Encoder implementation that splits up data into lines.
// encoding
use futures::sink::SinkExt;
use tokio_util::codec::LinesCodec;
use tokio_util::codec::FramedWrite;

#[tokio::main]
async fn main() {
    let buffer = Vec::new();
    let messages = vec!["Hello", "World"];
    let encoder = LinesCodec::new();

    //  buffer 实现了AsyncWrite,故可以作为 FramedWrite::new() 参数,
    // 返回的 writer 实现了 Sink 和 SinkExt trait。
    let mut writer = FramedWrite::new(buffer, encoder);

    // 异步向 Sink 写数据,自动编码为 Frame
    writer.send(messages[0]).await.unwrap();
    writer.send(messages[1]).await.unwrap();

    let buffer = writer.get_ref();
    assert_eq!(buffer.as_slice(), "Hello\nWorld\n".as_bytes());
}

// decoding
use tokio_stream::StreamExt;
use tokio_util::codec::LinesCodec;
use tokio_util::codec::FramedRead;

#[tokio::main]
async fn main() {
    let message = "Hello\nWorld".as_bytes();
    let decoder = LinesCodec::new();

    // message 实现了 AsyncRead,故可以作为 FrameRead::new() 参数,
    // 返回的 reader 实现了 Stream 和 StreamExt。
    // 每次读取返回一个解码后的 frame。
    let mut reader = FramedRead::new(message, decoder);

    let frame1 = reader.next().await.unwrap().unwrap();
    let frame2 = reader.next().await.unwrap().unwrap();
    assert!(reader.next().await.is_none());

    assert_eq!(frame1, "Hello");
    assert_eq!(frame2, "World");
}

FrameReader 从 AsyncRead 中使用 decoder 解码出 Frame 的过程大概如下:

use tokio::io::AsyncReadExt;

// buf 是内部带读写指针的缓存,实现了 AsyncRead trait 和 bytes::BufMut trait.
let mut buf = bytes::BytesMut::new();
loop {
    // The read_buf call will append to buf rather than overwrite existing data.

    // read_buf 方法使用 &mut bytes::BufMut
    // 从 io_resource 中读取一段数据(长度未知)后写入 buf.
    let len = io_resource.read_buf(&mut buf).await?;
    if len == 0 {
        while let Some(frame) = decoder.decode_eof(&mut buf)? {
            yield frame;
        }
        break;
    }

    // 解码出 frame:如果 buf 中数据不足,返回 None,触发下一次 loop 读数据。
    while let Some(frame) = decoder.decode(&mut buf)? {
        yield frame;
    }
}

FrameWriter 向 Sink 中写入使用 encoder 编码的数据的大概过程如下:

use tokio::io::AsyncWriteExt;
use bytes::Buf;

const MAX: usize = 8192;
let mut buf = bytes::BytesMut::new(); // buf 是内部带读写指针的缓存
loop {
    tokio::select! {
        // 持续向 buf 写数据
        num_written = io_resource.write(&buf), if !buf.is_empty() => {
            buf.advance(num_written?);
        },

        // 从 buf 数据生成一个 frame
        frame = next_frame(), if buf.len() < MAX => {
            encoder.encode(frame, &mut buf)?;
        },

        _ = no_more_frames() => {
            io_resource.write_all(&buf).await?;
            io_resource.shutdown().await?;
            return Ok(());
        },
    }
}

也可以为自定义类型实现 Encoder 和 Decoder trait, 从而可以在 Frame 中使用:

  • decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error>
  • encode(&mut self, item: String, dst: &mut BytesMut) -> Result<(), Self::Error>

都是用 BytesMut 来读写 Frame 数据。

// 实现 Decoder trait, 从输入的 src: &mut BytesMut 中解析出 Item 类型对象
use tokio_util::codec::Decoder;
use bytes::{BytesMut, Buf};

struct MyStringDecoder {}
const MAX: usize = 8 * 1024 * 1024;

impl Decoder for MyStringDecoder {
    type Item = String;
    type Error = std::io::Error;

    fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
        if src.len() < 4 {
            // Not enough data to read length marker.
            return Ok(None);
        }

        // Read length marker.
        let mut length_bytes = [0u8; 4];
        length_bytes.copy_from_slice(&src[..4]);
        let length = u32::from_le_bytes(length_bytes) as usize;

        // Check that the length is not too large to avoid a denial of
        // service attack where the server runs out of memory.
        if length > MAX {
            return Err(std::io::Error::new(
                std::io::ErrorKind::InvalidData,
                format!("Frame of length {} is too large.", length)
            ));
        }
        const HEADER_SIZE: usize = 4;

        if src.len() < HEADER_SIZE + length {
            let missing_bytes = HEADER_SIZE + length - src.len();
            src.reserve(missing_bytes);
            return Ok(None);
        }
        // Use advance to modify src such that it no longer contains
        // this frame.
        let data = src[4..4 + length].to_vec();
        src.advance(4 + length);

        // Convert the data to a string, or fail if it is not valid utf-8.
        match String::from_utf8(data) {
            Ok(string) => Ok(Some(string)),
            Err(utf8_error) => {
                Err(std::io::Error::new(
                    std::io::ErrorKind::InvalidData,
                    utf8_error.utf8_error(),
                ))
            },
        }
    }
}

// 实现 Encoder trait
use tokio_util::codec::Encoder;
use bytes::BytesMut;

struct MyStringEncoder {}

const MAX: usize = 8 * 1024 * 1024;

impl Encoder<String> for MyStringEncoder {
    type Error = std::io::Error;

    fn encode(&mut self, item: String, dst: &mut BytesMut) -> Result<(), Self::Error> {
        // Don't send a string if it is longer than the other end will
        // accept.
        if item.len() > MAX {
            return Err(std::io::Error::new(
                std::io::ErrorKind::InvalidData,
                format!("Frame of length {} is too large.", item.len())
            ));
        }

        // Convert the length into a byte array.
        // The cast to u32 cannot overflow due to the length check above.
        let len_slice = u32::to_le_bytes(item.len() as u32);

        // Reserve space in the buffer.
        dst.reserve(4 + item.len());

        // Write the length and string to the buffer.
        dst.extend_from_slice(&len_slice);
        dst.extend_from_slice(item.as_bytes());
        Ok(())
    }
}

Module tokio_util::time 提供了 DelayQueue 类型:

  1. pub fn insert_at(&mut self, value: T, when: Instant) -> Key: 插入元素, 并指定过期的绝对时间;
  2. pub fn insert(&mut self, value: T, timeout: Duration) -> Key: 插入元素, 并指定超时时间;
  3. pub fn poll_expired(&mut self, cx: &mut Context<’_>) -> Poll<Option<Expired<T>>>: 返回下一个过期的元素的 key, 需要使用 futures::ready!() 来 poll 返回的 Poll 对象。(不能使用 .await, 因为 Poll 类型没有实现 Future),如果没有过期元素则返回 Pending。

通过 insert 返回的 Key, 后续可以查询/删除/重置对应的元素;

use tokio_util::time::DelayQueue;
use std::time::Duration;

let mut delay_queue = DelayQueue::new();
let key1 = delay_queue.insert("foo", Duration::from_secs(5));
let key2 = delay_queue.insert("bar", Duration::from_secs(10));
assert!(delay_queue.deadline(&key1) < delay_queue.deadline(&key2));

// Remove the entry
let item = delay_queue.remove(&key);
assert_eq!(*item.get_ref(), "foo");

// "foo" is scheduled to be returned in 5 seconds
delay_queue.reset_at(&key, Instant::now() + Duration::from_secs(10));
// "foo" is now scheduled to be returned in 10 seconds

// 另一个例子
use tokio_util::time::{DelayQueue, delay_queue};

use futures::ready; // 也可以使用 std::task::ready!() 宏
use std::collections::HashMap;
use std::task::{Context, Poll};
use std::time::Duration;

struct Cache {
    entries: HashMap<CacheKey, (Value, delay_queue::Key)>,
    expirations: DelayQueue<CacheKey>,
}

const TTL_SECS: u64 = 30;

impl Cache {
    fn insert(&mut self, key: CacheKey, value: Value) {
        let delay = self.expirations.insert(key.clone(), Duration::from_secs(TTL_SECS));
        self.entries.insert(key, (value, delay));
    }

    fn get(&self, key: &CacheKey) -> Option<&Value> {
        self.entries.get(key).map(|&(ref v, _)| v)
    }

    fn remove(&mut self, key: &CacheKey) {
        if let Some((_, cache_key)) = self.entries.remove(key) {
            self.expirations.remove(&cache_key);
        }
    }

    fn poll_purge(&mut self, cx: &mut Context<'_>) -> Poll<()> {
        // ready!() 宏返回 Poll::Ready 的值, 如果不 Ready 则一致阻塞。
        while let Some(entry) = ready!(self.expirations.poll_expired(cx)) {
            self.entries.remove(entry.get_ref());
        }
        Poll::Ready(())
    }
}

Trait tokio_util::time::FutureExt 为所有实现了 Future 的对象, 添加 timeout() 方法。对于 Stream,可以使用 StreamExt 提供的 timeout() 方法。

pub trait FutureExt: Future {
    // Provided method
    fn timeout(self, timeout: Duration) -> Timeout<Self>
       where Self: Sized { ... }
}

// 示例
use tokio::{sync::oneshot, time::Duration};
use tokio_util::time::FutureExt;

let (tx, rx) = oneshot::channel::<()>();
let res = rx.timeout(Duration::from_millis(10)).await;
assert!(res.is_err());

12 tokio_test unit testing
#

https://tokio.rs/tokio/topics/testing

#[tokio::test] 默认创建一个单线程的 current_thread runtime。

tokio::time 提供了 pause()/resume()/advance() 方法。

tokio::time::pause(): 将当前 Instant::now() 保存,后续调用 Instant::now() 时将返回保存的值。保存的值可以使用 advance() 修改。该函数只适合 current_thread runtime,也就是 #[tokio::test] 默认使用的 runtime。(这里的 Instant 是 tokio 提供的,标准库 Instant 不受影响。)

自动前进(auto-advance):当 time 被 paused,且当前 runtime 空闲时,clock 会被自动前进到下一个 pending timer。这意味着 Sleep 和其它 time 相关函数、方法被 await 时,会引起 runtime 前景时间。

#[tokio::test]
async fn paused_time() {
    tokio::time::pause();
    let start = std::time::Instant::now();
    tokio::time::sleep(Duration::from_millis(500)).await;
    println!("{:?}ms", start.elapsed().as_millis()); // 打印 0ms
}

可以使用属性来更简便的开启 time pause:

#[tokio::main(flavor = "current_thread", start_paused = true)]
async fn main() {
   println!("Hello world");
}

#[tokio::test(start_paused = true)]
async fn paused_time() {
    let start = std::time::Instant::now();
    tokio::time::sleep(Duration::from_millis(500)).await;
    println!("{:?}ms", start.elapsed().as_millis());
}

虽然开启了 time pause, 但是异步函数的执行顺序和时间关系还是正常保持的: 立即打印 4 次 “Tick!”

#[tokio::test(start_paused = true)]
async fn interval_with_paused_time() {
    let mut interval = interval(Duration::from_millis(300));
    let _ = timeout(Duration::from_secs(1), async move {
        loop {
            interval.tick().await;
            println!("Tick!");
        }
    })
    .await;
}

使用 tokio_test::io::Builder 来 Mock AsyncRead and AsyncWrite:

use tokio::io::{AsyncBufReadExt, AsyncRead, AsyncWrite, AsyncWriteExt, BufReader};

async fn handle_connection<Reader, Writer>(
    reader: Reader,
    mut writer: Writer,
) -> std::io::Result<()>
where
    Reader: AsyncRead + Unpin,
    Writer: AsyncWrite + Unpin,
{
    let mut line = String::new();
    let mut reader = BufReader::new(reader);

    loop {
        if let Ok(bytes_read) = reader.read_line(&mut line).await {
            if bytes_read == 0 {
                break Ok(());
            }
            writer
                .write_all(format!("Thanks for your message.\r\n").as_bytes())
                .await
                .unwrap();
        }
        line.clear();
    }
}

#[tokio::test]
async fn client_handler_replies_politely() {
    let reader = tokio_test::io::Builder::new()
        .read(b"Hi there\r\n")
        .read(b"How are you doing?\r\n")
        .build();

    let writer = tokio_test::io::Builder::new()
        .write(b"Thanks for your message.\r\n")
        .write(b"Thanks for your message.\r\n")
        .build();

    let _ = handle_connection(reader, writer).await;
}

13 tokio::net
#

提供了和 std 类似的 TCP/UDP/Unix 异步实现。

  1. TcpListener and TcpStream
  2. UdpSocket
  3. UnixListener and UnixStream
  4. UnixDatagram
  5. tokio::net::unix::pipe

提供了一个函数 lookup_host() 来进行域名解析:

use tokio::net;
use std::io;

#[tokio::main]
async fn main() -> io::Result<()> {
    for addr in net::lookup_host("localhost:3000").await? {
        println!("socket address is {}", addr);
    }

    Ok(())
}

TcpListener 提供了 bind/accept 方法:

  • bind() 关联函数返回 TcpListener 类型对象;
  • accept() 方法返回 TCPStream 和对端 SocketAddr;
  • from_std()/into_std() 方法可以在标准库的 TcpListener 间转换, 这样可以 使用标准库创建和设置 TcpListener, 然后创建 tokio TcpListener.
use std::error::Error;
use tokio::net::TcpListener;

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
    let std_listener = std::net::TcpListener::bind("127.0.0.1:0")?;
    std_listener.set_nonblocking(true)?;
    let listener = TcpListener::from_std(std_listener)?;
    Ok(())
}

TcpStream 实现了 AsyncRead/AsyncWrite, 通过 split() 方法可以分别返回读端和写端.

TcpSocket 是未转换为 TcpListener 和 TcpStream 的对象, 用于设置 TCP 相关参数.

  • pub fn new_v4() -> Result<TcpSocket>

  • pub fn new_v6() -> Result<TcpSocket>

  • pub fn bind(&self, addr: SocketAddr) -> Result<()>

  • pub fn bind_device(&self, interface: Option<&[u8]>) -> Result<()>

  • pub async fn connect(self, addr: SocketAddr) -> Result<TcpStream>

  • pub fn listen(self, backlog: u32) -> Result<TcpListener>

  • pub fn set_keepalive(&self, keepalive: bool) -> Result<()>

  • pub fn set_reuseaddr(&self, reuseaddr: bool) -> Result<()>

  • pub fn set_recv_buffer_size(&self, size: u32) -> Result<()>

use tokio::net::TcpSocket;

use std::io;

#[tokio::main]
async fn main() -> io::Result<()> {
    let addr = "127.0.0.1:8080".parse().unwrap();

    let socket = TcpSocket::new_v4()?;
    // On platforms with Berkeley-derived sockets, this allows to quickly
    // rebind a socket, without needing to wait for the OS to clean up the
    // previous one.
    //
    // On Windows, this allows rebinding sockets which are actively in use,
    // which allows “socket hijacking”, so we explicitly don't set it here.
    // https://docs.microsoft.com/en-us/windows/win32/winsock/using-so-reuseaddr-and-so-exclusiveaddruse
    socket.set_reuseaddr(true)?;
    socket.bind(addr)?;

    let listener = socket.listen(1024)?;
    Ok(())


    let addr = "127.0.0.1:8080".parse().unwrap();
    let socket = TcpSocket::new_v4()?;
    let stream = socket.connect(addr).await?;
    Ok(())
}

tokio 的 TcpStream::connect() 没有提供 timeout,但是可以使用 tokio::time::timeout 来实现:

const CONNECTION_TIME: u64 = 100;

// ...

let (socket, _response) = match tokio::time::timeout(
     Duration::from_secs(CONNECTION_TIME),
     tokio::net::TcpStream::connect("127.0.0.1:8080")
 )
 .await
 {
     Ok(ok) => ok,
     Err(e) => panic!(format!("timeout while connecting to server : {}", e)),
 }
 .expect("Error while connecting to server")

tokio 的 TcpStream 也没有提供其它 read/write timeout,但是可以标准库提供了,可以从标准库 TcpStream 创建 tokio TcpStream:

let std_stream = std::net::TcpStream::connect("127.0.0.1:8080")
    .expect("Couldn't connect to the server...");
std_stream.set_write_timeout(None).expect("set_write_timeout call failed");
std_stream.set_nonblocking(true)?;
let stream = tokio::net::TcpStream::from_std(std_stream)?;

如果要使用其它标准库或 tokio::net 没有提供的 socket 设置,可以使用 =socket2 crate=, 例如为 socket 设置更详细的 KeepAalive 参数,设置 tcp user timeout 等:

use std::time::Duration;

use socket2::{Socket, TcpKeepalive, Domain, Type};

let socket = Socket::new(Domain::IPV4, Type::STREAM, None)?;
let keepalive = TcpKeepalive::new()
    .with_time(Duration::from_secs(4));
    // Depending on the target operating system, we may also be able to
    // configure the keepalive probe interval and/or the number of
    // retries here as well.

socket.set_tcp_keepalive(&keepalive)?;

关于 TCP_USER_TIMEOUT 可以为连接建立(即 connect())和数据读写指定超时时间:

  1. https://codearcana.com/posts/2015/08/28/tcp-keepalive-is-a-lie.html
  2. https://blog.cloudflare.com/when-tcp-sockets-refuse-to-die/

对于 UDP,只有 UdpSocket 一种类型,它提供 bind/connet/send/recv/send_to/recv_from 等方法。

bind
监听指定的地址和端口。由于 UDP 是无连接的,使用 recv_from 来接收任意 client 发送的数据,使用 send_to可以向任意 target 发送数据。
connect
UDP 虽然是无连接的,但是也支持使用 connect() 方法,效果是为返回的 UdpSocket 指定了 target ip 和port,可以使用 send/recv 方法来只向该 target 发送和接收数据。
use tokio::net::UdpSocket;
use std::io;

#[tokio::main]
async fn main() -> io::Result<()> {
    let sock = UdpSocket::bind("0.0.0.0:8080").await?;
    // use `sock`
    Ok(())
}

对于 Unix Socket,提供两种类型:

  1. 面向连接的: UnixListener,UnixStream;
  2. 面向无连接的:UnixDatagram

Unix Socket 使用本地文件(一般是 .sock 结尾)寻址(named socket),也可以使用没有关联文件的 unnamed Unix Socket。

关闭 named socket 时,对应的 socket 文件并不会被自动删除,如果没有 unix socket bind 到该文件上,则 client 连接会失败。

  1. pub fn bind<P>(path: P) -> Result<UnixDatagram> where P: AsRef<Path> : 使用指定 socket 文件创建一个 named unix Datagram socket;
  2. pub fn pair() -> Result<(UnixDatagram, UnixDatagram)>:返回一对 unamed unix Datagram socket;
  3. pub fn unbound() -> Result<UnixDatagram>:创建一个没有绑定任何 addr 的 socket。
  4. pub fn connect<P: AsRef<Path>>(&self, path: P) -> Result<()>:Datagram 是无连接的,调用该方法后可以使用 send/recv 从指定的 path 收发消息。
use tokio::net::UnixDatagram;
use tempfile::tempdir;

// We use a temporary directory so that the socket files left by the bound
// sockets will get cleaned up.
let tmp = tempdir()?;

// Bind each socket to a filesystem path
let tx_path = tmp.path().join("tx");
let tx = UnixDatagram::bind(&tx_path)?;

let rx_path = tmp.path().join("rx");
let rx = UnixDatagram::bind(&rx_path)?;

let bytes = b"hello world";
tx.send_to(bytes, &rx_path).await?;

let mut buf = vec![0u8; 24];
let (size, addr) = rx.recv_from(&mut buf).await?;

let dgram = &buf[..size];
assert_eq!(dgram, bytes);
assert_eq!(addr.as_pathname().unwrap(), &tx_path);



// unnamed unix socket
use tokio::net::UnixDatagram;

// Create the pair of sockets
let (sock1, sock2) = UnixDatagram::pair()?;

// Since the sockets are paired, the paired send/recv
// functions can be used
let bytes = b"hello world";
sock1.send(bytes).await?;

let mut buff = vec![0u8; 24];
let size = sock2.recv(&mut buff).await?;

let dgram = &buff[..size];
assert_eq!(dgram, bytes);

UnixListener bind() 返回 UnixListener,它的 accept() 方法返回 UnixStream:

use tokio::net::UnixListener;

#[tokio::main]
async fn main() {
    let listener = UnixListener::bind("/path/to/the/socket").unwrap();
    loop {
        match listener.accept().await {
            Ok((stream, _addr)) => {
                println!("new client!");
            }
            Err(e) => { /* connection failed */ }
        }
    }
}

UnixStream 实现了 AsyncRead/AsyncWrite,它的 connect() 也返回一个 UnixStream:

use tokio::io::Interest;
use tokio::net::UnixStream;
use std::error::Error;
use std::io;

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
    let dir = tempfile::tempdir().unwrap();
    let bind_path = dir.path().join("bind_path");
    let stream = UnixStream::connect(bind_path).await?;

    loop {
        let ready = stream.ready(Interest::READABLE | Interest::WRITABLE).await?;

        if ready.is_readable() {
            let mut data = vec![0; 1024];
            // Try to read data, this may still fail with `WouldBlock`
            // if the readiness event is a false positive.
            match stream.try_read(&mut data) {
                Ok(n) => {
                    println!("read {} bytes", n);
                }
                Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
                    continue;
                }
                Err(e) => {
                    return Err(e.into());
                }
            }

        }

        if ready.is_writable() {
            // Try to write data, this may still fail with `WouldBlock`
            // if the readiness event is a false positive.
            match stream.try_write(b"hello world") {
                Ok(n) => {
                    println!("write {} bytes", n);
                }
                Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
                    continue;
                }
                Err(e) => {
                    return Err(e.into());
                }
            }
        }
    }
}

14 tokio::signal
#

tokio::signal module 提供了信号捕获和处理的能力。

pub async fn ctrl_c() -> Result<()>:当收到 C-c 发送的 SIGINT 信号时 .await 返回;

use tokio::signal;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    signal::ctrl_c().await?;
    println!("ctrl-c received!");
    Ok(())
}

对于其它信号,可以使用 SignalKind 和 signal 函数创建一个 Signal,然后调用它的 recv() 方法:


// Wait for SIGHUP on Unix
use tokio::signal::unix::{signal, SignalKind};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // An infinite stream of hangup signals.
    let mut stream = signal(SignalKind::hangup())?;

    // Print whenever a HUP signal is received
    loop {
        stream.recv().await;
        println!("got signal HUP");
    }
}

15 tokio::time
#

tokio::time module 提供了如下类型:

  • Sleep
  • Interval: 在固定时间间隔流式产生一个值(stream yielding a value)
  • Timeout:为一个 future 或 stream 封装一个执行的过期时间,过期后 future 或 stream 被取消,并返回一个错误。

上面的类型必须在 async Runtime 上下文中使用:

不能在 async Rutnime 中使用标准库的 sleep,它会阻塞当前线程执行其它异步任务。

// Wait 100ms and print “100 ms have elapsed”
use std::time::Duration;
use tokio::time::sleep;

#[tokio::main]
async fn main() {
    sleep(Duration::from_millis(100)).await;
    println!("100 ms have elapsed");
}

// Require that an operation takes no more than 1s.
use tokio::time::{timeout, Duration};
async fn long_future() {}
// 任意 Feature 都可以设置异步 timeout
let res = timeout(Duration::from_secs(1), long_future()).await;
if res.is_err() {
    println!("operation timed out");
}

// A simple example using interval to execute a task every two seconds.
use tokio::time;
async fn task_that_takes_a_second() {
    println!("hello");
    time::sleep(time::Duration::from_secs(1)).await
}

#[tokio::main]
async fn main() {
    let mut interval = time::interval(time::Duration::from_secs(2));
    for _i in 0..5 {
        interval.tick().await;
        task_that_takes_a_second().await;
    }
}

tokio::time 提供了 pause()/resume()/advance() 方法。

tokio::time::pause(): 将当前 Instant::now() 保存,后续调用 Instant::now() 时将返回保存的值。保存的值可以使用 advance() 修改。该函数只适合 current_thread runtime,也就是 #[tokio::test] 默认使用的 runtime。(这里的 Instant 是 tokio 提供的,标准库 Instant 不受影响。)

自动前进(auto-advance):当 time 被 paused,且当前 runtime 空闲时,clock 会被自动前进到下一个 pending timer。这意味着 Sleep 和其它 time 相关函数、方法被 await 时,会引起 runtime 前景时间。

#[tokio::main(flavor = "current_thread", start_paused = true)]
async fn main() {
   println!("Hello world");
}

16 tokio::process
#

use tokio::io::AsyncWriteExt;
use tokio::process::Command;

use std::process::Stdio;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let mut cmd = Command::new("sort");

    // Specifying that we want pipe both the output and the input.  Similarly to capturing the
    // output, by configuring the pipe to stdin it can now be used as an asynchronous writer.
    cmd.stdout(Stdio::piped());
    cmd.stdin(Stdio::piped());

    let mut child = cmd.spawn().expect("failed to spawn command");

    // These are the animals we want to sort
    let animals: &[&str] = &["dog", "bird", "frog", "cat", "fish"];

    let mut stdin = child
        .stdin
        .take()
        .expect("child did not have a handle to stdin");

    // Write our animals to the child process Note that the behavior of `sort` is to buffer _all
    // input_ before writing any output.  In the general sense, it is recommended to write to the
    // child in a separate task as awaiting its exit (or output) to avoid deadlocks (for example,
    // the child tries to write some output but gets stuck waiting on the parent to read from it,
    // meanwhile the parent is stuck waiting to write its input completely before reading the
    // output).
    stdin
        .write(animals.join("\n").as_bytes())
        .await
        .expect("could not write to stdin");

    // We drop the handle here which signals EOF to the child process.  This tells the child
    // process that it there is no more data on the pipe.
    drop(stdin);

    let op = child.wait_with_output().await?;

    // Results should come back in sorted order
    assert_eq!(op.stdout, "bird\ncat\ndog\nfish\nfrog\n".as_bytes());

    Ok(())
}

使用其它进程的输出作为输入:

use tokio::join;
use tokio::process::Command;
use std::process::Stdio;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let mut echo = Command::new("echo")
        .arg("hello world!")
        .stdout(Stdio::piped())
        .spawn()
        .expect("failed to spawn echo");

    let tr_stdin: Stdio = echo
        .stdout
        .take()
        .unwrap()
        .try_into()
        .expect("failed to convert to Stdio");

    let tr = Command::new("tr")
        .arg("a-z")
        .arg("A-Z")
        .stdin(tr_stdin)
        .stdout(Stdio::piped())
        .spawn()
        .expect("failed to spawn tr");

    let (echo_result, tr_output) = join!(echo.wait(), tr.wait_with_output());

    assert!(echo_result.unwrap().success());

    let tr_output = tr_output.expect("failed to await tr");
    assert!(tr_output.status.success());

    assert_eq!(tr_output.stdout, b"HELLO WORLD!\n");

    Ok(())
}
rust crate - 这篇文章属于一个选集。
§ 1: 本文

相关文章

anyhow
··1874 字
Rust Rust-Crate
anyhow crate 提供了自定义 Error 类型和 Result 类型,Error 类型自带 backtrace 和 context,支持用户友好的格式化信息输出。
bytes
··2922 字
Rust Rust-Crate
bytes 提供了高效的 zero-copy 连续内存区域的共享和读写能力。
chrono
··4023 字
Rust Rust-Crate
chrono 提供了丰富的 Date/Time 类型和相关操作。
hyper
··861 字
Rust Rust-Crate
hyper 是高性能的异步 HTTP 1/2 底层库。