跳过正文

14. 异步:async

·
Rust
目录
rust-lang - 这篇文章属于一个选集。
§ 14: 本文

Thread 的问题:

  1. 每个 Thread 都有固定 size 的 stack 内存需求,成千上万个 Thread 会消耗大量的内存;
  2. 内核调度 Thread 运行的上下文切换开销大;
  3. Thread 在执行 read/write 系统调用时,尤其是网络通信,会有大量的时间被 block 等待,此时该 Thread 不能再执行其它事情,效率低。

async 通过创建大量异步 task,然后使用一个 thread pool 来执行它们,在某个 task 被阻塞时 executor 自动调度其它 task 到 thread 上运行,从而:

  1. task 相比 thread 更轻量化,没有了大量 thread stack 开销,切换执行速度也更快,所以一个程序内部可以创建大量的异步 task;
  2. executor 可以在 task 阻塞时调度其它 task 到 thread 运行,从而执行效率和并发高;

Future
#

async 的核心是 Future trait, 它的 poll() 方法的 self 类型是 Pin<&mut Self> ,表示一旦开始第一次 poll 后 Self 对象必须是固 定的,不能被转移, 这是为了确保 Future 对象内部保存的栈变量地址指针继续有效。

poll() 方法的第二个参数 Context 封装了 Waker ,当 poll Pending 时 Future 的实现可以保存该 waker(一般是在一个辅助线程中), 后当条件满足时调 用 waker 来唤醒 executor 来重新 poll 自己。

trait Future {
    type Output;

    // For now, read `Pin<&mut Self>` as `&mut Self`.
    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
}

enum Poll<T> {
    Ready(T),
    Pending,
}

Future 对象只有被 poll 时才开始执行, 一般使用 .await 来自动进行 Pin+poll:

  • 一般需要先将 Future 对象 Pin 住,生成一个 Pin<&mut impl Future> 对象,然后调用它的 poll() 方法;
  • .await 自动对 Future 对象 或 &mut Future 对象先后执行 Pin 操作和调用 poll() 方法;

获得 Pin<&mut Self> 对象
#

需要将 future 对象封装到 Pin 类型对象内部后,才能调用 poll() 方法,三种封装方式:

  1. future.await: Rust 自动对 future 对象进行 Pin 封装和 poll();

  2. pin!(future): 返回一个同名的 Pin<&mut impl Future> 类型对象,满足 poll() 方法签名要求。

  • 可以对返回的 future 对象执行:future.await 或 (&mut future).await
  • 或则对返回的 future 对象执行:future.poll() 或 (&mut future).poll()
  1. Box::pin(future): 返回一个 Pin 类型对象;

普通函数或方法可以返回 impl Future,即执行该函数时返回一个 Future 对象,但是并没有开始 poll 执行它。

fn read_to_string(&mut self, buf: &mut String) -> impl Future<Output = Result<usize>>;

但是 trait 中的函数方法不能返回 impl Trait 类型对象。

自定义类型实现 Future 和 Service
#

// https://mazhen.tech/p/pinboxdyn-future%E8%A7%A3%E6%9E%90/

// 实现自己的 Future 类型
pub struct HttpRequest {
    url: String,
}
pub struct HttpResponse {
    code: u32,
}

pub struct ResponseFuture {
    request: HttpRequest,
}

impl Future for ResponseFuture {
    type Output = Result<HttpResponse, Error>;

    fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
        println!("process url:{}", &self.as_ref().get_ref().request.url);
        Poll::Ready(Ok(HttpResponse { code: 200 }))
    }
}

// 在实现 Service 时,关联类型 type Future 设置为我们手工实现的 Future:
struct RequestHandler;

impl Service<HttpRequest> for RequestHandler {
    type Response = HttpResponse;
    type Error = Error;
    type Future = ResponseFuture;

    fn poll_ready(&mut self, cx: &mut std::task::Context<'_>) -> Poll<Result<(), Self::Error>> {
        Poll::Ready(Ok(()))
    }

    fn call(&mut self, req: HttpRequest) -> Self::Future {
        ResponseFuture { request: req }
    }
}

// 然后就可以像正常的 Future 一样,使用 Service :

#[tokio::main]
async fn main() {
    let mut service = RequestHandler {};
    match service
        .call(HttpRequest {
            url: "/user/mazhen".to_owned(),
        })
        .await
    {
        Ok(r) => println!("Response code: {}", r.code),
        Err(e) => println!("process failed. {:?}", e),
    }
}

为了方便实现异步 task,Rust 提供了 async fn/block/closure

async fn
#

执行 async fn 后返回一个 impl Future 匿名类型对象:

// 异步函数
async fn example(x: &str) -> usize {
    x.len()
}
// 等效于
fn example<'a>(x: &'a str) -> impl Future<Output = usize> + 'a {
    async move { x.len() }
}

// 示例 2:
use async_std::io::prelude::*;
use async_std::net;

async fn cheapo_request(host: &str, port: u16, path: &str) -> std::io::Result<String>
{
    let mut socket = net::TcpStream::connect((host, port)).await?;
    let request = format!("GET {} HTTP/1.1\r\nHost: {}\r\n\r\n", path, host);
    socket.write_all(request.as_bytes()).await?;
    socket.shutdown(net::Shutdown::Write)?;
    let mut response = String::new();
    socket.read_to_string(&mut response).await?;
    Ok(response)
}

// response 类型是 impl Future<Output=std::io::Result<String>>
let response = cheapo_request(host, port, path);

async fn 内部处于异步上下文,可以 .await poll 其它异步函数,所以 async executor 在执行 async fn 的过程中可能会多次暂停,暂停的位置是各 .await 表达式(这些 .await 位置也是异步运行时的 yield point )。

每次暂停都会返回一个新的 Future 对象,它封装了暂停位置依赖的上下文对象:栈变量、函数参数等,后续可能被调度到其他线程上运行,所以 async fn 需要实现 Send+'static , 也就是函数内跨 .await 的对象都需要是 Send +'static

但 Rc 没有实现 Send,同时 &T/&mut T/*const T/*mut T 等借用类型一般也很难满足 ‘static 要求。

  • 对于 Rc,需要确保不跨 .await 使用;
  • 对于借用的 lifetime 问题,一般使用 move 类型的 async block/closure 来将对象所有权转移到内部;

async fn 的局限性在于它的返回值只能是 impl Future 匿名类型对象,而不能表达进一步限界语义,如 impl Future + Send + 'static,这样在 trait 中包含 async fn 会有一定局限性。

trait 的 async fn
#

trait 可以包含 async fn,但一旦 trait 包含 async fn, 就不能再为它创建 trait object: 《== 该 trait 是非 dyn-safe

pub trait Trait {
    async fn f(&self);
}

// 编译报错:error[E0038]: the trait `main::Trait` cannot be made into an object
pub fn make() -> Box<dyn Trait> {
    unimplemented!()
}

另外一个问题是不能为 trait 中的 async fn 返回的 impl Future 匿名类型对象进行限界, 这样后续在多线程场景使用该 async fn 返回值可能会出错 (多线程一般要求对象实现 Send+'static):

  • 参考:https://smallcultfollowing.com/babysteps/blog/2023/02/01/async-trait-send-bounds-part-1-intro/

  • 这是由于 async fn check(&mut self, server: &Server) -> bool; 等效为 fn check(&mut self, server: &Server) -> impl Future<Output = bool>;,但并不能为返回的 impl Future 匿名类型对象指定限界,如 impl Future<Output = bool> + Send 等。

trait HealthCheck {
    async fn check(&mut self, server: &Server) -> bool;
}

fn start_health_check<H>(health_check: H, server: Server)
where
    H: HealthCheck + Send + 'static, // 这里只是对 H 自身进行了限定,并没有 H 的 方法的返回值进行限定
{
    tokio::spawn(async move {
        while health_check.check(&server).await {
            tokio::time::sleep(Duration::from_secs(1)).await;
        }
        emit_failure_log(&server).await;
    });
}

/*
error: future cannot be sent between threads safely
   --> src/main.rs:78:5
    |
78  | /     tokio::spawn(async move {
79  | |         while health_check.check().await {
80  | |             tokio::time::sleep(Duration::from_secs(1)).await;
...   |
83  | |     });
    | |______^ future created by async block is not `Send`
    |
    = help: within `{async block@src/main.rs:78:18: 78:28}`, the trait `Send` is not implemented for `impl Future<Output = bool>`
note: future is not `Send` as it awaits another future which is not `Send`
   --> src/main.rs:79:15
    |
79  |         while health_check.check().await {
    |               ^^^^^^^^^^^^^^^^^^^^ await occurs here on type `impl Future<Output = bool>`, which is not `Send`
note: required by a bound in `tokio::spawn`
   --> /Users/alizj/.cargo/registry/src/index.crates.io-1949cf8c6b5b557f/tokio-1.47.1/src/task/spawn.rs:168:21
    |
166 |     pub fn spawn<F>(future: F) -> JoinHandle<F::Output>
    |            ----- required by a bound in this function
167 |     where
168 |         F: Future + Send + 'static,
    |                     ^^^^ required by this bound in `spawn`
help: `Send` can be made part of the associated future's guarantees for all implementations of `HealthCheck::check`
    |
71  -     async fn check(&mut self) -> bool;
71  +     fn check(&mut self) -> impl std::future::Future<Output = bool> + Send;
    |
*/

上面两个问题的解法都是使用基于宏的三方库 async-trait,在定义和实现 trait 时添加 #[async_trait]:

use async_trait::async_trait;

#[async_trait]
trait Advertisement {
    async fn run(&self);
}

struct Modal;

#[async_trait]
impl Advertisement for Modal {
    async fn run(&self) {
        self.render_fullscreen().await;
        for _ in 0..4u16 {
            remind_user_to_join_mailing_list().await;
        }
        self.hide_for_now().await;
    }
}
// 后续可以使用:Vec<Box<dyn Advertisement + Sync>> or &[&dyn Advertisement],

这是由于 async_trait proc macro 对于 trait 中的 async fn 的返回值使用了另一种语法糖:它不返回 impl Future 对象,而是返回 Pin<Box<dyn Future + Send>> 类型对象。这样调用该 async fn 函数或方法返回的对象:

  1. dyn-safe(所以,该 trait 可以创建 trait object)
  2. 线程安全的(满足 Send + ‘Static 要求),可以在 spawn() 提交的 async block 中使用。

RTN
#

另外,新版 Rust 的 unstable 特性 RTN 可以为 trait 的 async fn 返回值进行限界,可以弥补 async trait 的局限性

trait 函数或方法不能返回 impl Trait 对象
#

Rust 支持普通的函数或方法返回 imply Trait 对象, 但是 trait 中关联函数或方法不支持返回 impl Trait 对象:

trait Service<Request> {
    type Response;
    type Error;

    // ERROR: `impl Trait` not allowed outside of function and inherent method return types
    fn call(&mut self, req: Request) -> impl Future<Output = Result<Self::Response, Self::Error>>;
}

解决办法:

  1. 如果需要返回 impl Future,则可以使用 async fn。其它类型的 Trait 则不行;
  2. 返回 trait object 类型,如 Box<dyn Trati>;
  3. 引入一个关联类型,它实现了 Trait 并返回:如下面的 Future 类型,其实是将问题留给了 Service 的实现者,由用户选择 type Future 的实际类型:
pub trait Service<Request> {
    type Response;
    type Error;
    type Future: Future<Output = Result<Self::Response, Self::Error>>;

    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>;

    fn call(&mut self, req: Request) -> Self::Future;
}

async blcok
#

async block 类似于闭包,提供了一个 async context,内部可以使用 .await, 返回一个 impl Future匿名类型对象

use std::io;
use std::future::Future;

fn cheapo_request(host: &str, port: u16, path: &str) -> impl Future<Output = io::Result<String>> + 'static
{
    let host = host.to_string();
    let path = path.to_string();

    async move {
        // 捕获了 host、path 的所有权
        //... use &*host, port, and path ...
    }
}

在 async block 中使用 ? 来传播错误或 return 值都是 async block 返回,而不是它所在的函数返回

  • 作为对比,在函数的普通 block、match branch 中使用 ? 时,是所在的函数返回;

async block 可以和闭包一样捕获环境中的对象(借用或 move),可以指定 async move 来获得对象的所有权,这时 async block 返回的 Future 具有 'static lifetime

  • async block 中不能使用 break/continue;

在多线程场景(如 std:🧵:spawn() 和 async fn/block/closure) 场景,需要考虑是否实现 Send、Sync trait,以及满足 ‘static 要求:

Rc/&T/&mut T/*const T/*mut T 的 Send、Sync 实现情况分析:

类型 Send Sync 说明
Rc<T> 内部引用计数不是原子操作,非线程安全
&T 共享不可变引用本质上是线程安全的(Sync);且引用本身可移动
&mut T 可变引用确保独占访问,不可多线程共享
*const T, *mut T(原始指针) 默认不实现 SendSync,须显式不安全地手动实现

为了避免 async block/closure 对上下文对象的隐式借用不满足 ‘static 的要求,需要使用 move async block

由于 async block 返回匿名的 impl Future<Output=XX> 对象,所以不能对返回的对象施加额外的限界,如 Send。

同时也不能指定 Output 关联类型, 当 Output 为 Result 时可能出错,解决办法:为 Ok 指定它所属的 Enum Result 类型:

let input = async_std::io::stdin();

// future 是 Future<Output=Result>
let future = async {
    let mut line = String::new();
    // ? 是 async block 返回,结果类型 std::io::Result<usize>
    input.read_line(&mut line).await?;
    println!("Read line: {}", line);

    // Ok(()) // 错误

    // 正确,指定 Ok 所属的 Result 类型
    Ok::<(), std::io::Error>(())
    // 或者
    // std::result::Result::<(), std::io::Error>::Ok(());
};

async blockasync move block 都是表达式,故可以作为闭包函数的返回值。

use async_std::net;
use async_std::task;

// serve_one 是一个 impl Future<Output=()> 的匿名类型对象
let serve_one = async {
    // ? 是 async block 返回(而非 block 所在的函数),类型是 Result
    let listener = net::TcpListener::bind("localhost:8087").await?;
    let (mut socket, _addr) = listener.accept().await?;
};

pub async fn many_requests(requests: Vec<(String, u16, String)>) -> Vec<std::io::Result<String>>
{
    let mut handles = vec![];
    for (host, port, path) in requests {
        handles.push(
            task::spawn_local(
                // 使用 async move 来获取 host、path 的所有权。
                async move {
                    cheapo_request(&host, port, &path).await
                }
            ));
    }
    //...
}

loop {
    async move {
        break; // error[E0267]: `break` inside of an `async` block
    }
}

返回 async block 的 closure
#

在 Rust 1.75 版本之前,不支持 async closure 定义和限界:

  1. 不能使用 async || {} 语法来 定义异步闭包函数;
  2. 不能使用 F: async Fn() 语法 来定义异步闭包限界;

对此的解法是,使用返回 Future 对象的闭包或闭包限界:

  1. 使用返回 async blockFuture 的 closure,如 || async move {} 来定义异步闭包函数,等效于生成 impl FnXX() -> impl Future<Output=YY> 类型的匿名类型对象:

    let app = Router::new()
        .route(
            "/",
            any_service(service_fn(
                // 等效于生成一个 impl Fn() -> impl Future<Output=Result<Response, Infallible> 匿名类型对象
                |_: Request| async {
                    let res = Response::new(Body::from("Hi from `GET /`"));
                    Ok::<_, Infallible>(res) // service_fn 的闭包必须返回 Result
            }))
        )
    
  2. 使用返回 Future 对象的闭包限界, 如 F: FnOnce() -> Fut, Fut: Future<Output = Res> + Send, Res: Send 来定义异步的 闭包限界(需要引入多个泛型参数);

// 示例 1:
fn async_callback<F, Fut>(callback: F)
where
    F: FnOnce() -> Fut,
    Fut: Future<Output = String>;


// 示例 2:Fn trait bound 返回一个 Future 对象,该对象同时也需要是泛型参数,这样才能进一步限界
impl<F, Fut, Res, S> Handler<((),), S> for F
where
    // 闭包整体要实现 FnOnce() -> Fut、Clone、Send、Sync 和 'static
    F: FnOnce() -> Fut + Clone + Send + Sync + 'static,
    // 闭包的返回值需要实现 Future<Output = Res> 和 Send
    Fut: Future<Output = Res> + Send,
    Res: IntoResponse,

但是这两种实现,有如下问题:

  1. 返回 Future 的闭包定义(如返回 async block),不能以 &mut T 的方式捕获上下文对象(但是 &T 捕获、或则转移所有权是 OK 的):
    let mut vec: Vec<String> = vec![];
    // closure 类型是:impl FnMut() -> impl Future<Output=()>
    let closure = || async {
        // error: captured variable cannot escape `FnMut` closure body
        vec.push(ready(String::from("")).await);
    };

      /*
      error: captured variable cannot escape `FnMut` closure body
      --> src/main.rs:69:22
      |
      68 |       let mut vec: Vec<String> = vec![];
      |           ------- variable defined here
      69 |       let closure = || async {
      |  ____________________-_^
      | |                    |
      | |                    inferred to be a `FnMut` closure
      70 | |         vec.push(ready(String::from("")).await);
      | |         --- variable captured here
      71 | |     };
      | |_____^ returns an `async` block that contains a reference to a captured variable, which then escapes the closure body
      |
      = note: `FnMut` closures only have access to their captured variables while they are executing...
      = note: ...therefore, they cannot allow references to captured variables to escape
      */

但是捕获 &T 是 OK 的:

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error + Send + 'static>> {
    let mut vec: Vec<String> = vec!["a".to_string()];
    // closure 类型是: impl Fn() -> impl Future<Output=()> 类型
    let closure = || async {
        vec.iter().for_each(|i| println!("{i}"));
    };
    closure().await;
}
  1. 异步函数签名如果包含异步闭包(返回 Future 对象),则不支持 HRTB:HRTB 可能是隐式的,例如限界的闭包的输入、输出包含借用的情况下就是隐式 HRTB。
async fn for_each_city<F, Fut>(mut f: F)
where
    // 错误:不支持
    F: FnMut(&str) -> Fut,
    // 等价于: Rust 为包含引用参数的闭包函数限界自动添加 HRTB 注解
    // F: for<'c> FnMut(&'c str) -> Fut,

    Fut: Future<Output = ()>,
{
    for x in ["New York", "London", "Tokyo"] {
        f(x).await;
    }
}

async fn do_something2(city_name: &str) { todo!() }

async fn main() {
    for_each_city(do_something2).await;
}

/*
error[E0308]: mismatched types
 --> src/main.rs:101:5
  |
101 |     for_each_city(do_something2).await;
  |     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^ one type is more general than the other
  |
  = note: expected opaque type `impl for<'c> Future<Output = ()>`
             found opaque type `impl Future<Output = ()>`
  = note: distinct uses of `impl Trait` result in different opaque types
note: the lifetime requirement is introduced here
 --> src/main.rs:89:34
  |
89  |     F: for<'c> FnMut(&'c str) -> Fut,
  |                                  ^^^

error: implementation of `FnMut` is not general enough
 --> src/main.rs:101:5
  |
86  | / async fn for_each_city<F, Fut>(mut f: F)
87  | | where
88  | |     // 错误:不支持
89  | |     F: for<'c> FnMut(&'c str) -> Fut,
  | |        ----------------------------- doesn't satisfy where-clause
90  | |     Fut: Future<Output = ()>,
  | |_____________________________- due to a where-clause on `for_each_city`...
...
101 |       for_each_city(do_something2).await;
  |       ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  |
  = note: ...`for<'a> fn(&'a str) -> impl Future<Output = ()> {do_something2}` must implement `FnMut<(&'c str,)>`
  = note: ...but it actually implements `FnMut<(&'0 str,)>`, for some specific lifetime `'0`

error: implementation of `FnMut` is not general enough
 --> src/main.rs:101:34
  |
86  | / async fn for_each_city<F, Fut>(mut f: F)
87  | | where
88  | |     // 错误:不支持
89  | |     F: for<'c> FnMut(&'c str) -> Fut,
  | |        ----------------------------- doesn't satisfy where-clause
90  | |     Fut: Future<Output = ()>,
  | |_____________________________- due to a where-clause on `for_each_city`...
...
101 |       for_each_city(do_something2).await;
  |                                    ^^^^^
  |
  = note: ...`for<'a> fn(&'a str) -> impl Future<Output = ()> {do_something2}` must implement `FnMut<(&'c str,)>`
  = note: ...but it actually implements `FnMut<(&'0 str,)>`, for some specific lifetime `'0`
*/

// 另一个例子 2:
async fn f1<Fut>(_: impl for<'a> Fn(&'a u8) -> Fut) where Fut: Future<Output = ()>, { todo!() }

async fn main() {
    async fn g(_: &u8) { todo!() }
    // error[E0308]: mismatched types
    // error: implementation of `Fn` is not general enough
    f1(g).await;
}

/*
error[E0308]: mismatched types
  --> src/main.rs:57:9
   |
57 |         f1(g).await;
   |         ^^^^^ one type is more general than the other
   |
   = note: expected opaque type `impl for<'a> Future<Output = ()>`
              found opaque type `impl Future<Output = ()>`
   = note: distinct uses of `impl Trait` result in different opaque types
note: the lifetime requirement is introduced here
  --> src/main.rs:44:48
   |
44 | async fn f1<Fut>(_: impl for<'a> Fn(&'a u8) -> Fut)
   |                                                ^^^

error: implementation of `Fn` is not general enough
  --> src/main.rs:57:9
   |
44 |   async fn f1<Fut>(_: impl for<'a> Fn(&'a u8) -> Fut)
   |   -                        ------------------------- doesn't satisfy where-clause
   |  _|
   | |
45 | | where
46 | |     Fut: Future<Output = ()>,
   | |_____________________________- due to a where-clause on `f1`...
...
57 |           f1(g).await;
   |           ^^^^^
   |
   = note: ...`for<'a> fn(&'a u8) -> impl Future<Output = ()> {main::g}` must implement `Fn<(&'a u8,)>`
   = note: ...but it actually implements `Fn<(&'0 u8,)>`, for some specific lifetime `'0`

error: implementation of `Fn` is not general enough
  --> src/main.rs:57:15
   |
44 |   async fn f1<Fut>(_: impl for<'a> Fn(&'a u8) -> Fut)
   |   -                        ------------------------- doesn't satisfy where-clause
   |  _|
   | |
45 | | where
46 | |     Fut: Future<Output = ()>,
   | |_____________________________- due to a where-clause on `f1`...
...
57 |           f1(g).await;
   |                 ^^^^^
   |
   = note: ...`for<'a> fn(&'a u8) -> impl Future<Output = ()> {main::g}` must implement `Fn<(&'a u8,)>`
   = note: ...but it actually implements `Fn<(&'0 u8,)>`, for some specific lifetime `'0`
*/

为了支持 HRTB,需要使用 trait object 的 Pin<Box<dyn Future>> 范式,这也是 async-trait crate 的实现方式:

https://docs.rs/futures/latest/futures/future/type.BoxFuture.html 定义了 BoxFuture 类型, 可以用于多线程环境:

pub type BoxFuture<'a, T> = Pin<Box<dyn Future<Output = T> + Send + 'a>>;
// https://blog.rust-lang.org/inside-rust/2024/08/09/async-closures-call-for-testing/

fn async_callback<F, Fut>(callback: F)
where
    // F 闭包输入、输出没有借用,没有 HRTB 的问题,故可以返回 Future 对象
    F: FnOnce() -> Fut,
    Fut: Future<Output = String>;


fn async_callback<F>(callback: F)
where
    // F 闭包输入包含借用时,由于不支持 HRTB,故需要返回 Pin<Box<...>> 类型
    F: FnOnce(&str) -> Pin<Box<dyn Future<Output = ()>>>;

async fn do_something(name: &str) {}

// OK
async_callback(|name| Box::pin(async {
    do_something(name).await;
}));

// OK: 强制将返回的 Pin<Box<impl Future<Output=()>>> 转换为 Pin<Box<dyn Future<Output=()>>>
// 这里将 impl Future<Output=()> 转换为 dyn Future<Output=()> 是 type coerce 支持的。
let ac = |name| {
    let b: Pin<Box<dyn Future<Output=()>>> = Box::pin(async {
        do_something(name).await;
    });
    b
};
async_callback(ac);

// ERROR: 这是由于 ac 的类型是 impl Fn(&str) -> Pin<Box<impl Future<Output=()>>>, 与 F 的限界不匹配
let ac = |name| {
    Box::pin(async {
        do_something(name).await;
    })
};
async_callback(ac);

/*
error[E0271]: expected `{[email protected]:83:14}` to return `Pin<Box<dyn Future<Output = ()>>>`, but it returns `Pin<Box<{async block@src/main.rs:84:18: 84:23}>>`
  --> src/main.rs:84:9
   |
83 |       let ac = |name| {
   |                ------ this closure
84 | /         Box::pin(async {
85 | |             do_something(name).await;
86 | |         })
   | |__________^ expected `Pin<Box<dyn Future<Output = ()>>>`, found `Pin<Box<{async block@src/main.rs:84:18: 84:23}>>`
87 |       };
88 |       async_callback(ac);
   |       -------------- -- closure used here
   |       |
   |       required by a bound introduced by this call
   |
   = note: expected struct `Pin<Box<(dyn Future<Output = ()> + 'static)>>`
              found struct `Pin<Box<{async block@src/main.rs:84:18: 84:23}>>`
note: required by a bound in `async_callback`
  --> src/main.rs:62:24
   |
60 | fn async_callback<F>(callback: F)
   |    -------------- required by a bound in this function
61 | where
62 |     F: FnOnce(&str) -> Pin<Box<dyn Future<Output = ()>>>,
   |                        ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ required by this bound in `async_callback`

For more information about this error, try `rustc --explain E0271`.
error: could not compile `my-demo` (bin "my-demo") due to 1 previous error
*/

如果不使用 Pin<Box<dyn Future<Output=()»> 而使用 Box<dyn Future<Output=()>, 则当闭包的参数包含借用时则会有 lifetime 的问题:

  • 而前者由于是 Pin 住的 Box,Box 的内存不能再移动,可以被 poll,而后者可以被移动,导致内部 dyn Future 的自引用出现问题;

  • 最佳实践: 只使用 Pin<Box<dyn Future<Output=()»> 而不使用 Box<dyn Future<Output=()>;

  • 对于 Pin<Box<dyn Future<Output = ()> + ‘a> 类型的 fut 对象,可以使用 Pin::as_mut() 方法来返回 Pin<&mut dyn Future<Output = ()» ,如 fut.as_mut().poll() 来进行 poll()

fn async_callback<F>(callback: F)
where
    F: FnOnce(&str) -> Box<dyn Future<Output = ()>>,
{
    callback("hello there!");
}

async fn do_something(name: &str) {}

fn main() {
    async_callback(|name| {
        Box::new(async {
            do_something(name).await;
        })
    });
}

/*
error: lifetime may not live long enough
  --> src/main.rs:71:9
   |
70 |       async_callback(|name| {
   |                       ----- return type of closure is Box<(dyn Future<Output = ()> + '2)>
   |                       |
   |                       has type `&'1 str`
71 | /         Box::new(async {
72 | |             do_something(name).await;
73 | |         })
   | |__________^ returning this value requires that `'1` must outlive `'2`
*/

另一种表达方式:如果 async block 借用了所在函数的对象,则该 async block 一定要比借用对象、以及所在的含命周期需要比所在的函数长,但是不支持表达该语义:

fn async_callback<F>(callback: F)
where
    F: FnOnce(&str) -> Box<dyn Future<Output = ()>>,
{
}

async fn do_something(name: &str) {}

fn main() {
    let ac = |name| {
        let a = async {
            do_something(name).await;
        };
        let b: Box<dyn Future<Output = ()>> = Box::new(a);
        b
    };
    async_callback(ac);

/*
error[E0373]: async block may outlive the current function, but it borrows `name`, which is owned by the current function
  --> src/main.rs:76:17
   |
76 |         let a = async {
   |                 ^^^^^ may outlive borrowed value `name`
77 |             do_something(name).await;
   |                          ---- `name` is borrowed here
   |
note: async block is returned here
  --> src/main.rs:80:9
   |
80 |         b
   |         ^
help: to force the async block to take ownership of `name` (and any other referenced variables), use the `move` keyword
   |
76 |         let a = async move {
   |                       ++++
*/
}

但是加了 async move 后又报错:error: implementation of FnOnce is not general enough

fn async_callback<F>(callback: F)
where
    // 这里的 F 输入参数包含借用,所以实际是隐式的 HRTB 限界
    F: FnOnce(&str) -> Box<dyn Future<Output = ()>>,
{
    callback("just for test");
}

async fn do_something(name: &str) {}

fn main() {
    let ac = |name| {
        let a = async move {
            do_something(name).await;
        };
        let b: Box<dyn Future<Output = ()>> = Box::new(a);
        b
    };
    // 但是 ac 的类型是:impl fn(&'2 str) -> Box<dyn Future<Output = ()>> 类型,
    // 而不是限界的 FnOnce(&str) -> Box<dyn Future<Output = ()>> 类型
    async_callback(ac);
}

/*
error: implementation of `FnOnce` is not general enough
  --> src/main.rs:80:9
   |
80 |         b
   |         ^ implementation of `FnOnce` is not general enough
   |
   = note: closure with signature `fn(&'2 str) -> Box<dyn Future<Output = ()>>` must implement `FnOnce<(&'1 str,)>`, for any lifetime `'1`...
   = note: ...but it actually implements `FnOnce<(&'2 str,)>`, for some specific lifetime `'2`

error: implementation of `FnOnce` is not general enough
  --> src/main.rs:82:5
   |
82 |     async_callback(ac);
   |     ^^^^^^^^^^^^^^^^^^ implementation of `FnOnce` is not general enough
   |
   = note: closure with signature `fn(&'2 str) -> Box<dyn Future<Output = ()>>` must implement `FnOnce<(&'1 str,)>`, for any lifetime `'1`...
   = note: ...but it actually implements `FnOnce<(&'2 str,)>`, for some specific lifetime `'2`
*/

Rust 1.75+ async closure
#

Rust 1.75+ 开始正式支持 async closure:

  • 使用 async || {} 语法(而非 || async {}),支持借用方式捕获上下文对象,支持 HRTB;
  • 或者在 trait bound、函数返回值场景,使用 AsyncFn{,Mut,Once} 函数集合(而不是 async Fn{,Mut,Once} 语法);

参考:

let s = String::from("hello, world");
// Implements `AsyncFn()` along with `FnMut` and `Fn`
// because it can copy the `&String` that it captures.
let _ = async || {
    println!("{s}");
};

let s = String::from("hello, world");
// Implements `AsyncFn()` but not `FnMut` or `Fn` because
// it moves and owns a value of type `String`, and therefore
// the future it returns needs to take a pointer to data
// owned by the closure.
let _ = async move || {
    println!("{s}");
};

let mut s = String::from("hello, world");
// Implements `AsyncFnMut()` but not `FnMut` or `Fn`
// because it needs to reborrow a mutable pointer to `s`.
let _ = async move || {
    s.push('!');
};


// https://rust-lang.github.io/rfcs/3668-async-closures.html#interaction-with-return-type-notation-naming-the-future-returned-by-calling
let s = String::from("hello, world");

let closure = async move || {
    ready(&s);
};
// At this point, `s` is moved out of.  However, the
// allocation for `s` is still live.  It just lives as a
// captured field in `closure`.

// Manually call `AsyncFnOnce` -- this isn't stable since
// `AsyncFnOnce` isn't stable, but it's useful for the demo.
let fut = AsyncFnOnce::call_once(closure, ());
// At this point, `closure` is dropped.  However, the
// allocation for `s` is still live.  It now lives as a
// captured field in `fut`.

fut.await;
// After the future is awaited, it's dropped.  At that
// point, the allocation for `s` is dropped.



// They can have arguments annotated with types:
let _ = async |_: u8| { todo!() };

// They can have their return types annotated:
let _ = async || -> u8 { todo!() };

// They can be higher-ranked:
let _ = async |_: &str| { todo!() };

// They can capture values by move:
let x = String::from("hello, world");
let _ = async move || do_something(&x).await };

takes_an_async_fn(async |s| { other_fn(s).await }).await;

trait bound 场景和返回值:使用 AsyncFn{,Once,Mut} 语法:

// AsyncFn{,Once,Mut} 可以用于返回值、泛型参数限界
// 也就是:The AsyncFn* trait can be used anywhere a Fn* trait bound is allowed
//
/// In return-position impl trait:
fn closure() -> impl AsyncFn() { async || {} }

/// In trait bounds:
trait Foo<F>: Sized where F: AsyncFn()
{
    fn new(f: F) -> Self;
}

/// in GATs:
trait Gat {
    type AsyncHasher<T>: AsyncFn(T) -> i32;
}

async fn takes_an_async_fn(f: impl AsyncFn(&str)) {
    futures::join(f("hello"), f("world")).await;
}

// 作为限界:
// 老的写法,不建议:闭包和返回值分别限界:
fn doesnt_exactly_take_an_async_closure<F, Fut>(callback: F)
where
    F: FnOnce() -> Fut,
    Fut: Future<Output = String>
{ todo!() }

// 建议: 新的写法
fn takes_an_async_closure<F: AsyncFnOnce() -> String>(callback: F) { todo!() }

AsyncFn{,Once,Mut} 的内部实现如下:

// https://rust-lang.github.io/rfcs/3668-async-closures.html#interaction-with-return-type-notation-naming-the-future-returned-by-calling

/// An async-aware version of the [`FnOnce`](crate::ops::FnOnce) trait.
///
/// All `async fn` and functions returning futures implement this trait.
pub trait AsyncFnOnce<Args> {
    /// Future returned by [`AsyncFnOnce::async_call_once`].
    type CallOnceFuture: Future<Output = Self::Output>;

    /// Output type of the called closure's future.
    type Output;

    /// Call the [`AsyncFnOnce`], returning a future which may move out of the called closure.
    fn async_call_once(self, args: Args) -> Self::CallOnceFuture;
}

/// An async-aware version of the [`FnMut`](crate::ops::FnMut) trait.
///
/// All `async fn` and functions returning futures implement this trait.
pub trait AsyncFnMut<Args>: AsyncFnOnce<Args> {
    /// Future returned by [`AsyncFnMut::async_call_mut`] and [`AsyncFn::async_call`].
    type CallRefFuture<'a>: Future<Output = Self::Output>
    where
        Self: 'a;

    /// Call the [`AsyncFnMut`], returning a future which may borrow from the called closure.
    fn async_call_mut(&mut self, args: Args) -> Self::CallRefFuture<'_>;
}

/// An async-aware version of the [`Fn`](crate::ops::Fn) trait.
///
/// All `async fn` and functions returning futures implement this trait.
pub trait AsyncFn<Args>: AsyncFnMut<Args> {
    /// Call the [`AsyncFn`], returning a future which may borrow from the called closure.
    fn async_call(&self, args: Args) -> Self::CallRefFuture<'_>;
}

但是新的 AsyncFn{,Once,Mut} 带来的问题(局限性):不支持为返回值指定 trait bound,也就是不能为上面 AsyncFn* 的关联类型如 Output、CallOnceFuture 进行限界,但是老的 Fn* 是 Ok 的:

With AsyncFn() -> T trait bounds, we don’t know anything about the Future returned by calling the async closure other than that it’s a Future and awaiting that future returns T.

// 老的写法 OK:
fn foo<F, T>()
where
    F: FnOnce() -> T,
    F::Output: Send, //~ OK,Output 为 FnOnce 的关联类型
    // 等效于
    //T: Send
{
}
// 或者:
fn async_callback<F, Fut>(callback: F)
where
    F: FnOnce() -> Fut,
    Fut: Future<Output = String> + Send + 'static
{ todo!() }


// 新的写法不行:
fn foo<F, T>()
where
    F: async FnOnce() -> T,
    F::Output: Send,
    //~^ ERROR use of unstable library feature
    F::CallOnceFuture: Send,
    //~^ ERROR use of unstable library feature
{
}

// 这是由于:x 的返回值是 Future<Output=Result<()> 类型,而不是 spawn() 要求的 Future<Output=Result<()>> + Send + 'static 类型:
async fn foo(x: impl AsyncFn(&str)) -> Result<()> {
    tokio::spawn(x("hello, world")).await
}

/*
error[E0277]: cannot be sent between threads safely
   --> src/lib.rs
    |
    |     tokio::spawn(x("hello, world")).await
    |     ------------ ^^^^^^^^^^^^^^^^^ cannot be sent between threads safely
    |     |
    |     required by a bound introduced by this call

*/

另一个 trait 中包含 async method 的报错例子:

  • 注:从 Rust 1.75.0 开始,trait 中包含 async fn 是 stable 状态的。
// https://smallcultfollowing.com/babysteps/blog/2023/02/01/async-trait-send-bounds-part-1-intro/#fnref:1
trait HealthCheck {
    async fn check(&mut self, server: &Server) -> bool;
}

fn start_health_check<H>(health_check: H, server: Server)
where
    H: HealthCheck + Send + 'static,
{
    tokio::spawn(async move {
        while health_check.check(&server).await {
            tokio::time::sleep(Duration::from_secs(1)).await;
        }
        emit_failure_log(&server).await;
    });
}

/*
error: future cannot be sent between threads safely
   --> src/lib.rs:15:18
    |
15  |       tokio::spawn(async move {
    |  __________________^
16  | |         while health_check.check(&server).await {
17  | |             tokio::time::sleep(Duration::from_secs(1)).await;
18  | |         }
19  | |         emit_failure_log(&server).await;
20  | |     });
    | |_____^ future created by async block is not `Send`
    |
    = help: within `[async block@src/lib.rs:15:18: 20:6]`, the trait `Send` is not implemented for `impl Future<Output = bool>`
*/

这是由于异步函数 ealth_check.check() 返回的 Future 对象没有实现 Send + ‘static:


/*
note: future is not `Send` as it awaits another future which is not `Send`
   --> src/lib.rs:16:15
    |
16  |         while health_check.check(&server).await {
    |               ^^^^^^^^^^^^^^^^^^^^^^^^^^^ await occurs here
*/

trait HealthCheck {
    // async fn check(&mut self, server: &Server) -> bool;
    fn check(&mut self, server: &Server) -> impl Future<Output = bool>;
                                           // ^ Problem is here! This returns a future, but not necessarily a `Send` future.
}

async-trait crate 对此的解决办法是返回 Pin<Box<dyn Future + Send>> , 而非 yields -> impl Future

Pin<Box<dyn Future + Send>> 实现了 impl Future<Output = Self::Response> + Send

impl<Ptr> Unpin for Pin<Ptr>
where
    Ptr: Unpin,

// https://github.com/rust-lang/rfcs/blob/master/text/3654-return-type-notation.md
trait Service<Request> {
    type Response;

    // Invoke the service.
    async fn call(&self, req: Request) -> Self::Response;
}

// 下面的 LogService call() 方法只能在 thread-per-core or single-threaded executor 中使用,而不能在多线程 executor 中使用。
pub struct LogService<S>(S);
impl<S, R> Service<R> for LogService<S>
where
    S: Service<R>,
    R: Debug,
{
    type Response = S::Response;

    async fn call(&self, request: R) -> S::Response {
        eprintln!("{request:?}");
        self.0.call(request).await
    }
}

// 多线程中使用报错:
async fn spawn_call<S>(service: S) -> S::Response
where
    S: Service<(), Response: Send> + Send + 'static,
{
    tokio::spawn(async move {
        service.call(()).await // <--- Error
    }).await
}


/*
error: future cannot be sent between threads safely
   --> src/lib.rs:6:5
    |
6   | /     tokio::spawn(async move {
7   | |         service.call(()).await // <--- Error
8   | |     }).await.unwrap()
    | |______^ future created by async block is not `Send`
    |
    = help: within `{async block@src/lib.rs:6:18: 8:6}`, the trait `Send` is not implemented for `impl Future<Output = <S as Service<()>>::Response>`, which is required by `{async block@src/lib.rs:6:18: 8:6}: Send`
note: future is not `Send` as it awaits another future which is not `Send`
   --> src/lib.rs:7:9
    |
7   |         service.call(()).await // <--- Error
    |         ^^^^^^^^^^^^^^^^ await occurs here on type `impl Future<Output = <S as Service<()>>::Response>`, which is not `Send`
*/

// 解决办法:修改 call() 方法的返回值,返回一个 Send 类型对象
trait SendService<Request>: Send {
    type Response;

    // Invoke the service.
    fn call(
        &self,
        req: Request,
    ) -> impl Future<Output = Self::Response> + Send;
    // 具体实现可能是返回一个 Pin<Box<dyn Future + Send>> 类型对象
}

由于 AsyncFn* 不能为返回的 Future 对象指定 trait bound,也即不支持:H implements HealthCheck and its check method returns a Send future

  • In other words, we don’t want just any type that implements HealthCheck. We specifically want a type that implements HealthCheck and returns a Send future.

  • we need some kind of syntax for declaring that you want an implementation that returns Send futures, and not just any implementation

解决办法:

  1. 继续使用老的写法;
  2. 如果是 trait 中的 async 方法,则使用 async-trait crate;
  3. 或则等待 Return Type Notation 稳定:

RTN 语法:A way to name “the type returned by a function”

// https://smallcultfollowing.com/babysteps/blog/2023/02/13/return-type-notation-send-bounds-part-2/
fn start_health_check<H>(health_check: H, server: Server)
where
    H: HealthCheck + Send + 'static,
    H::check(..): Send, // <— return type notation

// Here the where clause H::check(..): Send means “the type(s) returned when you call H::check must
// be Send. Since async functions return a future, this means that future must implement Send.

// 完整例子:
#![feature(return_type_notation)]
#![allow(dead_code)]

use std::time::Duration;

struct Server;

trait HealthCheck {
    async fn check(&mut self, server: &Server) -> bool;
}

fn start_health_check<H>(mut health_check: H, server: Server)
where
    H: HealthCheck + Send + 'static,
    H::check(..): Send + 'static, // 加 'static 也 OK
{
    tokio::spawn(async move {
        while health_check.check(&server).await {
            tokio::time::sleep(Duration::from_secs(1)).await;
        }
        emit_failure_log(&server).await;
    });
}

async fn emit_failure_log(_server: &Server) { }

RTN 的各种语法(where 限界 或 关联类型限界):

// 注意:下面的 method 需要被替换为 Trait 实际的方法名称,例如上面的 check(...)
fn foo<T, U>()
where
    // Associated type bound
    T: Trait<method(..): Send + 'static>,
    // Path bound
    U: Trait,
    U::method(..): Send + 'static,
{}

trait Trait {
    // In GAT bounds.
    type Item: Trait<method(..): Send + 'static>;
}

// In opaque item bounds too.
fn rpit() -> impl Foo<method(..): Send + 'static>;

使用 RTN 的示例:

// https://rust-lang.github.io/rfcs/3668-async-closures.html#interaction-with-return-type-notation-naming-the-future-returned-by-calling
async fn foo(x: F) -> Result<()>
where
    F: AsyncFn(&str) -> Result<()>,
    // The future from calling `F` is `Send` and `'static`.
    F(..): Send + 'static,
    // Which expands to two bounds:
    // `for<'a> <F as AsyncFnMut>::CallRefFuture<'a>: Send`
    // `<F as AsyncFnOnce>::CallOnceFuture: Send`
    // the latter is only if `F` is bounded with `async Fn` or `async FnMut`.
{
    tokio::spawn(x("hello, world")).await
}

// https://github.com/rust-lang/rust/pull/138424#issuecomment-2766836658
#![feature(return_type_notation)]
trait Trait {
    async fn foo() {}
}

trait Other {}
impl<T> Other for T
where
    T: Trait,
    T::foo(..): Send,
{}
impl Other for u32 {}

// https://github.com/rust-lang/rust/pull/138424#issuecomment-2766947426
#![feature(return_type_notation)]
trait Trait {
    fn foo() -> impl Sized;
}

trait Id {
    type This: ?Sized;
}
impl<T: ?Sized> Id for T {
    type This = T;
}

trait GetOpaque {
    type Opaque: ?Sized;
}
impl<T: ?Sized, U> GetOpaque for T
where
    T: Trait,
    T::foo(..): Id<This = U>,
{
    type Opaque = U;
}
type FooTypePos<T> = <T as GetOpaque>::Opaque;

impl Trait for u8 {
    fn foo() -> impl Sized {
        || ()
    }
}
struct ContainsOpaque(FooTypePos<u8>);

// 另一个例子:https://play.rust-lang.org/?version=nightly&mode=debug&edition=2021&gist=9691749c3f46ed13e459ee1d6382f7e8
#![feature(return_type_notation)]

async fn spawn_call<S>(service: S) -> S::Response
where
    S: Service<(), Response: Send, call(..): Send> + Send + 'static,
{
    tokio::spawn(async move {
        service.call(()).await
    }).await.unwrap()
}

trait Service<Request> {
    type Response;

    // Invoke the service.
    async fn call(&self, req: Request) -> Self::Response;
}


// 另一个例子:https://play.rust-lang.org/?version=nightly&mode=debug&edition=2021&gist=6d45f55355188001ea6499314ce30b4b
#![feature(return_type_notation)]

trait Factory {
    fn widgets(&self) -> impl Iterator<Item = Widget>;
}

struct ReverseWidgets<F: Factory<widgets(..): DoubleEndedIterator>> {
    factory: F,
}

impl<F> Factory for ReverseWidgets<F>
where
    F: Factory<widgets(..): DoubleEndedIterator>,
{
    fn widgets(&self) -> impl Iterator<Item = Widget> {
        self.factory.widgets().rev()
        //                     👆 requires that the iterator be double-ended
    }
}

struct Widget;

与以前的返回 async block 的 closure 不同,async closure 可以捕获上下文对象:

let mut vec: Vec<String> = vec![];
// 错误:let closure =  || async {
// 正确:
let closure = async || {
    // async closure 使用 &'closure mut Vec<String> 方式捕获了上下文对象 vec,直到这个闭包被 drop(如被 .await)。
    vec.push(ready(String::from("")).await);
};

let string: String = "Hello, world".into();
let closure = async move || {
    // 闭包获得了 string 的所有权
    ready(&string).await;
};

async closure 也支持 HTRB:

// 不建议:老的写法:Instead of writing:
fn higher_ranked<F>(callback: F) where  F: Fn(&Arg) -> Pin<Box<dyn Future<Output = ()> + '_>> { todo!() }
// 新的写法:async closure 参数支持借用,也即支持 HRTB:
fn higher_ranked<F: AsyncFn(&Arg)> { todo!() }

// 示例
// We could also use APIT: `mut f: impl async FnMut(&str)`.
async fn for_each_city<F>(mut f: F)
where
    F: AsyncFnMut(&str),
//     ...which is sugar for:
//  F: for<'a> async FnMut(&'a str),
{
    for x in ["New York", "London", "Tokyo"] {
        f(x).await;
    }
}

async fn increment_city_population_db_query(city_name: &str) { todo!() }

async fn main() {
    // Works for `async fn` that is higher-ranked.
    for_each_city(increment_city_population_db_query).await;
}

// 示例:
where T: for<'a> async Fn(&'a ()) -> i32,

// 示例:
// Returns: `impl Future<Output = ()> + Captures<&'a ()>`
async fn foo<'a>(x: &'a ()) { _ = (x,); }

Rust 自动为实现 Fn*() -> FutFut 实现 Future<Output = T> 的类型(闭包、异步函数、 dyn Fn* trait object 等)实现 AsyncFn*() -> T

// 原理类似于:(AsyncFn、AsyncFnMut 类似)
impl<F, Args, Fut, T> AsyncFnOnce<Args> for F
where
    F: FnOnce<A, Output = Fut>,
    Fut: Future<Output = T>,
{
    type Output = T;
    type CallOnceFuture = Fut;

    fn async_call_once(self, args: Args) -> Self::CallOnceFuture {
        FnOnce::call_once(self, args)
    }
}

// 以下类型自动实现了 AsyncFn*:
// Async functions:
async fn foo() {}

// Functions that return a concrete future type:
fn foo() -> Pin<Box<dyn Future<Output = ()>>> { Box::pin(async {}) }

// Closures that return an async block:
let c = || async {};
  • 当前只为直接可调用的具体类型实现了上述自动转换,对于函数参数位置的 impl trait like impl Fn() -> impl Future<Output = ()> does not implement AsyncFn();
fn is_async_fn(_: impl AsyncFn(&str)) {}

async fn async_fn_item(s: &str) { todo!() }
is_async_fn(s);
// ^^^ This works.

fn generic(f: impl Fn() -> impl Future<Output = ()>) {
    is_async_fn(f);
    // ^^^ This does not work (yet).
}

Future + Send + ‘static 限界
#

tokio 等异步运行时的对提交异步 task 的要求:

  • spawn(future):多线程执行,future 需要实现 Future+Send+'static
  • spawn_local(future): 单线程执行,future 需要实现 Future+'static
  • spawn_blocking(closure): 多线程执行,closure 是实现 Send+'static 的同步闭包;

其中 Future 可以是 async block/async fn/ async closure 的实现。

async block/closure 会捕获上下文对象,而且默认优先是借用 &T 和 &mut T 捕获,所以很难满足 ‘static 要求,故一般需要使用 async move 的 block 和 closure 方式。

async block/fn/closure 不满足 Send 的常见情况:

  1. 跨 .await 使用没有实现 Send 的类型对象,如 Rc、MutexGuard 等。

Pin/Unpin/pin!()/.await
#

Unpin 是标准库提供的 marker trait, 它是一个标记类型(std::pin::Pin 是一个具体 struct 类型),只能由 Rust 编译器实现。

类型实现 Unpin trait 意味着该类型对象可以被安全的移动(move)

Rust 为几乎所有类型都实现了 Unpin,例外的情况如下(它们实现了 !Unpin):

  1. Future:因为实现 Future trait object 可能包含对自身引用或保存指针类型的栈变量, 移动它可能会使这些地址指向失效。
    • async fn/block/closure 语法糖返回的是实现 dyn Future 的匿名类型,所以也未实现 Unpin;
  2. Pin<P> :Pin 本身就是为了处理不能移动的数据而设计的,Pin<P> 对于大多数 P 类型未实现 Unpin;
  3. PhantomPinned:标记 trait,用来显式标记类型不应该实现 Unpin:struct MyType { _pin: PhantomPinned }

Pin<T> 是一个 struct std::pin::Pin 类型(而 Unpin 是 trait),将对象包装到 Pin 中后,可以确保该对象不会被移动(move)。

创建 Pin<Ptr> 对象的几种方式:

  1. Pin::new(T): new() 方法对传入的 T 要求必须实现 Deref trait,而且 Target 必须是 Unpin。
  • 传入的一般是 &T、&mut T、Box<T> 类型,它们都实现了 Deref trait,对应的 Target 是 T 类型,所以 T 也需要实现 Unpin,T 不能是 dyn FuturePin 对象
  1. Box::pin(T)pin!(T)
  • T 可以是 dyn Future 类型对象, 前者返回的是 Pin<Box<impl Future>> ,后者返回的是 Pin<&mut T> 类型对象;

  • Pin<&mut dyn Future>,即包含了传入 impl Future 对象的 &mut 借用,这样确保只能通过返回的 Pin 对象来操作 dyn Future,而 Pin 对 象又可以确保不会转移内部封装的对象,所以满足 Future 的 poll() 方法的签名 要求。

impl<Ptr> Pin<Ptr> where Ptr: Deref, <Ptr as Deref>::Target: Unpin
  pub fn new(pointer: Ptr) -> Pin<Ptr>

// 示例:
let mut data=43;
let pinned: Pin<&mut i32> = Pin::new(&mut data); // &mut data Deref 的 Target 是 i32 类型,它实现了 Unpin

let boxed = Box::new(42);
let pinned: Pin<Box<i32>> = Pin::new(boxed); // boxed Deref 的 Target 是 i32 类型,它实现了 Unpin

// 返回 std::future::Ready<i32> struct 类型, 它实现了 Unpin 和 Future
let mut unpin_future = std::future::ready(5);
// &mut unpin_future 是 &mut Ready<i32> 类型,它实现了 Deref 且 Target 是 i32,实现了 Unpin,故满足 Pin::new() 的前面要求
let my_pinned_unpin_future: Pin<&mut _> = Pin::new(&mut unpin_future);

let pinned = Box::pin(42);

Pin<Ptr> 类型的方法:

  • as_ref()/as_mut()/info_ref() 都返回新的 Pin 对象。
  • as_mut() 返回 Pin<&mut T> 类型,满足 Future::poll() 方法签名要求;
impl<Ptr> Pin<Ptr> where Ptr: Deref, <Ptr as Deref>::Target: Unpin
  pub fn new(pointer: Ptr) -> Pin<Ptr>
  pub fn into_inner(pin: Pin<Ptr>) -> Ptr

impl<Ptr> Pin<Ptr> where Ptr: Deref
  pub unsafe fn new_unchecked(pointer: Ptr) -> Pin<Ptr>
  pub fn as_ref(&self) -> Pin<&<Ptr as Deref>::Target> // 返回新的 Pin 对象

impl<Ptr> Pin<Ptr> where Ptr: DerefMut
  pub fn as_mut(&mut self) -> Pin<&mut <Ptr as Deref>::Target> // 返回新的 Pin 对象
  pub fn set(&mut self, value: <Ptr as Deref>::Target) where <Ptr as Deref>::Target: Sized

impl<'a, T> Pin<&'a T> where T: ?Sized
  pub fn get_ref(self) -> &'a T // 返回 &T

impl<'a, T> Pin<&'a mut T> where T: ?Sized
  pub fn into_ref(self) -> Pin<&'a T>
  pub fn get_mut(self) -> &'a mut T where T: Unpin // 返回 &mut T

Pin<Ptr> 实现了 Deref/DerefMut trait,可以调用 Ptr 指向的类型的方法,同时也可以作为 Pin::new() 的参数:

  • 如果 T 实现了 Unpin,则 Pin<&mut T> 效果和 &mut 类似,mem::replace() 或 mem::take() 可以 move 该值。
impl<Ptr> Deref for Pin<Ptr> where Ptr: Deref
  type Target = <Ptr as Deref>::Target
  fn deref(&self) -> &<Ptr as Deref>::Target

impl<Ptr> DerefMut for Pin<Ptr> where  Ptr: DerefMut, <Ptr as Deref>::Target: Unpin
    fn deref_mut(&mut self) -> &mut <Ptr as Deref>::Target

Pin<Ptr> 实现了 Future,但要求 Deref 的 Target 也实现了 Future,如 pin!(future_obj) 返回的对象:

impl<P> Future for Pin<P> where P: DerefMut, <P as Deref>::Target: Future
    type Output = <<P as Deref>::Target as Future>::Output
    fn poll( self: Pin<&mut Pin<P>>, cx: &mut Context<'_>, ) -> Poll<<Pin<P> as Future>::Output>

std::pin::Pin struct 常用场景是 std::future::Feature traitpoll 方法: 由于 Future trait object 必须保存 执行上下文 stack 上的变量,而 Future 下一次被 wake 执行的时机是不定的,所以为了避免 Future 对象上保存的 stack 变量的地址发生变化导致引用出错,需要将 Future 对象设置为 Pin<&mut Self> 类型,这样该对象将不能被转移,从而确保内部保存的栈地址有效。

  • Future 没有实现 Unpin,因为 async/await 生成的 Future 可能包含自引用结构;
  • 编译器将 async 函数转换为状态机时, 会产生包含内部引用的结构;
  • poll() 方法的签名是 self: Pin<&mut Self>,会消耗自身。
pub trait Future {
    type Output;

    // Required method
    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
}

调用 async fn 返回一个 Future 类型值,在进行 poll() 前需要获得对应的 Pin<&mut impl Future> 对象,而 Future 没有实现 Deref,所以不能作为 Pin::new() 的参数。

Box::pin() 和 pin!()
#

解决办法:使用 Box::pin() 或 pin!() 来创建满足 Future::poll() 函数前面要求的 Pin<&mut T> 对象:

  1. pin!(future):返回一个同名的 Pin<&mut impl Future> 类型对象,满足 poll() 方法签名要求。
  • 可以对返回的 future 对象执行:future.await 或 (&mut future).await
  • 或者对返回的 future 对象执行:future.poll() 或 (&mut future).poll()
  • 或者调用返回 future 对象的 as_mut() 方法,返回一个新的 Pin<&mut impl Future> 对象,再调用它的 poll() 方法;
  1. Box::pin(future): 返回的是 Pin<Box<impl Future>> 对象,需要进一步调用 Pin::as_mut() 返回新的 Pin<&mut impl Future> 对象,才满足 poll() 方法签名要求:

Pin<&mut impl Future>Pin<Box<impl Future>> 都实现了 Future:

impl<P> Future for Pin<P>
where
    P: DerefMut,
    <P as Deref>::Target: Future,

    type Output = <<P as Deref>::Target as Future>::Output

    fn poll(
        self: Pin<&mut Pin<P>>,
        cx: &mut Context<'_>,
    ) -> Poll<<Pin<P> as Future>::Output>

示例:

// 如果要 Pin<Box<T>>, 则需要使用 Box::pin(value) 函数
async fn add_one(x: u32) -> u32 {
    x + 1
}
// fut 是 impl Future 的匿名类型
let fut = add_one(42);

// 返回一个 Pin<Box<impl Future>> 类型,Box 是智能指针类型。
let pinned_fut: Pin<Box<_>> = Box::pin(fut);

对于已有的 Box 类型值 value, 可以使用 Box::into_pin(value) 创建 Pin<Box<T>> 对象:

async fn add_one(x: u32) -> u32 {
    x + 1
}

fn boxed_add_one(x: u32) -> Box<dyn Future<Output = u32>> {
    Box::new(add_one(x))
}

// boxed_fut 类型是 Box<dyn Future<Output = u32>>
let boxed_fut = boxed_add_one(42);

// 返回 Pin<Box<dyn Future> 对象
let pinned_fut: Pin<Box<_>> = Box::into_pin(boxed_fut);

pin!(T) 宏返回 Pin<&mut T> 类型对象,如果如果传入的是 impl Future 对象,则返回 Pin<&mut impl Future> 对象,满足 poll() 方法签名要求:

  • 如果 T 没有实现 Unpin(如 Future 对象),则该宏将 T 值 pin 到内存中(通过 &mut T 可变借用的方式),不允许 move。
let parker = Parker::new();
let unparker = parker.unparker().clone();
let waker = waker_fn(move || unparker.unpark());
let mut context = Context::from_waker(&waker);

// pin!(T) 宏返回 Pin<&mut T> 类型对象;
// 为 future 创建一个 Pin<&mut impl Future> 对象,
pin!(future);

let _ = future.poll(&mut context)

对 Future 对象调用 .await 表达式,用于对 Future 对象进行 poll,直到获得 Ready 值。编译器自动对 Future 对象进行 Pin,然后调用它的 poll() 方法 (同时也会消耗 Future 对象)。

.await 不支持对 Future 对象的 &mut 进行 poll,这是因为 Future 的 poll() 方法的 self 签名是 self: Pin<&mut Self>:

async fn my_async_fn() {
    // async logic here
}

#[tokio::main]
async fn main() {
    let mut future = my_async_fn();

    // 错误:.await 不支持 &mut Future 的自动 Pin
    (&mut future).await;
}

解决办法:对 future 对象 使用 pin!(future) 返回 Pin<&mut impl Future> 对象,然后对该对象或它的 &mut pinned_future 进行 .await:

  • .await 的本质是对 Future 进行 poll,而 poll 需要 Pin<&mut Future>,所以只要能得到 Pin<&mut Future>,无论是 future.await 还是 (&mut future).await 都可以。
// 下面代码 Ok
#[tokio::main]
async fn main() {
    let future = my_async_fn();
    pin!(future); // 返回一个同名(future)的 Pin<&mut impl Future> 对象;

    // 必须先将 Future 对象 pin 住,获得同名的 Pin<&mut impl Future> 类型对象,然后才能使用 &mut 进行 .await。
    // 否则,是不支持 &mut impl Future 的 .await 的。

    // 以下 3 种方式均可:
    // 1. 消耗 future 对象
    future.await;
    // 2. 不消耗 future 对象
    (&mut future).await;
    // 3. 不消耗 future 对象,as_mut() 返回一个新的 Pin<&mut impl Future> 类型对象
    future.as_mut().await;
}

(&mut future).awaitloop+select!{} 中得到应用:要对 Future 对象进行重复 poll,而 select!{} 在并发 .await 时,如果有返 回值,则会 drop 其它 branch 的 Future 对象,所以在 select branch 中不能直接使用 future 对象,以避免它被 drop 后下一次 loop 失败 select 失败。

解决办法是:

  1. 创建一个 mut 类型的 Future 对象;
  2. 使用 pin!(future) 来创建一个同名,但类型是 Pin<&mut impl Future<Output=xx>> 的 Pin 对象, 它也实现了 Future;
  3. 在 select branch 中使用 &mut future 来进行轮询。
async fn action() {
    // Some asynchronous logic
}

#[tokio::main]
async fn main() {
    let (mut tx, mut rx) = tokio::sync::mpsc::channel(128);

    let operation = action(); // 返回一个 Future 对象, 没有对它 .await

    // 必须先 Pin 该 Future 对象,创建一个同名的类型为 Pin<&mut impl Future<Output=i32> 类型对象 ;
    tokio::pin!(operation);

    loop {
        tokio::select! {
        // &mut impl Future 是不被 .await 支持的。
        //
        // 但是 &mut operation 类型是 &mut Pin<&mut impl Future<Output=i32> 被 .await 表达式所支持, 所以可以被 poll 轮询。
        //
        // 当该 branch 因不匹配或无返回值而被 drop/cancel 时,operation 任然有效,下一次 select!{} 可以接续使用 &mut operation
            _ = &mut operation => break,

            Some(v) = rx.recv() => {
                if v % 2 == 0 {
                    break;
                }
            }
        }
    }
}

除了 &mut pinned_future 外,还可以调用 pinned_future.as_mut() 方法来创建一个新的 Pin<&mut impl Future> 类型对象,所以它也 实现了 Future,可以被 poll。

tokio 等异步运行时也会自动对 Future 对象进行 Pin 操作:

  • pin!(future) 返回 Pin<&mut dyn Future> 类型,满足 poll() 方法前面要求,但是会消耗 future 自身。
  • 所以在 loop 循环中,需要使用 Pin::as_mut() 返回一个新的 Pin<&mut dyn Future> 对象。
use futures::executor::block_on;

async fn hello_world() {
    println!("hello, world!");
}

fn main() {
    let future = hello_world();
    // block_on() 自动对 future 对象进行 Pin 操作。
    block_on(future);
}

// block_on() 的一种简化实现方式:
use waker_fn::waker_fn; // Cargo.toml: waker-fn = "1.1"
use futures_lite::pin; // Cargo.toml: futures-lite = "1.11"
use crossbeam::sync::Parker; // Cargo.toml: crossbeam = "0.8"
use std::future::Future;
use std::task::{Context, Poll};

fn block_on<F: Future>(future: F) -> F::Output {
    let parker = Parker::new();
    let unparker = parker.unparker().clone();
    let waker = waker_fn(move || unparker.unpark());
    let mut context = Context::from_waker(&waker);

    // pin!(T) 宏返回 Pin<&mut T> 类型对象;
    // 为 future 创建一个 Pin<&mut dyn Future> 对象,
    pin!(future);

    loop {
        // future 对象是 Pin<&mut dyn Future> 类型,满足 poll() 方法前面要求,但是会消耗 future 自身。
        // 所以在 loop 循环中,需要使用 Pin::as_mut() 返回一个新的 Pin<&mut dyn Future> 对象。
        //
        // future.as_mut() 实际是 Pin<&mut dyn Future>.as_mut(), 而 &mut dyn Future Deref 返回的是 dyn Future 对象,
        // 所以 as_mut() 返回的是 Pin<&mut dyn Future> 对象
        match future.as_mut().poll(&mut context) {
            Poll::Ready(value) => return value,
            Poll::Pending => parker.park(),
        }
    } }

// 另一个示例:
#![feature(noop_waker)]
use std::future::Future;
use std::task;

let waker = task::Waker::noop();
let mut cx = task::Context::from_waker(&waker);

// 返回 Pin<Box<dyn Future>> 类型对象,Rust 确保该对象内部的类型不会被 move。
let mut future = Box::pin(async { 10 });

// Pin 的 as_mut() 方法签名:pub fn as_mut(&mut self) -> Pin<&mut <Ptr as Deref>::Target>
// 所以,future.as_mut() 返回 Pin<&mut dyn Future> 对象,满足 Future 的 poll() 方法的函数签名要求。
assert_eq!(future.as_mut().poll(&mut cx), task::Poll::Ready(10));
  1. pin!() 参数必须是 Future 对象,而不是能是表达式:

        async fn my_async_fn() {
            // async logic here
        }
    
        #[tokio::main]
        async fn main() {
            // 错误
            // let mut future = pin!(my_async_fn());
    
            // OK
            let mut future = my_async_fn()
            pin!(future);
    
            // 如果 Pin<Ptr> 的 Ptr 实现了 Future,则 Pin<Ptr> 也实现了 Future, 所以可以对 Pin<Ptr> 进行 .await;
            // future.await;
            //
            // 或者:
            (&mut future).await;
        }
    
  2. pin!{} 支持同时创建多个 Future 对象并 Pin 住:

        use tokio::{pin, select};
    
        async fn my_async_fn() {
            // async logic here
        }
    
        #[tokio::main]
        async fn main() {
            pin! {
                let future1 = my_async_fn();
                let future2 = my_async_fn();
            }
    
            select! {
                _ = &mut future1 => {}
                _ = &mut future2 => {}
            }
        }
    

如果类型实现了 Unpin,则可以从 Pin 中安全地获取可变引用:

fn example<T: Unpin>(x: Pin<&mut T>) {
    let mut_ref: &mut T = Pin::into_inner(x);
}

spawn_blocking() 的简单参考实现
#

传入的闭包是同步函数 ,必须实现 Send + 'static, 内部创建一个线程来执行业务逻辑,结束后通过调用 waker 来通知 executor 完成。

pub fn spawn_blocking<T, F>(closure: F) -> SpawnBlocking<T> where
        F: FnOnce() -> T,
        F: Send + 'static,
        T: Send + 'static,
{
    // Arc 确保 inner 和它 clone 后的对象都使用同一个 Mutex 和内部的 Shared 对象。
    let inner = Arc::new(Mutex::new(Shared { value: None, waker: None, }));

    std::thread::spawn({
        let inner = inner.clone();
        move || {
            let value = closure();
            let maybe_waker = {
                let mut guard = inner.lock().unwrap();
                guard.value = Some(value);
                guard.waker.take()
            };
            if let Some(waker) = maybe_waker {
                waker.wake();
            }
        } });

    SpawnBlocking(inner)
}

// 为自定义 SpawnBlocking 实现 Future
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};

impl<T: Send> Future for SpawnBlocking<T> {
    type Output = T;

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<T> {
        let mut guard = self.0.lock().unwrap();
        if let Some(value) = guard.value.take() {
            return Poll::Ready(value);
        }
        // 更新 SpawnBlocking 内部的 waker
        guard.waker = Some(cx.waker().clone());
        Poll::Pending
    }
}

block_on() 的简单实现
#

使用 Parker 机制来实现 waker,传入的 Future 在 loop poll 前,必须使用 pin!(future) 来转换为同名但类型为 Pin<&mut impl Future> 的对象。

pin!() 创建的 Pin 会获得 future 的所有权(参数类型是 self: Pin<&mut Self>),而且确保该对象的栈内存地址不会再发生变化。

future.poll() 会获得 Pin 对象的所有权,这样不能在 loop 中使用,使用 future.as_mut() 返回一个新的 Pin 对象来解决该问题。

use waker_fn::waker_fn; // Cargo.toml: waker-fn = "1.1"
use futures_lite::pin; // Cargo.toml: futures-lite = "1.11"
use crossbeam::sync::Parker; // Cargo.toml: crossbeam = "0.8"
use std::future::Future;
use std::task::{Context, Poll};

fn block_on<F: Future>(future: F) -> F::Output {
    let parker = Parker::new();
    let unparker = parker.unparker().clone();
    let waker = waker_fn(move || unparker.unpark());
    let mut context = Context::from_waker(&waker);

    // pin!(T) 宏返回 Pin<&mut T> 类型对象;
    // 为 future 创建一个同名的 Pin<&mut dyn Future> 对象,
    pin!(future);

    loop {
        // Pin<&mut dyn Future> 的 as_mut 方法返回一个新的 Pin<&mut dyn Future> 对象
        // 满足 poll() 方法要求同时不消耗 future。
        match future.as_mut().poll(&mut context) {
            Poll::Ready(value) => return value,
            Poll::Pending => parker.park(),
        }
    } }

当 async task panic 时,可以通过 .await 返回的 Error 来判断:

use tokio::task::JoinError;

pub async fn run() {
    let handle = tokio::spawn(work());
    if let Err(e) = handle.await {
        if let Ok(reason) = e.try_into_panic() {
            // The task has panicked We resume unwinding the panic, thus propagating it to the current thread
            panic::resume_unwind(reason);
        }
    }
}

pub async fn work() {
    // [...]
}

async、await 是状态机语法糖
#

https://eventhelix.com/rust/rust-to-assembly-async-await/

.await 优化
#

尽量避免 stack buffer, 因为跨 .await 的上下文变量会随者 task Future 对象一起被保存, 如果使用较大的 stack buffer 变量, 则 自动生成的 task Future 对象就比较大, buffer size 一般是 page sized 对齐的, 这导致 task 的大小大概是 $page-size + a-few-bytes, 浪费内存。

use tokio::io::{self, AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpListener;

#[tokio::main]
async fn main() -> io::Result<()> {
    let listener = TcpListener::bind("127.0.0.1:6142").await?;
    loop {
        let (mut socket, _) = listener.accept().await?;
        tokio::spawn(async move {
            // 在堆中分配 buff 内存, 而不使用 stack 上的 array
            let mut buf = vec![0; 1024];

            loop {
                match socket.read(&mut buf).await {
                    Ok(0) => return,
                    Ok(n) => {
                        if socket.write_all(&buf[..n]).await.is_err() {
                            return;
                        }
                    }
                    Err(_) => {
                        return;
                    }
                }
            }
        });
    }
}

std::task::Poll 类型
#

对于返回 Poll 类型对象的函数/方法,不能使用 .await 来 poll 它,因为 Poll 类型本身没有实现 Future,但是可以调用它的 poll() 方法来返回 Ready 或 Pending 结果,或使用 std::task::ready!() 宏:

use std::task::{ready, Context, Poll};
use std::future::{self, Future};
use std::pin::Pin;

pub fn do_poll(cx: &mut Context<'_>) -> Poll<()> {
    let mut fut = future::ready(42);
    let fut = Pin::new(&mut fut);

    let num = ready!(fut.poll(cx));
    // ... use num

    Poll::Ready(())

// 其中的 ready!(fut.poll(cx)) 等效于:
let num = match fut.poll(cx) {
    Poll::Ready(t) => t,
    Poll::Pending => return Poll::Pending,
};
rust-lang - 这篇文章属于一个选集。
§ 14: 本文

相关文章

1. 标识符和注释:identify/comment
·
Rust
Rust 标识符介绍
10. 泛型和特性:generic/trait
·
Rust
Rust 泛型和特性
11. 类型协变:type coercion
·
Rust
Rust 高级话题:子类型和类型协变
12. 迭代器:iterator
·
Rust
Rust 迭代器