跳过正文

14. 异步:async

·
目录
rust-lang - 这篇文章属于一个选集。
§ 14: 本文

Thread 的问题:

  1. 每个 thread 都有固定大小的 stack 内存占用,成千上万个 thread 会消耗大量的内存;
  2. 内核调度 thread 运行的上下文切换开销大;
  3. thread 在执行 read/write 系统调用时,尤其是网络通信时,会有大量的时间被 block 等待,此时该 thread 不能再执行其它事情,效率低;

异步(async)通过创建大量异步 task,然后使用一个 thread pool 来执行它们,在某个 task 被阻塞时 async executor 自动调度其它 task 到 thread 上运行,从而执行效率和并发更高。task 相比 thread 更轻量化,没有了大量 thread stack 开销,切换执行速度也更快,所以一 个程序内部可以创建大量的异步 task。

参考:https://rust-lang.github.io/async-book/part-guide/concurrency.html

Future
#

async 的核心是 Future trait, 定义如下:

trait Future {
    type Output;

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
}

enum Poll<T> {
    Ready(T),
    Pending,
}

它的 poll() 方法的 self 类型是 Pin<&mut Self> ,表示一旦开始第一次 poll 后,Self 对象的内存地址必须是固定的,不能被转 移, 这是为了确保 Future 对象内部保存的栈变量地址指针继续有效。

poll() 方法的第二个参数 Context 封装了 Waker ,当 poll Pending 时,Future 的实现可以保存该 waker(一般是在一个辅助线程中), 后续当条件满足时调用 waker 来唤醒 async executor 来重新 poll 自己。

Future 对象只有被 poll 时才开始执行, 一般使用 .await 来进行 poll,它返回 Ready 后的值(await 的原理参考后文)。

创建 Future 对象
#

有多种方式来创建实现 Future trait 的对象:

  1. 为自定义类型实现 Future trait;

  2. 使用 async block 语法:

  • async {xxx} 返回一个 impl Future<Output=XX> + !Unpin + !Send + !Sync 的匿名类型对象;

  • async move {xxx} 返回一个 impl Future<Output=XX> + 'static + !Unpin + !Send + !Sync 的匿名类型对象;

  1. 使用 async fn 语法:
  • async fn(xx) {xxx} 返回一个 fn(xx) -> impl Future<Output=XX> + !Unpin + !Send + !Sync 的匿名函数类型对象;
  1. 使用返回 async block 的 closure 语法:
  • |xx| async {xx} 返回一个 impl FnXX(xx) -> impl Future<Output=XX> + !Unpin + !Send + !Sync 的匿名闭包类型对象;

  • |xx| async move {xx} 返回一个 impl FnXX(xx) -> impl Future<Output=XX> + 'static + !Unpin + !Send + !Sync 的匿名闭包类型对象;

  1. 使用 async closure 语法:
  • async |xx| {}async move |xx| {} 返回 impl FnXX(xx) -> impl Future<Output=XX> + !Unpin + !Send + !Sync 的匿名闭包类型对象;
  1. 普通函数返回 impl Future 类型对象;
fn read_to_string(&mut self, buf: &mut String) -> impl Future<Output = Result<usize>> {
    // 可以使用 async block 来生成待返回的 impl Future 对象
    async move {
        Ok::<Result(usize)>(2)
    }
}

实现 Future trait
#

// https://mazhen.tech/p/pinboxdyn-future%E8%A7%A3%E6%9E%90/

pub struct HttpRequest {
    url: String,
}

pub struct HttpResponse {
    code: u32,
}

pub struct ResponseFuture {
    request: HttpRequest,
}

impl Future for ResponseFuture {
    type Output = Result<HttpResponse, Error>;

    fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
        println!("process url:{}", &self.as_ref().get_ref().request.url);
        Poll::Ready(Ok(HttpResponse { code: 200 }))
    }
}

// 在实现 Service 时,关联类型 type Future 设置为我们手工实现的 Future:
struct RequestHandler;

impl Service<HttpRequest> for RequestHandler {
    type Response = HttpResponse;
    type Error = Error;
    type Future = ResponseFuture;

    fn poll_ready(&mut self, cx: &mut std::task::Context<'_>) -> Poll<Result<(), Self::Error>> {
        Poll::Ready(Ok(()))
    }

    fn call(&mut self, req: HttpRequest) -> Self::Future {
        ResponseFuture { request: req }
    }
}

// 然后就可以像正常的 Future 一样,使用 Service:
#[tokio::main]
async fn main() {
    let mut service = RequestHandler {};
    match service.call(HttpRequest { url: "/user/mazhen".to_owned(),}) .await
    {
        Ok(r) => println!("Response code: {}", r.code),
        Err(e) => println!("process failed. {:?}", e),
    }
}

async block
#

async block 类似于闭包,提供了一个异步上下文,内部可以使用 .await, 返回一个 impl Future 的匿名类型对象:

示例:使用 async move {} 构建一个异步函数:

注:和同步闭包 move || {} 类似,move 并不能保证生成的 Future 对象一定是 ‘static 的。

use std::io;
use std::future::Future;
fn cheapo_request<'a>(host: &'a str, port: u16, path: &'a str)
    -> impl Future<Output = io::Result<String>> + 'a
{
    // async move 也不能保证返回的 Future 对象一定是 'static 的
    // 特别是这里捕获的 host、path 本身是借用,而且不是 'static,
    // 所以返回的 Future 也是具有 'a 而非 'static
    async move {
        //... function body ...
    }
}

// 解决办法:捕获对象而非借用类型
use std::io;
use std::future::Future;
fn cheapo_request(host: &str, port: u16, path: &str) -> impl Future<Output = io::Result<String>> + 'static
{
    let host = host.to_string();
    let path = path.to_string();

    async move {
        // 捕获了 host、path 的所有权
        //... use &*host, port, and path ...
    }
}

// 下面是 OK 的:
use async_std::task;
let mut handles = vec![];
for (host, port, path) in requests {
    // 将 host、port、path 所有权转移到 async block 中
    handles.push(task::spawn(async move {
        cheapo_request(&host, port, &path).await
    }));
}

async block 中使用 ? 来传播错误或 return 值都是 async block 返回,而不是它所在的函数返回

  • 作为对比,在函数的普通 block、match branch 中使用 ? 时,是所在的函数返回;

  • async block 中不能使用 break/continue,如果要提前返回需要使用 return。

// https://doc.rust-lang.org/std/keyword.async.html#control-flow
async fn example() -> i32 {
    let x = async {
        return 5; // async block 提前返回,而不是所在的函数返回,返回返回值
    };

    x.await
}

//https://rust-lang.github.io/async-book/part-guide/more-async-await.html
async {
    let x = foo()?;   // This `?` only exits the async block, not the surrounding function.
    consume(x);

   // Ok(()) // 错误,需要指定 Ok 所属的 Result 完整类型,参考下面的例子

   // 正确,指定 Ok 所属的 Result 类型
   Ok::<(), std::io::Error>(())
   // 或者
   // std::result::Result::<(), std::io::Error>::Ok(());
}.await?

async block 可以和闭包一样捕获环境中的对象(借用或 move),可以指定 async move 来获得对象的所有权,这时 async block 返回 的 Future 具有 'static lifetime。

由于 async block 返回匿名的 impl Future<Output=XX> 对象,所以:

  1. 不能对返回的对象施加额外的限界,如 +Send+Unpin,但是编译器会自动判断它是否实现 Send/Unpin
  2. 也不能指定 Future 中的 Output 关联类型,当 Output 为 Result 时可能出错。解决办法:为 Ok 指定它所属的 Enum Result 类型。
let input = async_std::io::stdin();

// future 是 Future<Output=Result>
let future = async {
    let mut line = String::new();
    // ? 是 async block 返回,结果类型 std::io::Result<usize>
    input.read_line(&mut line).await?;
    println!("Read line: {}", line);

    // Ok(()) // 错误

    // 正确,指定 Ok 所属的 Result 类型
    Ok::<(), std::io::Error>(())
    // 或者
    // std::result::Result::<(), std::io::Error>::Ok(());
};

async {}async move {} 都是表达式,故可以作为闭包函数的返回值。

use async_std::net;
use async_std::task;

// serve_one 是一个 impl Future<Output=()> 的匿名类型对象
let serve_one = async {
    // ? 是 async block 返回(而非 block 所在的函数),类型是 Result
    let listener = net::TcpListener::bind("localhost:8087").await?;
    let (mut socket, _addr) = listener.accept().await?;
};

pub async fn many_requests(requests: Vec<(String, u16, String)>) -> Vec<std::io::Result<String>>
{
    let mut handles = vec![];
    for (host, port, path) in requests {
        handles.push(
            task::spawn_local(
                // 使用 async move 来获取 host、path 的所有权。
                async move {
                    cheapo_request(&host, port, &path).await
                }
            ));
    }
    //...
}

loop {
    async move {
        break; // error[E0267]: `break` inside of an `async` block
    }
}

async closure
#

async closure 有两种类型:

  1. Rust 1.75 之前:只能使用返回 async {} 的闭包来实现,如 || async {}|| async move {}
  2. Rust 1.75 及之后:支持 async closure,如 async || {}async move || {};

旧的 async block 实现
#

使用 || async move {} 来定义异步闭包,等效于生成 impl FnXX() -> impl Future<Output=YY> 类型的匿名类型对象:

let app = Router::new()
    .route(
        "/",
        any_service(service_fn(
            // 等效于生成一个 impl Fn() -> impl Future<Output=Result<Response, Infallible> 匿名类型对象
            |_: Request| async {
                let res = Response::new(Body::from("Hi from `GET /`"));
                Ok::<_, Infallible>(res) // service_fn 的闭包必须返回 Result
        }))
    )

但是这种实现,有如下问题:

  1. 不能以 &mut T 的方式捕获上下文对象;
  2. 输入参数可能不能有借用(因为同步闭包不支持 HRTB);
// 错误:同步闭包不支持 HRTB
// 返回的 Combined 也是有 lifetime 的,但是它的 lifetime 和 a、b 之间约束缺少定义,所以编译器报错。
// 如果只是纯输入有借用,但是输出没有借用(或为 'static) 且闭包内部没有借用方式捕获上下文,则是 OK 的。
let mut closure = |a: &i32, b: &str|  {Combined { num: a, text: b }}

// 错误:不支持 &mut T 捕获上下文对象
let mut vec: Vec<String> = vec![];
// closure 类型是:impl FnMut() -> impl Future<Output=()>
let closure = || async {
    // error: captured variable cannot escape `FnMut` closure body
    vec.push(ready(String::from("")).await);
};

/*
error: captured variable cannot escape `FnMut` closure body
--> src/main.rs:69:22
|
68 |       let mut vec: Vec<String> = vec![];
|           ------- variable defined here
69 |       let closure = || async {
|  ____________________-_^
| |                    |
| |                    inferred to be a `FnMut` closure
70 | |         vec.push(ready(String::from("")).await);
| |         --- variable captured here
71 | |     };
| |_____^ returns an `async` block that contains a reference to a captured variable, which then escapes the closure body
|
= note: `FnMut` closures only have access to their captured variables while they are executing...
= note: ...therefore, they cannot allow references to captured variables to escape
*/

但是 &T 捕获(对应实现的 Fn)、或则转移所有权是 OK 的:

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error + Send + 'static>> {
    let mut vec: Vec<String> = vec!["a".to_string()];
    // closure 类型是: impl Fn() -> impl Future<Output=()> 类型
    let closure = || async {
        vec.iter().for_each(|i| println!("{i}"));
    };
    closure().await;
}

对于上面 async {} 实现的 async closure 的不支持 &mut 捕获上下文,以及参数不支持借用(不支持 HRTB)的问题,可以通过 Rust 1.75+ 的 async closure 来解决。

新的 async closure 语法
#

Rust 1.75+ 开始正式支持 async closure: 使用 async || {} 语法(而非 || async {}),支持 &mut T 捕获上下文对象,支持 HRTB(编译器自动添加,但不能显式添加);

与以前的返回 async block 的 closure 不同,async closure 可以 &mut T 捕获上下文对象:

// 错误:通过返回 async block 的异步闭包不支持 &mut T 捕获上下文对象(&T 捕获 OK)
let vec: Vec<String> = vec![];
let closure = || async {
    vec.push(ready(String::from("")).await);
};

// OK: async cloure 支持 &mut T 捕获上下文对象
let vec: Vec<String> = vec![];
let closure = async || {
    // async closure 使用 &'closure mut Vec<String> 方式捕获了上下文对象 vec,直到这个闭包被 drop(如被 .await)。
    vec.push(ready(String::from("")).await);
};
// OK: move 捕获
let string: String = "Hello, world".into();
let closure = async move || {
    // 闭包获得了 string 的所有权
    ready(&string).await;
};

async closure 也支持 HTRB:

// 错误:同步闭包不支持 HRTB
let mut closure = |a: &i32, b: &str|  {Combined { num: a, text: b }}

// OK: 异步闭包支持自动添加 HRTB 和 move
let arg = async |x: i32| { async_add(x, 1).await };
let ret = async || -> Vec<Id> { async_iterator.collect().await };
let hr = async |x: &str| { do_something(x).await }; // 支持 HRTB
let s = String::from("hello, world");
let c = async move || { do_something(&s).await };

由于 async closure 的定义和同步闭包 closure 的定义一样,都不支持 liftime 声明,所以在输入、输出包含借用时,因缺少显式的 lifetime 关系约束,会编译失败。

解决办法:将闭包(async clousre、closure)转换为 fn 函数实现,因为 fn 函数支持显式的 lifetime 标记和约束定义。

async fn
#

async fn 分为: 1. 普通异步函数; 2. 位于 trait 中的异步函数。

两者在实现上有一些差异。

Rust 的 async fn/block/closure 提供了一种使用同步代码风格来写异步代码的机制,它们都是通过编译器创建的包含状态机的匿名 (枚举)类型来实现的。

终态机的终态和代码块中的 .await 有关,每个 .await 位置对应一个状态,因为这些位置是 async 运行时(executor)的 yield 点,对应一个新的 Future 对象。

// https://www.eventhelix.com/rust/rust-to-assembly-async-await/

// Await this function until the unit has reached the target position.
async fn goto(unit: UnitRef, pos: i32) {
    UnitGotoFuture {
        unit,
        target_pos: pos,
    }
    .await;
}

goto(unit.clone(), 10).await;
// The code here will execute after the unit has reached position 10

async fn/block/closure 的编译器实现
#

对于普通 async fn,以及 async block、async closure, 编译器会生成一个实现 impl Future + '_ 的匿名类型对象 (静态派发,后续不会再变化)。

对于 async fn,编译器将它糖化为返回 impl Future + '_ 的普通函数('_ 表示编译器会为其捕获函数的范型参数和 lifetime), 并且在函数内部创建一个实现该 trait 的隐式类型和对象,如果函数参数包含借用,则生成的 Future 对象不一定满足 ‘static 要求:

async fn foo(x: &i32) -> i32 {
    *x
}

// 等价为:返回类型为 impl Future 隐藏具体类型。
fn foo(x: Rc<i32>) -> impl Future<Output=i32> {
    // 编译器生成的一个具体类型,其中 state 是跨 .await 的状态机
    struct FooFuture { x: Rc<i32>, state: bool };
    // 为具体类型实现 Future trait
    impl Future for FooFuture { /* poll 实现 */ }
    FooFuture { x, state: false }
}

// 另一个例子:如果函数的参数包含借用类型,则返回的实现 impl Future 的隐式类型对象也具有 lifetime。
async fn foo(x: &str) -> usize {
    let len = x.len();
    len
}
// 等价为:这里 foo 返回一个 匿名 Future 类型,生命周期 'a 来自参数 &'a str。
fn foo<'a>(x: &'a str) -> impl std::future::Future<Output = usize> + 'a {
    struct FooFuture<'b> {x: &'b str, state: bool };
    impl<'b> Future for FooFuture<'b> {/* pool 实现*/ }
    FooFuture{x, state: false} // FooFuture 的 lifetime 和 x 一致,都是 'a
}
// 后续调用 foo 返回的 fut 类型为:impl Future<Output = i32> + '_,推导的生命周期和 v 一致。
let fut = foo(&mut v);
// 错误:fut 不是 'static 类型,不满足 spawn() 参数的限界要求
tokio::spawn(fut).await?;

虽然它们都没有声明实现 Send,但编译器在编译时可以根据跨 .await 的对象是否都实现 Send 来判断整体是否实现 Send:

  • 如果局部变量(输入参数、内部定义变量、捕获的对象)都是 Send(如 i32、Arc、String 等),那么编译器生成的匿名 Future 类型自动实现 Send。

  • 如果捕获了非 Send 类型(如 Rc、RefCell),Future 就不会实现 Send。

use tokio::io::{self, AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpListener;

#[tokio::main]
async fn main() -> io::Result<()> {
    let listener = TcpListener::bind("127.0.0.1:6142").await?;
    loop {
        let (mut socket, _) = listener.accept().await?;
        tokio::spawn(async move { // 这里的 async block 返回的 impl Future 对象由编译器自动推导实现了 Send
            let mut buf = vec![0; 1024];

            loop {
                match socket.read(&mut buf).await {
                    Ok(0) => return,
                    Ok(n) => {
                        if socket.write_all(&buf[..n]).await.is_err() {
                            return;
                        }
                    }
                    Err(_) => {
                        return;
                    }
                }
            }
        });
    }
}

一个带 .await&mut T 输入的 async fn 编译器实现的复杂例子:

  • 每个 .await 都会让编译器拆分出一个状态,对应一个状态枚举值。
  • 局部变量(包括函数输入、内部创建的变量)在跨 .await 时会被保存到状态机结构体里。
  • 由于状态机里可能持有栈变量的地址,如 &mut i32,必须通过 Pin 保证对象不被移动导致地址失效。
  • Rust 编译器自动生成这个状态机和 poll 方法,用户只需写 async/await。
  • poll 方法是一个大 match,根据不同状态推进计算。
  • Pin 和 Context 保证 Future 安全地暂停和恢复。
async fn foo(x: &mut i32) -> i32 {
    *x += 1;
    tokio::task::yield_now().await;   // 第一个 await
    *x *= 2;
    tokio::task::yield_now().await;   // 第二个 await
    *x + 3
}

// 去语法糖,返回 impl Future + '_
fn foo<'a>(x: &'a mut i32) -> impl std::future::Future<Output = i32> + 'a {
    async move {
        *x += 1;
        tokio::task::yield_now().await; // 第一个 await
        *x *= 2;
        tokio::task::yield_now().await; // 第二个 await
        *x + 3
    }
}
// 进一步去语法糖,编译器生成一个实现 Future 的匿名类型 FooFuture,内部的状态机保存 .await 点的 Future 状态
fn foo<'a>(x: &'a mut i32) -> impl std::future::Future<Output = i32> + 'a {
    FooFuture{x, state: State::Start{x}}
}

// 编译器为 async fn() { ... } 生成一个匿名类型,包含各 .await 状态机对象。
// 如果 async {} 通过 &T/&mut T 借用捕获了上下文对象,则生成的匿名类型是有 lifetime 标记的。
struct FooFuture<'a> {
    state: State<'a>,
}

// 编译器会为每个 .await 会生成一个状态枚举的分支。
enum State<'a> {
    // 枚举值的 field 包含跨 .await 的对象引用,所以在这些 .await 点间进行状态切换前,需要确保栈变量 x 的地址不变。
    // 所以 poll() 的 self 参数是 Pin<&mut Self> 类型,Pin 可以确保 Self 不被移动,进而确保这些栈变量地址有效。
    Start { x: &'a mut i32 },
    Awaiting1 { x: &'a mut i32, fut: tokio::task::YieldNow },
    Awaiting2 { x: &'a mut i32, fut: tokio::task::YieldNow },
    Done,
}

// Start:刚开始执行,捕获 &mut x。
// Awaiting1:第一个 await 处挂起,持有内部 future。
// Awaiting2:第二个 await 处挂起,持有内部 future。
// Done:完成状态。

// 为匿名类型实现 Future
use std::pin::Pin;
use std::task::{Context, Poll};

impl<'a> std::future::Future for FooFuture<'a> {
    type Output = i32;

    fn poll( mut self: Pin<&mut Self>, cx: &mut Context<'_> ) -> Poll<Self::Output> {
        unsafe {
            let me = self.get_unchecked_mut();
            loop {
                match &mut me.state {
                    // 进入第一个 await
                    State::Start { x } => {
                        *x += 1;
                        let fut = tokio::task::yield_now();
                        me.state = State::Awaiting1 { x: *x, fut };
                        return Poll::Pending;
                    }
                    State::Awaiting1 { x, fut } => {
                        // poll 内部 Future
                        match Pin::new(fut).poll(cx) {
                            Poll::Pending => return Poll::Pending,
                            Poll::Ready(()) => {
                                *x *= 2;
                                let fut = tokio::task::yield_now();
                                me.state = State::Awaiting2 { x: *x, fut };
                            }
                        }
                    }
                    State::Awaiting2 { x, fut } => {
                        match Pin::new(fut).poll(cx) {
                            Poll::Pending => return Poll::Pending,
                            Poll::Ready(()) => {
                                let result = *x + 3;
                                me.state = State::Done;
                                return Poll::Ready(result);
                            }
                        }
                    }
                    State::Done => panic!("polled after completion"),
                }
            }
        }
    }
}

// 执行流程示意
// Start:执行 *x += 1 → 创建第一个 future → 切换 Awaiting1 → Poll::Pending
// Awaiting1:poll 第一个 future,完成后执行 *x *= 2 → 创建第二个 future → 切换 Awaiting2 → Poll::Pending
// Awaiting2:poll 第二个 future,完成后计算 *x + 3 → 切换 Done → 返回结果
// Done:Future 已完成,再 poll 会 panic

总结:

  • async fn 语法糖 = 普通函数 + impl Future 返回值。
  • async {} = 被编译器转换成匿名 struct + 状态机;
  • async move 会移动捕获变量,使 Future ‘static
  • .await = 状态机的“中断点”,会把局部变量保存到状态机里,必须通过 Pin 来确保 Self 内存地址不变,这样栈变量的地址才一直有效。
  • Pin + Future::poll = 保证状态机在内存中不会移动,局部变量借用安全。
  • poll() 推进状态机,编译器为状态机自动实现 Future poll 方法;

Future 的 poll 接口要求 Pin<&mut Self> 类型,原因是:异步块内部的局部变量可能包含自引用,如 &self.x 指向同一个 Future 内 部的变量,而 Pin 保证 Future 内部状态的地址不会移动,从而保证自引用安全。

参考:

trait 中的 async fn 编译器实现
#

async fn 的第二种类型是 trait 中的 async fn ,它返回的 impl Future 匿名类型对象实现方式和普通 async fn 不 同,是通过一个 trait 匿名关联类型来实现,而且编译器为每个实现该 Trait 的类型都生成不同的关联类型,所以编译器不能判断它们是否都实现 Send,进而导致默认是未实现 Send

trait MyTrait {
    async fn do_work(&self) -> i32;
}

// 编译器会将其内部处理为类似:
trait MyTrait {
    type DoWorkFuture<'a>: Future<Output = i32> + 'a
    where Self: 'a;

    fn do_work(&self) -> Self::DoWorkFuture<'_>;
}

这会导致后续调用 trait 中定义的 async fn 函数时会因未实现 Send 而报错:

warning: use of `async fn` in public traits is discouraged as auto trait bounds cannot be specified
 --> src/lib.rs:7:5
  |
7 |     async fn fetch(&self, url: Url) -> HtmlBody;
  |     ^^^^^
  |
help: you can desugar to a normal `fn` that returns `impl Future` and add any desired bounds such as `Send`, but these cannot be relaxed without a breaking API change
  |
7 -     async fn fetch(&self, url: Url) -> HtmlBody;
7 +     fn fetch(&self, url: Url) -> impl std::future::Future<Output = HtmlBody> + Send;
  |

解决方案:

  1. async fn 转换为返回 impl Trait + SendPin<Box<dyn Future> + Send> 的普通函数:
// 示例 1:
// async fn fetch(&self, url: Url) -> HtmlBody;
fn fetch(&self, url: Url) -> impl std::future::Future<Output = HtmlBody> + Send; // impl Trait 无论是在函数输入还是输出位置都支持限界

// 示例 2:使用 Box::pin + dyn Future + Send 是 trait async fn 的常见做法。
use std::future::Future;

trait MyTrait {
    fn do_work(&self) -> Pin<Box<dyn Future<Output = i32> + Send + '_>>;
}

// 然后在实现里用 Box::pin(async move { ... }) 返回:
impl MyTrait for MyStruct {
    fn do_work(&self) -> Pin<Box<dyn Future<Output = i32> + Send + '_>> {
        Box::pin(async move {
            // 捕获的变量必须 Send
            42
        })
    }
}
  1. 使用 async-trait 宏: 它在 trait 层面隐藏这些复杂性,自动把 async fn 去糖为返回值 Pin<Box<dyn Future + Send +'_>>,从而满足多线程环境的限界要求。
use async_trait::async_trait;

#[async_trait]
trait MyTrait {
    async fn do_work(&self) -> i32;
}

struct MyStruct;

#[async_trait]
impl MyTrait for MyStruct {
    async fn do_work(&self) -> i32 {
        42
    }
}
  1. 或者使用 Rust 官方提供的 trait-variant crate (cargo add trait-variant)来为包含 async fn 的 trait 生成多个版本:
#[trait_variant::make(HttpService: Send)]
pub trait LocalHttpService {
    async fn fetch(&self, url: Url) -> HtmlBody;
}

// This creates two versions of your trait: LocalHttpService for single-threaded
// executors and HttpService for multithreaded work-stealing executors

// 生成的 HttpService 版本:
pub trait HttpService: Send {
    fn fetch(
        &self,
        url: Url,
    ) -> impl Future<Output = HtmlBody> + Send;
}

但是 trait-variant crate 并不能完全取代 async-trait crate,因为前者为 trait 中 async fn 返回的还是 impl Trait 对象,还是不满足 object-safe/dyn-compatibility 的要求,不支持动态派发。

由于有上面的额问题,impl Trait 一般不建议出现在 trait 的 Public APIs 中,而应该使用关联类型来解决上面的问题。

pub trait Service<Request> {
    type Response;
    type Error;
    type Future: Future<Output = Result<Self::Response, Self::Error>>;

    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>;

    fn call(&mut self, req: Request) -> Self::Future;
}

  // 示例:
  fn async_callback<F, Fut>(callback: F)
  where
      F: FnOnce() -> Fut,
      Fut: Future<Output = String>;


  // 示例:Fn trait bound 返回一个 Future 对象,该对象同时也需要是泛型参数,这样才能进一步限界
  impl<F, Fut, Res, S> Handler<((),), S> for F
  where
      // 闭包整体要实现 FnOnce() -> Fut、Clone、Send、Sync 和 'static
      F: FnOnce() -> Fut + Clone + Send + Sync + 'static,
      // 闭包的返回值需要实现 Future<Output = Res> 和 Send
      Fut: Future<Output = Res> + Send,
      Res: IntoResponse,

参考:

trait 的 async fn 的局限性:dyn compatible 和 Send
#

有两个局限性:

  1. trait 定义一旦包含 async fn, 就不再 object safe(dyn compatible),就不能再为它创建 trait object
  2. async fn 的参数限界如果是函数类型,则不支持 HRTB,即输入和输出参数不能有借用;

trait 定义一旦包含 async fn, 就不再 object safe(dyn compatible),就不能再为它创建 trait object

pub trait Trait {
    async fn f(&self);
}

// 编译报错:error[E0038]: the trait `main::Trait` cannot be made into an object
pub fn make() -> Box<dyn Trait> {
    unimplemented!()
}

另外,上面提到的 trait 中 async fn 返回值是 impl Future,而不能进一步指定限界语义,如 impl Future + Send + 'static,不满足多线程场景的要求。

上面两个问题可以使用 async-trait crate 来解决(见后文)。

trait 的 async fn 的局限性:Fn* 函数限界不支持 HRTB
#

async fn 的参数限界如果是函数类型,则不支持 HRTB,即输入和输出参数不能有借用:

async fn for_each_city<F, Fut>(mut f: F)
where
    // 错误:不支持 HRTB,这个 HRTB 是 Rust 为包含引用的限界自动添加的,等效于: F: for<'c> FnMut(&'c str) -> Fut,
    F: FnMut(&str) -> Fut,
    Fut: Future<Output = ()>,
{
    for x in ["New York", "London", "Tokyo"] {
        f(x).await;
    }
}

async fn do_something2(city_name: &str) { todo!() }

async fn main() {
    for_each_city(do_something2).await;
}

/*
error[E0308]: mismatched types
 --> src/main.rs:101:5
  |
101 |     for_each_city(do_something2).await;
  |     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^ one type is more general than the other
  |
  = note: expected opaque type `impl for<'c> Future<Output = ()>`
             found opaque type `impl Future<Output = ()>`
  = note: distinct uses of `impl Trait` result in different opaque types
note: the lifetime requirement is introduced here
 --> src/main.rs:89:34
  |
89  |     F: for<'c> FnMut(&'c str) -> Fut,
  |                                  ^^^

error: implementation of `FnMut` is not general enough
 --> src/main.rs:101:5
  |
86  | / async fn for_each_city<F, Fut>(mut f: F)
87  | | where
88  | |     // 错误:不支持
89  | |     F: for<'c> FnMut(&'c str) -> Fut,
  | |        ----------------------------- doesn't satisfy where-clause
90  | |     Fut: Future<Output = ()>,
  | |_____________________________- due to a where-clause on `for_each_city`...
...
101 |       for_each_city(do_something2).await;
  |       ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  |
  = note: ...`for<'a> fn(&'a str) -> impl Future<Output = ()> {do_something2}` must implement `FnMut<(&'c str,)>`
  = note: ...but it actually implements `FnMut<(&'0 str,)>`, for some specific lifetime `'0`

error: implementation of `FnMut` is not general enough
 --> src/main.rs:101:34
  |
86  | / async fn for_each_city<F, Fut>(mut f: F)
87  | | where
88  | |     // 错误:不支持
89  | |     F: for<'c> FnMut(&'c str) -> Fut,
  | |        ----------------------------- doesn't satisfy where-clause
90  | |     Fut: Future<Output = ()>,
  | |_____________________________- due to a where-clause on `for_each_city`...
...
101 |       for_each_city(do_something2).await;
  |                                    ^^^^^
  |
  = note: ...`for<'a> fn(&'a str) -> impl Future<Output = ()> {do_something2}` must implement `FnMut<(&'c str,)>`
  = note: ...but it actually implements `FnMut<(&'0 str,)>`, for some specific lifetime `'0`
*/

// 另一个例子 2:
async fn f1<Fut>(_: impl for<'a> Fn(&'a u8) -> Fut) where Fut: Future<Output = ()>, { todo!() }

async fn main() {
    async fn g(_: &u8) { todo!() }
    // error[E0308]: mismatched types
    // error: implementation of `Fn` is not general enough
    f1(g).await;
}

/*
error[E0308]: mismatched types
  --> src/main.rs:57:9
   |
57 |         f1(g).await;
   |         ^^^^^ one type is more general than the other
   |
   = note: expected opaque type `impl for<'a> Future<Output = ()>`
              found opaque type `impl Future<Output = ()>`
   = note: distinct uses of `impl Trait` result in different opaque types
note: the lifetime requirement is introduced here
  --> src/main.rs:44:48
   |
44 | async fn f1<Fut>(_: impl for<'a> Fn(&'a u8) -> Fut)
   |                                                ^^^

error: implementation of `Fn` is not general enough
  --> src/main.rs:57:9
   |
44 |   async fn f1<Fut>(_: impl for<'a> Fn(&'a u8) -> Fut)
   |   -                        ------------------------- doesn't satisfy where-clause
   |  _|
   | |
45 | | where
46 | |     Fut: Future<Output = ()>,
   | |_____________________________- due to a where-clause on `f1`...
...
57 |           f1(g).await;
   |           ^^^^^
   |
   = note: ...`for<'a> fn(&'a u8) -> impl Future<Output = ()> {main::g}` must implement `Fn<(&'a u8,)>`
   = note: ...but it actually implements `Fn<(&'0 u8,)>`, for some specific lifetime `'0`

error: implementation of `Fn` is not general enough
  --> src/main.rs:57:15
   |
44 |   async fn f1<Fut>(_: impl for<'a> Fn(&'a u8) -> Fut)
   |   -                        ------------------------- doesn't satisfy where-clause
   |  _|
   | |
45 | | where
46 | |     Fut: Future<Output = ()>,
   | |_____________________________- due to a where-clause on `f1`...
...
57 |           f1(g).await;
   |                 ^^^^^
   |
   = note: ...`for<'a> fn(&'a u8) -> impl Future<Output = ()> {main::g}` must implement `Fn<(&'a u8,)>`
   = note: ...but it actually implements `Fn<(&'0 u8,)>`, for some specific lifetime `'0`
*/

为了支持 HRTB:

  1. 传统的解法是使用 trait object 的 Pin<Box<dyn Future>> 范式,这也是 async-trait crate 的实现方式;
  1. Rust 1.75+ 开始,使用 AsyncFnXX 限界时支持 HRTB。
pub type BoxFuture<'a, T> = Pin<Box<dyn Future<Output = T> + Send + 'a>>;
// https://blog.rust-lang.org/inside-rust/2024/08/09/async-closures-call-for-testing/

fn async_callback<F, Fut>(callback: F)
where
    // F 闭包输入、输出没有借用,没有 HRTB 的问题,故可以返回 Future 对象
    F: FnOnce() -> Fut,
    Fut: Future<Output = String>;


fn async_callback<F>(callback: F)
where
    // F 闭包输入包含借用时,由于不支持 HRTB,故需要返回 Pin<Box<...>> 类型
    F: FnOnce(&str) -> Pin<Box<dyn Future<Output = ()>>>;

async fn do_something(name: &str) {}

// OK
async_callback(|name| Box::pin(async {
    do_something(name).await;
}));

// OK: 强制将返回的 Pin<Box<impl Future<Output=()>>> 转换为 Pin<Box<dyn Future<Output=()>>>
// 这里将 impl Future<Output=()> 转换为 dyn Future<Output=()> 是 type coerce 支持的。
let ac = |name| {
    let b: Pin<Box<dyn Future<Output=()>>> = Box::pin(async {
        do_something(name).await;
    });
    b
};
async_callback(ac);

// ERROR: 这是由于 ac 的类型是 impl Fn(&str) -> Pin<Box<impl Future<Output=()>>>, 与 F 的限界不匹配
let ac = |name| {
    Box::pin(async {
        do_something(name).await;
    })
};
async_callback(ac);

/*
error[E0271]: expected `{[email protected]:83:14}` to return `Pin<Box<dyn Future<Output = ()>>>`, but it returns `Pin<Box<{async block@src/main.rs:84:18: 84:23}>>`
  --> src/main.rs:84:9
   |
83 |       let ac = |name| {
   |                ------ this closure
84 | /         Box::pin(async {
85 | |             do_something(name).await;
86 | |         })
   | |__________^ expected `Pin<Box<dyn Future<Output = ()>>>`, found `Pin<Box<{async block@src/main.rs:84:18: 84:23}>>`
87 |       };
88 |       async_callback(ac);
   |       -------------- -- closure used here
   |       |
   |       required by a bound introduced by this call
   |
   = note: expected struct `Pin<Box<(dyn Future<Output = ()> + 'static)>>`
              found struct `Pin<Box<{async block@src/main.rs:84:18: 84:23}>>`
note: required by a bound in `async_callback`
  --> src/main.rs:62:24
   |
60 | fn async_callback<F>(callback: F)
   |    -------------- required by a bound in this function
61 | where
62 |     F: FnOnce(&str) -> Pin<Box<dyn Future<Output = ()>>>,
   |                        ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ required by this bound in `async_callback`

For more information about this error, try `rustc --explain E0271`.
error: could not compile `my-demo` (bin "my-demo") due to 1 previous error
*/

最佳实践: 只使用 Pin<Box<dyn Future<Output=()>>>

async-trait crate
#

async_trait proc macro 对 trait 中的 async fn 的返回值使用了另一种语法糖:它不返回 impl Future 对象,而是返回 Pin<Box<dyn Future<Output=xx> + Send + 'static>> 类型对象:

  1. 满足多线程环境,如各种 spawn() 提交的 async block 中各 .await 返回的 Future 对象需要满足 Send + 'static 的限界要求;
  2. 它的 as_mut() 方法返回 Pin<&mut dyn Future<Output=xx>> 类型,它实现了 Future,满足 poll() 方法的前面要求。
  3. 该 trait 支持 trait object,即 dyn-safe;
use async_trait::async_trait;

// 在定义和实现 trait 时都需要添加 #[async_trait]
#[async_trait]
trait Advertisement {
    // 等效于:
    // fn run(&self) -> Pin<Box<dyn Future(Output=()) + Send + 'static>
    async fn run(&self);
}

// Advertisement 是 dyn-safe 的,支持 trait object,如 Vec<Box<dyn Advertisement + Sync>> 或 &[&dyn
// Advertisement]

struct Modal;

#[async_trait]
impl Advertisement for Modal {
    async fn run(&self) {
        self.render_fullscreen().await;
        for _ in 0..4u16 {
            remind_user_to_join_mailing_list().await;
        }
        self.hide_for_now().await;
    }
}

另外,新版 Rust 的 unstable 特性 RTN 可以为 trait 的 async fn 返回值进行限界,可以弥补 async trait 的局限性

AsyncFn* 函数限界
#

Rust 1.75+ 开始,开始支持 AsyncFn* 函数限界,它支持 HRTB:

// https://blog.rust-lang.org/inside-rust/2024/08/09/async-closures-call-for-testing/
fn async_callback<F>(callback: F)
where
    // F 闭包输入包含借用时,但返回的对象不包含借用,故 OK
    F: FnOnce(&str) -> Pin<Box<dyn Future<Output=()>>>; // 等效为:Pin<Box<dyn Future + 'static>>

async fn do_something(name: &str) {}

// OK
async_callback(|name| Box::pin(async {
    do_something(name).await;
}));


// Rust 1.75+ 异步闭包 AsyncFn** 限界直接支持 HRTB
// Instead of writing:
fn higher_ranked<F>(callback: F)
where
    F: Fn(&Arg) -> Pin<Box<dyn Future<Output = ()> + '_>>
{ todo!() }
// Write this:
fn higher_ranked<F: AsyncFn(&Arg)> { todo!() } // 自动生成 HRTB


async fn for_each_city<F>(mut f: F)
where
    F: AsyncFnMut(&str),
//     ...which is sugar for:
//  F: for<'a> AsyncFnMut(&'a str),
{
    for x in ["New York", "London", "Tokyo"] {
        f(x).await;
    }
}

async fn increment_city_population_db_query(city_name: &str) { todo!() }
async fn main() {
    // Works for `async fn` that is higher-ranked.
    for_each_city(increment_city_population_db_query).await;
}

Rust 自动为实现 Fn*() -> FutFut 实现 Future<Output = T> 的类型(闭包、异步函数、 dyn Fn* trait object 等)实现 AsyncFn*() -> T

// 原理类似于:(AsyncFn、AsyncFnMut 类似)
impl<F, Args, Fut, T> AsyncFnOnce<Args> for F
where
    F: FnOnce<A, Output = Fut>,
    Fut: Future<Output = T>,
{
    type Output = T;
    type CallOnceFuture = Fut;

    fn async_call_once(self, args: Args) -> Self::CallOnceFuture {
        FnOnce::call_once(self, args)
    }
}

// 以下类型自动实现了 AsyncFn*:
// Async functions:
async fn foo() {}

// Functions that return a concrete future type:
fn foo() -> Pin<Box<dyn Future<Output = ()>>> { Box::pin(async {}) }

// Closures that return an async block:
let c = || async {};
  • 当前只为直接可调用的具体类型实现了上述自动转换,对于函数参数位置的 impl trait like impl Fn() -> impl Future<Output = ()> does not implement AsyncFn();
fn is_async_fn(_: impl AsyncFn(&str)) {}

async fn async_fn_item(s: &str) { todo!() }
is_async_fn(s);
// ^^^ This works.

fn generic(f: impl Fn() -> impl Future<Output = ()>) {
    is_async_fn(f);
    // ^^^ This does not work (yet).
}

在 trait bound、函数返回值场景,使用 AsyncFn{,Mut,Once} trait 集合(而不是 async Fn{,Mut,Once} 语法),也支持 HRTB。

  • 编译器为 async closure 自动实现了 AsyncFn{,Mut,Once} trait
// Instead of writing:
takes_async_callback(|arg| async {
    // Do things here...
});
// Write this:
takes_async_callback(async |arg| {
    // Do things here...
});


// 异步闭包限界
// Instead of writing:
fn doesnt_exactly_take_an_async_closure<F, Fut>(callback: F)
where
    F: FnOnce() -> Fut,
    Fut: Future<Output = String>
{ todo!() }
// Write this:
fn takes_an_async_closure<F: AsyncFnOnce() -> String>(callback: F) { todo!() }


// 异步闭包限界支持 HRTB
// Instead of writing:
fn higher_ranked<F>(callback: F)
where
    F: Fn(&Arg) -> Pin<Box<dyn Future<Output = ()> + '_>>
{ todo!() }

// Write this:
fn higher_ranked<F: AsyncFn(&Arg)> { todo!() }

参考:

let s = String::from("hello, world");
// Implements `AsyncFn()` along with `FnMut` and `Fn` because it can copy the `&String` that it
// captures.
let _ = async || {
    println!("{s}");
};

let s = String::from("hello, world");
// Implements `AsyncFn()` but not `FnMut` or `Fn` because it moves and owns a value of type
// `String`, and therefore the future it returns needs to take a pointer to data owned by the
// closure.
let _ = async move || {
    println!("{s}");
};

let mut s = String::from("hello, world");
// Implements `AsyncFnMut()` but not `FnMut` or `Fn` because it needs to reborrow a mutable pointer
// to `s`.
let _ = async move || {
    s.push('!');
};


// https://rust-lang.github.io/rfcs/3668-async-closures.html#interaction-with-return-type-notation-naming-the-future-returned-by-calling
let s = String::from("hello, world");

let closure = async move || {
    ready(&s);
};
// At this point, `s` is moved out of. However, the allocation for `s` is still live. It just lives
// as a captured field in `closure`.

// Manually call `AsyncFnOnce` -- this isn't stable since `AsyncFnOnce` isn't stable, but it's
// useful for the demo.
let fut = AsyncFnOnce::call_once(closure, ());
// At this point, `closure` is dropped. However, the allocation for `s` is still live. It now lives
// as a captured field in `fut`.

fut.await;
// After the future is awaited, it's dropped. At that point, the allocation for `s` is dropped.

// They can have arguments annotated with types:
let _ = async |_: u8| { todo!() };

// They can have their return types annotated:
let _ = async || -> u8 { todo!() };

// async closure 支持 HRTB
let _ = async |_: &str| { todo!() };

// They can capture values by move:
let x = String::from("hello, world");
let _ = async move || do_something(&x).await };

takes_an_async_fn(async |s| { other_fn(s).await }).await;

trait bound 场景和返回值:使用 AsyncFn{,Once,Mut} 语法:

// AsyncFn{,Once,Mut} 可以用于返回值、泛型参数限界 也就是:The AsyncFn* trait can be used anywhere a Fn* trait bound
// is allowed
//
/// In return-position impl trait:
fn closure() -> impl AsyncFn() { async || {} }

/// In trait bounds:
trait Foo<F>: Sized where F: AsyncFn()
{
    fn new(f: F) -> Self;
}

/// in GATs:
trait Gat {
    type AsyncHasher<T>: AsyncFn(T) -> i32;
}

// 用于限界时支持 HRTB
async fn takes_an_async_fn(f: impl AsyncFn(&str)) {
    futures::join(f("hello"), f("world")).await;
}

// 作为限界:
// 老的写法,不建议:闭包和返回值分别限界:
fn doesnt_exactly_take_an_async_closure<F, Fut>(callback: F)
where
    F: FnOnce() -> Fut,
    Fut: Future<Output = String>
{ todo!() }

// 建议: 新的写法
fn takes_an_async_closure<F: AsyncFnOnce() -> String>(callback: F) { todo!() }

新的 AsyncFn{,Once,Mut} 的局限性:不支持为返回值指定 trait bound,也就是不能为上面 AsyncFn* 的关联类型如 Output、CallOnceFuture 进行 限界,但是老的 Fn* 是支持对内部的关联类型进行限界:

// AsyncFnOnce 的内部实现:

// https://rust-lang.github.io/rfcs/3668-async-closures.html#interaction-with-return-type-notation-naming-the-future-returned-by-calling

/// An async-aware version of the [`FnOnce`](crate::ops::FnOnce) trait.
///
/// All `async fn` and functions returning futures implement this trait.
pub trait AsyncFnOnce<Args> {
    /// Future returned by [`AsyncFnOnce::async_call_once`].
    type CallOnceFuture: Future<Output = Self::Output>;

    /// Output type of the called closure's future.
    type Output;

    /// Call the [`AsyncFnOnce`], returning a future which may move out of the called closure.
    fn async_call_once(self, args: Args) -> Self::CallOnceFuture;
}

With AsyncFn() -> T trait bounds, we don’t know anything about the Future returned by calling the async closure other than that it’s a Future and awaiting that future returns T.

// 老的写法 OK:
fn foo<F, T>()
where
    F: FnOnce() -> T,
    F::Output: Send, //~ 支持为 FnOnce 的内部关联类型 Output 进行限界
    // 等效于
    //T: Send
{
}
// 或者:
fn async_callback<F, Fut>(callback: F)
where
    F: FnOnce() -> Fut,
    Fut: Future<Output = String> + Send + 'static
{ todo!() }


// 新的写法不行:
fn foo<F, T>()
where
    F: async FnOnce() -> T,
    F::Output: Send,   // 错误:不支持为 AsyncFnOnce 的内部关联类型 Output、CallOnceFuture 进行限界
    F::CallOnceFuture: Send,
    //~^ ERROR use of unstable library feature
{
}

// x 的返回值是 Future<Output=Result<()> 类型,而不是 spawn() 要求的 Future<Output=Result<()>> + Send + 'static 类型:
async fn foo(x: impl AsyncFn(&str)) -> Result<()> {
    tokio::spawn(x("hello, world")).await
}

由于 AsyncFn* 不能为返回的 Future 对象指定 trait bound,也即不支持:H implements HealthCheck and its check method returns a Send future

In other words, we don’t want just any type that implements HealthCheck. We specifically want a type that implements HealthCheck and returns a Send future. we need some kind of syntax for declaring that you want an implementation that returns Send futures, and not just any implementation

解决办法:

  1. 继续使用老的写法;
  2. 如果是 trait 中的 async 方法,则使用 async-trait crate;
  3. 或则等待 Return Type Notation(RTN) 特性稳定:

实验特性:RTN
#

RTN:A way to name “the type returned by a function”

// https://smallcultfollowing.com/babysteps/blog/2023/02/13/return-type-notation-send-bounds-part-2/
fn start_health_check<H>(health_check: H, server: Server)
where
    H: HealthCheck + Send + 'static,
    H::check(..): Send, // <— return type notation

// Here the where clause H::check(..): Send means “the type(s) returned when you call H::check must
// be Send. Since async functions return a future, this means that future must implement Send.

// 完整例子:
#![feature(return_type_notation)]
#![allow(dead_code)]

use std::time::Duration;

struct Server;

trait HealthCheck {
    async fn check(&mut self, server: &Server) -> bool;
}

fn start_health_check<H>(mut health_check: H, server: Server)
where
    H: HealthCheck + Send + 'static,
    H::check(..): Send + 'static, // 加 'static 也 OK
{
    tokio::spawn(async move {
        while health_check.check(&server).await {
            tokio::time::sleep(Duration::from_secs(1)).await;
        }
        emit_failure_log(&server).await;
    });
}

async fn emit_failure_log(_server: &Server) { }

RTN 的各种语法(where 限界 或 关联类型限界):

// 注意:下面的 method 需要被替换为 Trait 实际的方法名称,例如上面的 check(...)
fn foo<T, U>()
where
    // Associated type bound
    T: Trait<method(..): Send + 'static>,
    // Path bound
    U: Trait,
    U::method(..): Send + 'static,
{}

trait Trait {
    // In GAT bounds.
    type Item: Trait<method(..): Send + 'static>;
}

// In opaque item bounds too.
fn rpit() -> impl Foo<method(..): Send + 'static>;

使用 RTN 的示例:

// https://rust-lang.github.io/rfcs/3668-async-closures.html#interaction-with-return-type-notation-naming-the-future-returned-by-calling
async fn foo(x: F) -> Result<()>
where
    F: AsyncFn(&str) -> Result<()>,
    // The future from calling `F` is `Send` and `'static`.
    F(..): Send + 'static,
    // Which expands to two bounds:
    // `for<'a> <F as AsyncFnMut>::CallRefFuture<'a>: Send`
    // `<F as AsyncFnOnce>::CallOnceFuture: Send`
    // the latter is only if `F` is bounded with `async Fn` or `async FnMut`.
{
    tokio::spawn(x("hello, world")).await
}

// https://github.com/rust-lang/rust/pull/138424#issuecomment-2766836658
#![feature(return_type_notation)]
trait Trait {
    async fn foo() {}
}

trait Other {}
impl<T> Other for T
where
    T: Trait,
    T::foo(..): Send,
{}
impl Other for u32 {}

// https://github.com/rust-lang/rust/pull/138424#issuecomment-2766947426
#![feature(return_type_notation)]
trait Trait {
    fn foo() -> impl Sized;
}

trait Id {
    type This: ?Sized;
}
impl<T: ?Sized> Id for T {
    type This = T;
}

trait GetOpaque {
    type Opaque: ?Sized;
}
impl<T: ?Sized, U> GetOpaque for T
where
    T: Trait,
    T::foo(..): Id<This = U>,
{
    type Opaque = U;
}
type FooTypePos<T> = <T as GetOpaque>::Opaque;

impl Trait for u8 {
    fn foo() -> impl Sized {
        || ()
    }
}
struct ContainsOpaque(FooTypePos<u8>);

// 另一个例子:https://play.rust-lang.org/?version=nightly&mode=debug&edition=2021&gist=9691749c3f46ed13e459ee1d6382f7e8
#![feature(return_type_notation)]

async fn spawn_call<S>(service: S) -> S::Response
where
    S: Service<(), Response: Send, call(..): Send> + Send + 'static,
{
    tokio::spawn(async move {
        service.call(()).await
    }).await.unwrap()
}

trait Service<Request> {
    type Response;

    // Invoke the service.
    async fn call(&self, req: Request) -> Self::Response;
}


// 另一个例子:https://play.rust-lang.org/?version=nightly&mode=debug&edition=2021&gist=6d45f55355188001ea6499314ce30b4b
#![feature(return_type_notation)]

trait Factory {
    fn widgets(&self) -> impl Iterator<Item = Widget>;
}

struct ReverseWidgets<F: Factory<widgets(..): DoubleEndedIterator>> {
    factory: F,
}

impl<F> Factory for ReverseWidgets<F>
where
    F: Factory<widgets(..): DoubleEndedIterator>,
{
    fn widgets(&self) -> impl Iterator<Item = Widget> {
        self.factory.widgets().rev()
        //                     👆 requires that the iterator be double-ended
    }
}

struct Widget;

Future 对象的 ‘static + Send 问题
#

在多线程异步场景,tokio::spawn*(future) 函数提交的 Future 对象需要实现 Send+'static

  • spawn(future):多线程执行,future 需要实现 Future+Send+'static
  • spawn_blocking(closure): 多线程执行,closure 是实现 Send+'static 的同步闭包;
// tokio::task::spawn() 函数的 future 对象和返回值也需要实现 Send 和 'static
pub fn spawn<F>(future: F) -> JoinHandle<F::Output> 
where
    F: Future + Send + 'static, // 对 Future 整体的要求:Send + 'static
    F::Output: Send + 'static,

‘static 问题
#

spawn() 对提交的 Future 对象可能在后续任意时刻被异步运行时执行,甚至可能超出了创建该 Future 的 block 作用域,所以要求 Future 对 象是 'static 类型,即在程序运行期间可以一直存在。同时由于是多线程环境来执行该 Future 对象,所以它也需要实现 Send。

注意:和同步闭包 不一样async fn/async closure 的输入参数包含 &T/&mut T 影响 'static 的达成,这是 因为编译器在 async fn/async closure 的定义阶段创建一个 impl Future + '_ 的匿名类型对象:

  • async fn 中没有任何非 'static 引用被捕获或返回,且输入参数也没有引用,则编译器会自动推导返回的 Future 为 'static
  • 2024 版本开始,函数返回位置的 impl Trait 会自动捕获 作用域内的所有泛型参数 ,所以不再需要加 + '_ 了:
// 异步函数 foo 返回的实现 Future 的对象 lifetime 不是 'static
async fn foo(x: &u32) -> u32 {
    *x
}
// 等效于:
fn foo<'a>(x: &'a u32) -> impl Future<Output = u32> + 'a {
    async move { *x }
}

// 错误的例子:
pub async fn many_requests(requests: Vec<(String, u16, String)>) -> Vec<std::io::Result<String>>
{
    use async_std::task;
    let mut handles = vec![];
    for (host, port, path) in requests {
        // cheapo_request() 返回的 Future 对象不满足 'static 要求
        handles.push(task::spawn_local(cheapo_request(&host, port, &path)));
    }
    let mut results = vec![];
    for handle in handles {
        results.push(handle.await);
    }
    results
}

// 解决办法:定义一个转移对象所有权的辅助函数:
async fn cheapo_owning_request(host: String, port: u16, path: String) -> std::io::Result<String> {
    cheapo_request(&host, port, &path).await
}

如果 feature 时通过 async block/async closure 创建的,和同步闭包一 样, async block/closure 也是闭包,也 优先使用 &T/&mut T 方式捕获上下文中的对象,而这些借用很难满足 'static 的要求,故 一般使用 async move 来将对象所有权转移到 future 中。

注意:move 会把捕获的变量转移进闭包,但如果捕获的是 &T, 则闭包内部获得的还是一个非 'static 的借用,则还是有生命周期问题:

use std::thread;

let people = vec![
    "Alice".to_string(),
    "Bob".to_string(),
    "Carol".to_string(),
];

let mut threads = Vec::new();

for person in &people {
    threads.push(thread::spawn(move || {
        // person 是 &String 类型,所以 move 捕获的是 &String 而非 String
        println!("Hello, {}!", person);
    }));
}

for thread in threads {
    thread.join().unwrap();
}

// 报错:
/*
error[E0597]: `people` does not live long enough
  --> src/main.rs:12:20
   |
12 |     for person in &people {
   |                    ^^^^^^ borrowed value does not live long enough
...
21 | }
   | - borrowed value only lives until here
   |
   = note: borrowed value must be valid for the static lifetime...
*/

解决办法:

// 方法 1:直接转移所有权
for person in people {
    threads.push(thread::spawn(move || {
        println!("Hello, {}!", person);
    }));
}

// 方法 2:克隆每个元素
for person in &people {
    let person = person.clone();
    threads.push(thread::spawn(move || {
        println!("Hello, {}!", person);
    }));
}

Send 问题
#

同步闭包和异步闭包实现 Send 的判断方式有区别:同步闭包不看内部定义的对象、也不看传入的参数是否实现 Send,但是对于异步的 Future 对象,对象内任意 .await 点 返回的 Future 对象也都需要实现 Send 时该 Future 对象才实现 Send,

async fn/async block/async closure 内部处于异步上下文,可以使用 .await 来 poll future 对象,所以 async executor 在执行异步上下文时可能会多次暂停,暂停的位置是各 .await 表达式(这些 .await 位置也是异步运行时的 yield point )。

每次暂停都会返回一个新的 Future 对象,它封装了暂停位置依赖的上下文对象:栈变量、函数参数等,后续可能被调度到其他线程上运行,所以要求异步上下文中的所有 .await 的 future 都必须实现 Send。只有这些 .await 位置返回的 Future 对象都满足 Send 时,async fn/async block/async closure 才是 Send 的。所以需要看下面三种场景的跨 .await 对象都是否实现了 Send(典型的例外是 trait 中的 async fn 函数返回值没有实现 Send):

  1. 输入的参数对象;
  2. 内部定义的对象;
  3. 捕获的对象

对于普通 async fn、async block、async closure 返回的实现 impl Future 的匿名类型对象,编译器会检查它是否实现 Send。 但是对于 trait 中的 async fn 返回的 impl Future 匿名类型对象,它无论如何都没有实现 Send(原因参考后文,实际上不能对它进行任何的限界,如 Send、Unpin、‘static), 所以如果在 async block/async closure 中 await 该对象会编译失败。

另外,trait object 默认也没有实现 Send/Sync/Unpin,需要在异步上下文中,如果跨 await 使用它们则可能编译出错:

  1. 定义 trait 时,通过 super trait 语法来定义它实现 Send、Sync: trait MyTrait: Send + Sync + Unpin;
  2. 使用 trait object 时,显式进行限界: Box<dyn Debug+Send+UnwindSafe+Unpin>;
// Not recommended!
type GenericError = Box<dyn std::error::Error>;
type GenericResult<T> = Result<T, GenericError>;

fn some_fallible_thing() -> GenericResult<i32> {
    //...
}

// This function's future is not `Send`...
async fn unfortunate() {
    // some_fallible_thing() 返回的 Result<T, Box<dyn std::error::Error>> 中的 Box trait object 没有实现 Send + ‘static
    match some_fallible_thing() {
        Err(error) => {
            report_error(error);
        }
        Ok(output) => {
            // ... is alive across this await ...
            use_output(output).await;
        }
    }
}
// ... and thus this `spawn` is an error.
async_std::task::spawn(unfortunate());

// 解决办法:
type GenericError = Box<dyn std::error::Error + Send + Sync + 'static>;
type GenericResult<T> = Result<T, GenericError>;

对于单线程的异步执行环境,提交的异步闭包可以不实现 Send,但是还是要求 'static,如:

  • tokio::spawn_local(future)
  • tokio::task::LocalSet
  • tokio::select(以及 futures::future::select, futures::future::select_all)

impl Future 对象没有实现 Send 的情况举例:

  1. trait 内定义的 async fn:
// https://smallcultfollowing.com/babysteps/blog/2023/02/01/async-trait-send-bounds-part-1-intro/
trait HealthCheck {
    async fn check(&mut self, server: &Server) -> bool;
    // 等效于
    // fn check(&mut self, server: &Server) -> impl Future<Output = bool>;
                                            // ^ Problem is here! This returns a future, but not necessarily a `Send` future.
                                            // The problem is that check returns an impl Future, but the trait doesn’t say whether this future is Send or not.
}

fn start_health_check<H>(health_check: H, server: Server)
where
    H: HealthCheck + Send + 'static, // 对 H 泛型类型整体的限界,Send 和 'static 是对实现 HealthCheck 对象整体而言
{
    tokio::spawn(async move {
        // 错误:health_check.check(&server) 返回的 Future 未实现 Send
        while health_check.check(&server).await {
            tokio::time::sleep(Duration::from_secs(1)).await;
        }
        emit_failure_log(&server).await;
    });
}
  1. 异步函数输入参数 &T 没有实现 Send 报错:
// Context 包含 Cell 成员,所以 Context 没有实现 Sync,进而 &Context 没有实现 Send
#[derive(Default)]
pub struct Context {
  counter: Cell<i32>
}

impl Context {
  fn increment(&self) {
    self.counter.set(self.counter.get() + 1);
  }
}

async fn f(context: &Context) {
  // context 跨 .await,但未实现 Send,所以报错
  g(context).await;
  context.increment();
}

async fn g(_context: &Context) {
}

async fn task_main() {
  let context = Context::default();
  // f(&context) 返回一个 Future 对象,而它内部的 &context 没有实现 Send,所以不能被 .awati
  f(&context).await;
}

#[tokio::main]
async fn main() {
  tokio::spawn(task_main());
}

/*
 * error: future cannot be sent between threads safely
    --> src/main.rs:254:18
     |
 254 |     tokio::spawn(task_main());
     |                  ^^^^^^^^^^^ future returned by `task_main` is not `Send`
     |
     = help: within `Context`, the trait `Sync` is not implemented for `Cell<i32>`
     = note: if you want to do aliasing and mutation between multiple threads, use `std::sync::RwLock` or `std::sync::atomic::AtomicI32` instead
 note: future is not `Send` as this value is used across an await
 * */

这是因为当编译普通的 async fn 时,编译器使用一个 Rust struct 来表示它的 stack frame:

struct FStackFrame<'a> {
  context: &'a Context,
  await_state: usize
}

这个结构包含一个 context 的借用,而 Context: !Sync 意味着 &Context: !Send 意味着 FStackFrame<’_>: !Send 意味着返回的 Future 对象不满足 Send 要求:

pub fn spawn<F>(future: F) -> JoinHandle<F::Output>
where
    F: Future + Send + 'static, // <- note this Send
    F::Output: Send + 'static,

Tokio 的 executore 默认是多线程的 work-stealing 模式,所以后续可能在其它线程 poll 该 Future 对象,所以它要求踏实 Send 类型。同时调用 Future 对象时,可能已经脱离了创建它的 block,所以要求该对象捕获的上下位满足 ‘static。

  1. 异步函数内部创建的栈对象,如果跨 await 且没有实现 Send 则报错;
async fn not_send_future() {
    // Rc 不是 Send
    let _rc = Rc::new(42);
    // 模拟异步点
    tokio::time::sleep(std::time::Duration::from_secs(1)).await;
}


#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error + Send + 'static>> {
    tokio::task::spawn(not_send_future()).await.unwrap();
}
/*
 * error: future cannot be sent between threads safely
    --> src/main.rs:261:24
     |
 261 |     tokio::task::spawn(not_send_future()).await.unwrap();
     |                        ^^^^^^^^^^^^^^^^^ future returned by `not_send_future` is not `Send`
     |
     = help: within `impl Future<Output = ()>`, the trait `Send` is not implemented for `Rc<i32>`
 note: future is not `Send` as this value is used across an await
 */

 // 解决办法:将 _rc 在 await 前 drop,这样可以确保它不跨 await 未实现 Send 而报错:
 async fn not_send_future() {
     // Rc 不是 Send
     let _rc = Rc::new(42);
     drop(_rc);
     // 模拟异步点
     tokio::time::sleep(std::time::Duration::from_secs(1)).await;
 }
  1. 异步函数返回值没有实现 Send 报错:
// OK 的情况:
use tokio::task;
use std::error::Error;

async fn ok_future() -> Result<(), Box<dyn Error>> {
    // await 在前,返回值在最后才构造
    tokio::time::sleep(std::time::Duration::from_secs(1)).await;

    // 返回时才构造 Box<dyn Error>
    Err("some error".into())
}

#[tokio::main]
async fn main() {
    // ✔ 可以编译,因为 Future 没有跨 await 捕获非 Send 类型
    task::spawn(ok_future()).await.unwrap();
}

// 错误的情况:在 .await 之前持有(报错)
async fn returns_not_send_error() -> Result<(), Box<dyn std::error::Error>> {

    let err: Box<dyn std::error::Error> = "some error".into();

    // 👇 err 在 await 之前被捕获,编译器必须把它放进 Future 的状态里
    tokio::time::sleep(std::time::Duration::from_secs(1)).await;

    Err(err)
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error + Send + 'static>> {
    tokio::task::spawn(returns_not_send_error()).await.unwrap();
}

/*
 * error[E0277]: `(dyn std::error::Error + 'static)` cannot be sent between threads safely
    --> src/main.rs:274:24
     |
 274 |     tokio::task::spawn(returns_not_send_error()).await.unwrap();
     |     ------------------ ^^^^^^^^^^^^^^^^^^^^^^^^ `(dyn std::error::Error + 'static)` cannot be sent between threads safely
     |     |
     |     required by a bound introduced by this call
     |
     = help: the trait `Send` is not implemented for `(dyn std::error::Error + 'static)`
     = note: required for `Unique<(dyn std::error::Error + 'static)>` to implement `Send`
 */

解决办法:将 async fn 返回的 Box<dyn std::error::Error> 改成 Box<dyn std::error::Error + Send+ Sync> 就能满足 tokio::spawn 的要求(另外,Rust 为 Box<dyn std::error::Error + Send + Sync> 类型提供了多种方法和函数支持。)

use tokio::task;
use std::error::Error;

async fn fixed_future() -> Result<(), Box<dyn Error + Send + Sync>> {
    // 这里的 Box 是 Send + Sync 的
    let err: Box<dyn Error + Send + Sync> = "some error".into();

    // await 之前就创建了,但类型满足 Send + Sync
    tokio::time::sleep(std::time::Duration::from_secs(1)).await;

    Err(err)
}

#[tokio::main]
async fn main() {
    // ✔ 可以编译,因为 Future 是 Send
    task::spawn(fixed_future()).await.unwrap();
}

参考:

Pin/Unpin
#

Pin/Unpin trait 参考:10-rust-lang-generic-trait.md

pin!() 和 Box::pin()
#

async fn/block/closure 返回的匿名 impl Future 类型对象都是 Future+!Unpin 的, 也即没有实现 Unpin,不能用于 Pin::new(&mut fut) (因为签名要求 fut 实现 Unpin)。

  • 是否实现 Send,编译器会进行自动判断。但 trait 中的 async fn 返回的 Future 一定没有实现 Send。
async fn add_one(x: u32) -> u32 {
    x + 1
}

let parker = Parker::new();
let unparker = parker.unparker().clone();
let waker = waker_fn(move || unparker.unpark());
let mut context = Context::from_waker(&waker);

let mut future = add_one(42); // fut 类型:impl Future<Output=u32> + !Unpin

//let mut future = std::pin::Pin:new(&mut future); // 错误:future 没有实现 Unpin

pin!(future) // OK:返回一个 Pin<&mut impl Future> 类型对象,满足 poll 的签名要求
let _ = future.poll(&mut context)

同时,Box<T> 有单独的 Future trait 实现定义: T 必须是 Future + Unpin 的,所以上面 async fn/block/closure 返回的 impl Future 也不满足要求:

impl<F, A> Future for Box<F, A>
where
    F: Future + Unpin + ?Sized,
    A: Allocator,

为了对 async fn/block/closure 返回的匿名 impl Future + !Unpin 类型对象进行 pool,需要创建一个 Pin<&mut impl Future> 类型对象。(注:future.await 不要求 future 实现 Unpin,原因见后文 .await 原理一节)。

解决办法:使用 Box::pin() 或 pin!() 来创建满足 Future::poll() 函数签名要求的 Pin<&mut impl Future> 对象:

  1. pin!(future):不要求 future 实现 Unpin,返回一个同名的 Pin<&mut impl Future> 类型对象,满足 poll() 方法签名要求。
  • 可以对返回的 future 对象执行:future.await 或 (&mut future).await

  • 或者对返回的 future 对象执行:future.poll() 或 (&mut future).poll()

  • 或者调用返回 future 对象的 as_mut() 方法,返回一个新的 Pin<&mut impl Future> 对象,再调用它的 poll() 方法;

  • Pin<&mut impl Future> 实现了 Future 但未实现 Unpin

  1. Box::pin(future): 不要求 future 实现 Unpin,返回一个 Pin<Box<impl Future>> 类型对象,它的 Pin::as_mut() 方法返回新的 Pin<&mut impl Future> 对象,也满足 poll() 方法签名要求;

    • 将对象 Pin 在堆上

    • Pin<Box<impl Future>> 实现了 FutureUnpin

impl<P> Future for Pin<P> where P: DerefMut, <P as Deref>::Target: Future,

    type Output = <<P as Deref>::Target as Future>::Output

    fn poll( self: Pin<&mut Pin<P>>, cx: &mut Context<'_>,) -> Poll<<Pin<P> as Future>::Output>

// Ptr 是 Unpin 时,Pin<Ptr> 才是 Unpin
impl<Ptr> Unpin for Pin<Ptr> where Ptr: Unpin,

// Box<T> 无条件实现了 Unpin
impl<T, A> Unpin for Box<T, A> where A: Allocator, T: ?Sized,

pin!(T) 宏返回 Pin<&mut T> 类型对象:如果 T 没有实现 Unpin(如 async fn/block/closure 返回的 impl Future 对象),则该宏将 T 值 pin 到内存中(通过 &mut T 可变借用的方式),不允许 move。

let parker = Parker::new();
let unparker = parker.unparker().clone();
let waker = waker_fn(move || unparker.unpark());
let mut context = Context::from_waker(&waker);

pin!(future); // 返回一个同名的 Pin<&mut impl Future> 类型对象,满足 poll() 方法签名要求

let _ = future.poll(&mut context)

pin!() 参数必须是 Future 对象,而不是能是表达式:

async fn my_async_fn() {
    // async logic here
}

#[tokio::main]
async fn main() {
    // 错误
    // let mut future = pin!(my_async_fn());

    // OK
    let mut future = my_async_fn()
    pin!(future);

    // 如果 Pin<Ptr> 的 Ptr 实现了 Future,则 Pin<Ptr> 也实现了 Future, 所以可以对 Pin<Ptr> 进行 .await;
    // future.await;
    //
    // 或者:
    (&mut future).await;
}

pin!{} 支持同时创建多个 Future 对象并 Pin 住:

use tokio::{pin, select};

async fn my_async_fn() {
    // async logic here
}

#[tokio::main]
async fn main() {
    pin! {
        let future1 = my_async_fn();
        let future2 = my_async_fn();
    }

    select! {
        _ = &mut future1 => {}
        _ = &mut future2 => {}
    }
}

使用 pin!()Box::pin::new() 来包装 async block/closure/fn 返回的结果,将获得 Pin<&mut Future>Pin<Box<impl Future>> 类型对象,从而满足 poll 方法签名要求。

async fn add_one(x: u32) -> u32 {
    x + 1
}

let fut = add_one(42); // fut 类型:impl Future<Output=u32> + !Unpin

fut.await; // OK,.await 不要求 fut 实现 Unpin

// Pin 的 as_mut() 方法签名:pub fn as_mut(&mut self) -> Pin<&mut <Ptr as Deref>::Target>
// 所以,future.as_mut() 返回 Pin<&mut dyn Future> 对象,满足 Future 的 poll() 方法的函数签名要求。
let pinned_fut: Pin<Box<_>> = Box::pin(fut); // pinned_fut 类型:Pin<Box<impl Future>>, 被 Pin 在堆上,可以被安全 poll
pinned_fut.as_mut().poll(&mut cx); // 安全,且不转移 pinned_fut 的所有权

另外,由于 poll() 会消耗传入的 Pin 对象,如果对 future 进行重复 poll(如 loop + select!),则也需要使用 Box::pin(T),它返回的 Box<Pin<&mut impl Future>> 类型对象的 as_mut() 方法返回一个新的 Box<Pin<&impl Future>> 类型对象,可以进行 poll。

另外异步任务调度的参数常要求是 Future+Send+'static,而 Pin<Box<impl Future>> 满足要求,所以也常用于 tokio::spawn(Box::pin(fut))。(这个场景下,pin!() 返回的 Pin<&mut impl Future> 不满足要求)。

对于已有的 Box<T> 类型值, 可以使用 Box::into_pin(value) 创建 Pin<Box<T>> 对象:

async fn add_one(x: u32) -> u32 {
    x + 1
}

fn boxed_add_one(x: u32) -> Box<dyn Future<Output = u32>> {
    Box::new(add_one(x))
}

// boxed_fut 类型是 Box<dyn Future<Output = u32>>
let boxed_fut = boxed_add_one(42);

// 返回 Pin<Box<dyn Future> 对象
let pinned_fut: Pin<Box<_>> = Box::into_pin(boxed_fut);

参考:

pin!() 和 Box::pin() 的差异
#

pin!(future) 是将 future pin 在所在的 block,不允许被 move,所以 pin!() 不适合后续可能 return move pin 住的值 的情况:

use core::pin::{pin, Pin};

let x: Pin<&mut Foo> = {
    let x: Pin<&mut Foo> = pin!(Foo { /* … */ });
    x
}; // <- Foo is dropped
stuff(x); // Error: use of dropped value

pin!() 只能用于局部栈变量, 生命周期受限于作用域,不能安全地“逃逸”到外部, 特别适合写 Future combinator 或临时测试 pinned 行为。

pin!() 在栈上的 pinned 值仍然可能通过 内存移动(比如 mem::swap 整个栈变量)间接导致 UB,所以严格来说它只对 API 层保证了 语义 pinning,但并不是绝对物理钉死。

而 Box::pin() 是将值分配到堆上,所以返回的 Pin<Box> 可以被 move,但是内存地址仍然不变,生命周期不受栈作用域限制,可以安全地传出,甚至跨线程。

另外异步任务调度的参数常要求是 Future+Send+'static,而 Pin<Box<impl Future>> 满足要求,所以也常用于 tokio::spawn(Box::pin(fut))。(这个场景下,pin!() 返回的 Pin<&mut impl Future> 不满足要求)。

缺点:在堆上分配对象,开销比 pin!() 更大。

总结:

  • pin!():轻量,栈上,局部 pin,用于短期/局部逻辑。
  • Box::pin():稳定,堆上,全局 pin,用于长期存储/跨线程场景。

.await 原理
#

对 Future 对象进行 .await 时,编译器内部自动使用 Pin::new_unchecked(&mut future) 来自动将 future 对象 Pin 住,这个函数不 检查 fut 是否实现 Unpin(但是 Pin::new(&mut fut) 函数要求 fut 必须实现 Unpin),然后在一个 loop 中 poll,直到返回 Ready 的数据。(同时也消耗 Future 对象)。

async fn foo() -> i32 { 1 }

#[tokio::main]
async fn main() {
    let mut f = foo(); // impl Future<Output=i32> + !Unpin,也即 f 没有实现 Unpin
    let v = f.await; // Ok,但是还是可以被 await 或 poll。
    //这是因为 .await 的内部实现是使用  Pin::new_unchecked(&mut fut) 来为 fut Pin 住,等效于:
    // loop {
    //     //
    //     match Pin::new_unchecked(&mut f).poll(&mut cx) {
    //         Poll::Ready(val) => break val,
    //         Poll::Pending => yield,
    //     }
    // }
    println!("{v}");
}

同理,tokio runtime 的 spawn() 也会对传入的 fut 自动进行 Pin 操作,而且使用的是不检查 fut 是否实现 Unpin 的 Pin::new_unchecked(&mut fut) 函数。

pub fn spawn<F>(future: F) -> JoinHandle<F::Output>
where
    F: Future + Send + 'static,

let fut = foo();
// OK
tokio::spawn(fut);

// 例子:
use futures::executor::block_on;

async fn hello_world() {
    println!("hello, world!");
}

fn main() {
    let future = hello_world();
    // block_on() 自动对 future 对象进行 Pin 操作。
    block_on(future);
}

// block_on() 的一种简化实现方式:
use waker_fn::waker_fn; // Cargo.toml: waker-fn = "1.1"
use futures_lite::pin; // Cargo.toml: futures-lite = "1.11"
use crossbeam::sync::Parker; // Cargo.toml: crossbeam = "0.8"
use std::future::Future;
use std::task::{Context, Poll};

fn block_on<F: Future>(future: F) -> F::Output {
    let parker = Parker::new();
    let unparker = parker.unparker().clone();
    let waker = waker_fn(move || unparker.unpark());
    let mut context = Context::from_waker(&waker);

    // pin!(T) 宏返回 Pin<&mut T> 类型对象;
    // 为 future 创建一个 Pin<&mut dyn Future> 对象,
    pin!(future);

    loop {
        // future 对象是 Pin<&mut dyn Future> 类型,满足 poll() 方法前面要求,但是会消耗 future 自身。
        // 所以在 loop 循环中,需要使用 Pin::as_mut() 返回一个新的 Pin<&mut dyn Future> 对象。
        //
        // future.as_mut() 实际是 Pin<&mut dyn Future>.as_mut(), 而 &mut dyn Future Deref 返回的是 dyn Future 对象,
        // 所以 as_mut() 返回的是 Pin<&mut dyn Future> 对象
        match future.as_mut().poll(&mut context) {
            Poll::Ready(value) => return value,
            Poll::Pending => parker.park(),
        }
    }
}

.await 不支持对 Future 对象的 &mut 进行 poll,这是因为 Future 的 poll() 方法的 self 签名是 self: Pin<&mut Self>:

async fn my_async_fn() {
    // async logic here
}

#[tokio::main]
async fn main() {
    let mut future = my_async_fn();
    future.await // OK

    // 错误:.await 不支持 &mut Future 的自动 Pin
    (&mut future).await;
}

解决办法:对 future 对象使用 pin!(future) 返回 Pin<&mut impl Future> 对象,然后对该对象或它的 &mut pinned_future 进行 .await:

.await 的本质是对 Future 进行 poll,而 poll 需要 Pin<&mut Future>,所以只要能得到 Pin<&mut Future>,无论是 future.await 还是 (&mut future).await 都可以。

#[tokio::main]
async fn main() {
    let future = my_async_fn();

    pin!(future); // 返回一个同名(future)的 Pin<&mut impl Future> 对象;

    // 必须先将 Future 对象 pin 住,获得同名的 Pin<&mut impl Future> 类型对象,然后才能使用 &mut 进行 .await。 否
    // 则,是不支持 &mut impl Future 的 .await 的。

    // 以下 3 种方式均 OK:
    // 1. 消耗 future 对象
    future.await;
    // 2. 不消耗 future 对象
    (&mut future).await;
    // 3. 不消耗 future 对象,as_mut() 返回一个新的 Pin<&mut impl Future> 类型对象
    future.as_mut().await;
}

(&mut future).awaitloop+select!{} 中得到应用:要对 Future 对象进行重复 poll,而 select!{} 在并 发 .await 时,如果有返回值,则会 drop 其它 branch 的 Future 对象,所以在 select branch 中不能直接使用 future 对象, 以避免它被 drop 后下一次 loop 失败 select 失败。

解决办法是:

  1. 创建一个 mut 类型的 Future 对象;
  2. 使用 pin!(future) 来创建一个同名,但类型是 Pin<&mut impl Future<Output=xx>> 的 Pin 对象, 它也实现了 Future;
  3. 在 select branch 中使用 &mut future 来进行轮询。
async fn action() {
    // Some asynchronous logic
}

#[tokio::main]
async fn main() {
    let (mut tx, mut rx) = tokio::sync::mpsc::channel(128);

    let operation = action(); // 返回一个 Future 对象, 没有对它 .await

    // 必须先 Pin 该 Future 对象,创建一个同名的类型为 Pin<&mut impl Future<Output=i32> 类型对象 ;
    tokio::pin!(operation);

    loop {
        tokio::select! {
        // &mut impl Future 是不被 .await 支持的。
        //
        // 但是 &mut operation 类型是 &mut Pin<&mut impl Future<Output=i32> 被 .await 表达式所支持, 所以可以被 poll 轮询。
        //
        // 当该 branch 因不匹配或无返回值而被 drop/cancel 时,operation 任然有效,下一次 select!{} 可以接续使用 &mut operation
            _ = &mut operation => break,

            Some(v) = rx.recv() => {
                if v % 2 == 0 {
                    break;
                }
            }
        }
    }
}

除了 &mut pinned_future 外,还可以调用 pinned_future.as_mut() 方法来创建一个新的 Pin<&mut impl Future> 类型对象,所以它也 实现了 Future,可以被 poll。

避免大的 stack buffer
#

尽量避免 stack buffer, 因为跨 .await 的上下文变量会随者 task Future 对象一起被保存, 如果使用较大的 stack buffer 变量, 则 自动生成的 task Future 对象就比较大, buffer size 一般是 page sized 对齐的, 这导致 task 的大小大概是 $page-size + a-few-bytes, 浪费内存。

use tokio::io::{self, AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpListener;

#[tokio::main]
async fn main() -> io::Result<()> {
    let listener = TcpListener::bind("127.0.0.1:6142").await?;
    loop {
        let (mut socket, _) = listener.accept().await?;
        tokio::spawn(async move {
            // 在堆中分配 buff 内存, 而不使用 stack 上的 array
            let mut buf = vec![0; 1024];

            loop {
                match socket.read(&mut buf).await {
                    Ok(0) => return,
                    Ok(n) => {
                        if socket.write_all(&buf[..n]).await.is_err() {
                            return;
                        }
                    }
                    Err(_) => {
                        return;
                    }
                }
            }
        });
    }
}

spawn_blocking() 简单实现
#

传入的闭包是同步函数 ,必须实现 Send + 'static。内部创建一个独立线程(tokio 实际是一个单独的 blocking 线程池 )来执行闭包对应的业务逻辑,结束后通过调用 waker 来通知 executor 完成。

pub fn spawn_blocking<T, F>(closure: F) -> SpawnBlocking<T> where
        F: FnOnce() -> T,
        F: Send + 'static,
        T: Send + 'static,
{
    // Arc 确保 inner 和它 clone 后的对象都使用同一个 Mutex 和内部的 Shared 对象。
    let inner = Arc::new(Mutex::new(Shared { value: None, waker: None, }));

    std::thread::spawn({
        let inner = inner.clone();
        move || {
            // 执行传入的同步闭包
            let value = closure();

            let maybe_waker = {
                let mut guard = inner.lock().unwrap();
                guard.value = Some(value);
                guard.waker.take()
            };
            if let Some(waker) = maybe_waker {
                // 通知 executor 指向完成
                waker.wake();
            }
        } });

    SpawnBlocking(inner)
}

// 为自定义 SpawnBlocking 实现 Future
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};

impl<T: Send> Future for SpawnBlocking<T> {
    type Output = T;

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<T> {
        let mut guard = self.0.lock().unwrap();
        if let Some(value) = guard.value.take() {
            return Poll::Ready(value);
        }
        // 更新 SpawnBlocking 内部的 waker
        guard.waker = Some(cx.waker().clone());
        Poll::Pending
    }
}

block_on() 简单实现
#

使用 Parker 机制来实现 waker,传入的 Future 在 loop poll 前,必须使用 pin!(future) 来转换为同名但类型为 Pin<&mut impl Future> 的对象。

future.poll() 会获得 Pin 对象的所有权,这样不能在 loop 中使用,使用 future.as_mut() 返回一个新的 Pin 对象来解决该问题。

use waker_fn::waker_fn; // Cargo.toml: waker-fn = "1.1"
use futures_lite::pin; // Cargo.toml: futures-lite = "1.11"
use crossbeam::sync::Parker; // Cargo.toml: crossbeam = "0.8"
use std::future::Future;
use std::task::{Context, Poll};

fn block_on<F: Future>(future: F) -> F::Output {
    let parker = Parker::new();
    let unparker = parker.unparker().clone();
    let waker = waker_fn(move || unparker.unpark());
    let mut context = Context::from_waker(&waker);

    // pin!(T) 宏返回 Pin<&mut T> 类型对象, 为 future 创建一个同名的 Pin<&mut dyn Future> 对象,
    pin!(future);

    loop {
        // Pin<&mut dyn Future> 的 as_mut 方法返回一个新的 Pin<&mut dyn Future> 对象
        // 满足 poll() 方法要求同时不消耗 future。
        match future.as_mut().poll(&mut context) {
            Poll::Ready(value) => return value,
            Poll::Pending => parker.park(),
        }
    } }

std::task::Poll
#

对于返回 Poll 类型对象的函数/方法,不能使用 .await 来 poll 它,因为 Poll 类型本身没有实现 Future,但是可以调用它的 poll() 方法来 返回 Ready 或 Pending 结果,或使用 std::task::ready!() 宏:

use std::task::{ready, Context, Poll};
use std::future::{self, Future};
use std::pin::Pin;

pub fn do_poll(cx: &mut Context<'_>) -> Poll<()> {
    let mut fut = future::ready(42);
    let fut = Pin::new(&mut fut);

    let num = ready!(fut.poll(cx));
    // ... use num

    Poll::Ready(())

// 其中的 ready!(fut.poll(cx)) 等效于:
let num = match fut.poll(cx) {
    Poll::Ready(t) => t,
    Poll::Pending => return Poll::Pending,
};
rust-lang - 这篇文章属于一个选集。
§ 14: 本文

相关文章

10. 泛型和特性:generic/trait
·
Rust 泛型和特性
1. 标识符和注释:identify/comment
·
Rust 标识符介绍
11. 类型协变:type coercion
·
Rust 高级话题:子类型和类型协变
12. 迭代器:iterator
·
Rust 迭代器