跳过正文

tokio

··23918 字
Rust Rust-Crate
目录
rust crate - 这篇文章属于一个选集。
§ 1: 本文

1 features
#

# 使用 full feature
tokio = { version = "1", features = ["full"] }

# 或单独启用各个 feature
tokio = { version = "1", features = ["rt", "net"] }
  • full : Enables all features listed below except test-util and tracing.
  • rt : Enables tokio::spawn, the current-thread scheduler, and non-scheduler utilities.
  • rt-multi-thread : Enables the heavier, multi-threaded, work-stealing scheduler.
  • io-util : Enables the IO based Ext traits.
  • io-std : Enable Stdout, Stdin and Stderr types.
  • net : Enables tokio::net types such as TcpStream, UnixStream and UdpSocket, as well as (on Unix-like systems) AsyncFd and (on FreeBSD) PollAio.
  • time : Enables tokio::time types and allows the schedulers to enable the built in timer.
  • process : Enables tokio::process types.
  • macros : ~ Enables#[tokio::main] and #[tokio::test]~ macros.
  • sync : Enables all tokio::sync types.
  • signal : Enables all tokio::signal types.
  • fs : Enables tokio::fs types.
  • test-util : Enables testing based infrastructure for the Tokio runtime.
  • parking_lot : As a potential optimization, use the parking_lot crate’s synchronization primitives internally. Also, this dependency is necessary to construct some of our primitives in a const context. MSRV may increase according to the parking_lot release in use.

下面是启用 tokio_unstable feature 后才能使用的功能:

  • tracing: Enables tracing events.
  • task::Builder 注意不是 runtime::Builder
  • Some methods on task::JoinSet
  • runtime::RuntimeMetrics
  • runtime::Builder::unhandled_panic
  • task::Id
[build]
rustflags = ["--cfg", "tokio_unstable"]

2 macros
#

#[tokio::main]: 创建和设置一个 Runtime,对于复杂的 Runtime 参数可以使用 Builder;

  • 可用于任何 async fn,但一般是 async main fn,否则每次调用该 async fn 时都新建一个 Runtime 来运行;
  • 默认是 Multi-threaded runtime,为每个 CPU Core 创建一个 thread 的 worker thread pool 来调度执行 spawn() 产生的 Future, 支持 work-stealing strategy;
// Multi-threaded runtime
#[tokio::main(flavor = "multi_thread", worker_threads = 10)]
#[tokio::main(worker_threads = 2)]

// Current thread runtime
#[tokio::main(flavor = "current_thread")]
#[tokio::main(flavor = "current_thread", start_paused = true)]

// 示例
#[tokio::main]
async fn main() {
    println!("Hello world");
}

// 等效为
fn main() {
    tokio::runtime::Builder::new_multi_thread()
        .enable_all()
        .build()
        .unwrap()
        .block_on(async {
            println!("Hello world");
        })
}

join!() 并发执行传入的 async fn,直到它们 都完成 (标准库 std::future module 也提供该宏):

  1. join!() 必须在 async context 中运行,比如 async fn/block/closure;
  2. tokio 使用 单线程 来执行这些 async task,所以一个 task 可能 blocking 其它 task 的执行,如果要真正并发执行,需要使用 spawn();
  3. 如果 async task 都返回 Result,join!() 也是等待它们都完成。如果要在遇到第一个 Error 时停止执行,可以使用 try_join!()
async fn do_stuff_async() {}
async fn more_async_work() {}

#[tokio::main]
async fn main() {
    let (first, second) = tokio::join!(
        do_stuff_async(),
        more_async_work());
}

pin!() : 拥有传入的 async task 的 Future 对象,返回一个 Pin 类型对象 ,可以确保该 Future 对象的栈内存不发生移动:

  • select! 宏在执行某个 branch 时会 cancel/drop 其它 branch 的 Future 对象,为了能在 loop 中使用 select!, 需要传入 &mut future 而非 future 对象本身(防止被 drop)。
use tokio::{pin, select};
use tokio_stream::{self as stream, StreamExt};

async fn my_async_fn() {}

#[tokio::main]
async fn main() {
    let mut stream = stream::iter(vec![1, 2, 3, 4]); // 从可迭代对象创建一个 async stream。

    let future = my_async_fn();
    pin!(future);

    // select! 宏在执行某个 branch 时会 cancel/drop 其它 branch 的 Future 对象,为了能在 loop 中使
    // 用 select!, 需要传入 &mut future 而非 future 对象本身(防止被 drop)。
    loop {
        select! {
            _ = &mut future => {
                break;
            }

            Some(val) = stream.next() => { // drop 的是 next() 返回的 future 对象而非 stream
                println!("got value = {}", val);
            }
        }
    }
}

// 同时创建多一个 Future 的 Pin 类型
#[tokio::main]
async fn main() {
    pin! {
        let future1 = my_async_fn();
        let future2 = my_async_fn();
    }

    select! {
        _ = &mut future1 => {}
        _ = &mut future2 => {}
    }
}

select!() : 并发执行多个 async expression,然后并发 .await 返回的 Future 对象,对于第一个匹配 pattern 的 branch,执行对应的 handler,同时 drop 其它 branch 正在 await 的 Future 对象:

<pattern> = <async expression> (, if <precondition>)? => <handler>,
async fn do_stuff_async() { }
async fn more_async_work() {}

#[tokio::main]
async fn main() {
    tokio::select! {
        _ = do_stuff_async() => {
            println!("do_stuff_async() completed first")
        }

        _ = more_async_work() => {
            println!("more_async_work() completed first")
        }
    };
}

task_local!(): 对于传给 spawn() 的 async fn,必须实现 Send + Sync + ‘static, 它有可能被调度到不同的线程上运行,所以不能使用 thread_local 变量,而需使用 task local 变量。该宏生成一个 tokio::task::LocalKey 使用的 local key:

tokio::task_local! {
    pub static ONE: u32;
    static TWO: f32;
}

tokio::task_local! {
    static NUMBER: u32;
}

NUMBER.scope(1, async move {
    assert_eq!(NUMBER.get(), 1);
}).await;

NUMBER.scope(2, async move {
    assert_eq!(NUMBER.get(), 2);

    NUMBER.scope(3, async move {
        assert_eq!(NUMBER.get(), 3);
    }).await;
}).await;

3 runtime
#

tokio Runtime 提供 IO event loop 和基于 IO 的 task 调度,以及基于 timer 的调度。(例如调度使用 tokio::time::sleep() 的 async task);

一般不需要手动创建 Runtime,而是使用 #[tokio::main] 宏来标识 async fn main() 函数。如果要精细控制 Runtime 的参数,可以使用 tokio::runtime::Builder

  • 在 Runtime 上下文中,可以使用 tokio::spawn()/spawn_local() 来执行异步任务,使用 spwan_blocking() 执行同步任务;
#[tokio::main]
async fn main() {
    println!("Hello world");
}
// 等效为
fn main() {
    tokio::runtime::Builder::new_multi_thread()
        .enable_all() // 启用所有 resource driver(IO/timer)
        .build()
        .unwrap()
        .block_on(async {
            println!("Hello world");
        })
}

// 另一个例子,手动创建 Runtime
use tokio::net::TcpListener;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::runtime::Runtime;

fn main() -> Result<(), Box<dyn std::error::Error>> {
    // 创建一个 multi thread 和 enable all 的 Runtime
    let rt  = Runtime::new()?;
    rt.block_on(async {
        let listener = TcpListener::bind("127.0.0.1:8080").await?;
        loop {
            let (mut socket, _) = listener.accept().await?;
            tokio::spawn(async move {
                let mut buf = [0; 1024];
                loop {
                    let n = match socket.read(&mut buf).await {
                        // socket closed
                        Ok(n) if n == 0 => return,
                        Ok(n) => n,
                        Err(e) => {
                            println!("failed to read from socket; err = {:?}", e);
                            return;
                        }
                    };
                    if let Err(e) = socket.write_all(&buf[0..n]).await {
                        println!("failed to write to socket; err = {:?}", e);
                        return;
                    }
                }
            });
        }
    })
}

tokio Runtime 有两种类型(默认使用的是多线程版本):

  • Multi-Thread Scheduler : 使用每个 CPU 一个 thread 的 worker 线程池, 使用 work-stealing 机制来执行 spawn() 提交的 Future task;
  • Current-Thread Scheduler : 提供单线程执行器,所有异步任务都在该线程上执行;

这两个 Runtime 都有两个 queue:global queue 和 local queue:

  • 先从 local queue 获取 task,如果为空,再从 global queue 获取任务。或者从 local queue 获取 global_queue_interval (默认 31)个任务后,再从 global queue 获取任务;
  • 当没有任务可以调度,或者调度任务超过 event_interval 次后(默认 61)后,检查 IO/Timer event;
  • current-thread scheduler 默认启用 lifo slot optimazition,即:有新 task 被 wake 时,添加到 local queue;

对于 multi-thread scheduler,有一个固定 thread 数量的 thread pool,它在创建 Runtime 时创建。有一个 global queue 和每个 thread 一个的 local queue。local queue 初始容量是 256 个 tasks,超过的会被移动到 global queue。默认先从 local queue 获取 task,然后是 global queue,如果都为空,则从其它 thread 的 local queue steal tasks

tokio Runtime 除了管理 Scheduler 以外,还管理各种 resource driver,需要单独启用它们(目前就 io 和 time 两种类型)或一次全部启用:

  • Builder::enable_io() :包括 network/fs 等 IO;
  • Builder::enable_time :定时器调度;
  • Builder::enable_all() :启用所有 resrouce driver。

Runtime 在调度任务的间隙,周期检查这些 IO/Timer 是否 Ready,从而按需唤醒响应的 task 来执行。默认当没有任务可以调度,或者调度的任务数超过 61 时检查 IO/Timer,这个次数可以使用 event_interval 来设置。

创建 Runtime 时,默认为每一个 CPU 创建一个 thread,形成固定 thread 数量的 worker thread pool 。同时,tokio Runtime 还维护一个 blocking thread pool ,其中的 thread 在调用 spawn_blocking() 时临时创建,这个 pool 中的线程数量不固定,而且 idle 一段时间后会自动被 tokio Runtime 清理。

tokio Runtime 确保所有 task 都是 公平调度 ,防止个别 task 一直可以调度的情况下,其它 task 得不到运行。

  • MAX_TASKS 指定任意时刻 runtime 创建的最大数量 task;
  • MAX_SCHEDULE 指定 runtime 可以调度的最大数量;
  • MAX_DELAY 指定任务被唤醒后,调度器执行它的最大延迟;

MAX_TASKS 和 MAX_SCHEDULE 是可以配置的,MAX_DELAY 是运行时自动计算的。

运行时调度器不保证任务调度的顺序。

3.1 Runtime
#

Runtime 提供 IO driver、task 调度、计时器、线程池等运行异步任务所需的功能。

关闭 Runtime:

  1. drop runtime 对象;
  2. 调用 shutdown_background() and shutdown_timeout() 方法;

当 drop runtime 时默认 wait forver ,直到所有已经 spawned 的任务被 stopped:

  1. Tasks spawned through Runtime::spawn keep running until they yield. Then they are dropped. They are not guaranteed to run to completion, but might do so if they do not yield until completion.
    • task 只有 yield 时(例如下一次 .await 位置)才会被 dropped,否则就一直运行直到结束;
  2. Blocking functions spawned through Runtime::spawn_blocking keep running until they return.

调用 Runtime 的 shutdown_background() and shutdown_timeout() 方法,可以 避免这种 waiting forerver 的等待。当 timeout 时,如果 task 没有结束,则运行它的 thread 会被泄露(leak),task 会一直运行直到结束。

Runtime 方法:

new()
创建一个多线程的 Runtime,启用所有 resource driver, 自动创建一个 Handle 并调用它的 enter(), 从而可以使用 tokio::spwan(), 一般使用 #[tokio::main] 来自动调用。
blocl_on()
在 Runtime 中执行异步 task,只能在同步上下文中调用该方法,它是同步代码和异步代码的结合点,如果在异步上下文中执行则会 panic。
  • 在当前线程中执行 Future task,block 当前线程直到 task 返回。如果要并行执行 task,则需要在 task 内部调用spawn() 来提交 task;
spawn() 和 spawn_blocking()
返回的 JoinHandle 对象实现了 Future,可以 .await 获得结果;
enter()
设置 thread local Runtime,后续 tokio::spawn() 感知该 Runtime。一般用于手动创建 Runtime 的场景。
handler()
返回一个可以在该 Runtime 上 spawn task 的 Handle 对象,它基于引用计数实现了 Clone() 方法,可以将 Runtime 在多个 thread 间共享。
impl Runtime

// 创建一个多线程的调度器 Runtime,启用所有默认 resource driver,一般使用 #[tokio::main] 来自动调用。
pub fn new() -> Result<Runtime>

// 返回一个实现了引用计数的 Handle 对象,可以 spawn() 任务,将 Runtime 在多个 thread 间共享。
pub fn handle(&self) -> &Handle

// 在 worker thread pool 中立即运行 async task,而不管是否 await 返回的 JoinHandle
pub fn spawn<F>(&self, future: F) -> JoinHandle<F::Output>
where
    F: Future + Send + 'static, // 对线程 Runtime 要求传入的 feture 必须是 Send + 'static
    F::Output: Send + 'static

// 提交同步任务
pub fn spawn_blocking<F, R>(&self, func: F) -> JoinHandle<R>
where
    F: FnOnce() -> R + Send + 'static,
    R: Send + 'static

// 执行 future,直到结束。对于多线程 Runtime,当 block_on 返回时,spawn() 的 task 可能还在运行,如
// 果要确保这些task 完成,可以进行 .await.
pub fn block_on<F: Future>(&self, future: F) -> F::Output

// 设置 thread local Runtime,后续可以使用 tokio::spawn() 等函数(它们感知 EnterGuard)
pub fn enter(&self) -> EnterGuard<'_>

// 关闭 Runtime,不能再提交任务和 IO/Timer
pub fn shutdown_timeout(self, duration: Duration)
pub fn shutdown_background(self)

impl Runtime
// Available on tokio_unstable only.
pub fn metrics(&self) -> RuntimeMetrics

Runtime::enter() 或 Handle::enter() 创建 Runtime context,它会创建一个 thread local 变量来标识当前的 Runtime,可以使用 tokio::spawn() 等方法在该 Runtime context 中提交异步任务,一般用于手动创建 Runtime 的场景:

  • Runtime::new() 自动创建一个 Handle 并调用它的 enter();
use tokio::runtime::Runtime;
use tokio::task::JoinHandle;

fn function_that_spawns(msg: String) -> JoinHandle<()> {
    // tokio::spawn 感知所在线程的 EnterGuard 对象
    tokio::spawn(async move {
        println!("{}", msg);
    })
}

fn main() {
    // 手动创建 Runtime
    let rt = Runtime::new().unwrap();

    let s = "Hello World!".to_string();

    // By entering the context, we tie `tokio::spawn` to this executor.
    let _guard = rt.enter();
    let handle = function_that_spawns(s);

    rt.block_on(handle).unwrap();
}

runtime.shutdown_timeout() 和 shutdown_background() 关闭 Runtime,而不等待所有 spawn 的 task 结束:

  • Drop Runtime 时默认会等待所有 task 结束,则可能导致无限期等待;这两个方法不会无限期等待,task 可能还会在后台运行,故对应的 thread 可能会被泄露;
use tokio::runtime::Runtime;
use tokio::task;

use std::thread;
use std::time::Duration;

fn main() {
    let runtime = Runtime::new().unwrap();

    runtime.block_on(async move {
        task::spawn_blocking(move || {
            thread::sleep(Duration::from_secs(10_000));
        });
    });

    runtime.shutdown_timeout(Duration::from_millis(100));
}

Runtime 可以通过多方方式进行 Sharing:

  1. Using an Arc<Runtime>.
  2. Using a Handle.
  3. Entering the runtime context.

Arc<Runtime> 和 Handle(Runtime.handle() 方法返回) 都基于引用计数实现了 Clone() 方法,可以将 Runtime 在多个thread 间共享。

  • Handler::current() 返回当前 Runtime 的 Handle,可以 spawn() 并发异步任务;
use tokio::runtime::Handle;

#[tokio::main]
async fn main () {
    let handle = Handle::current();

    // 创建一个 std 线程
    std::thread::spawn(move || {
        // Using Handle::block_on to run async code in the new thread.
        handle.block_on(async {
            println!("hello");
        });
    });
}

3.2 Builder
#

Builder 用于创建自定义参数的 Runtime:

use tokio::runtime::Builder;

fn main() {
    let runtime = Builder::new_multi_thread()
        .worker_threads(4)
        .thread_name("my-custom-name")
        .thread_stack_size(3 * 1024 * 1024)
        .build()
        .unwrap();
    // use runtime ...
}

Builder 方法:

  • new_current_thread() :单线程调度器;
  • new_multi_thread() : 多线程调度器;
impl Builder
pub fn new_current_thread() -> Builder
pub fn new_multi_thread() -> Builder
pub fn new_multi_thread_alt() -> Builder

// 启用所有 resource driver,如 io/net/timer 等
pub fn enable_all(&mut self) -> &mut Self
// 默认为 CPU Core 数量,会覆盖环境变量 TOKIO_WORKER_THREADS 值。一次性创建。
pub fn worker_threads(&mut self, val: usize) -> &mut Self

// blocking 线程池中线程数量,按需创建,idle 超过一定时间后被回收
pub fn max_blocking_threads(&mut self, val: usize) -> &mut Self
// blocking 线程池中线程空闲时间,默认 10s
pub fn thread_keep_alive(&mut self, duration: Duration) -> &mut Self

pub fn thread_name(&mut self, val: impl Into<String>) -> &mut Self
pub fn thread_name_fn<F>(&mut self, f: F) -> &mut Self where F: Fn() -> String + Send + Sync + 'static
pub fn thread_stack_size(&mut self, val: usize) -> &mut Self

pub fn on_thread_start<F>(&mut self, f: F) -> &mut Self where F: Fn() + Send + Sync + 'static
pub fn on_thread_stop<F>(&mut self, f: F) -> &mut Self where F: Fn() + Send + Sync + 'static
pub fn on_thread_park<F>(&mut self, f: F) -> &mut Self where F: Fn() + Send + Sync + 'static
pub fn on_thread_unpark<F>(&mut self, f: F) -> &mut Self where F: Fn() + Send + Sync + 'static

// 构建生成 Runtime
pub fn build(&mut self) -> Result<Runtime>

// 检查 global queue event 的 scheduler ticks
pub fn global_queue_interval(&mut self, val: u32) -> &mut Self
// 检查 event 的 scheduler ticks
pub fn event_interval(&mut self, val: u32) -> &mut Self

// Available on tokio_unstable only.
pub fn unhandled_panic(&mut self, behavior: UnhandledPanic) -> &mut Self
// Available on tokio_unstable only.
pub fn disable_lifo_slot(&mut self) -> &mut Self
// Available on tokio_unstable only.
pub fn rng_seed(&mut self, seed: RngSeed) -> &mut Self
// Available on tokio_unstable only.
pub fn enable_metrics_poll_count_histogram(&mut self) -> &mut Self
pub fn metrics_poll_count_histogram_scale( &mut self, histogram_scale: HistogramScale ) -> &mut Self
pub fn metrics_poll_count_histogram_resolution( &mut self, resolution: Duration) -> &mut Self
pub fn metrics_poll_count_histogram_buckets( &mut self, buckets: usize ) -> &mut Self

impl Builder
// Available on crate feature net, or Unix and crate feature process, or Unix and crate feature
// signal only.
pub fn enable_io(&mut self) -> &mut Self
// Available on crate feature net, or Unix and crate feature process, or Unix and crate feature
// signal only.
pub fn max_io_events_per_tick(&mut self, capacity: usize) -> &mut Self

// Available on crate feature time only.
pub fn enable_time(&mut self) -> &mut Self

impl Builder
pub fn start_paused(&mut self, start_paused: bool) -> &mut Self

4 task
#

task 是轻量级,无阻塞的调度&执行单元,三大特点:

  1. 轻量级:用户级的 Tokio runtime 调度,而非操作系统内核调度;
  2. 协作式调度:OS thread 一般是抢占式调度;协作式调度 cooperatively 表示 task 会一直运行直到 yield (例如数据没有准备好,.await 位置会 yield,同时 tokio API lib 中默认强制插入了一些 yield point,确保即使 task 没有yield,底层的 lib 也会周期 yield),这时 Tokio Runtime 会调度其他 task 来运行;
  3. 非阻塞:需要使用 tokio crate 提供的非阻塞 IO 来进行 IO/Net/Fs 在 async context 下的 APIs 操作,这些 API 不会阻塞线程,而是 yield,这样 tokio Runtime 可以执行其它 task;

异步运行时将 async fn 或 async block 作为一个调度和执行的单元,称为异步任务 async task

  • block_on() : 使用当前线程来运行异步任务,是同步代码和异步代码的分界点;
  • task::spawn_local() : 在 block_on() 所在的线程空闲时运行异步任务;
  • task::spawn() : 在一个线程池中立即运行异步任务;
  • task::spawn_blocking() :立即创建一个线程来运行指定的 同步函数

同步代码和异步代码的结合点是 block_on(),block_on() 是一个 同步函数 ,所以不能在 async fn 中调用它。它在当前线程上 poll 传入的 Future 对象 ,直到 Ready 返回,如果是 Pending 且没有其他 aysnc task(如通过 task::spawn_local() 提交的任务)可以 poll 则 block_on() 会 sleep。

block_on() 在当前线程 poll 传入的 Future 对象,当它空闲时,为了能在该线程中同时 poll 其它 Future,可以使用spawn_local() 来提交任务:

  • 可以多次调用 spawn_local(),它将传入的 Future 添加到 block_on() 线程所处理的 task pool 中,当 block_on() 处理某个 task pending 时,会从该 pool 中获取下一个 Future task 进行 poll;
  • spawn_local() 返回 JoinHandle 类型,它实现了 Future trait,可以进行 .await 来获得最终值;
  • 如果一个 task 执行时间很长,会导致 block_on 没有机会执行其他 task,从而引起性能问题;

spawn_local() 要求传入的 async task Future 对象是 ‘static 的(因为是单线程执行,所以不要求实现 Send+Sync),这是由于该 Future 的 执行时机是不确定的 ,如果不 .await 它返回的 handle,则有可能 many_requests() 函数返回了,但是 Future 还没有被执行,从而导致引用失效。

解决办法:

  1. 传入一个 async 辅助函数,它拥有对应的参数值,这样该函数就实现了 ‘static;
  2. 使用 async move {} 来捕获对象,这样该 block 返回的 Future 也实现了 ‘static;
// async 辅助函数,拥有传入参数值的所有权
async fn cheapo_owning_request(host: String, port: u16, path: String) -> std::io::Result<String> {
    cheapo_request(&host, port, &path).await
}

for (host, port, path) in requests {
    handles.push(task::spawn_local(cheapo_owning_request(host, port, path)));
}

// 在单线程中并发发起多个 request
let requests = vec![
    ("example.com".to_string(), 80, "/".to_string()),
    ("www.red-bean.com".to_string(), 80, "/".to_string()),
    ("en.wikipedia.org".to_string(), 80, "/".to_string()),
];

// block_on() 是同步函数,返回传入的 async task 的返回值。
let results = async_std::task::block_on(many_requests(requests));
for result in results {
    match result {
        Ok(response) => println!("{}", response),
        Err(err) => eprintln!("error: {}", err),
    }
}

spawn() 是在一个专门的线程池(tokio 称为 worker thread 或 core thread)中并发执行 async task:

  • spawn() 和 spawn_local() 一样,也返回 JoinHandler,JoinHandler 实现了 Future,需要 .await 获得结果。但是它不依赖调用 block_on() 来 poll,而是执行候该函数后立即开始被 poll;
  • 由于传给 spawn() 的 Future 可能被线程池中任意线程执行,而且在暂停恢复后可能会在另一个线程中运行,所以不能使用线程本地存储变量(解决办法是使用 task_local! 宏),同时 Future 对象必须满足 Future + Send + 'static ,这样该 Future 对象才能安全的 move 到其他线程执行。(类似于 std::thread::spawn() 对闭包的要求);
use async_std::task;
let mut handles = vec![];
for (host, port, path) in requests {
    handles.push(task::spawn(
        async move { // block 捕获了上下问文对象所有权,所以返回的是 'static Future 对象
            cheapo_request(&host, port, &path).await
        }
    ));
}

use async_std::task;
use std::rc::Rc;
// 错误:reluctant() 返回的 Future 没有实现 Send。
async fn reluctant() -> String {
    let string = Rc::new("ref-counted string".to_string());
    some_asynchronous_thing().await; // Rc 跨 await
    format!("Your splendid string: {}", string)
}
task::spawn(reluctant());

// 解决办法:将不支持 Send 的变量隔离在单独的 await 所在的 block。
async fn reluctant() -> String {
    let return_value = {
        let string = Rc::new("ref-counted string".to_string());
        format!("Your splendid string: {}", string)
    };
    some_asynchronous_thing().await; // await 没有跨 Rc
    return_value
}

另一个常见的错误是 Box<dyn std::error::Error> 对象不支持 Send,比如 some_fallible_thing() 的返回值 Result 有效性是整个 match 表达式,所以跨越了 Ok 分支的 .await, 这时 Result 如果不是 Send 就报错:

type GenericError = Box<dyn std::error::Error>;
type GenericResult<T> = Result<T, GenericError>;

fn some_fallible_thing() -> GenericResult<i32> {
    //...
}

// 该函数的 Future 没有实现 Send
async fn unfortunate() {
    // ... 因为这个函数返回值的原因
    match some_fallible_thing() {
        Err(error) => {
            report_error(error);
        }
        Ok(output) => {
            // 返回值跨越了这个 .await
            use_output(output).await;
        }
    } }

// 报错
async_std::task::spawn(unfortunate());

解决办法是:重新定义 Error, 添加 Send Bound,‘static 是 Box<dyn Trait> 默认的 lifetime,可以不添加。

type GenericError = Box<dyn std::error::Error + Send + Sync + 'static>;
type GenericResult<T> = Result<T, GenericError>;

spawn_blocking() 是异步运行时用来解决可能会阻塞或长时间运行(CPU 消耗性)的任务问题,它的参数是一个 同步闭包函数 ,运行时会立即创建一个 新的线程来执行 (tokio 中称为 blocking thread,该线程也组成线程池,在被过期回收前可以被运行时重复使用),它返回一个 Future,可以被 .await 结果。

async fn verify_password(password: &str, hash: &str, key: &str) -> Result<bool, argonautica::Error>
{
    let password = password.to_string();
    let hash = hash.to_string();
    let key = key.to_string();

    async_std::task::spawn_blocking(
        // 参数是一个同步闭包函数,而非前面的 async task
        move || {
            argonautica::Verifier::default()
                .with_hash(hash)
                .with_password(password)
                .with_secret_key(key)
                .verify()
        }).await }

// 对于长时间运行的任务,可以主动 yield
while computation_not_done() {
    async_std::task::yield_now().await;
}

.await 表达式是 Rust 执行异步 poll 的时间点(yield point),默认情况下 task 可以被异步运行时调度到其它 thread 中执行,所以 async fn 中涉及跨 await 的对象,都需要是能 Send 的。

如果在 async 中使用跨 await 的 Mutex,则应该使用异步感知的 Mutex 而非标准库的 Mutext。异步感知指的是后续加锁如果失败,则 yield 当前任务,防止单线程死锁:

// https://tokio.rs/tokio/tutorial/shared-state
use std::sync::{Mutex, MutexGuard};

async fn increment_and_do_stuff(mutex: &Mutex<i32>) {
    let mut lock: MutexGuard<i32> = mutex.lock().unwrap();
    *lock += 1;
    do_something_async().await;
} // lock goes out of scope here

报错:std::sync::MutexGuard 没有实现 Send:

error: future cannot be sent between threads safely
   --> src/lib.rs:13:5
    |
13  |     tokio::spawn(async move {
    |     ^^^^^^^^^^^^ future created by async block is not `Send`
    |
   ::: /playground/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-0.2.21/src/task/spawn.rs:127:21
    |
127 |         T: Future + Send + 'static,
    |                     ---- required by this bound in `tokio::task::spawn::spawn`
    |
    = help: within `impl std::future::Future`, the trait `std::marker::Send` is not implemented for `std::sync::MutexGuard<'_, i32>`
note: future is not `Send` as this value is used across an await
   --> src/lib.rs:7:5
    |
4   |     let mut lock: MutexGuard<i32> = mutex.lock().unwrap();
    |         -------- has type `std::sync::MutexGuard<'_, i32>` which is not `Send`
...
7   |     do_something_async().await;
    |     ^^^^^^^^^^^^^^^^^^^^^^^^^^ await occurs here, with `mut lock` maybe used later
8   | }
    | - `mut lock` is later dropped here

解决办法:

async fn increment_and_do_stuff(mutex: &Mutex<i32>) {
    {
        let mut lock: MutexGuard<i32> = mutex.lock().unwrap();
        *lock += 1;
    } // lock goes out of scope here

    do_something_async().await;
}

或者使用异步库提供的 Mutex 类型:

use async_std::sync::Mutex;

pub struct Outbound(Mutex<TcpStream>);

impl Outbound {
    pub fn new(to_client: TcpStream) -> Outbound {
        Outbound(Mutex::new(to_client))
    }
    pub async fn send(&self, packet: FromServer) -> ChatResult<()> {
        let mut guard = self.0.lock().await;
        utils::send_as_json(&mut *guard, &packet).await?;
        guard.flush().await?;
        Ok(())
    } }

async task 可以使用 async fn 或 async move block 来构造,它们都是返回实现 Future trait 的语法糖。

  • task::spawn() 立即在后台运行异步任务,而不管是否 .await 它返回的 JoinHandle;
  • JoinHandle 实现了 Future trait,.await 它时返回 Result<T, JoinError>;
use tokio::task;

// 需要位于 Runtime Context,spawn 的 task 立即在后台执行,而不管是否 .await
let join = task::spawn(async {
    // ...
    "hello world!"
});

// ...

// Await the result of the spawned task.
let result = join.await?;
assert_eq!(result, "hello world!");

// 如果 task panic,则 .await 返回 JoinErr
use tokio::task;
let join = task::spawn(async {
    panic!("something bad happened!")
});
// The returned result indicates that the task failed.
assert!(join.await.is_err());

spawn() 所需的 async task 的签名是 Future + Send + 'static, 所以:

  1. 不能直接共享借用 stack 上的变量(但是可以共享借用全局 static/static mut 常量,它们具有 ‘static 声明周期);
  2. 使用 move 来将引用的外部对象的所有权转移到 task, 从而满足 ‘static 的要求;
  3. task 在多个 thread 间调度, 所以必须是可以 Send 的, 这意味着 task 内部的跨 await 的变量(因为在 await 处生成Future object, 它记录了上下文变量)必须实现 Send;
// 错误
use tokio::task;
#[tokio::main]
async fn main() {
    let v = vec![1, 2, 3];
    task::spawn(async {
        println!("Here's a vec: {:?}", v); // 错误
    });
}
// error[E0373]: async block may outlive the current function, but
//               it borrows `v`, which is owned by the current function
//  --> src/main.rs:7:23
//   |
// 7 |       task::spawn(async {
//   |  _______________________^
// 8 | |         println!("Here's a vec: {:?}", v);
//   | |                                        - `v` is borrowed here
// 9 | |     });
//   | |_____^ may outlive borrowed value `v`
//   |
// note: function requires argument type to outlive `'static`
//  --> src/main.rs:7:17
//   |
// 7 |       task::spawn(async {
//   |  _________________^
// 8 | |         println!("Here's a vector: {:?}", v);
// 9 | |     });
//   | |_____^
// help: to force the async block to take ownership of `v` (and any other
//       referenced variables), use the `move` keyword
//   |
// 7 |     task::spawn(async move {
// 8 |         println!("Here's a vec: {:?}", v);
// 9 |     });
//   |

// OK
use tokio::task::yield_now;
use std::rc::Rc;

#[tokio::main]
async fn main() {
    tokio::spawn(async {
        // The scope forces `rc` to drop before `.await`.
        {
            let rc = Rc::new("hello");
            println!("{}", rc);
        }

        // `rc` is no longer used. It is **not** persisted when the task yields to the scheduler
        yield_now().await; // yield_now() 返回 Future,必须 await 才会实际 yield
    });
}

// Error
use tokio::task::yield_now;
use std::rc::Rc;

#[tokio::main]
async fn main() {
    tokio::spawn(async {
        let rc = Rc::new("hello");
        // `rc` is used after `.await`. It must be persisted to the task's state.
        yield_now().await;
        println!("{}", rc);
    });
}

报错: 解决办法: 将 Rc 改成实现 Send 的 Arc:

error: future cannot be sent between threads safely
--> src/main.rs:6:5
	|
	6   |     tokio::spawn(async {
	|     ^^^^^^^^^^^^ future created by async block is not `Send`
	|
	::: [..]spawn.rs:127:21
	|
	127 |         T: Future + Send + 'static,
    |                     ---- required by this bound in
    |                          `tokio::task::spawn::spawn`
    |
    = help: within `impl std::future::Future`, the trait
    |       `std::marker::Send` is not  implemented for
    |       `std::rc::Rc<&str>`
note: future is not `Send` as this value is used across an await
   --> src/main.rs:10:9
    |
7   |         let rc = Rc::new("hello");
    |             -- has type `std::rc::Rc<&str>` which is not `Send`
...
10  |         yield_now().await;
    |         ^^^^^^^^^^^^^^^^^ await occurs here, with `rc` maybe
    |                           used later
11  |         println!("{}", rc);
12  |     });
    |     - `rc` is later dropped here

其它不需要 Future + Send + 'static 的特殊 task 任务类型:

  1. tokio::task::block_in_place(f) // f 同步闭包函数: FnOnce() -> R
  2. tokio::task::LocalSet::spawn_local(f) // f 是 Future + ‘static
pub fn block_in_place<F, R>(f: F) -> R  // F 是同步函数
where
    F: FnOnce() -> R


pub fn spawn_local<F>(&self, future: F) -> JoinHandle<F::Output>  // F 是不需要 Send 的异步函数
where
    F: Future + 'static,
    F::Output: 'static,

let local = task::LocalSet::new();
local.spawn_local(async {
    // ...
});

task::spawn() 和 task::spawn_blocking() 返回 JoinHandle 类型:

  • JoinHandle 实现了 Future,可以进行 .await 结果;
  • 当 JoinHandle 被 Drop 时,关联的 task 会被从 Runtime detach 并继续运行,但不能再 join 它。
  • 它的 abort_handle() 方法也可以返回该 task 的 AbortHandle;

impl<T> JoinHandle<T>
// Abort the task associated with the handle.
pub fn abort(&self)
// Checks if the task associated with this JoinHandle has finished.
pub fn is_finished(&self) -> bool
// Returns a new AbortHandle that can be used to remotely abort this task.
pub fn abort_handle(&self) -> AbortHandle

use tokio::{time, task};

let mut handles = Vec::new();
handles.push(tokio::spawn(async {
   time::sleep(time::Duration::from_secs(10)).await;
   true
}));
handles.push(tokio::spawn(async {
   time::sleep(time::Duration::from_secs(10)).await;
   false
}));

let abort_handles: Vec<task::AbortHandle> = handles.iter().map(|h| h.abort_handle()).collect();

for handle in abort_handles {
    handle.abort();
}
for handle in handles {
    assert!(handle.await.unwrap_err().is_cancelled());
}

JoinHandle 实现了 Future,它的 Output = Result<T, JoinError>, JoinError 类型可以判断 task 是否被 cancelled/panic 等出错原因:

impl JoinError
pub fn is_cancelled(&self) -> bool
pub fn is_panic(&self) -> bool
pub fn into_panic(self) -> Box<dyn Any + Send + 'static>
pub fn try_into_panic(self) -> Result<Box<dyn Any + Send + 'static>, JoinError>
pub fn id(&self) -> Id

use std::panic;
#[tokio::main]
async fn main() {
    let err = tokio::spawn(async {
        panic!("boom");
    }).await.unwrap_err();
    assert!(err.is_panic());
}

spawn 的 task 可以通过 JoinHandle.abort() 方法来取消(Cancelled), 对应的 task 会在 下一次 yield 时(如 .await) 时被终止。这时,JoinHandle 的 .await 结果是 JoinErr,它的 is_cancelled() 为 true:

  • abort task 并不代表 task 一定以 JoinErr 结束,因为有些任务可能在 yield 前正常结束,这时 .await 返回正常结果。

abort() 方法可能在 task 被终止前返回,可以使用 JoinHandle .await 来确保 task 被终止后返回。

对于 spawn_blocking() 创建的任务,由于不是 async,所以调用它返回的 JoinHandle.abort() 是无效的, task 会持续运行。

如果使用的不是 tokio crate 的 APIs,如标准库的 IO APIs,则可能会阻塞 tokio Runtime。对于可能会引起阻塞的任务,tokio 提供了在 async context 中运行的 task::spawn_blocking() 和 task::block_in_place() 函数。

task::spawn_blocking() 是在单独的 blocing thread pool 中运行同步任务(clouse 来标识),从而避免阻塞运行 aysnc task 的线程。

// async context 中,在单独的线程中运行可能会阻塞 tokio 的代码
let join = task::spawn_blocking(|| {
    // do some compute-heavy work or call synchronous code
    "blocking completed"
});
let result = join.await?;
assert_eq!(result, "blocking completed");

如果使用的多线程 runtime,则 task::block_in_place() 也是可用的,它也是在 async context 中运行可能 blocking 当前线程的代码,但是它是将 Runtime 的 worker thread 转换为 blocking thread 来实现的,这样可以避免上下文切换来提升性能:

use tokio::task;

let result = task::block_in_place(|| {
    // do some compute-heavy work or call synchronous code
    "blocking completed"
});
assert_eq!(result, "blocking completed");

async fn task::yield_now() 类似于 std::thread::yield_now(), .await 该函数时会使当前 task yield to tokio Runtime 调度器,让其它 task 被调度执行。

use tokio::task;

async {
    task::spawn(async {
        // ...
        println!("spawned task done!")
    });

    // Yield, allowing the newly-spawned task to execute first.
    task::yield_now().await; // 必须 .await
    println!("main task done!");
}

协作式调度:tokio Runtime 没有使用 OS thread 的抢占式调度,而是使用协作式调度,可以避免一个 task执行时长时间占有 CPU 而影响其他 task 的执行。这是通过在 tokio libray 中 强制插入一些 yield point ,从而强制实现 task 周期返回 executor,从而可以调度其他 task 运行。

task::unconstrained 可以对 task 规避 tokio 协作式调度,使用它包裹的 Future task 不会 forced to yield to Tokio:

use tokio::{task, sync::mpsc};

let fut = async {
    let (tx, mut rx) = mpsc::unbounded_channel();
    for i in 0..1000 {
        let _ = tx.send(());
        // This will always be ready. If coop was in effect, this code would be forced to yield
        // periodically. However, if left unconstrained, then this code will never yield.
        rx.recv().await;
    }
};

task::unconstrained(fut).await;

5 JoinSet/AbortHandle
#

JoinSet: 在 tokion runtime 上 spawn 一批 task,等待一些或全部执行完成,按照完成的顺序返回。

  • 所有任务的返回类型 T 必须相同;
  • 如果 JoinSet 被 Drop,则其中的所有 task 立即被 aborted;

对比,标准库的 std::future::join!() 宏是等待所有 task 都完成,而不能单独等待某一个完成。

use tokio::task::JoinSet;

#[tokio::main]
async fn main() {
    let mut set = JoinSet::new();

    for i in 0..10 {
        set.spawn(async move { i });
    }

    let mut seen = [false; 10];
    // join_next() 返回下一个返回的 task 结果
    while let Some(res) = set.join_next().await {
        let idx = res.unwrap();
        seen[idx] = true;
    }

    for i in 0..10 {
        assert!(seen[i]);
    }
}

JoinSet 的方法:

impl<T> JoinSet<T>
pub fn new() -> Self
// 返回 JoinSet 中 task 数量
pub fn len(&self) -> usize
pub fn is_empty(&self) -> bool

impl<T: 'static> JoinSet<T>
// 使用 Builder 来构造一个 task:只能设置 name 然后再 spawn
pub fn build_task(&mut self) -> Builder<'_, T>

use tokio::task::JoinSet;
#[tokio::main]
async fn main() -> std::io::Result<()> {
    let mut set = JoinSet::new();
    // Use the builder to configure a task's name before spawning it.
    set.build_task()
        .name("my_task")
        .spawn(async { /* ... */ })?;
    Ok(())
}

// 在 JoinSet 中 spawn 一个 task,该 task 会立即运行
pub fn spawn<F>(&mut self, task: F) -> AbortHandle where F: Future<Output = T> + Send + 'static, T: Send

// 在指定的 Handler 对应的 Runtime 上 spawn task
pub fn spawn_on<F>(&mut self, task: F, handle: &Handle) -> AbortHandle
where
    F: Future<Output = T> + Send + 'static,
    T: Send
pub fn spawn_local<F>(&mut self, task: F) -> AbortHandle where F: Future<Output = T> + 'static
pub fn spawn_local_on<F>( &mut self, task: F, local_set: &LocalSet ) -> AbortHandle where F: Future<Output = T> + 'static

pub fn spawn_blocking<F>(&mut self, f: F) -> AbortHandle where F: FnOnce() -> T + Send + 'static, T: Send
use tokio::task::JoinSet;
#[tokio::main]
async fn main() {
    let mut set = JoinSet::new();
    for i in 0..10 {
        set.spawn_blocking(move || { i });
    }
    let mut seen = [false; 10];
    while let Some(res) = set.join_next().await {
        let idx = res.unwrap();
        seen[idx] = true;
    }
    for i in 0..10 {
        assert!(seen[i]);
    }
}

pub fn spawn_blocking_on<F>(&mut self, f: F, handle: &Handle) -> AbortHandle
where
    F: FnOnce() -> T + Send + 'static,
    T: Send

// 等待直到其中一个 task 完成,返回它的 output,注意是异步函数。
// 当 JoinSet 为空时,返回 None,可用于 while let 中。
pub async fn join_next(&mut self) -> Option<Result<T, JoinError>>
pub async fn join_next_with_id(&mut self) -> Option<Result<(Id, T), JoinError>>
pub fn try_join_next(&mut self) -> Option<Result<T, JoinError>>
pub fn try_join_next_with_id(&mut self) -> Option<Result<(Id, T), JoinError>>

// Aborts all tasks and waits for them to finish shutting down. Calling this method is equivalent
// to calling abort_all and then calling join_next in a loop until it returns None.
pub async fn shutdown(&mut self)

// Aborts all tasks on this JoinSet. This does not remove the tasks from the JoinSet. To wait for
// the tasks to complete cancellation, you should call join_next in a loop until the JoinSet is
// empty.
pub fn abort_all(&mut self)

// Removes all tasks from this JoinSet without aborting them. The tasks removed by this call will
// continue to run in the background even if the JoinSet is dropped.
pub fn detach_all(&mut self)

JoinSet 的各 spawn() 都返回 AbortHandle 对象,通过该对象可以 abort 对应的 spawned task,而不等待运行结束。

  • 不像 JoinHandle 那样 await task 结束,AbortHandle 只用于 terminate task 而不等待它结束;
  • Drop AbortHandle 表示释放了终止 task 的权利,并不会终止任务。
impl AbortHandle
// 终止 task。JoinHandle 可能返回 JoinError 错误(也可能正常结束)
pub fn abort(&self)
// abort/canceld task 是否完成
pub fn is_finished(&self) -> bool

pub fn id(&self) -> Id

6 LocalSet/spawn_local()
#

在同一个 thread 上运行一批异步 task,可以避免 tokio::spawn() 的 Future task 必须实现 Send 的要求:

  1. 使用 task::LocalSet::new() 创建一个 LocalSet;
  2. 使用 LocalSet::run_until() 运行一个 async Future task,该方法为 task 创建一个 LocalSet 上下文,所以可以在 task 中使用 tokio::task::spawn_local() 函数来提交任务。
    • 只能使用 spawn_local() 方法来提交任务,不能使用 task::spawn() 函数;
  3. 当 LocalSet 中所有 task 都结束时,.await 返回。

tokio::spawn_local() 向 LocalSet 提交的 !Send task 在 Runtime::block_on() 所在的单线程中调用。所以 LocalSet::run_until() 只能在 #[tokio::main]/#[tokio::test] 或者 Runtime::block_on() 中调用,不能在 tokio::spawn() 中调用。

use std::rc::Rc;
use tokio::task;

#[tokio::main]
async fn main() {
    let nonsend_data = Rc::new("my nonsend data...");
    let local = task::LocalSet::new();

    local.run_until(async move { // 自动调用 local.enter(), Future 位于该 LocalSet context 中
        let nonsend_data = nonsend_data.clone();

        // 在 run_until() 内部,可以多次使用 task::spawn_local() 提交任务, 都在 block_on() 所在的
        // 单线程上执行。
        task::spawn_local(async move {
            println!("{}", nonsend_data);
            // ...
        }).await.unwrap();
    }).await; // .await 在所有 spawn_local() 提交的 task 结束时返回
}

// 另一个例子
use tokio::{task, time};
use std::rc::Rc;

#[tokio::main]
async fn main() {
    let nonsend_data = Rc::new("world");
    let local = task::LocalSet::new();

    let nonsend_data2 = nonsend_data.clone();

    // 使用 LocalSet.spawn_local() 提交任务
    local.spawn_local(async move {
        // ...
        println!("hello {}", nonsend_data2)
    });

    local.spawn_local(async move {
        time::sleep(time::Duration::from_millis(100)).await;
        println!("goodbye {}", nonsend_data)
    });

    // ...

    local.await;
}

LocalSet 的方法:

source
impl LocalSet
pub fn new() -> LocalSet

// 进入 LocalSet 的 context,这样后续使用 tokio::task::spawn_local() 提交 !Send task;
pub fn enter(&self) -> LocalEnterGuard

// 提交一个 !Send task, 返回可以 .await 的 JoinHandle
pub fn spawn_local<F>(&self, future: F) -> JoinHandle<F::Output> where F: Future + 'static, F::Output: 'static

// 在指定的 Runtime 中同步执行 future,直到返回。
// 内部调用 Runtime::block_on() 函数,所以需要在同步上下文中调用。
pub fn block_on<F>(&self, rt: &Runtime, future: F) -> F::Output where F: Future

// 执行一个 Future, 内部可以使用 tokio::task::spawn_local() 提交 !Send task, 运行直到这些 task 结束;
pub async fn run_until<F>(&self, future: F) -> F::Output where F: Future

pub fn unhandled_panic(&mut self, behavior: UnhandledPanic) -> &mut Self
pub fn id(&self) -> Id

7 mutex
#

由于标准库的 MutexGuard 没有实现 Send, 在持有 MutexGuard 的情况下不能跨 .await:

  • Rust 在 .await 时有可能将当前 task 转移到其他 thread 运行, 所以需要 async task block 实现 Send.
use std::sync::{Mutex, MutexGuard};

async fn increment_and_do_stuff(mutex: &Mutex<i32>) {
    let mut lock: MutexGuard<i32> = mutex.lock().unwrap();
    *lock += 1;

    do_something_async().await;
} // lock goes out of scope here

编译报错: error: future cannot be sent between threads safely

error: future cannot be sent between threads safely
   --> src/lib.rs:13:5
    |
13  |     tokio::spawn(async move {
    |     ^^^^^^^^^^^^ future created by async block is not `Send`
    |
   ::: /playground/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-0.2.21/src/task/spawn.rs:127:21
    |
127 |         T: Future + Send + 'static,
    |                     ---- required by this bound in `tokio::task::spawn::spawn`
    |
    = help: within `impl std::future::Future`, the trait `std::marker::Send` is not implemented for `std::sync::MutexGuard<'_, i32>`
note: future is not `Send` as this value is used across an await
   --> src/lib.rs:7:5
    |
4   |     let mut lock: MutexGuard<i32> = mutex.lock().unwrap();
    |         -------- has type `std::sync::MutexGuard<'_, i32>` which is not `Send`
...
7   |     do_something_async().await;
    |     ^^^^^^^^^^^^^^^^^^^^^^^^^^ await occurs here, with `mut lock` maybe used later
8   | }
    | - `mut lock` is later dropped here

解决办法: 将 MutexGuard 的 destructor 在 await 前运行:

// This works!
async fn increment_and_do_stuff(mutex: &Mutex<i32>) {
    {
        let mut lock: MutexGuard<i32> = mutex.lock().unwrap();
        *lock += 1;
    } // lock goes out of scope here

    do_something_async().await;
}

// This fails too.
async fn increment_and_do_stuff(mutex: &Mutex<i32>) {
    let mut lock: MutexGuard<i32> = mutex.lock().unwrap();
    *lock += 1;
    drop(lock);

    do_something_async().await;
}

或者使用 struct 来封装 Mutex, 然后在同步函数中处理锁:

use std::sync::Mutex;

struct CanIncrement {
    mutex: Mutex<i32>,
}
impl CanIncrement {
    // This function is not marked async.
    fn increment(&self) {
        let mut lock = self.mutex.lock().unwrap();
        *lock += 1;
    }
}

async fn increment_and_do_stuff(can_incr: &CanIncrement) {
    can_incr.increment();
    do_something_async().await;
}

或者使用 tokio 的异步 Mutext, tokio::sync::Mutex 支持跨 .await, 但是性能会差一些:

use tokio::sync::Mutex; // note! This uses the Tokio mutex

// This compiles!
// (but restructuring the code would be better in this case)
async fn increment_and_do_stuff(mutex: &Mutex<i32>) {
    let mut lock = mutex.lock().await;
    *lock += 1;

    do_something_async().await;
} // lock goes out of scope here

综上, 对于 Mutex 和带来的 Send 问题:

  1. 首选将 Mutex 封装到 Struct 和同步函数中;
  2. 或者将 Mutex 在 .await 前解构(必须是 block 解构,而不是 drop());
  3. 或者 spawn 一个 task 来专门管理 state, 其他 task 使用 message 来对它进行操作.

异步任务中可以使用 std::sync::Arc/Mutex 同步原语来对共享内存进行并发访问:

  • 这里使用 std::sync::Mutex 而非 tokio::sync::Mutex, 只有当 Mutex 跨 await 时才需要使用异步 Mutex;
  • Mutex lock 阻塞时会阻塞当前 thread, 会导致其他 async task 也不能调度到该 thread 执行;
  • 缺省情况下, tokio runtime 使用 multi-thread scheduler, task 被调度到那个 thread 是 tokio runtime 决定的; tokio 还提供 current_thread runtime flavor, 它是一个轻量级, 单线程的 runtime.
use bytes::Bytes;
use std::collections::HashMap;
use std::sync::{Arc, Mutex};

type Db = Arc<Mutex<HashMap<String, Bytes>>>; // 定义类型别名

use tokio::net::TcpListener;
use std::collections::HashMap;
use std::sync::{Arc, Mutex};

#[tokio::main]
async fn main() {
    let listener = TcpListener::bind("127.0.0.1:6379").await.unwrap();
    println!("Listening");
    let db = Arc::new(Mutex::new(HashMap::new()));
    loop {
        let (socket, _) = listener.accept().await.unwrap();
        // Clone the handle to the hash map.
        let db = db.clone();
        println!("Accepted");
        tokio::spawn(async move {
            process(socket, db).await;
        });
    }
}

use tokio::net::TcpStream;
use mini_redis::{Connection, Frame};

async fn process(socket: TcpStream, db: Db) {
    use mini_redis::Command::{self, Get, Set};

    // Connection, provided by `mini-redis`, handles parsing frames from the socket
    let mut connection = Connection::new(socket);

    while let Some(frame) = connection.read_frame().await.unwrap() {
        let response = match Command::from_frame(frame).unwrap() {
            Set(cmd) => {
                let mut db = db.lock().unwrap();  // db 是 MutexGuard 类型, 声明周期没有跨 await, 所以可以使用同步 Mutex;
                db.insert(cmd.key().to_string(), cmd.value().clone());
                Frame::Simple("OK".to_string())
            }
            Get(cmd) => {
                let db = db.lock().unwrap();
                if let Some(value) = db.get(cmd.key()) {
                    Frame::Bulk(value.clone())
                } else {
                    Frame::Null
                }
            }
            cmd => panic!("unimplemented {:?}", cmd),
        };

        // Write the response to the client
        connection.write_frame(&response).await.unwrap();
    }
}

为了尽量降低 Mutex 竞争导致的 task/thread block, 可以:

  1. Switching to a dedicated task to manage state and use message passing.
  2. Shard the mutex.
  3. Restructure the code to avoid the mutex.

对于第二种情况的例子: 将单一 Mutex<HashMap<, _>> 实例,拆解为 N 个实例:

  • 第三方 crate dashmap 提供了更复杂的 sharded hash map;
type ShardedDb = Arc<Vec<Mutex<HashMap<String, Vec<u8>>>>>;

fn new_sharded_db(num_shards: usize) -> ShardedDb {
    let mut db = Vec::with_capacity(num_shards);
    for _ in 0..num_shards {
        db.push(Mutex::new(HashMap::new()));
    }
    Arc::new(db)
}

// 后续查找 map 前先计算下 key 的 hash
let shard = db[hash(key) % db.len()].lock().unwrap();
shard.insert(key, value);

8 tokio::sync
#

tokio 提供了 4 种类型 channel:

  1. oneshot:发送和接收一个值;
  2. mpsc:多个发送方,一个接收方;
  3. broadcast:多个发送方,多个接收方;
  4. watch:只保证接收方收到最新值,不保证它们收到所有值;

std:sync::mpsc 和 crossbeam::channel 都是 同步 channel , 不能在 async func 中使用, 否则可能 block 当前线程和 task.

tokio::sync:mpsc 是 multi-producer signle-consumer channel, 可以在 async 异步函数中使用, 而 async_channel crate 提供了 multi-producer multi-consumer channel, 每个 message 只能被一个 consumer 消费.

oneshot:The oneshot channel supports sending a single value from a single producer to a single consumer. This channel is usually used to send the result of a computation to a waiter. Example: using a oneshot channel to receive the result of a computation.

  1. oneshot::channel() 用于创建一对 Sender 和 Receiver;
  2. Sender 的 send() 方法是同步方法,故可以在同步或异步上下文中使用;
  3. Receiver .await 返回 Sender 发送的值,Sender 被 Drop 后,Receiver .await 返回 error::RecvError;
use tokio::sync::oneshot;

async fn some_computation() -> String {
    "represents the result of the computation".to_string()
}

#[tokio::main]
async fn main() {
    let (tx, rx) = oneshot::channel(); // 创建一对发送和接收 handler

    tokio::spawn(async move {
        let res = some_computation().await;
        tx.send(res).unwrap(); // send() 是非异步的方法
    });

    // Do other work while the computation is happening in the background

    // Wait for the computation result
    let res = rx.await.unwrap();
}

If the sender is dropped without sending, the receiver will fail with error::RecvError:

use tokio::sync::oneshot;

#[tokio::main]
async fn main() {
    let (tx, rx) = oneshot::channel::<u32>();

    tokio::spawn(async move {
        drop(tx);
    });

    match rx.await {
        Ok(_) => panic!("This doesn't happen"),
        Err(_) => println!("the sender dropped"),
    }
}

To use a oneshot channel in a tokio::select! loop, add &mut in front of the channel.

use tokio::sync::oneshot;
use tokio::time::{interval, sleep, Duration};

#[tokio::main]
async fn main() {
    let (send, mut recv) = oneshot::channel();
    let mut interval = interval(Duration::from_millis(100));

    tokio::spawn(async move {
        sleep(Duration::from_secs(1)).await;
        send.send("shut down").unwrap();
    });

    loop {
        tokio::select! {
            _ = interval.tick() => println!("Another 100ms"),
            // select! 自动 .await recv 的返回值
            msg = &mut recv => {
                println!("Got message: {}", msg.unwrap());
                break;
            }
        }
    }
}

oneshot::Receiver 的 close() 方法用于关闭 Receiver,这时 Sender 调用 send() 方法会失败。

use tokio::sync::oneshot;
use tokio::sync::oneshot::error::TryRecvError;

#[tokio::main]
async fn main() {
    let (tx, mut rx) = oneshot::channel();

    assert!(!tx.is_closed());

    rx.close();

    assert!(tx.is_closed());
    assert!(tx.send("never received").is_err());

    match rx.try_recv() {
        Err(TryRecvError::Closed) => {}
        _ => unreachable!(),
    }
}

oneshot::Sender 的 closed() 方法可以等待 oneshot::Receiver 被 closed 或被 Drop 时返回。Sender 的 is_closed() 方法用于获取 Receiver 是否被 closed 或 Drop:

  • 如果 Receiver 被 closed 或 Drop,则 Sender 的 send() 方法会失败;
use tokio::sync::oneshot;

#[tokio::main]
async fn main() {
    let (mut tx, rx) = oneshot::channel::<()>();

    tokio::spawn(async move {
        drop(rx);
    });

    tx.closed().await; // 等待 rx 被 drop 时返回
    println!("the receiver dropped");
}

// 使用 select!
use tokio::sync::oneshot;
use tokio::time::{self, Duration};
async fn compute() -> String {
    // Complex computation returning a `String`
}
#[tokio::main]
async fn main() {
    let (mut tx, rx) = oneshot::channel();

    tokio::spawn(async move {
        tokio::select! {
            _ = tx.closed() => {
                // The receiver dropped, no need to do any further work
            }
            value = compute() => {
                // The send can fail if the channel was closed at the exact same time as when
                // compute() finished, so just ignore the failure.
                let _ = tx.send(value);
            }
        }
    });

    // Wait for up to 10 seconds
    let _ = time::timeout(Duration::from_secs(10), rx).await;
}

mpsc 是标准库的 mpsc 的异步版本:

use tokio::io::{self, AsyncWriteExt};
use tokio::net::TcpStream;
use tokio::sync::mpsc;

#[tokio::main]
async fn main() -> io::Result<()> {
    let mut socket = TcpStream::connect("www.example.com:1234").await?;
    let (tx, mut rx) = mpsc::channel(100);

    for _ in 0..10 {
        // Each task needs its own `tx` handle. This is done by cloning the original handle.
        let tx = tx.clone();

        tokio::spawn(async move {
            tx.send(&b"data to write"[..]).await.unwrap();
        });
    }

    // The `rx` half of the channel returns `None` once **all** `tx` clones drop. To ensure `None`
    // is returned, drop the handle owned by the current task. If this `tx` handle is not dropped,
    // there will always be a single outstanding `tx` handle.
    drop(tx);

    while let Some(res) = rx.recv().await {
        socket.write_all(res).await?;
    }

    Ok(())
}

mpsc 和 oneshot 联合使用,可以实现 req/resp 响应的对共享资源的处理:

use tokio::sync::{oneshot, mpsc};
use Command::Increment;

enum Command {
    Increment,
    // Other commands can be added here
}

#[tokio::main]
async fn main() {
    let (cmd_tx, mut cmd_rx) = mpsc::channel::<(Command, oneshot::Sender<u64>)>(100);

    // Spawn a task to manage the counter
    tokio::spawn(async move {
        let mut counter: u64 = 0;

        while let Some((cmd, response)) = cmd_rx.recv().await {
            match cmd {
                Increment => {
                    let prev = counter;
                    counter += 1;
                    response.send(prev).unwrap();
                }
            }
        }
    });

    let mut join_handles = vec![];

    // Spawn tasks that will send the increment command.
    for _ in 0..10 {
        let cmd_tx = cmd_tx.clone();

        join_handles.push(tokio::spawn(async move {
            let (resp_tx, resp_rx) = oneshot::channel();

            cmd_tx.send((Increment, resp_tx)).await.ok().unwrap();
            let res = resp_rx.await.unwrap();

            println!("previous value = {}", res);
        }));
    }

    // Wait for all tasks to complete
    for join_handle in join_handles.drain(..) {
        join_handle.await.unwrap();
    }
}

tokio mpsc message passing:

  1. 一个 tokio spawn task 作为 manager 角色, 通过 buffered mpsc channel 接收 message, 然后根据 message 类型来操作有状态对象, 由于只有 manager 来串行操作该对象, 所以可以避免加锁.
  2. manager 通过 message 中的 response channel 来向发送者响应结果;
  3. tokio::sync::oneshot 是 a single-producer, single-consumer channel optimized for sending a single value.
use bytes::Bytes;
use mini_redis::client;
use tokio::sync::{mpsc, oneshot};

/// Multiple different commands are multiplexed over a single channel.
#[derive(Debug)]
enum Command {
    Get {
        key: String,
        resp: Responder<Option<Bytes>>, // 发送响应的 oneshot 类型 channel
    },
    Set {
        key: String,
        val: Bytes,
        resp: Responder<()>,
    },
}

/// Provided by the requester and used by the manager task to send the command
/// response back to the requester.
type Responder<T> = oneshot::Sender<mini_redis::Result<T>>;

#[tokio::main]
async fn main() {
    // 创建一个 buffered 类型 channel, 当消费的慢时可以对发送端反压( send(...).await 将阻塞一段时
    // 间),从而降低内存和并发度, 防止消耗过多系统资源.
    let (tx, mut rx) = mpsc::channel(32);
    // Clone a `tx` handle for the second f
    let tx2 = tx.clone();

    let manager = tokio::spawn(async move {
        // Open a connection to the mini-redis address.
        let mut client = client::connect("127.0.0.1:6379").await.unwrap();

        while let Some(cmd) = rx.recv().await {
            match cmd {
                Command::Get { key, resp } => {
                    let res = client.get(&key).await;
                    // Ignore errors
                    let _ = resp.send(res); // 通过 oneshot channel 发送响应
                }
                Command::Set { key, val, resp } => {
                    let res = client.set(&key, val).await;
                    // Ignore errors
                    let _ = resp.send(res);
                }
            }
        }
    });

    // Spawn two tasks, one setting a value and other querying for key that was set.
    let t1 = tokio::spawn(async move {
        let (resp_tx, resp_rx) = oneshot::channel(); // 构建响应 channel
        let cmd = Command::Get {
            key: "foo".to_string(),
            resp: resp_tx,
        };

        // Send the GET request
        if tx.send(cmd).await.is_err() {
            eprintln!("connection task shutdown");
            return;
        }

        // Await the response
        let res = resp_rx.await;
        println!("GOT (Get) = {:?}", res);
    });

    let t2 = tokio::spawn(async move {
        let (resp_tx, resp_rx) = oneshot::channel();
        let cmd = Command::Set {
            key: "foo".to_string(),
            val: "bar".into(),
            resp: resp_tx,
        };

        // Send the SET request
        if tx2.send(cmd).await.is_err() {
            eprintln!("connection task shutdown");
            return;
        }

        // Await the response
        let res = resp_rx.await;
        println!("GOT (Set) = {:?}", res);
    });

    t1.await.unwrap();
    t2.await.unwrap();
    manager.await.unwrap();
}

broadcast channel: 从多个发送端向多个接收端发送多个值 ,可以实现 fan out 模式,如 pub/sub 或 chat 系统。

  • tokio::sync::broadcast::channel(N) 创建一个指定容量为 N 的 bounded,multi-producer, multi-consumer channel,当 channel 中元素数量达到 N 后,最老的元素将被清理,同时没有消费该元素的 Receiver 的 recv() 方法将返回 RecvError::Lagged 错误,然后该 Receiver 的读写位置将更新到 channel 中当前最老的元素,下一次 recv() 将返回该元素。通过这种机制,Receiver 可以感知是否 Lagged 以及做相应的处理。
  • Sender 实现了 clone,可以 clone 多个实例,然后在多个 task 中使用。当所有 Sender 都被 drop 时, channel 将处于 closed 状态,这时 Receiver 的 recv() 将返回 RecvError::Closed 错误。
  • Sender::subscribe() 创建新的 Receiver,它接收调用 subscribe() 创建它 发送的消息。当所有 Receiver 都被 drop 时,Sender::send() 方法返回 SendError;
  • Sender 发送的值,会被 clone 后发送给所有 Receiver,直到它们 都收到 这个值后,该值才会从 channel 中移除;
use tokio::sync::broadcast;
#[tokio::main]
async fn main() {
    let (tx, mut rx1) = broadcast::channel(16);
    let mut rx2 = tx.subscribe(); // 创建一个新的接收端

    tokio::spawn(async move {
        assert_eq!(rx1.recv().await.unwrap(), 10);
        assert_eq!(rx1.recv().await.unwrap(), 20);
    });

    tokio::spawn(async move {
        assert_eq!(rx2.recv().await.unwrap(), 10);
        assert_eq!(rx2.recv().await.unwrap(), 20);
    });

    tx.send(10).unwrap();
    tx.send(20).unwrap();
}

// 感知 Lagged
use tokio::sync::broadcast;
#[tokio::main]
async fn main() {
    let (tx, mut rx) = broadcast::channel(2);

    tx.send(10).unwrap();
    tx.send(20).unwrap();
    tx.send(30).unwrap();

    // The receiver lagged behind
    assert!(rx.recv().await.is_err());

    // At this point, we can abort or continue with lost messages
    assert_eq!(20, rx.recv().await.unwrap());
    assert_eq!(30, rx.recv().await.unwrap());
}

watch channel: 从多个发送端向多个接收端发送多个值 ,但是 channel 中只保存最新的一个值,所以如果接收端存在延迟,则不能保证它接收了所有的中间值。类似于容量为 1 的 broadcast channel。使用场景:广播配置变更,应用状态变更,优雅关闭等。

  • watch::channel(initial_value): 创建一个 watch channel 时可以指定一个初始值;
  • Sender::subscribe() 创建一个新的 Receiver ,它只接收后面新发送的值;
  • Sender 实现了 Clone trait,可以 clone 多个实例来发送数据。同时 Sender 和 Receiver 都是 thread safe;
  • Sender::is_closed()/closed() 为 Sender 提供检查所有 Receiver 是否被 closed 或 Drop 的方法。
  • Sender::send() 必须在有 Receiver 的情况下才能发送成功,而 send_if_modified, send_modify, or send_replace 方法可以在没有 Receiver 的情况下发送成功;

Receiver 使用 Receiver::changed() 方法来接收 un-seen 值更新,如果没有 un-seen 值,则该方法会 sleep 直到有 un-seen 值或者 Sender 被 Drop,如果有 un-seen 值则该方法立即返回 Ok(()) 。

  • Receiver 使用 Receiver::borrow_and_update() 来获取最新值,并标记为 seen。
  • 如果只是获取最新值而不标 记为 seen,则使用 Receiver::borrow()。borrow() 没有将值标记为 seen,所以下一次调用 Receiver::changed() 时将立即返回 Ok(())。borrow_and_update() 将只标记为 seen,所以下一次 调用 Receiver::changed() 时将 sleep 直到有新的值;
  • Receiver 在 borrow 值时会将 channel 设置一个 read lock,所以当 borrow 时间较长时,可能会阻塞 Sender;
use tokio::sync::watch;
use tokio::time::{self, Duration, Instant};
use std::io;

#[derive(Debug, Clone, Eq, PartialEq)]
struct Config {
    timeout: Duration,
}

impl Config {
    async fn load_from_file() -> io::Result<Config> {
        // file loading and deserialization logic here
    }
}

async fn my_async_operation() {
    // Do something here
}

#[tokio::main]
async fn main() {

    let mut config = Config::load_from_file().await.unwrap();

    // 创建 watch channel 并提供初始值
    let (tx, rx) = watch::channel(config.clone());

    // Spawn a task to monitor the file.
    tokio::spawn(async move {
        loop {
            // Wait 10 seconds between checks
            time::sleep(Duration::from_secs(10)).await;

            // Load the configuration file
            let new_config = Config::load_from_file().await.unwrap();

            // If the configuration changed, send the new config value on the watch channel.
            if new_config != config {
                tx.send(new_config.clone()).unwrap();
                config = new_config;
            }
        }
    });

    let mut handles = vec![];

    // Spawn tasks that runs the async operation for at most `timeout`. If the timeout elapses,
    // restart the operation.
    //
    // The task simultaneously watches the `Config` for changes. When the timeout duration
    // changes, the timeout is updated without restarting the in-flight operation.
    for _ in 0..5 {
        // Clone a config watch handle for use in this task
        let mut rx = rx.clone();

        let handle = tokio::spawn(async move {
            // Start the initial operation and pin the future to the stack.  Pinning to the stack
            // is required to resume the operation across multiple calls to `select!`
            let op = my_async_operation();
            tokio::pin!(op);

            // Get the initial config value
            let mut conf = rx.borrow().clone();

            let mut op_start = Instant::now();
            let sleep = time::sleep_until(op_start + conf.timeout);
            tokio::pin!(sleep);

            loop {
                tokio::select! {
                    _ = &mut sleep => {
                        // The operation elapsed. Restart it
                        op.set(my_async_operation());

                        // Track the new start time
                        op_start = Instant::now();

                        // Restart the timeout
                        sleep.set(time::sleep_until(op_start + conf.timeout));
                    }
                    _ = rx.changed() => {
                        conf = rx.borrow_and_update().clone(); // 获得最新值,然后标记为 seen

                        // The configuration has been updated. Update the `sleep` using the new
                        // `timeout` value.
                        sleep.as_mut().reset(op_start + conf.timeout);
                    }
                    _ = &mut op => {
                        // The operation completed!
                        return
                    }
                }
            }
        });

        handles.push(handle);
    }

    for handle in handles.drain(..) {
        handle.await.unwrap();
    }
}

tokio::sync::Notify 用于通知一个或所有的 task wakup,它本身不携带任何数据:

  1. A Notify can be thought of as a Semaphore starting with 0 permits. The notified().await method waits for a permit to become available, and notify_one() sets a permit if there currently are no available permits.
  2. notified(&self) -> Notified<’_> : Wait for a notification. 返回的 Notified 实现了 Future,.await 时将被阻塞,直到收到通知;
  3. notify_one(): Notifies the first waiting task.
  4. notify_last(&self): Notifies the last waiting task.
  5. notify_waiters(&self): Notifies all waiting tasks.
use tokio::sync::Notify;
use std::sync::Arc;

#[tokio::main]
async fn main() {
    let notify = Arc::new(Notify::new());
    let notify2 = notify.clone();

    let handle = tokio::spawn(async move {
        notify2.notified().await; // 等待被 Notify
        println!("received notification");
    });

    println!("sending notification");
    notify.notify_one();

    // Wait for task to receive notification.
    handle.await.unwrap();
}


#[tokio::main]
async fn main() {
    let notify = Arc::new(Notify::new());
    let notify2 = notify.clone();

    let notified1 = notify.notified();
    let notified2 = notify.notified();

    let handle = tokio::spawn(async move {
        println!("sending notifications");
        notify2.notify_waiters();
    });

    notified1.await;
    notified2.await;
    println!("received notifications");
}

tokio::sync::Notify 可以作为替代 waker 的异步函数内部 thread 间同步的工具:

use tokio::sync::Notify;
use std::sync::Arc;
use std::time::{Duration, Instant};
use std::thread;

async fn delay(dur: Duration) {
    let when = Instant::now() + dur;
    let notify = Arc::new(Notify::new());
    let notify_clone = notify.clone();

    thread::spawn(move || {
        let now = Instant::now();

        if now < when {
            thread::sleep(when - now);
        }

        notify_clone.notify_one();
    });


    notify.notified().await;
}

9 tokio::io
#

tokio::io 没有定义和使用 Read/Write trait,而是定义和使用 AsyncRead/AsyncWrite/AsyncSeek trait,同时为这两个 trait 定义了:

  1. 异步 Buf 版本:AsyncBufRead,AsyncBufReadExt;
  2. 其它扩展 trait:AsyncReadExt/AsyncWriteExt/AsyncSeekExt;

AsyncRead/AsyncWrite/AsyncBufRead trait 提供的 poll_XX() 开头的方法,不方便直接使用,而各种 Ext trait 则提供了更常用的 Read/Write/Lines 等方法。

可以从 AsyncRead 创建 Struct tokio::io::BufReader 对象, 它实现了 AsyncRead/AsyncBufRead trait.

可以从 AsyncWrite 创建 Struct tokio::io::BufWriter 对象, 它实现了 AsyncWrite trait.

可以从同时实现了 AsyncRead/AsyncWrite 的对象创建 struct BufStream<RW> (例如 TCPStream 对象), 它实现了 AsyncBufRead 和 AsyncWrite.

AsyncReadExt trait 的方法返回的对象都实现了 Future:

pub trait AsyncReadExt: AsyncRead {
    // Provided methods
    fn chain<R>(self, next: R) -> Chain<Self, R> where Self: Sized, R: AsyncRead { ... }
    fn read<'a>(&'a mut self, buf: &'a mut [u8]) -> Read<'a, Self> where Self: Unpin { ... }
    fn read_buf<'a, B>(&'a mut self, buf: &'a mut B) -> ReadBuf<'a, Self, B> where Self: Unpin, B: BufMut + ?Sized
    fn read_exact<'a>(&'a mut self, buf: &'a mut [u8]) -> ReadExact<'a, Self> where Self: Unpin { ... }
    fn read_u8(&mut self) -> ReadU8<&mut Self> where Self: Unpin { ... }

例如 read() 返回的 Read 对象实现了 Future, Poll ready 时返回 io::Result<usize>, 所以即使这些方法没有使用 async fn 形式,但由于返回的对象实现了 Future, 它们也可以被 .await 轮询:

// https://docs.rs/tokio/latest/src/tokio/io/util/read.rs.html#43
impl<R> Future for Read<'_, R> where R: AsyncRead + Unpin + ?Sized,
{
    type Output = io::Result<usize>;

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<usize>> {
        let me = self.project();
        let mut buf = ReadBuf::new(me.buf);
        ready!(Pin::new(me.reader).poll_read(cx, &mut buf))?;
        Poll::Ready(Ok(buf.filled().len()))
    }
}

示例:

use tokio::fs::File;
// 需要显式导入 AsyncReadExt 和 AsyncWriteExt trait
use tokio::io::{self, AsyncReadExt, AsyncWriteExt};

#[tokio::main]
async fn main() -> io::Result<()> {
    let mut f = File::open("foo.txt").await?;
    let mut buffer = [0; 10];
    // read up to 10 bytes
    let n = f.read(&mut buffer[..]).await?;
    println!("The bytes: {:?}", &buffer[..n]);

    let mut buffer = Vec::new();
    // read the whole file
    f.read_to_end(&mut buffer).await?;
    Ok(())

    let mut file = File::create("foo.txt").await?;
    // Writes some prefix of the byte string, but not necessarily all of it.
    let n = file.write(b"some bytes").await?;
    println!("Wrote the first {} bytes of 'some bytes'.", n);
    Ok(())
}

tokio::io::split() 函数将传入的支持 AsyncRead + AsyncWrite 的 Stream 对象, 如 TCPStream, 拆分为 ReadHalf<T>, WriteHalf<T>, 前者实现 AsyncRead trait, 后者实现 AsyncWrite trait:

pub fn split<T>(stream: T) -> (ReadHalf<T>, WriteHalf<T>)
where
    T: AsyncRead + AsyncWrite,

echo server client:

  • 使用 tokio::io::split() 将一个实现 read/write 的对象拆分为 read 和 write 两个对象;
// https://tokio.rs/tokio/tutorial/io

use tokio::io::{self, AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpStream;

#[tokio::main]
async fn main() -> io::Result<()> {
    let socket = TcpStream::connect("127.0.0.1:6142").await?;
    let (mut rd, mut wr) = io::split(socket);

    // Write data in the background
    tokio::spawn(async move {
        wr.write_all(b"hello\r\n").await?;
        wr.write_all(b"world\r\n").await?;
        // Sometimes, the rust type inferencer needs a little help
        Ok::<_, io::Error>(())
    });

    let mut buf = vec![0; 128];
    loop {
        let n = rd.read(&mut buf).await?;
        if n == 0 {
            break;
        }
        println!("GOT {:?}", &buf[..n]);
    }
    Ok(())
}

echo server:

  • 尽量避免 stack buffer, 因为跨 .await 的上下文变量会随者 task Future 对象一起被保存, 如果使用较大的 stack buffer 变量, 则自动生成的 task Future 对象就比较大, buffer size 一般是 page sized 对齐的, 这导致 task 的大小大概是 $page-size + a-few-bytes, 浪费内存。
use tokio::io::{self, AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpListener;

#[tokio::main]
async fn main() -> io::Result<()> {
    let listener = TcpListener::bind("127.0.0.1:6142").await?;
    loop {
        let (mut socket, _) = listener.accept().await?;
        tokio::spawn(async move {
            let mut buf = vec![0; 1024]; // 在堆中分配 buff 内存, 而不使用 stack 上的 array

            loop {
                match socket.read(&mut buf).await {
                    Ok(0) => return,
                    Ok(n) => {
                        if socket.write_all(&buf[..n]).await.is_err() {
                            return;
                        }
                    }
                    Err(_) => {
                        return;
                    }
                }
            }
        });
    }
}

tokio::io::duplex() 函数创建一个使用内存作为缓冲的 DuplexStream 类型对象,它实现了 AsyncRead/AsyncWrite trait。DuplexStream 被 drop 时, 另一端 read 还可以继续读取内存中的数据, 读到 0 byte 表示 EOF,而另一端 write 时立即返回 Err(BrokenPipe) ;

pub fn duplex(max_buf_size: usize) -> (DuplexStream, DuplexStream)

// 示例
let (mut client, mut server) = tokio::io::duplex(64);

client.write_all(b"ping").await?;

let mut buf = [0u8; 4];
server.read_exact(&mut buf).await?;
assert_eq!(&buf, b"ping");

server.write_all(b"pong").await?;

client.read_exact(&mut buf).await?;
assert_eq!(&buf, b"pong");

10 select!{}
#

tokio::select!{} 宏用于同时 .await 多个 async task,当其中一个完成时返回, drop 其它分支的 task Future 对象。

  • 必须在 async context 中使用 select!, 如 async functions, closures, and blocks;
use tokio::sync::oneshot;

#[tokio::main]
async fn main() {
    let (tx1, rx1) = oneshot::channel();
    let (tx2, rx2) = oneshot::channel();

    tokio::spawn(async {
        let _ = tx1.send("one");
    });

    tokio::spawn(async {
        let _ = tx2.send("two");
    });

    // select 并发 poll rx1 和 rx2, 如果 rx1 先 Ready 则结果赋值给 val, 同时 drop rx2.
    tokio::select! {
        val = rx1 => {
            println!("rx1 completed first with {:?}", val);
        }
        val = rx2 => {
            println!("rx2 completed first with {:?}", val);
        }
    }
}

tokio::sync::oneshot::Receiver 实现了 Drop, 被 Drop 时对应的 tokio::sync::oneshot::Sender 可以收到 closed 通知(sender 的 async closed() 方法返回).

  • tokio::spawn(async {…}) 立即调度执行一个异步 task, 返回一个实现 Future 的 JoinHandle, Future 的关联类型是Result<T, JoinError>, 所以通过 .await 它可以获得 task 的返回值 T.
use tokio::sync::oneshot;

async fn some_operation() -> String {
    // Compute value here
}

#[tokio::main]
async fn main() {
    let (mut tx1, rx1) = oneshot::channel();
    let (tx2, rx2) = oneshot::channel();

    tokio::spawn(async {
        // Select on the operation and the oneshot's `closed()` notification.
        tokio::select! {
            val = some_operation() => {
                let _ = tx1.send(val);
            }
            // tx1.closed() 是一个 async 方法, 故可以被 select, 当对端 rx1 被 Drop 时返回.
            _ = tx1.closed() => {
                // `some_operation()` is canceled, the task completes and `tx1` is dropped.
            }
        }
    });

    tokio::spawn(async {
        let _ = tx2.send("two");
    });

    // rx2 先 .await 返回时, rx1 会被 Drop, 这时上面的 tx1.closed() .await 返回
    tokio::select! {
        val = rx1 => {
            println!("rx1 completed first with {:?}", val);
        }
        val = rx2 => {
            println!("rx2 completed first with {:?}", val);
        }
    }
}

select! 宏支持多个 branch(当前限制为 64 个branchs):

<pattern> = <async expression> (, if <precondition>)? => <handler>
else => <expression>

Rust 在 单线程异步执行 所有 branch 的 async expressions (一个 branch 的 async expression 不 Ready 时, 调度执行另一个 branch 的 async expression), 当第一个 async expressions 返回且它的 result 与 pattern 匹配时, Drop 所有其它 async expression, 然后执行 handler. 如果 result 与 pattern 不匹配, 继续等待下一个异步返回并检查返回值是否匹配.

每个 branch 还可以有 if 表达式, 如果 if 表达式结果为 false, 则对应 async expression 还是会被执行, 但是返回的 Future 不会被 .await Poll. (例如在 loop 中多次 poll 一个 Pin 的 Future 对象, 一旦该对象 Ready, 后续就不能再poll 了, 这时需要使用 if 表达式来排除该 branch).

每次进入执行 select! 时:

  1. 先执行个 branch 的 if 表达式, 结果为 false 时, disable 对应 branch;
  2. 执行所有 async expression(包括 disable 的 branch, 但是不 poll 它) , 注意是在单 thread 中异步执行这些expression;
  3. 并发 .await poll 为 disable 的 expression 返回的 Future 对象;
  4. 当第一个 poll 返回时, 看是否匹配 pattern, 如果不匹配则 disable 该 branch, 继续等待其它 branhc .await poll返回, 重复 1-3; 如果匹配, 则执行该 branch 的 handler, 同时 drop 其它 branch 的 Future 对象.
  5. 如果所有 branch 都被 disable(如都不匹配 pattern), 则执行 else branch handler, 如果没有提供 else 则 panic.

各分支的 aysnc expression 可以执行复杂的计算,只要结果是 Future 对象即可:

use tokio::net::TcpStream;
use tokio::sync::oneshot;

#[tokio::main]
async fn main() {
    let (tx, rx) = oneshot::channel();

    // Spawn a task that sends a message over the oneshot
    tokio::spawn(async move {
        tx.send("done").unwrap();
    });

    tokio::select! {
        socket = TcpStream::connect("localhost:3465") => {
            println!("Socket connected {:?}", socket);
        }
        msg = rx => {
            println!("received message first {:?}", msg);
        }
    }
}

// 另一个例子
use tokio::net::TcpListener;
use tokio::sync::oneshot;
use std::io;

#[tokio::main]
async fn main() -> io::Result<()> {
    let (tx, rx) = oneshot::channel();

    tokio::spawn(async move {
        tx.send(()).unwrap();
    });

    let mut listener = TcpListener::bind("localhost:3465").await?;

    // 并发执行两个 branch, 直到 rx 收到消息或者 accept 出错.
    tokio::select! {
        // 使用 async {} 来定义一个 async expression, 该 expression 返回 Result
        // select! 会 .await 这个 expression 和 rx, 当 rx 返回时该 expression 才不会被继续 .await
        _ = async {
            loop {
                let (socket, _) = listener.accept().await?; // listener 出错时才返回
                tokio::spawn(async move { process(socket) });
            }

            // Help the rust type inferencer out
            Ok::<_, io::Error>(())
        } => {}

        _ = rx => {
            println!("terminating accept loop");
        }
    }

    Ok(())
}

async expression 返回值支持 pattern match, 并且可以有一个 else 分支, 当所有 pattern match 都不匹配时, 才执行else 分支.

use tokio::sync::mpsc;

#[tokio::main]
async fn main() {
    let (mut tx1, mut rx1) = mpsc::channel(128);
    let (mut tx2, mut rx2) = mpsc::channel(128);

    tokio::spawn(async move {
        // Do something w/ `tx1` and `tx2`
    });

    // 这里 drop 的是 recv() 返回的 Future,而非 rx1 和 rx2 对象本身
    tokio::select! {
        Some(v) = rx1.recv() => {
            println!("Got {:?} from rx1", v);
        }
        Some(v) = rx2.recv() => {
            println!("Got {:?} from rx2", v);
        }
        else => {
            println!("Both channels closed");
        }
    }
}

select! 宏有返回值, 各 branch 必须返回相同类型值:

async fn computation1() -> String {
    // .. computation
}

async fn computation2() -> String {
    // .. computation
}

#[tokio::main]
async fn main() {
    let out = tokio::select! {
        res1 = computation1() => res1,
        res2 = computation2() => res2,
    };

    println!("Got = {}", out);
}

Errors:

  • 如果 async exprssion 中有 ? 则表达式返回 Result, 例如 accept().await? 出错时, res 为 Result::Err.
  • 如果是 handler 中有 ?, 则会立即将 error 传递到 select 表达式外, 如 res? 会将错误传递到 main() 函数;
use tokio::net::TcpListener;
use tokio::sync::oneshot;
use std::io;

#[tokio::main]
async fn main() -> io::Result<()> {
    // [setup `rx` oneshot channel]

    let listener = TcpListener::bind("localhost:3465").await?;

    tokio::select! {
        res = async {
            loop {
                let (socket, _) = listener.accept().await?;
                tokio::spawn(async move { process(socket) });
            }

            // Help the rust type inferencer out
            Ok::<_, io::Error>(())
        } => {
            res?;
        }

        _ = rx => {
            println!("terminating accept loop");
        }
    }

    Ok(())
}

Borrowing: 对于 tokio::spawn(async {..}) 必须 move 捕获要使用的数据, 但是对于 select! 的多 branch async expression 则不需要, 只要遵守 borrow 的规则即可, 比如同时访问 & 共享数据, 唯一访问 &mut 数据:

  • select 必须异步并发执行所有 aysnc expression, 并且当第一个 expression 返回的结果匹配 pattern 时才 drop 其它正在执行的 async expression.
use tokio::io::AsyncWriteExt;
use tokio::net::TcpStream;
use std::io;
use std::net::SocketAddr;

async fn race(
    data: &[u8],
    addr1: SocketAddr,
    addr2: SocketAddr
) -> io::Result<()> {
    tokio::select! {
        // 两个 async expression 共享借用 data
        Ok(_) = async {
            let mut socket = TcpStream::connect(addr1).await?;

            socket.write_all(data).await?;
            Ok::<_, io::Error>(())
        } => {}

        Ok(_) = async {
            let mut socket = TcpStream::connect(addr2).await?;
            socket.write_all(data).await?;
            Ok::<_, io::Error>(())
        } => {}

        else => {}
    };

    Ok(())
}

#[tokio::main]
async fn main() {
    let (tx1, rx1) = oneshot::channel();
    let (tx2, rx2) = oneshot::channel();

    let mut out = String::new();

    tokio::spawn(async move {
        // Send values on `tx1` and `tx2`.
    });

    // 由于 select 保证只执行一个 handler, 所以多个 handler 中可以 &mut 使用相同的数据.
    tokio::select! {
        _ = rx1 => {
            out.push_str("rx1 completed");
        }
        _ = rx2 => {
            out.push_str("rx2 completed");
        }
    }

    println!("{}", out);
}

Loop: 并发执行多个 async expression, 当它们都 recv() 返回时, select 随机选择一个 branch 来执行 handler, 未执行的 handler branch 的 message 也不会丢(称为 Cancellation safety).

use tokio::sync::mpsc;

#[tokio::main]
async fn main() {
    let (tx1, mut rx1) = mpsc::channel(128);
    let (tx2, mut rx2) = mpsc::channel(128);
    let (tx3, mut rx3) = mpsc::channel(128);

    loop {
	// 当 rx1.recv() 返回时, drop 的是 rx2.recv() 和 rx3.recv() 的 Future 对象, 而不是 rx2 和 rx3, 所
	// 以下一次 loop 还可以继续调用.

	// 当 channel 被 close , .recv() 返回 None, 不匹配 Some, 所以其它 rx 还是继续进行, 直到所有 rx 都
	// 被关闭.

	// 注意: std::sync::mpsc 的 Receiver.recv() 返回 Result, 而 async 的返回 Option
        let msg = tokio::select! {
            Some(msg) = rx1.recv() => msg,
            Some(msg) = rx2.recv() => msg,
            Some(msg) = rx3.recv() => msg,
            else => { break }
        };

        println!("Got {:?}", msg);
    }

    println!("All channels have been closed.");
}

Cancellation safety : 在 loop 中使用 select! 来从多个 branch source 接收 message 时, 需要确保接收调用被 cancel时不会丢失 message.

以下方法是 cancellation safe 的:

  • tokio::sync::mpsc::Receiver::recv
  • tokio::sync::mpsc::UnboundedReceiver::recv
  • tokio::sync::broadcast::Receiver::recv
  • tokio::sync::watch::Receiver::changed
  • tokio::net::TcpListener::accept
  • tokio::net::UnixListener::accept
  • tokio::signal::unix::Signal::recv
  • tokio::io::AsyncReadExt::read on any AsyncRead
  • tokio::io::AsyncReadExt::read_buf on any AsyncRead
  • tokio::io::AsyncWriteExt::write on any AsyncWrite
  • tokio::io::AsyncWriteExt::write_buf on any AsyncWrite
  • tokio_stream::StreamExt::next on any Stream
  • futures::stream::StreamExt::next on any Stream

以下方法不是 cancellation safe 的, 有可能会导致丢失数据:

  • tokio::io::AsyncReadExt::read_exact
  • tokio::io::AsyncReadExt::read_to_end
  • tokio::io::AsyncReadExt::read_to_string
  • tokio::io::AsyncWriteExt::write_all

以下方法也不是 cancellation safe 的,因为它们使用 queue 来保证公平, cancel 时会丢失 queue 中的数据:

  • tokio::sync::Mutex::lock
  • tokio::sync::RwLock::read
  • tokio::sync::RwLock::write
  • tokio::sync::Semaphore::acquire
  • tokio::sync::Notify::notified

To determine whether your own methods are cancellation safe, look for the location of uses of .await. This is because when an asynchronous method is cancelled, that always happens at an .await. If your function behaves correctly even if it is restarted while waiting at an .await, then it is cancellation safe.

Cancellation safety can be defined in the following way: If you have a future that has not yet completed, then it must be a no-op to drop that future and recreate it. This definition is motivated by the situation where a select! is used in a loop. Without this guarantee, you would lose your progress when another branch completes and you restart the select! by going around the loop.

cancellation safety 可以这样定义: 即当一个 Future 未 ready 时, 如果 drop 该 Future 什么都不影响。

Be aware that cancelling something that is not cancellation safe is not necessarily wrong. For example, if you are cancelling a task because the application is shutting down, then you probably don’t care that partially read data is lost.

重用 async expression: 先 tokio::pin!(operation), 该宏将返回同名的变量 operation , 但是类型已经是 Pin 类型, 然后在 async expression 中 &mut operation, 这样确保不会 drop 返回的 operation Pin 对象;

async fn action() {
    // Some asynchronous logic
}

#[tokio::main]
async fn main() {
    let (mut tx, mut rx) = tokio::sync::mpsc::channel(128);

    let operation = action(); // 返回一个 Future 对象, 没有对它 .await
    tokio::pin!(operation);  // 必须 Pin 该 Future 对象

    loop {
        tokio::select! {
	    // BUG:  operation 一旦完成, 就不能被重用. 需要使用  if precheck 条件来避免.
            _ = &mut operation => break, // 重用该 Future 对象, 而不是调用 action(). 同时必须是 &mut 借用.
            Some(v) = rx.recv() => {
                if v % 2 == 0 {
                    break;
                }
            }
        }
    }
}

这里必须使用 tokio::pin!(operation) 来 Pin Future 对象(返回一个同名的但是类型是 Pin 的对象), 否则报错:

  • std::pin::Pin<&mut &mut impl std::future::Future> 的解释:
    • &mut impl std::future::Future 是一个实现 Future trait 的 &mut 引用;
    • &mut &mut impl std::future::Future 是对 &mut 的 &mut 引用;
error[E0599]: no method named `poll` found for struct
     `std::pin::Pin<&mut &mut impl std::future::Future>`
     in the current scope
  --> src/main.rs:16:9
   |
16 | /         tokio::select! {
17 | |             _ = &mut operation => break,
18 | |             Some(v) = rx.recv() => {
19 | |                 if v % 2 == 0 {
...  |
22 | |             }
23 | |         }
   | |_________^ method not found in
   |             `std::pin::Pin<&mut &mut impl std::future::Future>`
   |
   = note: the method `poll` exists but the following trait bounds
            were not satisfied:
           `impl std::future::Future: std::marker::Unpin`
           which is required by
           `&mut impl std::future::Future: std::future::Future`

Modifying a branch

  • async fn 一旦 completion 就不能再 resume 执行: 一般情况下, future.await? 会消耗 future 对象.
  • branch 可以使用 if 表达式先判断是否需要执行, 如: &mut operation, if !done => {xxx};
async fn action(input: Option<i32>) -> Option<String> {
    // If the input is `None`, return `None`.
    // This could also be written as `let i = input?;`
    let i = match input {
        Some(input) => input,
        None => return None,
    };
    // async logic here
}

#[tokio::main]
async fn main() {
    let (mut tx, mut rx) = tokio::sync::mpsc::channel(128);

    let mut done = false;
    let operation = action(None);
    tokio::pin!(operation);

    tokio::spawn(async move {
        let _ = tx.send(1).await;
        let _ = tx.send(3).await;
        let _ = tx.send(2).await;
    });

    loop {
        tokio::select! {
            // 先判断 done, 为 true 时才执行 async expression. 这里使用的是 &mut 引用, 所以 .await 并不会消耗
	    // 对象, operation 在下一次 loop 还存在.
            res = &mut operation, if !done => {
                done = true;
                if let Some(v) = res {
                    println!("GOT = {}", v);
                    return;
                }
            }

            Some(v) = rx.recv() => {
                if v % 2 == 0 {
                    // `.set` is a method on `Pin`.
                    operation.set(action(Some(v)));
                    done = false;
                }
            }
        }
    }
}

tokio::spawn 和 select! 的区别:

  1. 都可以并发执行都各异步任务,但是 spawn() 是 tokio runtime 调度执行,而且可能位于其它线程上,所以调度的 async task 必须满足 Future + Send + Sync + ‘static ,而 ‘static 意味着 task 中不能有共享借用(需要 move);
  2. select! 在单个 task 线程上执行所有 branch 的异步任务,所以它们不可能并发执行,而是串行的。(比如一个 async expression 未 Ready 时,执行另一个 branch 的 async expression).

11 tokio-stream
#

Stream 是 std::iter::Iterator 的异步版本, 返回一系列 value.

当前,Stream 还不是 Rust 标准库的一部分, 是 futures-core crate 定义的 Stream trait 类型。

// https://docs.rs/futures-core/0.3.30/futures_core/stream/trait.Stream.html
pub trait Stream {
    type Item;

    // Required method
    fn poll_next(
        self: Pin<&mut Self>,
        cx: &mut Context<'_>
    ) -> Poll<Option<Self::Item>>;

    // Provided method
    fn size_hint(&self) -> (usize, Option<usize>) { ... }
}

// poll_next() 方法返回值:
// + Ready<Some(Item)>: 返回下一个值;
// + Ready<None>: Stream 结束;
// + Pending: 值不 Ready;

tokio 在单独的 tokio-stream crate 中提供 Stream 支持功能,它通过 pub use futures_core::Stream; 来 re-export futures-core crate 定义的 Stream trait。

tokio_stream crate 提供了如下 module 函数:

  • empty Creates a stream that yields nothing.
  • iter Converts an Iterator into a Stream which is always ready to yield the next value.
  • once Creates a stream that emits an element exactly once.
  • pending Creates a stream that is never ready
use tokio_stream::{self as stream, StreamExt};  // StreamExt trait 为 Stream 提供了常用的 next() 方法。

#[tokio::main]
async fn main() {
    // empty() 迭代式立即结束
    let mut none = stream::empty::<i32>();
    assert_eq!(None, none.next().await);

    // iter() 函数将一个 Iterator 转换为 Stream
    let mut stream = stream::iter(vec![17, 19]);
    assert_eq!(stream.next().await, Some(17));
    assert_eq!(stream.next().await, Some(19));
    assert_eq!(stream.next().await, None);

    // once() 函数返回只能迭代生成一个元素的 stream
    let mut one = stream::once(1);
    assert_eq!(Some(1), one.next().await);
    assert_eq!(None, one.next().await);

    // pending() 函数返回一个迭代式 pending 的 stream
    let mut never = stream::pending::<i32>();
    // This will never complete
    never.next().await;
    unreachable!();
}

tokio_stream::StreamExt 是 futures_core::stream::Stream 子 trait, 提供了常用的额外 trait 方法, 包括: 各种 adapter 方法(如 map/filter 等), 以及 用于迭代的 next() 方法

pub trait StreamExt: Stream {
    // next() 迭代返回下一个元素,返回的 Next 类型对象实现了 Future trait,.await 时返回 Option
    fn next(&mut self) -> Next<'_, Self> where Self: Unpin

    fn try_next<T, E>(&mut self) -> TryNext<'_, Self>
       where Self: Stream<Item = Result<T, E>> + Unpin { ... }

    fn map<T, F>(self, f: F) -> Map<Self, F> where F: FnMut(Self::Item) -> T, Self: Sized
    fn map_while<T, F>(self, f: F) -> MapWhile<Self, F>
       where F: FnMut(Self::Item) -> Option<T>,
             Self: Sized { ... }
    fn then<F, Fut>(self, f: F) -> Then<Self, Fut, F>
       where F: FnMut(Self::Item) -> Fut,
             Fut: Future,
             Self: Sized { ... }
    fn merge<U>(self, other: U) -> Merge<Self, U>
       where U: Stream<Item = Self::Item>,
             Self: Sized { ... }
    fn filter<F>(self, f: F) -> Filter<Self, F>
       where F: FnMut(&Self::Item) -> bool,
             Self: Sized { ... }
    fn filter_map<T, F>(self, f: F) -> FilterMap<Self, F>
       where F: FnMut(Self::Item) -> Option<T>,
             Self: Sized { ... }
    fn fuse(self) -> Fuse<Self>
       where Self: Sized { ... }
    fn take(self, n: usize) -> Take<Self>
       where Self: Sized { ... }
    fn take_while<F>(self, f: F) -> TakeWhile<Self, F>
       where F: FnMut(&Self::Item) -> bool,
             Self: Sized { ... }
    fn skip(self, n: usize) -> Skip<Self>
       where Self: Sized { ... }
    fn skip_while<F>(self, f: F) -> SkipWhile<Self, F>
       where F: FnMut(&Self::Item) -> bool,
             Self: Sized { ... }
    fn all<F>(&mut self, f: F) -> AllFuture<'_, Self, F>
       where Self: Unpin,
             F: FnMut(Self::Item) -> bool { ... }
    fn any<F>(&mut self, f: F) -> AnyFuture<'_, Self, F>
       where Self: Unpin,
             F: FnMut(Self::Item) -> bool { ... }
    fn chain<U>(self, other: U) -> Chain<Self, U>
       where U: Stream<Item = Self::Item>,
             Self: Sized { ... }
    fn fold<B, F>(self, init: B, f: F) -> FoldFuture<Self, B, F>
       where Self: Sized,
             F: FnMut(B, Self::Item) -> B { ... }
    fn collect<T>(self) -> Collect<Self, T> where T: FromStream<Self::Item>, Self: Sized

    // Timeout 实现了 Future trait,.await 时返回 Result<<T as Future>::Output, Elapsed>,
    // 当 await 超时时返回 Elapsed Error。所以,可以作为一种通用的异步超时机制。
    fn timeout(self, duration: Duration) -> Timeout<Self> where Self: Sized
    fn timeout_repeating(self, interval: Interval) -> TimeoutRepeating<Self> where Self: Sized
    fn throttle(self, duration: Duration) -> Throttle<Self> where Self: Sized
    fn chunks_timeout( self, max_size: usize, duration: Duration ) -> ChunksTimeout<Self> where Self: Sized
    fn peekable(self) -> Peekable<Self> where Self: Sized
}


// 示例
use tokio_stream::StreamExt;
#[tokio::main]
async fn main() {
    let mut stream = tokio_stream::iter(&[1, 2, 3]);
    while let Some(v) = stream.next().await {
        println!("GOT = {:?}", v);
    }
}

// timeout() 示例
use tokio_stream::{self as stream, StreamExt};
use std::time::Duration;
let int_stream = int_stream.timeout(Duration::from_secs(1));
tokio::pin!(int_stream);
// When no items time out, we get the 3 elements in succession:
assert_eq!(int_stream.try_next().await, Ok(Some(1)));
assert_eq!(int_stream.try_next().await, Ok(Some(2)));
assert_eq!(int_stream.try_next().await, Ok(Some(3)));
assert_eq!(int_stream.try_next().await, Ok(None));

Stream 没有实现迭代器, 不支持 for-in 迭代, 只能在 StreamExt 的基础上使用 while-let 循环来迭代:

use tokio_stream::{self as stream, StreamExt};
#[tokio::main]
async fn main() {
    let mut stream = stream::iter(vec![0, 1, 2]);
    while let Some(value) = stream.next().await {
        println!("Got {}", value);
    }
}

module tokio_stream::wrappers 提供将 tokio 其它类型 转换为 Stream 的 wrappers 类型:

  • BroadcastStream A wrapper around tokio::sync::broadcast::Receiver that implements Stream.
  • CtrlBreakStreaml A wrapper around CtrlBreak that implements Stream.
  • CtrlCStream A wrapper around CtrlC that implements Stream.
  • IntervalStream A wrapper around Interval that implements Stream.
  • LinesStream A wrapper around tokio::io::Lines that implements Stream.
  • ReadDirStream A wrapper around tokio::fs::ReadDir that implements Stream.
  • ReceiverStream A wrapper around tokio::sync::mpsc::Receiver that implements Stream.
  • SignalStream A wrapper around Signal that implements Stream.
  • SplitStream A wrapper around tokio::io::Split that implements Stream.
  • TcpListenerStream A wrapper around TcpListener that implements Stream.
  • UnboundedReceiverStream A wrapper around tokio::sync::mpsc::UnboundedReceiver that implements Stream.
  • UnixListenerStream A wrapper around UnixListener that implements Stream.
  • WatchStream A wrapper around tokio::sync::watch::Receiver that implements Stream.
use tokio_stream::{StreamExt, wrappers::WatchStream};
use tokio::sync::watch;

let (tx, rx) = watch::channel("hello");
let mut rx = WatchStream::new(rx); // 从 Watch 创建一个 Stream
assert_eq!(rx.next().await, Some("hello"));

tx.send("goodbye").unwrap();
assert_eq!(rx.next().await, Some("goodbye"));

// 复杂的例子
use tokio_stream::StreamExt;
use mini_redis::client;

async fn publish() -> mini_redis::Result<()> {
    let mut client = client::connect("127.0.0.1:6379").await?;
    client.publish("numbers", "1".into()).await?;
    client.publish("numbers", "two".into()).await?;
    client.publish("numbers", "3".into()).await?;
    client.publish("numbers", "four".into()).await?;
    client.publish("numbers", "five".into()).await?;
    client.publish("numbers", "6".into()).await?;
    Ok(())
}

async fn subscribe() -> mini_redis::Result<()> {
    let client = client::connect("127.0.0.1:6379").await?;
    let subscriber = client.subscribe(vec!["numbers".to_string()]).await?;
    let messages = subscriber.into_stream(); // 返回一个 Stream 对象
    tokio::pin!(messages); // Pin 到 stack
    while let Some(msg) = messages.next().await {
        // next() 要求 message Stream 必须是 Pinned
        println!("got = {:?}", msg);
    }
    Ok(())
}

#[tokio::main]
async fn main() -> mini_redis::Result<()> {
    tokio::spawn(async {
        publish().await
    });
    subscribe().await?;
    println!("DONE");
    Ok(())
}

A Rust value is “pinned” when it can no longer be moved in memory. A key property of a pinned value is that pointers can be taken to the pinned data and the caller can be confident the pointer stays valid. This feature is used by async/await to support borrowing data across .await points.

  • 因为 async fn 内的 data 时保存在 stack 上的, 但 across .await 使用这些 data 时, 在 .await 位置, async fn 执行可能会暂停或被调度到其他 thread 上运行, 所以生成的 Future 对象会保存这些 data 在 stack 上的指针, 并且确保这些 data 是 Pin/Send/Sync 类型的.

Adapters 指从 Stream 生成新的 Stream, 如 map/take/filter:

let messages = subscriber
    .into_stream()
    .take(3);

let messages = subscriber
    .into_stream()
    .filter(|msg| match msg {
        Ok(msg) if msg.content.len() == 1 => true,
        _ => false,
    })
    .take(3);

let messages = subscriber
    .into_stream()
    .filter(|msg| match msg {
        Ok(msg) if msg.content.len() == 1 => true,
        _ => false,
    })
    .map(|msg| msg.unwrap().content)
    .take(3);

实现一个 Stream:

  • 能返回值时, poll_next() 返回 Poll::Ready, 否则返回 Poll::Pending;
  • 一般在 poll_next() 内部需要使用 Future 和其他 Stream 来实现;
use tokio_stream::Stream;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::Duration;

struct Interval {
    rem: usize,
    delay: Delay,
}

impl Interval {
    fn new() -> Self {
        Self {
            rem: 3,
            delay: Delay { when: Instant::now() }
        }
    }
}

impl Stream for Interval {
    type Item = ();

    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<()>>
    {
        if self.rem == 0 {
            return Poll::Ready(None);
        }

        match Pin::new(&mut self.delay).poll(cx) {
            Poll::Ready(_) => {
                let when = self.delay.when + Duration::from_millis(10);
                self.delay = Delay { when };
                self.rem -= 1;
                Poll::Ready(Some(()))
            }
            Poll::Pending => Poll::Pending,
        }
    }
}

11.1 ReaderStream 和 StreamReader
#

tokio_util crate 提供了两个 Stream/Reader 相关的 struct 类型。

  1. ReaderStream:Convert an AsyncRead into a Stream of byte chunks.
// ReaderStream 类型的关联函数
impl<R: AsyncRead> ReaderStream<R>
// Convert an AsyncRead into a Stream with item type Result<Bytes, std::io::Error>.
pub fn new(reader: R) -> Self
// Convert an AsyncRead into a Stream with item type Result<Bytes, std::io::Error>, with a
// specific read buffer initial capacity.
pub fn with_capacity(reader: R, capacity: usize) -> Self

// ReaderStream 实现了 Stream trait,迭代返回的元素类型为 Bytes,即一块连续的 u8 内存区域
impl<R: AsyncRead> Stream for ReaderStream<R>
type Item = Result<Bytes, Error>


// 示例
use tokio_stream::StreamExt;
use tokio_util::io::ReaderStream;

// Create a stream of data.
let data = b"hello, world!";
let mut stream = ReaderStream::new(&data[..]); // &[u8] 实现了 AsyncRead trait

// Read all of the chunks into a vector.
let mut stream_contents = Vec::new();
// chunk 类型为 bytes::Bytes, 可以 Deref<[u8]> 来使用
while let Some(chunk) = stream.next().await {
    stream_contents.extend_from_slice(&chunk?);
}
// Once the chunks are concatenated, we should have the original data.
assert_eq!(stream_contents, data);
  1. StreamReader:Convert a Stream of byte chunks into an AsyncRead.
  2. 返回的 Reader 也实现了 AsyncBufRead trait
impl<S, B, E> StreamReader<S, B>
where
    S: Stream<Item = Result<B, E>>,
    B: Buf,
    E: Into<Error>,
// new() 从 Stream 创建一个 StreamReader,Stream 需要每次迭代返回一个 Result<bytes::Buf, E> 对象
//
// Bytes, BytesMut, &[u8], VecDeque<u8> 等(不含 Vec<u8>)均实现了 bytes::Buf
pub fn new(stream: S) -> Self


use bytes::Bytes;
use tokio::io::{AsyncReadExt, Result};
use tokio_util::io::StreamReader;

// Create a stream from an iterator.
let stream = tokio_stream::iter(vec![
    Result::Ok(Bytes::from_static(&[0, 1, 2, 3])), // Bytes 实现了 Buf trait
    Result::Ok(Bytes::from_static(&[4, 5, 6, 7])),
    Result::Ok(Bytes::from_static(&[8, 9, 10, 11])),
]);

// Convert it to an AsyncRead.
let mut read = StreamReader::new(stream);

// Read five bytes from the stream.
let mut buf = [0; 5];
read.read_exact(&mut buf).await?;
assert_eq!(buf, [0, 1, 2, 3, 4]);

// Read the rest of the current chunk.
assert_eq!(read.read(&mut buf).await?, 3);
assert_eq!(&buf[..3], [5, 6, 7]);

// Read the next chunk.
assert_eq!(read.read(&mut buf).await?, 4);
assert_eq!(&buf[..4], [8, 9, 10, 11]);

// We have now reached the end.
assert_eq!(read.read(&mut buf).await?, 0);

12 tokio_util crate
#

futures_util crate 提供了 Sink 和 SinkExt trait 的定义:

Sink
A Sink is a value into which other values can be sent, asynchronously.
SinkExt
An extension trait for Sinks that provides a variety of convenient combinator functions.
pub trait Sink<Item> {
    type Error;

    // Required methods
    fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_> ) -> Poll<Result<(), Self::Error>>;
    fn start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error>;
    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_> ) -> Poll<Result<(), Self::Error>>;
    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_> ) -> Poll<Result<(), Self::Error>>;
}

// 更常用的是 SinkExt 提供的方法, 如 send()/send_all()
pub trait SinkExt<Item>: Sink<Item> {
    // Provided methods
    fn with<U, Fut, F, E>(self, f: F) -> With<Self, Item, U, Fut, F>
       where F: FnMut(U) -> Fut,
             Fut: Future<Output = Result<Item, E>>,
             E: From<Self::Error>,
             Self: Sized { ... }
    fn with_flat_map<U, St, F>(self, f: F) -> WithFlatMap<Self, Item, U, St, F>
       where F: FnMut(U) -> St,
             St: Stream<Item = Result<Item, Self::Error>>,
             Self: Sized { ... }
    fn sink_map_err<E, F>(self, f: F) -> SinkMapErr<Self, F>
       where F: FnOnce(Self::Error) -> E,
             Self: Sized { ... }
    fn sink_err_into<E>(self) -> SinkErrInto<Self, Item, E>
       where Self: Sized,
             Self::Error: Into<E> { ... }
    fn buffer(self, capacity: usize) -> Buffer<Self, Item>
       where Self: Sized { ... }
    fn close(&mut self) -> Close<'_, Self, Item> 
       where Self: Unpin { ... }
    fn fanout<Si>(self, other: Si) -> Fanout<Self, Si>
       where Self: Sized,
             Item: Clone,
             Si: Sink<Item, Error = Self::Error> { ... }
    fn flush(&mut self) -> Flush<'_, Self, Item> 
       where Self: Unpin { ... }
    fn send(&mut self, item: Item) -> Send<'_, Self, Item> 
       where Self: Unpin { ... }
    fn feed(&mut self, item: Item) -> Feed<'_, Self, Item> 
       where Self: Unpin { ... }
    fn send_all<'a, St>(
        &'a mut self,
        stream: &'a mut St
    ) -> SendAll<'a, Self, St> 
       where St: TryStream<Ok = Item, Error = Self::Error> + Stream + Unpin + ?Sized,
             Self: Unpin { ... }
    fn left_sink<Si2>(self) -> Either<Self, Si2> 
       where Si2: Sink<Item, Error = Self::Error>,
             Self: Sized { ... }
    fn right_sink<Si1>(self) -> Either<Si1, Self> 
       where Si1: Sink<Item, Error = Self::Error>,
             Self: Sized { ... }
    fn compat(self) -> CompatSink<Self, Item>
       where Self: Sized + Unpin { ... }
    fn poll_ready_unpin(
        &mut self,
        cx: &mut Context<'_>
    ) -> Poll<Result<(), Self::Error>>
       where Self: Unpin { ... }
    fn start_send_unpin(&mut self, item: Item) -> Result<(), Self::Error>
       where Self: Unpin { ... }
    fn poll_flush_unpin(
        &mut self,
        cx: &mut Context<'_>
    ) -> Poll<Result<(), Self::Error>>
       where Self: Unpin { ... }
    fn poll_close_unpin(
        &mut self,
        cx: &mut Context<'_>
    ) -> Poll<Result<(), Self::Error>>
       where Self: Unpin { ... }
}

Module tokio_util::codec 提供了将 AsyncRead/AsyncWrite 转换为 Stream/Sink, 并且将 byte stream sequence 转换为 有意义的 chunks 即 frames 的能力:

struct FrameWrite
A Sink of frames encoded to an AsyncWrite. 当作 Sink 来发送数据并编码为 Frame;
struct FrameRead
A Stream of messages decoded from an AsyncRead . 当作 Stream 来读取数据并解码为 Frame;
struct Framed
A unified Stream and Sink interface to an underlying I/O object, using the Encoder and Decoder traits to encode and decode frames.

Stream 从 AsyncRead 转换而来, 它的 StreamExt trait 提供的 next() 方法一次返回一个 chunk (bytes::Bytes 类型) .

Sink 从 AsyncWrite 转换而来, 它的 SinkExt trait 提供的 send()/send_all()/feed() 方法可以用于发送数据.

在创建 FrameWrite/FrameRead/Framed 时需要传入实现 Encoder 和 Decoder trait 的对象, 用于从 Stream 中解码出 Frame 对象, 向 Sink 中写入编码的 Frame 对象.

tokio_util::codec 提供了如下实现 Encoder 和 Decoder trait 的类型:

  • AnyDelimiterCodec A simple Decoder and Encoder implementation that splits up data into chunks based on any character in the given delimiter string.
  • BytesCodec A simple Decoder and Encoder implementation that just ships bytes around.
  • LinesCodec A simple Decoder and Encoder implementation that splits up data into lines.
// encoding
use futures::sink::SinkExt;
use tokio_util::codec::LinesCodec;
use tokio_util::codec::FramedWrite;

#[tokio::main]
async fn main() {
    let buffer = Vec::new();
    let messages = vec!["Hello", "World"];
    let encoder = LinesCodec::new();

    //  buffer 实现了AsyncWrite,故可以作为 FramedWrite::new() 参数,返回的 writer 实现了 Sink 和
    // SinkExt trait。
    let mut writer = FramedWrite::new(buffer, encoder);
    writer.send(messages[0]).await.unwrap(); // 异步想 Sink 写数据,自动编码为 Frame
    writer.send(messages[1]).await.unwrap();

    let buffer = writer.get_ref();
    assert_eq!(buffer.as_slice(), "Hello\nWorld\n".as_bytes());
}

// decoding
use tokio_stream::StreamExt;
use tokio_util::codec::LinesCodec;
use tokio_util::codec::FramedRead;

#[tokio::main]
async fn main() {
    let message = "Hello\nWorld".as_bytes();
    let decoder = LinesCodec::new();

    // message 实现了 AsyncRead,故可以作为 FrameRead::new() 参数,返回的 reader 实现了 Stream 和
    // StreamExt。每次读取返回一个解码后的 frame。
    let mut reader = FramedRead::new(message, decoder);
    let frame1 = reader.next().await.unwrap().unwrap();
    let frame2 = reader.next().await.unwrap().unwrap();
    assert!(reader.next().await.is_none());

    assert_eq!(frame1, "Hello");
    assert_eq!(frame2, "World");
}

FrameReader 从 Stream Buf 中使用 decoder 来解码出 Frame 的过程大概如下:

use tokio::io::AsyncReadExt;

let mut buf = bytes::BytesMut::new(); // buf 是内部带读写指针的缓存
loop {
    // The read_buf call will append to buf rather than overwrite existing data.
    let len = io_resource.read_buf(&mut buf).await?;
    if len == 0 {
        while let Some(frame) = decoder.decode_eof(&mut buf)? {
            yield frame;
        }
        break;
    }
    while let Some(frame) = decoder.decode(&mut buf)? { // 解码出 frame,如果 buf 中数据不足,返回 None
        yield frame;
    }
}

FrameWriter 向 Sink 中写入使用 encoder 编码的数据的大概过程如下:

use tokio::io::AsyncWriteExt;
use bytes::Buf;

const MAX: usize = 8192;

let mut buf = bytes::BytesMut::new(); // buf 是内部带读写指针的缓存
loop {
    tokio::select! {
        // 持续向 buf 写数据
        num_written = io_resource.write(&buf), if !buf.is_empty() => {
            buf.advance(num_written?);
        },
        // buf 中数据可以生成一个 frame
        frame = next_frame(), if buf.len() < MAX => {
            encoder.encode(frame, &mut buf)?;
        },
        _ = no_more_frames() => {
            io_resource.write_all(&buf).await?;
            io_resource.shutdown().await?;
            return Ok(());
        },
    }
}

也可以为自定义类型实现 Encoder 和 Decoder trait, 从而可以在 Frame 中使用:

// 实现 Decoder trait, 从输入的 src: &mut BytesMut 中解析出 Item 类型对象
use tokio_util::codec::Decoder;
use bytes::{BytesMut, Buf};

struct MyStringDecoder {}

const MAX: usize = 8 * 1024 * 1024;

impl Decoder for MyStringDecoder {
    type Item = String;
    type Error = std::io::Error;

    fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
        if src.len() < 4 {
            // Not enough data to read length marker.
            return Ok(None);
        }

        // Read length marker.
        let mut length_bytes = [0u8; 4];
        length_bytes.copy_from_slice(&src[..4]);
        let length = u32::from_le_bytes(length_bytes) as usize;

        // Check that the length is not too large to avoid a denial of service attack where the
        // server runs out of memory.
        if length > MAX {
            return Err(std::io::Error::new(
                std::io::ErrorKind::InvalidData,
                format!("Frame of length {} is too large.", length)
            ));
        }

        if src.len() < 4 + length {
            // The full string has not yet arrived.
            //
            // We reserve more space in the buffer. This is not strictly necessary, but is a good idea
            // performance-wise.
            src.reserve(4 + length - src.len());

            // We inform the Framed that we need more bytes to form the next frame.
            return Ok(None);
        }

        // Use advance to modify src such that it no longer contains this frame.
        let data = src[4..4 + length].to_vec();
        src.advance(4 + length);

        // Convert the data to a string, or fail if it is not valid utf-8.
        match String::from_utf8(data) {
            Ok(string) => Ok(Some(string)),
            Err(utf8_error) => {
                Err(std::io::Error::new(
                    std::io::ErrorKind::InvalidData,
                    utf8_error.utf8_error(),
                ))
            },
        }
    }
}

// 实现 Encoder trait
use tokio_util::codec::Encoder;
use bytes::BytesMut;

struct MyStringEncoder {}

const MAX: usize = 8 * 1024 * 1024;

impl Encoder<String> for MyStringEncoder {
    type Error = std::io::Error;

    fn encode(&mut self, item: String, dst: &mut BytesMut) -> Result<(), Self::Error> {
        // Don't send a string if it is longer than the other end will
        // accept.
        if item.len() > MAX {
            return Err(std::io::Error::new(
                std::io::ErrorKind::InvalidData,
                format!("Frame of length {} is too large.", item.len())
            ));
        }

        // Convert the length into a byte array.
        // The cast to u32 cannot overflow due to the length check above.
        let len_slice = u32::to_le_bytes(item.len() as u32);

        // Reserve space in the buffer.
        dst.reserve(4 + item.len());

        // Write the length and string to the buffer.
        dst.extend_from_slice(&len_slice);
        dst.extend_from_slice(item.as_bytes());
        Ok(())
    }
}

Module tokio_util::time 提供了 DelayQueue 类型:

  1. pub fn insert_at(&mut self, value: T, when: Instant) -> Key: 插入元素, 并指定过期的绝对时间;
  2. pub fn insert(&mut self, value: T, timeout: Duration) -> Key: 插入元素, 并指定超时时间;
  3. pub fn poll_expired(&mut self, cx: &mut Context<’_>) -> Poll<Option<Expired<T>>>: 返回过期的下一个元素的 key, 一般需要使用 futures::ready!() 来 poll 返回的 Poll 对象。(不能使用 .await, 因为 Poll 类型没有实现 Future);
  4. 通过 insert 返回的 Key, 后续可以查询,删除,重置对应的元素;
use tokio_util::time::{DelayQueue, delay_queue};

use futures::ready; // 也可以使用 std::task::ready!() 宏
use std::collections::HashMap;
use std::task::{Context, Poll};
use std::time::Duration;

struct Cache {
    entries: HashMap<CacheKey, (Value, delay_queue::Key)>,
    expirations: DelayQueue<CacheKey>,
}

const TTL_SECS: u64 = 30;

impl Cache {
    fn insert(&mut self, key: CacheKey, value: Value) {
        let delay = self.expirations.insert(key.clone(), Duration::from_secs(TTL_SECS));
        self.entries.insert(key, (value, delay));
    }

    fn get(&self, key: &CacheKey) -> Option<&Value> {
        self.entries.get(key).map(|&(ref v, _)| v)
    }

    fn remove(&mut self, key: &CacheKey) {
        if let Some((_, cache_key)) = self.entries.remove(key) {
            self.expirations.remove(&cache_key);
        }
    }

    fn poll_purge(&mut self, cx: &mut Context<'_>) -> Poll<()> {
        // ready!() 宏返回 Poll::Ready 的值, 如果不 Ready 则一致阻塞。
        while let Some(entry) = ready!(self.expirations.poll_expired(cx)) {
            self.entries.remove(entry.get_ref());
        }
        Poll::Ready(())
    }
}

Trait tokio_util::time::FutureExt 为所有实现了 Future 的对象,添加 timeout() 方法:

  • 对于 Stream,可以使用 StreamExt 提供的 timeout() 方法;
pub trait FutureExt: Future {
    // Provided method
    fn timeout(self, timeout: Duration) -> Timeout<Self>
       where Self: Sized { ... }
}

// 示例
use tokio::{sync::oneshot, time::Duration};
use tokio_util::time::FutureExt;
let (tx, rx) = oneshot::channel::<()>();
let res = rx.timeout(Duration::from_millis(10)).await;
assert!(res.is_err());

13 unit testing
#

https://tokio.rs/tokio/topics/testing

#[tokio::test] 默认创建一个 current_thread runtime.

Pausing and resuming time in tests

  • pause time: The current value of Instant::now() is saved and all subsequent calls to Instant::now() will return the saved value. The saved value can be changed by advance or by the time auto-advancing once the runtime has no work to do. This only affects the Instant type in Tokio, and the Instant in std continues to work as normal.
#[tokio::test]
async fn paused_time() {
    tokio::time::pause();
    let start = std::time::Instant::now();
    tokio::time::sleep(Duration::from_millis(500)).await;
    println!("{:?}ms", start.elapsed().as_millis()); // This code prints 0ms on a reasonable machine.
}

可以使用属性来更简便的开启 time pause:

#[tokio::test(start_paused = true)]
async fn paused_time() {
    let start = std::time::Instant::now();
    tokio::time::sleep(Duration::from_millis(500)).await;
    println!("{:?}ms", start.elapsed().as_millis());
}

虽然开启了 time pause, 但是异步函数的执行顺序和时间关系还是正常保持的:

  • 立即打印 4 次 “Tick!”
#[tokio::test(start_paused = true)]
async fn interval_with_paused_time() {
    let mut interval = interval(Duration::from_millis(300));
    let _ = timeout(Duration::from_secs(1), async move {
        loop {
            interval.tick().await;
            println!("Tick!");
        }
    })
    .await;
}

使用 tokio_test::io::Builder 来 Mock AsyncRead and AsyncWrite:

use tokio::io::{AsyncBufReadExt, AsyncRead, AsyncWrite, AsyncWriteExt, BufReader};

async fn handle_connection<Reader, Writer>(
    reader: Reader,
    mut writer: Writer,
) -> std::io::Result<()>
where
    Reader: AsyncRead + Unpin,
    Writer: AsyncWrite + Unpin,
{
    let mut line = String::new();
    let mut reader = BufReader::new(reader);

    loop {
        if let Ok(bytes_read) = reader.read_line(&mut line).await {
            if bytes_read == 0 {
                break Ok(());
            }
            writer
                .write_all(format!("Thanks for your message.\r\n").as_bytes())
                .await
                .unwrap();
        }
        line.clear();
    }
}

#[tokio::test]
async fn client_handler_replies_politely() {
    let reader = tokio_test::io::Builder::new()
        .read(b"Hi there\r\n")
        .read(b"How are you doing?\r\n")
        .build();
    let writer = tokio_test::io::Builder::new()
        .write(b"Thanks for your message.\r\n")
        .write(b"Thanks for your message.\r\n")
        .build();
    let _ = handle_connection(reader, writer).await;
}

14 tokio::net
#

TCP/UDP/Unix bindings for tokio.

  1. TcpListener and TcpStream provide functionality for communication over TCP
  2. UdpSocket provides functionality for communication over UDP
  3. UnixListener and UnixStream provide functionality for communication over a Unix Domain Stream Socket (available on Unix only)
  4. UnixDatagram provides functionality for communication over Unix Domain Datagram Socket (available on Unix only)
  5. tokio::net::unix::pipe for FIFO pipes (available on Unix only)
  6. tokio::net::windows::named_pipe for Named Pipes (available on Windows only)

提供了一个函数 lookup_hostnet Performs a DNS resolution.

use tokio::net;
use std::io;

#[tokio::main]
async fn main() -> io::Result<()> {
    for addr in net::lookup_host("localhost:3000").await? {
        println!("socket address is {}", addr);
    }

    Ok(())
}

TcpListener 提供了 bind/accept 方法:

  • bind() 关联函数返回 TcpListener 类型对象;
  • accept() 方法返回 TCPStream 和对端 SocketAddr;
  • from_std()/into_std() 方法可以在标准库的 TcpListener 间转换, 这样可以 使用标准库创建和设置 TcpListener, 然后创建 tokio TcpListener.
      use std::error::Error;
      use tokio::net::TcpListener;
    
      #[tokio::main]
      async fn main() -> Result<(), Box<dyn Error>> {
          let std_listener = std::net::TcpListener::bind("127.0.0.1:0")?;
          std_listener.set_nonblocking(true)?;
          let listener = TcpListener::from_std(std_listener)?;
          Ok(())
      }
    

TcpStream 实现了 AsyncRead/AsyncWrite, 通过 split() 方法可以分别返回读端和写端.

TcpSocket 是未转换为 TcpListener 和 TcpStream 的对象, 用于设置 TCP 相关参数.

  • pub fn new_v4() -> Result<TcpSocket>

  • pub fn new_v6() -> Result<TcpSocket>

  • pub fn bind(&self, addr: SocketAddr) -> Result<()>

  • pub fn bind_device(&self, interface: Option<&[u8]>) -> Result<()>

  • pub async fn connect(self, addr: SocketAddr) -> Result<TcpStream>

  • pub fn listen(self, backlog: u32) -> Result<TcpListener>

  • pub fn set_keepalive(&self, keepalive: bool) -> Result<()>

  • pub fn set_reuseaddr(&self, reuseaddr: bool) -> Result<()>

  • pub fn set_recv_buffer_size(&self, size: u32) -> Result<()>

use tokio::net::TcpSocket;

use std::io;

#[tokio::main]
async fn main() -> io::Result<()> {
    let addr = "127.0.0.1:8080".parse().unwrap();

    let socket = TcpSocket::new_v4()?;
    // On platforms with Berkeley-derived sockets, this allows to quickly
    // rebind a socket, without needing to wait for the OS to clean up the
    // previous one.
    //
    // On Windows, this allows rebinding sockets which are actively in use,
    // which allows “socket hijacking”, so we explicitly don't set it here.
    // https://docs.microsoft.com/en-us/windows/win32/winsock/using-so-reuseaddr-and-so-exclusiveaddruse
    socket.set_reuseaddr(true)?;
    socket.bind(addr)?;

    let listener = socket.listen(1024)?;
    Ok(())


    let addr = "127.0.0.1:8080".parse().unwrap();
    let socket = TcpSocket::new_v4()?;
    let stream = socket.connect(addr).await?;
    Ok(())
}

tokio 的 TcpStream::connect() 没有提供 timeout,但是可以使用 tokio::time::timeout 来实现:

const CONNECTION_TIME: u64 = 100;

// ...

let (socket, _response) = match tokio::time::timeout(
     Duration::from_secs(CONNECTION_TIME),
     tokio::net::TcpStream::connect("127.0.0.1:8080")
 )
 .await
 {
     Ok(ok) => ok,
     Err(e) => panic!(format!("timeout while connecting to server : {}", e)),
 }
 .expect("Error while connecting to server")

同时,tokio 的 TcpStream 也没有提供其它 read/write timeout,但是可以标准库提供了,可以从标准库 TcpStream 创建tokio TcpStream:

let std_stream = std::net::TcpStream::connect("127.0.0.1:8080")
    .expect("Couldn't connect to the server...");
std_stream.set_write_timeout(None).expect("set_write_timeout call failed");
std_stream.set_nonblocking(true)?;
let stream = tokio::net::TcpStream::from_std(std_stream)?;

如果要使用其它标准库或 tokio::net 没有提供的 socket 设置,可以使用 socket2 crate, 例如为 socket设置更详细的 KeepAalive 参数,设置 tcp user timeout 等:

use std::time::Duration;

use socket2::{Socket, TcpKeepalive, Domain, Type};

let socket = Socket::new(Domain::IPV4, Type::STREAM, None)?;
let keepalive = TcpKeepalive::new()
    .with_time(Duration::from_secs(4));
    // Depending on the target operating system, we may also be able to
    // configure the keepalive probe interval and/or the number of
    // retries here as well.

socket.set_tcp_keepalive(&keepalive)?;

关于 TCP_USER_TIMEOUT 可以为连接建立(即 connect())和数据读写指定超时时间:

  1. https://codearcana.com/posts/2015/08/28/tcp-keepalive-is-a-lie.html
  2. https://blog.cloudflare.com/when-tcp-sockets-refuse-to-die/

对于 UDP,只有 UdpSocket 一种类型,它提供 bind/connet/send/recv/send_to/recv_from 等方法。

bind
监听指定的地址和端口。由于 UDP 是无连接的,使用 recv_from 来接收任意 client 发送的数据,使用 send_to可以向任意 target 发送数据。
connect
UDP 虽然是无连接的,但是也支持使用 connect() 方法,效果是为返回的 UdpSocket 指定了 target ip 和port,可以使用 send/recv 方法来只向该 target 发送和接收数据。
use tokio::net::UdpSocket;
use std::io;

#[tokio::main]
async fn main() -> io::Result<()> {
    let sock = UdpSocket::bind("0.0.0.0:8080").await?;
    // use `sock`
    Ok(())
}

对于 Unix Socket,提供两种类型:

  1. 面向连接的: UnixListener,UnixStream;
  2. 面向无连接的:UnixDatagram

Unix Socket 使用本地文件(一般是 .sock 结尾)寻址(named socket),也可以使用没有关联文件的 unnamed Unix Socket。

关闭 named socket 时,对应的 socket 文件并不会被自动删除,如果没有 unix socket bind 到该文件上,则 client 连接会失败。

  1. pub fn bind<P>(path: P) -> Result<UnixDatagram> where P: AsRef<Path> : 使用指定 socket 文件创建一个 named unix Datagram socket;
  2. pub fn pair() -> Result<(UnixDatagram, UnixDatagram)>: 返回一对 unamed unix Datagram socket;
  3. pub fn unbound() -> Result<UnixDatagram>:创建一个没有绑定任何 addr 的 socket。
  4. pub fn connect<P: AsRef<Path>>(&self, path: P) -> Result<()>:Datagram 是无连接的,但调用该方法后,可以使用send/recv 从指定的 path 收发消息。
use tokio::net::UnixDatagram;
use tempfile::tempdir;

// We use a temporary directory so that the socket files left by the bound sockets will get cleaned up.
let tmp = tempdir()?;

// Bind each socket to a filesystem path
let tx_path = tmp.path().join("tx");
let tx = UnixDatagram::bind(&tx_path)?;

let rx_path = tmp.path().join("rx");
let rx = UnixDatagram::bind(&rx_path)?;

let bytes = b"hello world";
tx.send_to(bytes, &rx_path).await?;

let mut buf = vec![0u8; 24];
let (size, addr) = rx.recv_from(&mut buf).await?;

let dgram = &buf[..size];
assert_eq!(dgram, bytes);
assert_eq!(addr.as_pathname().unwrap(), &tx_path);



// unnamed unix socket
use tokio::net::UnixDatagram;

// Create the pair of sockets
let (sock1, sock2) = UnixDatagram::pair()?;

// Since the sockets are paired, the paired send/recv
// functions can be used
let bytes = b"hello world";
sock1.send(bytes).await?;

let mut buff = vec![0u8; 24];
let size = sock2.recv(&mut buff).await?;

let dgram = &buff[..size];
assert_eq!(dgram, bytes);

UnixListener 的 bind() 返回 UnixListener,然后它的 accept() 方法返回 UnixStream:

use tokio::net::UnixListener;

#[tokio::main]
async fn main() {
    let listener = UnixListener::bind("/path/to/the/socket").unwrap();
    loop {
        match listener.accept().await {
            Ok((stream, _addr)) => {
                println!("new client!");
            }
            Err(e) => { /* connection failed */ }
        }
    }
}

UnixStream 实现了 AsyncRead/AsyncWrite,它的 connect() 方法也返回一个 UnixStream:

use tokio::io::Interest;
use tokio::net::UnixStream;
use std::error::Error;
use std::io;

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
    let dir = tempfile::tempdir().unwrap();
    let bind_path = dir.path().join("bind_path");
    let stream = UnixStream::connect(bind_path).await?;

    loop {
        let ready = stream.ready(Interest::READABLE | Interest::WRITABLE).await?;

        if ready.is_readable() {
            let mut data = vec![0; 1024];
            // Try to read data, this may still fail with `WouldBlock`
            // if the readiness event is a false positive.
            match stream.try_read(&mut data) {
                Ok(n) => {
                    println!("read {} bytes", n);
                }
                Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
                    continue;
                }
                Err(e) => {
                    return Err(e.into());
                }
            }

        }

        if ready.is_writable() {
            // Try to write data, this may still fail with `WouldBlock`
            // if the readiness event is a false positive.
            match stream.try_write(b"hello world") {
                Ok(n) => {
                    println!("write {} bytes", n);
                }
                Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
                    continue;
                }
                Err(e) => {
                    return Err(e.into());
                }
            }
        }
    }
}

15 tokio::signal
#

tokio::signal module 提供了信号捕获和处理的能力。

pub async fn ctrl_c() -> Result<()>: 当收到 C-c 发送的 SIGINT 信号时 .await 返回;

use tokio::signal;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    signal::ctrl_c().await?;
    println!("ctrl-c received!");
    Ok(())
}

对于其它信号,可以使用 SignalKind 和 signal 函数创建一个 Signal,然后调用它的 recv() 方法:


// Wait for SIGHUP on Unix
use tokio::signal::unix::{signal, SignalKind};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // An infinite stream of hangup signals.
    let mut stream = signal(SignalKind::hangup())?;

    // Print whenever a HUP signal is received
    loop {
        stream.recv().await;
        println!("got signal HUP");
    }
}

16 tokio::time
#

  1. Sleep is a future that does no work and completes at a specific Instant in time.
  2. Interval is a stream yielding a value at a fixed period. It is initialized with a Duration and repeatedly yields each time the duration elapses.
  3. Timeout: Wraps a future or stream, setting an upper bound to the amount of time it is allowed to execute. If the future or stream does not complete in time, then it is canceled and an error is returned.

不能在 async 中使用标准库的 sleep,它会阻塞当前线程执行其它异步任务。

These types must be used from within the context of the Runtime.

// Wait 100ms and print “100 ms have elapsed”
use std::time::Duration;
use tokio::time::sleep;

#[tokio::main]
async fn main() {
    sleep(Duration::from_millis(100)).await;
    println!("100 ms have elapsed");
}

// Require that an operation takes no more than 1s.
use tokio::time::{timeout, Duration};
async fn long_future() {}
let res = timeout(Duration::from_secs(1), long_future()).await; // 任意 Feature 都可以设置异步 timeout
if res.is_err() {
    println!("operation timed out");
}

// A simple example using interval to execute a task every two seconds.
use tokio::time;
async fn task_that_takes_a_second() {
    println!("hello");
    time::sleep(time::Duration::from_secs(1)).await
}
#[tokio::main]
async fn main() {
    let mut interval = time::interval(time::Duration::from_secs(2));
    for _i in 0..5 {
        interval.tick().await;
        task_that_takes_a_second().await;
    }
}

tokio::time 同时提供了 pause()/resume()/advance() 方法:

  1. tokio::time::pause(): 将当前 Instant::now() 保存,后续调用 Instant::now() 时将返回保存的值。保存的值可以使用 advance() 修改。该函数只适合 current_thread runtime,也就是 #[tokio::test] 默认使用的 runtime。
  2. Auto-advance:If time is paused and the runtime has no work to do, the clock is auto-advanced to the next pending timer. This means that Sleep or other timer-backed primitives can cause the runtime to advance the current time when awaited.
#[tokio::main(flavor = "current_thread", start_paused = true)]
async fn main() {
   println!("Hello world");
}

17 tokio::process
#

use tokio::io::AsyncWriteExt;
use tokio::process::Command;

use std::process::Stdio;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let mut cmd = Command::new("sort");

    // Specifying that we want pipe both the output and the input.  Similarly to capturing the
    // output, by configuring the pipe to stdin it can now be used as an asynchronous writer.
    cmd.stdout(Stdio::piped());
    cmd.stdin(Stdio::piped());

    let mut child = cmd.spawn().expect("failed to spawn command");

    // These are the animals we want to sort
    let animals: &[&str] = &["dog", "bird", "frog", "cat", "fish"];

    let mut stdin = child
        .stdin
        .take()
        .expect("child did not have a handle to stdin");

    // Write our animals to the child process Note that the behavior of `sort` is to buffer _all
    // input_ before writing any output.  In the general sense, it is recommended to write to the
    // child in a separate task as awaiting its exit (or output) to avoid deadlocks (for example,
    // the child tries to write some output but gets stuck waiting on the parent to read from it,
    // meanwhile the parent is stuck waiting to write its input completely before reading the
    // output).
    stdin
        .write(animals.join("\n").as_bytes())
        .await
        .expect("could not write to stdin");

    // We drop the handle here which signals EOF to the child process.  This tells the child
    // process that it there is no more data on the pipe.
    drop(stdin);

    let op = child.wait_with_output().await?;

    // Results should come back in sorted order
    assert_eq!(op.stdout, "bird\ncat\ndog\nfish\nfrog\n".as_bytes());

    Ok(())
}

使用其它进程的输出作为输入:

use tokio::join;
use tokio::process::Command;
use std::process::Stdio;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let mut echo = Command::new("echo")
        .arg("hello world!")
        .stdout(Stdio::piped())
        .spawn()
        .expect("failed to spawn echo");

    let tr_stdin: Stdio = echo
        .stdout
        .take()
        .unwrap()
        .try_into()
        .expect("failed to convert to Stdio");

    let tr = Command::new("tr")
        .arg("a-z")
        .arg("A-Z")
        .stdin(tr_stdin)
        .stdout(Stdio::piped())
        .spawn()
        .expect("failed to spawn tr");

    let (echo_result, tr_output) = join!(echo.wait(), tr.wait_with_output());

    assert!(echo_result.unwrap().success());

    let tr_output = tr_output.expect("failed to await tr");
    assert!(tr_output.status.success());

    assert_eq!(tr_output.stdout, b"HELLO WORLD!\n");

    Ok(())
}
rust crate - 这篇文章属于一个选集。
§ 1: 本文

相关文章

anyhow
··1816 字
Rust Rust-Crate
anyhow crate 提供了自定义 Error 类型和 Result 类型,Error 类型自带 backtrace 和 context,支持用户友好的格式化信息输出。
bytes
··2834 字
Rust Rust-Crate
bytes 提供了高效的 zero-copy 连续内存区域的共享和读写能力。
chrono
··4003 字
Rust Rust-Crate
chrono 提供了丰富的 Date/Time 类型和相关操作。
hyper
··797 字
Rust Rust-Crate
hyper 是高性能的异步 HTTP 1/2 底层库。