Thread 的问题:
- 每个 Thread 都有固定 size 的 stack 内存需求,成千上万个 Thread 会消耗大量的内存;
- 内核调度 Thread 运行的上下文切换开销大;
- Thread 在执行 read/write 系统调用时,尤其是网络通信,会有大量的时间被 block 等待,此时该 Thread 不能再执行其它事情,效率低。
async 通过创建大量异步 task,然后使用一个 thread pool 来执行它们,在某个 task 被阻塞时 executor 自动调度其它 task 到 thread 上运行,从而:
- task 相比 thread 更轻量化,没有了大量 thread stack 开销,切换执行速度也更快,所以一个程序内部可以创建大量的异步 task;
- executor 可以在 task 阻塞时调度其它 task 到 thread 运行,从而执行效率和并发高;
Future #
async 的核心是 Future trait
, 它的 poll()
方法的 self
类型是 Pin<&mut Self>
,表示一旦开始第一次 poll 后 Self 对象必须是固
定的,不能被转移, 这是为了确保 Future 对象内部保存的栈变量地址指针继续有效。
poll() 方法的第二个参数 Context
封装了 Waker
,当 poll Pending 时 Future 的实现可以保存该 waker(一般是在一个辅助线程中), 后当条件满足时调
用 waker 来唤醒 executor 来重新 poll 自己。
trait Future {
type Output;
// For now, read `Pin<&mut Self>` as `&mut Self`.
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
}
enum Poll<T> {
Ready(T),
Pending,
}
Future 对象只有被 poll 时才开始执行, 一般使用 .await
来自动进行 Pin+poll:
- 一般需要先将 Future 对象 Pin 住,生成一个
Pin<&mut impl Future>
对象,然后调用它的 poll() 方法; - .await 自动对
Future
对象 或&mut Future
对象先后执行 Pin 操作和调用 poll() 方法;
获得 Pin<&mut Self>
对象
#
需要将 future 对象封装到 Pin 类型对象内部后,才能调用 poll() 方法,三种封装方式:
-
future.await
: Rust 自动对 future 对象进行 Pin 封装和 poll(); -
pin!(future)
: 返回一个同名的Pin<&mut impl Future>
类型对象,满足 poll() 方法签名要求。
- 可以对返回的 future 对象执行:future.await 或 (&mut future).await
- 或则对返回的 future 对象执行:future.poll() 或 (&mut future).poll()
Box::pin(future)
: 返回一个 Pin 类型对象;
普通函数或方法可以返回 impl Future
,即执行该函数时返回一个 Future 对象,但是并没有开始 poll 执行它。
fn read_to_string(&mut self, buf: &mut String) -> impl Future<Output = Result<usize>>;
但是 trait 中的函数方法不能返回 impl Trait
类型对象。
自定义类型实现 Future 和 Service #
// https://mazhen.tech/p/pinboxdyn-future%E8%A7%A3%E6%9E%90/
// 实现自己的 Future 类型
pub struct HttpRequest {
url: String,
}
pub struct HttpResponse {
code: u32,
}
pub struct ResponseFuture {
request: HttpRequest,
}
impl Future for ResponseFuture {
type Output = Result<HttpResponse, Error>;
fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
println!("process url:{}", &self.as_ref().get_ref().request.url);
Poll::Ready(Ok(HttpResponse { code: 200 }))
}
}
// 在实现 Service 时,关联类型 type Future 设置为我们手工实现的 Future:
struct RequestHandler;
impl Service<HttpRequest> for RequestHandler {
type Response = HttpResponse;
type Error = Error;
type Future = ResponseFuture;
fn poll_ready(&mut self, cx: &mut std::task::Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn call(&mut self, req: HttpRequest) -> Self::Future {
ResponseFuture { request: req }
}
}
// 然后就可以像正常的 Future 一样,使用 Service :
#[tokio::main]
async fn main() {
let mut service = RequestHandler {};
match service
.call(HttpRequest {
url: "/user/mazhen".to_owned(),
})
.await
{
Ok(r) => println!("Response code: {}", r.code),
Err(e) => println!("process failed. {:?}", e),
}
}
为了方便实现异步 task,Rust 提供了 async fn/block/closure
。
async fn #
执行 async fn
后返回一个 impl Future
匿名类型对象:
// 异步函数
async fn example(x: &str) -> usize {
x.len()
}
// 等效于
fn example<'a>(x: &'a str) -> impl Future<Output = usize> + 'a {
async move { x.len() }
}
// 示例 2:
use async_std::io::prelude::*;
use async_std::net;
async fn cheapo_request(host: &str, port: u16, path: &str) -> std::io::Result<String>
{
let mut socket = net::TcpStream::connect((host, port)).await?;
let request = format!("GET {} HTTP/1.1\r\nHost: {}\r\n\r\n", path, host);
socket.write_all(request.as_bytes()).await?;
socket.shutdown(net::Shutdown::Write)?;
let mut response = String::new();
socket.read_to_string(&mut response).await?;
Ok(response)
}
// response 类型是 impl Future<Output=std::io::Result<String>>
let response = cheapo_request(host, port, path);
async fn
内部处于异步上下文,可以 .await poll 其它异步函数,所以 async executor 在执行 async fn
的过程中可能会多次暂停,暂停的位置是各
.await 表达式(这些 .await 位置也是异步运行时的 yield point
)。
每次暂停都会返回一个新的 Future 对象,它封装了暂停位置依赖的上下文对象:栈变量、函数参数等,后续可能被调度到其他线程上运行,所以 async fn
需要实现 Send+'static
, 也就是函数内跨 .await 的对象都需要是 Send +'static
的。
但 Rc 没有实现 Send,同时 &T/&mut T/*const T/*mut T 等借用类型一般也很难满足 ‘static 要求。
- 对于 Rc,需要确保不跨 .await 使用;
- 对于借用的 lifetime 问题,一般使用 move 类型的 async block/closure 来将对象所有权转移到内部;
async fn
的局限性在于它的返回值只能是 impl Future
匿名类型对象,而不能表达进一步限界语义,如 impl Future + Send + 'static
,这样在
trait 中包含 async fn 会有一定局限性。
trait 的 async fn #
trait 可以包含 async fn,但一旦 trait 包含 async fn, 就不能再为它创建 trait object
: 《== 该 trait 是非 dyn-safe
pub trait Trait {
async fn f(&self);
}
// 编译报错:error[E0038]: the trait `main::Trait` cannot be made into an object
pub fn make() -> Box<dyn Trait> {
unimplemented!()
}
另外一个问题是不能为 trait 中的 async fn 返回的 impl Future
匿名类型对象进行限界, 这样后续在多线程场景使用该 async fn
返回值可能会出错
(多线程一般要求对象实现 Send+'static
):
-
参考:https://smallcultfollowing.com/babysteps/blog/2023/02/01/async-trait-send-bounds-part-1-intro/
-
这是由于
async fn check(&mut self, server: &Server) -> bool;
等效为fn check(&mut self, server: &Server) -> impl Future<Output = bool>;
,但并不能为返回的impl Future
匿名类型对象指定限界,如impl Future<Output = bool> + Send
等。
trait HealthCheck {
async fn check(&mut self, server: &Server) -> bool;
}
fn start_health_check<H>(health_check: H, server: Server)
where
H: HealthCheck + Send + 'static, // 这里只是对 H 自身进行了限定,并没有 H 的 方法的返回值进行限定
{
tokio::spawn(async move {
while health_check.check(&server).await {
tokio::time::sleep(Duration::from_secs(1)).await;
}
emit_failure_log(&server).await;
});
}
/*
error: future cannot be sent between threads safely
--> src/main.rs:78:5
|
78 | / tokio::spawn(async move {
79 | | while health_check.check().await {
80 | | tokio::time::sleep(Duration::from_secs(1)).await;
... |
83 | | });
| |______^ future created by async block is not `Send`
|
= help: within `{async block@src/main.rs:78:18: 78:28}`, the trait `Send` is not implemented for `impl Future<Output = bool>`
note: future is not `Send` as it awaits another future which is not `Send`
--> src/main.rs:79:15
|
79 | while health_check.check().await {
| ^^^^^^^^^^^^^^^^^^^^ await occurs here on type `impl Future<Output = bool>`, which is not `Send`
note: required by a bound in `tokio::spawn`
--> /Users/alizj/.cargo/registry/src/index.crates.io-1949cf8c6b5b557f/tokio-1.47.1/src/task/spawn.rs:168:21
|
166 | pub fn spawn<F>(future: F) -> JoinHandle<F::Output>
| ----- required by a bound in this function
167 | where
168 | F: Future + Send + 'static,
| ^^^^ required by this bound in `spawn`
help: `Send` can be made part of the associated future's guarantees for all implementations of `HealthCheck::check`
|
71 - async fn check(&mut self) -> bool;
71 + fn check(&mut self) -> impl std::future::Future<Output = bool> + Send;
|
*/
上面两个问题的解法都是使用基于宏的三方库 async-trait
,在定义和实现 trait 时添加 #[async_trait]
:
use async_trait::async_trait;
#[async_trait]
trait Advertisement {
async fn run(&self);
}
struct Modal;
#[async_trait]
impl Advertisement for Modal {
async fn run(&self) {
self.render_fullscreen().await;
for _ in 0..4u16 {
remind_user_to_join_mailing_list().await;
}
self.hide_for_now().await;
}
}
// 后续可以使用:Vec<Box<dyn Advertisement + Sync>> or &[&dyn Advertisement],
这是由于 async_trait
proc macro 对于 trait 中的 async fn
的返回值使用了另一种语法糖:它不返回 impl Future
对象,而是返回
Pin<Box<dyn Future + Send>>
类型对象。这样调用该 async fn
函数或方法返回的对象:
- dyn-safe(所以,该 trait 可以创建 trait object)
- 线程安全的(满足 Send + ‘Static 要求),可以在 spawn() 提交的 async block 中使用。
RTN #
另外,新版 Rust 的 unstable 特性 RTN 可以为 trait 的 async fn 返回值进行限界,可以弥补 async trait 的局限性
trait 函数或方法不能返回 impl Trait
对象
#
Rust 支持普通的函数或方法返回 imply Trait
对象, 但是 trait 中关联函数或方法不支持返回 impl Trait
对象:
trait Service<Request> {
type Response;
type Error;
// ERROR: `impl Trait` not allowed outside of function and inherent method return types
fn call(&mut self, req: Request) -> impl Future<Output = Result<Self::Response, Self::Error>>;
}
解决办法:
- 如果需要返回
impl Future
,则可以使用async fn
。其它类型的 Trait 则不行; - 返回 trait object 类型,如
Box<dyn Trati>
; - 引入一个关联类型,它实现了 Trait 并返回:如下面的 Future 类型,其实是将问题留给了 Service 的实现者,由用户选择 type Future 的实际类型:
pub trait Service<Request> {
type Response;
type Error;
type Future: Future<Output = Result<Self::Response, Self::Error>>;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>;
fn call(&mut self, req: Request) -> Self::Future;
}
async blcok #
async block
类似于闭包,提供了一个 async context,内部可以使用 .await
, 返回一个 impl Future
的 匿名类型对象:
use std::io;
use std::future::Future;
fn cheapo_request(host: &str, port: u16, path: &str) -> impl Future<Output = io::Result<String>> + 'static
{
let host = host.to_string();
let path = path.to_string();
async move {
// 捕获了 host、path 的所有权
//... use &*host, port, and path ...
}
}
在 async block 中使用 ? 来传播错误或 return 值都是 async block 返回,而不是它所在的函数返回;
- 作为对比,在函数的普通 block、match branch 中使用 ? 时,是所在的函数返回;
async block 可以和闭包一样捕获环境中的对象(借用或 move),可以指定 async move
来获得对象的所有权,这时 async block 返回的 Future 具有 'static lifetime
;
- async block 中不能使用 break/continue;
在多线程场景(如 std:🧵:spawn() 和 async fn/block/closure) 场景,需要考虑是否实现 Send、Sync trait,以及满足 ‘static 要求:
Rc/&T/&mut T/*const T/*mut T 的 Send、Sync 实现情况分析:
类型 | Send |
Sync |
说明 |
---|---|---|---|
Rc<T> |
否 | 否 | 内部引用计数不是原子操作,非线程安全 |
&T |
是 | 是 | 共享不可变引用本质上是线程安全的(Sync );且引用本身可移动 |
&mut T |
是 | 否 | 可变引用确保独占访问,不可多线程共享 |
*const T , *mut T (原始指针) |
否 | 否 | 默认不实现 Send 与 Sync ,须显式不安全地手动实现 |
为了避免 async block/closure 对上下文对象的隐式借用不满足 ‘static 的要求,需要使用 move async block
。
由于 async block
返回匿名的 impl Future<Output=XX>
对象,所以不能对返回的对象施加额外的限界,如 Send。
同时也不能指定 Output 关联类型, 当 Output 为 Result 时可能出错,解决办法:为 Ok 指定它所属的 Enum Result 类型:
let input = async_std::io::stdin();
// future 是 Future<Output=Result>
let future = async {
let mut line = String::new();
// ? 是 async block 返回,结果类型 std::io::Result<usize>
input.read_line(&mut line).await?;
println!("Read line: {}", line);
// Ok(()) // 错误
// 正确,指定 Ok 所属的 Result 类型
Ok::<(), std::io::Error>(())
// 或者
// std::result::Result::<(), std::io::Error>::Ok(());
};
async block
和 async move block
都是表达式,故可以作为闭包函数的返回值。
use async_std::net;
use async_std::task;
// serve_one 是一个 impl Future<Output=()> 的匿名类型对象
let serve_one = async {
// ? 是 async block 返回(而非 block 所在的函数),类型是 Result
let listener = net::TcpListener::bind("localhost:8087").await?;
let (mut socket, _addr) = listener.accept().await?;
};
pub async fn many_requests(requests: Vec<(String, u16, String)>) -> Vec<std::io::Result<String>>
{
let mut handles = vec![];
for (host, port, path) in requests {
handles.push(
task::spawn_local(
// 使用 async move 来获取 host、path 的所有权。
async move {
cheapo_request(&host, port, &path).await
}
));
}
//...
}
loop {
async move {
break; // error[E0267]: `break` inside of an `async` block
}
}
返回 async block 的 closure #
在 Rust 1.75 版本之前,不支持 async closure
定义和限界:
- 不能使用
async || {}
语法来 定义异步闭包函数; - 不能使用
F: async Fn()
语法 来定义异步闭包限界;
对此的解法是,使用返回 Future 对象的闭包或闭包限界:
-
使用返回
async block
或Future
的 closure,如|| async move {}
来定义异步闭包函数,等效于生成impl FnXX() -> impl Future<Output=YY>
类型的匿名类型对象:let app = Router::new() .route( "/", any_service(service_fn( // 等效于生成一个 impl Fn() -> impl Future<Output=Result<Response, Infallible> 匿名类型对象 |_: Request| async { let res = Response::new(Body::from("Hi from `GET /`")); Ok::<_, Infallible>(res) // service_fn 的闭包必须返回 Result })) )
-
使用返回
Future
对象的闭包限界, 如F: FnOnce() -> Fut, Fut: Future<Output = Res> + Send, Res: Send
来定义异步的 闭包限界(需要引入多个泛型参数);
// 示例 1:
fn async_callback<F, Fut>(callback: F)
where
F: FnOnce() -> Fut,
Fut: Future<Output = String>;
// 示例 2:Fn trait bound 返回一个 Future 对象,该对象同时也需要是泛型参数,这样才能进一步限界
impl<F, Fut, Res, S> Handler<((),), S> for F
where
// 闭包整体要实现 FnOnce() -> Fut、Clone、Send、Sync 和 'static
F: FnOnce() -> Fut + Clone + Send + Sync + 'static,
// 闭包的返回值需要实现 Future<Output = Res> 和 Send
Fut: Future<Output = Res> + Send,
Res: IntoResponse,
但是这两种实现,有如下问题:
- 返回
Future
的闭包定义(如返回async block
),不能以&mut T
的方式捕获上下文对象(但是 &T 捕获、或则转移所有权是 OK 的):
let mut vec: Vec<String> = vec![];
// closure 类型是:impl FnMut() -> impl Future<Output=()>
let closure = || async {
// error: captured variable cannot escape `FnMut` closure body
vec.push(ready(String::from("")).await);
};
/*
error: captured variable cannot escape `FnMut` closure body
--> src/main.rs:69:22
|
68 | let mut vec: Vec<String> = vec![];
| ------- variable defined here
69 | let closure = || async {
| ____________________-_^
| | |
| | inferred to be a `FnMut` closure
70 | | vec.push(ready(String::from("")).await);
| | --- variable captured here
71 | | };
| |_____^ returns an `async` block that contains a reference to a captured variable, which then escapes the closure body
|
= note: `FnMut` closures only have access to their captured variables while they are executing...
= note: ...therefore, they cannot allow references to captured variables to escape
*/
但是捕获 &T 是 OK 的:
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error + Send + 'static>> {
let mut vec: Vec<String> = vec!["a".to_string()];
// closure 类型是: impl Fn() -> impl Future<Output=()> 类型
let closure = || async {
vec.iter().for_each(|i| println!("{i}"));
};
closure().await;
}
- 异步函数签名如果包含异步闭包(返回 Future 对象),则不支持 HRTB:HRTB 可能是隐式的,例如限界的闭包的输入、输出包含借用的情况下就是隐式 HRTB。
async fn for_each_city<F, Fut>(mut f: F)
where
// 错误:不支持
F: FnMut(&str) -> Fut,
// 等价于: Rust 为包含引用参数的闭包函数限界自动添加 HRTB 注解
// F: for<'c> FnMut(&'c str) -> Fut,
Fut: Future<Output = ()>,
{
for x in ["New York", "London", "Tokyo"] {
f(x).await;
}
}
async fn do_something2(city_name: &str) { todo!() }
async fn main() {
for_each_city(do_something2).await;
}
/*
error[E0308]: mismatched types
--> src/main.rs:101:5
|
101 | for_each_city(do_something2).await;
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^ one type is more general than the other
|
= note: expected opaque type `impl for<'c> Future<Output = ()>`
found opaque type `impl Future<Output = ()>`
= note: distinct uses of `impl Trait` result in different opaque types
note: the lifetime requirement is introduced here
--> src/main.rs:89:34
|
89 | F: for<'c> FnMut(&'c str) -> Fut,
| ^^^
error: implementation of `FnMut` is not general enough
--> src/main.rs:101:5
|
86 | / async fn for_each_city<F, Fut>(mut f: F)
87 | | where
88 | | // 错误:不支持
89 | | F: for<'c> FnMut(&'c str) -> Fut,
| | ----------------------------- doesn't satisfy where-clause
90 | | Fut: Future<Output = ()>,
| |_____________________________- due to a where-clause on `for_each_city`...
...
101 | for_each_city(do_something2).await;
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
|
= note: ...`for<'a> fn(&'a str) -> impl Future<Output = ()> {do_something2}` must implement `FnMut<(&'c str,)>`
= note: ...but it actually implements `FnMut<(&'0 str,)>`, for some specific lifetime `'0`
error: implementation of `FnMut` is not general enough
--> src/main.rs:101:34
|
86 | / async fn for_each_city<F, Fut>(mut f: F)
87 | | where
88 | | // 错误:不支持
89 | | F: for<'c> FnMut(&'c str) -> Fut,
| | ----------------------------- doesn't satisfy where-clause
90 | | Fut: Future<Output = ()>,
| |_____________________________- due to a where-clause on `for_each_city`...
...
101 | for_each_city(do_something2).await;
| ^^^^^
|
= note: ...`for<'a> fn(&'a str) -> impl Future<Output = ()> {do_something2}` must implement `FnMut<(&'c str,)>`
= note: ...but it actually implements `FnMut<(&'0 str,)>`, for some specific lifetime `'0`
*/
// 另一个例子 2:
async fn f1<Fut>(_: impl for<'a> Fn(&'a u8) -> Fut) where Fut: Future<Output = ()>, { todo!() }
async fn main() {
async fn g(_: &u8) { todo!() }
// error[E0308]: mismatched types
// error: implementation of `Fn` is not general enough
f1(g).await;
}
/*
error[E0308]: mismatched types
--> src/main.rs:57:9
|
57 | f1(g).await;
| ^^^^^ one type is more general than the other
|
= note: expected opaque type `impl for<'a> Future<Output = ()>`
found opaque type `impl Future<Output = ()>`
= note: distinct uses of `impl Trait` result in different opaque types
note: the lifetime requirement is introduced here
--> src/main.rs:44:48
|
44 | async fn f1<Fut>(_: impl for<'a> Fn(&'a u8) -> Fut)
| ^^^
error: implementation of `Fn` is not general enough
--> src/main.rs:57:9
|
44 | async fn f1<Fut>(_: impl for<'a> Fn(&'a u8) -> Fut)
| - ------------------------- doesn't satisfy where-clause
| _|
| |
45 | | where
46 | | Fut: Future<Output = ()>,
| |_____________________________- due to a where-clause on `f1`...
...
57 | f1(g).await;
| ^^^^^
|
= note: ...`for<'a> fn(&'a u8) -> impl Future<Output = ()> {main::g}` must implement `Fn<(&'a u8,)>`
= note: ...but it actually implements `Fn<(&'0 u8,)>`, for some specific lifetime `'0`
error: implementation of `Fn` is not general enough
--> src/main.rs:57:15
|
44 | async fn f1<Fut>(_: impl for<'a> Fn(&'a u8) -> Fut)
| - ------------------------- doesn't satisfy where-clause
| _|
| |
45 | | where
46 | | Fut: Future<Output = ()>,
| |_____________________________- due to a where-clause on `f1`...
...
57 | f1(g).await;
| ^^^^^
|
= note: ...`for<'a> fn(&'a u8) -> impl Future<Output = ()> {main::g}` must implement `Fn<(&'a u8,)>`
= note: ...but it actually implements `Fn<(&'0 u8,)>`, for some specific lifetime `'0`
*/
为了支持 HRTB,需要使用 trait object 的 Pin<Box<dyn Future>>
范式,这也是 async-trait crate
的实现方式:
https://docs.rs/futures/latest/futures/future/type.BoxFuture.html 定义了 BoxFuture 类型, 可以用于多线程环境:
pub type BoxFuture<'a, T> = Pin<Box<dyn Future<Output = T> + Send + 'a>>;
// https://blog.rust-lang.org/inside-rust/2024/08/09/async-closures-call-for-testing/
fn async_callback<F, Fut>(callback: F)
where
// F 闭包输入、输出没有借用,没有 HRTB 的问题,故可以返回 Future 对象
F: FnOnce() -> Fut,
Fut: Future<Output = String>;
fn async_callback<F>(callback: F)
where
// F 闭包输入包含借用时,由于不支持 HRTB,故需要返回 Pin<Box<...>> 类型
F: FnOnce(&str) -> Pin<Box<dyn Future<Output = ()>>>;
async fn do_something(name: &str) {}
// OK
async_callback(|name| Box::pin(async {
do_something(name).await;
}));
// OK: 强制将返回的 Pin<Box<impl Future<Output=()>>> 转换为 Pin<Box<dyn Future<Output=()>>>
// 这里将 impl Future<Output=()> 转换为 dyn Future<Output=()> 是 type coerce 支持的。
let ac = |name| {
let b: Pin<Box<dyn Future<Output=()>>> = Box::pin(async {
do_something(name).await;
});
b
};
async_callback(ac);
// ERROR: 这是由于 ac 的类型是 impl Fn(&str) -> Pin<Box<impl Future<Output=()>>>, 与 F 的限界不匹配
let ac = |name| {
Box::pin(async {
do_something(name).await;
})
};
async_callback(ac);
/*
error[E0271]: expected `{[email protected]:83:14}` to return `Pin<Box<dyn Future<Output = ()>>>`, but it returns `Pin<Box<{async block@src/main.rs:84:18: 84:23}>>`
--> src/main.rs:84:9
|
83 | let ac = |name| {
| ------ this closure
84 | / Box::pin(async {
85 | | do_something(name).await;
86 | | })
| |__________^ expected `Pin<Box<dyn Future<Output = ()>>>`, found `Pin<Box<{async block@src/main.rs:84:18: 84:23}>>`
87 | };
88 | async_callback(ac);
| -------------- -- closure used here
| |
| required by a bound introduced by this call
|
= note: expected struct `Pin<Box<(dyn Future<Output = ()> + 'static)>>`
found struct `Pin<Box<{async block@src/main.rs:84:18: 84:23}>>`
note: required by a bound in `async_callback`
--> src/main.rs:62:24
|
60 | fn async_callback<F>(callback: F)
| -------------- required by a bound in this function
61 | where
62 | F: FnOnce(&str) -> Pin<Box<dyn Future<Output = ()>>>,
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ required by this bound in `async_callback`
For more information about this error, try `rustc --explain E0271`.
error: could not compile `my-demo` (bin "my-demo") due to 1 previous error
*/
如果不使用 Pin<Box<dyn Future<Output=()»> 而使用 Box<dyn Future<Output=()>, 则当闭包的参数包含借用时则会有 lifetime 的问题:
-
而前者由于是 Pin 住的 Box,Box 的内存不能再移动,可以被 poll,而后者可以被移动,导致内部 dyn Future 的自引用出现问题;
-
最佳实践: 只使用 Pin<Box<dyn Future<Output=()»> 而不使用 Box<dyn Future<Output=()>;
-
对于 Pin<Box<dyn Future<Output = ()> + ‘a> 类型的 fut 对象,可以使用 Pin::as_mut() 方法来返回 Pin<&mut dyn Future<Output = ()» ,如 fut.as_mut().poll() 来进行 poll()
fn async_callback<F>(callback: F)
where
F: FnOnce(&str) -> Box<dyn Future<Output = ()>>,
{
callback("hello there!");
}
async fn do_something(name: &str) {}
fn main() {
async_callback(|name| {
Box::new(async {
do_something(name).await;
})
});
}
/*
error: lifetime may not live long enough
--> src/main.rs:71:9
|
70 | async_callback(|name| {
| ----- return type of closure is Box<(dyn Future<Output = ()> + '2)>
| |
| has type `&'1 str`
71 | / Box::new(async {
72 | | do_something(name).await;
73 | | })
| |__________^ returning this value requires that `'1` must outlive `'2`
*/
另一种表达方式:如果 async block 借用了所在函数的对象,则该 async block 一定要比借用对象、以及所在的含命周期需要比所在的函数长,但是不支持表达该语义:
fn async_callback<F>(callback: F)
where
F: FnOnce(&str) -> Box<dyn Future<Output = ()>>,
{
}
async fn do_something(name: &str) {}
fn main() {
let ac = |name| {
let a = async {
do_something(name).await;
};
let b: Box<dyn Future<Output = ()>> = Box::new(a);
b
};
async_callback(ac);
/*
error[E0373]: async block may outlive the current function, but it borrows `name`, which is owned by the current function
--> src/main.rs:76:17
|
76 | let a = async {
| ^^^^^ may outlive borrowed value `name`
77 | do_something(name).await;
| ---- `name` is borrowed here
|
note: async block is returned here
--> src/main.rs:80:9
|
80 | b
| ^
help: to force the async block to take ownership of `name` (and any other referenced variables), use the `move` keyword
|
76 | let a = async move {
| ++++
*/
}
但是加了 async move
后又报错:error: implementation of FnOnce
is not general enough
fn async_callback<F>(callback: F)
where
// 这里的 F 输入参数包含借用,所以实际是隐式的 HRTB 限界
F: FnOnce(&str) -> Box<dyn Future<Output = ()>>,
{
callback("just for test");
}
async fn do_something(name: &str) {}
fn main() {
let ac = |name| {
let a = async move {
do_something(name).await;
};
let b: Box<dyn Future<Output = ()>> = Box::new(a);
b
};
// 但是 ac 的类型是:impl fn(&'2 str) -> Box<dyn Future<Output = ()>> 类型,
// 而不是限界的 FnOnce(&str) -> Box<dyn Future<Output = ()>> 类型
async_callback(ac);
}
/*
error: implementation of `FnOnce` is not general enough
--> src/main.rs:80:9
|
80 | b
| ^ implementation of `FnOnce` is not general enough
|
= note: closure with signature `fn(&'2 str) -> Box<dyn Future<Output = ()>>` must implement `FnOnce<(&'1 str,)>`, for any lifetime `'1`...
= note: ...but it actually implements `FnOnce<(&'2 str,)>`, for some specific lifetime `'2`
error: implementation of `FnOnce` is not general enough
--> src/main.rs:82:5
|
82 | async_callback(ac);
| ^^^^^^^^^^^^^^^^^^ implementation of `FnOnce` is not general enough
|
= note: closure with signature `fn(&'2 str) -> Box<dyn Future<Output = ()>>` must implement `FnOnce<(&'1 str,)>`, for any lifetime `'1`...
= note: ...but it actually implements `FnOnce<(&'2 str,)>`, for some specific lifetime `'2`
*/
Rust 1.75+ async closure #
Rust 1.75+ 开始正式支持 async closure
:
- 使用
async || {}
语法(而非|| async {}
),支持借用方式捕获上下文对象,支持 HRTB; - 或者在 trait bound、函数返回值场景,使用
AsyncFn{,Mut,Once}
函数集合(而不是async Fn{,Mut,Once}
语法);
参考:
- Stabilize async closures (RFC 3668)
- rfc-3668
- 注意:关于
async Fn* vs AsyncFn*
语法问题,正式版本采用了后者: https://github.com/rust-lang/rust/issues/128129
- 注意:关于
let s = String::from("hello, world");
// Implements `AsyncFn()` along with `FnMut` and `Fn`
// because it can copy the `&String` that it captures.
let _ = async || {
println!("{s}");
};
let s = String::from("hello, world");
// Implements `AsyncFn()` but not `FnMut` or `Fn` because
// it moves and owns a value of type `String`, and therefore
// the future it returns needs to take a pointer to data
// owned by the closure.
let _ = async move || {
println!("{s}");
};
let mut s = String::from("hello, world");
// Implements `AsyncFnMut()` but not `FnMut` or `Fn`
// because it needs to reborrow a mutable pointer to `s`.
let _ = async move || {
s.push('!');
};
// https://rust-lang.github.io/rfcs/3668-async-closures.html#interaction-with-return-type-notation-naming-the-future-returned-by-calling
let s = String::from("hello, world");
let closure = async move || {
ready(&s);
};
// At this point, `s` is moved out of. However, the
// allocation for `s` is still live. It just lives as a
// captured field in `closure`.
// Manually call `AsyncFnOnce` -- this isn't stable since
// `AsyncFnOnce` isn't stable, but it's useful for the demo.
let fut = AsyncFnOnce::call_once(closure, ());
// At this point, `closure` is dropped. However, the
// allocation for `s` is still live. It now lives as a
// captured field in `fut`.
fut.await;
// After the future is awaited, it's dropped. At that
// point, the allocation for `s` is dropped.
// They can have arguments annotated with types:
let _ = async |_: u8| { todo!() };
// They can have their return types annotated:
let _ = async || -> u8 { todo!() };
// They can be higher-ranked:
let _ = async |_: &str| { todo!() };
// They can capture values by move:
let x = String::from("hello, world");
let _ = async move || do_something(&x).await };
takes_an_async_fn(async |s| { other_fn(s).await }).await;
trait bound 场景和返回值:使用 AsyncFn{,Once,Mut}
语法:
// AsyncFn{,Once,Mut} 可以用于返回值、泛型参数限界
// 也就是:The AsyncFn* trait can be used anywhere a Fn* trait bound is allowed
//
/// In return-position impl trait:
fn closure() -> impl AsyncFn() { async || {} }
/// In trait bounds:
trait Foo<F>: Sized where F: AsyncFn()
{
fn new(f: F) -> Self;
}
/// in GATs:
trait Gat {
type AsyncHasher<T>: AsyncFn(T) -> i32;
}
async fn takes_an_async_fn(f: impl AsyncFn(&str)) {
futures::join(f("hello"), f("world")).await;
}
// 作为限界:
// 老的写法,不建议:闭包和返回值分别限界:
fn doesnt_exactly_take_an_async_closure<F, Fut>(callback: F)
where
F: FnOnce() -> Fut,
Fut: Future<Output = String>
{ todo!() }
// 建议: 新的写法
fn takes_an_async_closure<F: AsyncFnOnce() -> String>(callback: F) { todo!() }
AsyncFn{,Once,Mut} 的内部实现如下:
// https://rust-lang.github.io/rfcs/3668-async-closures.html#interaction-with-return-type-notation-naming-the-future-returned-by-calling
/// An async-aware version of the [`FnOnce`](crate::ops::FnOnce) trait.
///
/// All `async fn` and functions returning futures implement this trait.
pub trait AsyncFnOnce<Args> {
/// Future returned by [`AsyncFnOnce::async_call_once`].
type CallOnceFuture: Future<Output = Self::Output>;
/// Output type of the called closure's future.
type Output;
/// Call the [`AsyncFnOnce`], returning a future which may move out of the called closure.
fn async_call_once(self, args: Args) -> Self::CallOnceFuture;
}
/// An async-aware version of the [`FnMut`](crate::ops::FnMut) trait.
///
/// All `async fn` and functions returning futures implement this trait.
pub trait AsyncFnMut<Args>: AsyncFnOnce<Args> {
/// Future returned by [`AsyncFnMut::async_call_mut`] and [`AsyncFn::async_call`].
type CallRefFuture<'a>: Future<Output = Self::Output>
where
Self: 'a;
/// Call the [`AsyncFnMut`], returning a future which may borrow from the called closure.
fn async_call_mut(&mut self, args: Args) -> Self::CallRefFuture<'_>;
}
/// An async-aware version of the [`Fn`](crate::ops::Fn) trait.
///
/// All `async fn` and functions returning futures implement this trait.
pub trait AsyncFn<Args>: AsyncFnMut<Args> {
/// Call the [`AsyncFn`], returning a future which may borrow from the called closure.
fn async_call(&self, args: Args) -> Self::CallRefFuture<'_>;
}
但是新的 AsyncFn{,Once,Mut} 带来的问题(局限性):不支持为返回值指定 trait bound,也就是不能为上面 AsyncFn* 的关联类型如 Output、CallOnceFuture 进行限界,但是老的 Fn* 是 Ok 的:
- https://rust-lang.github.io/rfcs/3668-async-closures.html#associated-types-of-asyncfn-traits-are-not-nameable
- 原因:https://rust-lang.github.io/rfcs/3668-async-closures.html#why-do-we-recommend-the-asyncfnonceoutput-type-remains-unstable-unlike-fnonceoutput
With AsyncFn() -> T
trait bounds, we don’t know anything about the Future returned by calling the
async closure other than that it’s a Future and awaiting that future returns T.
// 老的写法 OK:
fn foo<F, T>()
where
F: FnOnce() -> T,
F::Output: Send, //~ OK,Output 为 FnOnce 的关联类型
// 等效于
//T: Send
{
}
// 或者:
fn async_callback<F, Fut>(callback: F)
where
F: FnOnce() -> Fut,
Fut: Future<Output = String> + Send + 'static
{ todo!() }
// 新的写法不行:
fn foo<F, T>()
where
F: async FnOnce() -> T,
F::Output: Send,
//~^ ERROR use of unstable library feature
F::CallOnceFuture: Send,
//~^ ERROR use of unstable library feature
{
}
// 这是由于:x 的返回值是 Future<Output=Result<()> 类型,而不是 spawn() 要求的 Future<Output=Result<()>> + Send + 'static 类型:
async fn foo(x: impl AsyncFn(&str)) -> Result<()> {
tokio::spawn(x("hello, world")).await
}
/*
error[E0277]: cannot be sent between threads safely
--> src/lib.rs
|
| tokio::spawn(x("hello, world")).await
| ------------ ^^^^^^^^^^^^^^^^^ cannot be sent between threads safely
| |
| required by a bound introduced by this call
*/
另一个 trait 中包含 async method 的报错例子:
- 注:从 Rust 1.75.0 开始,trait 中包含 async fn 是 stable 状态的。
// https://smallcultfollowing.com/babysteps/blog/2023/02/01/async-trait-send-bounds-part-1-intro/#fnref:1
trait HealthCheck {
async fn check(&mut self, server: &Server) -> bool;
}
fn start_health_check<H>(health_check: H, server: Server)
where
H: HealthCheck + Send + 'static,
{
tokio::spawn(async move {
while health_check.check(&server).await {
tokio::time::sleep(Duration::from_secs(1)).await;
}
emit_failure_log(&server).await;
});
}
/*
error: future cannot be sent between threads safely
--> src/lib.rs:15:18
|
15 | tokio::spawn(async move {
| __________________^
16 | | while health_check.check(&server).await {
17 | | tokio::time::sleep(Duration::from_secs(1)).await;
18 | | }
19 | | emit_failure_log(&server).await;
20 | | });
| |_____^ future created by async block is not `Send`
|
= help: within `[async block@src/lib.rs:15:18: 20:6]`, the trait `Send` is not implemented for `impl Future<Output = bool>`
*/
这是由于异步函数 ealth_check.check()
返回的 Future 对象没有实现 Send + ‘static:
/*
note: future is not `Send` as it awaits another future which is not `Send`
--> src/lib.rs:16:15
|
16 | while health_check.check(&server).await {
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^ await occurs here
*/
trait HealthCheck {
// async fn check(&mut self, server: &Server) -> bool;
fn check(&mut self, server: &Server) -> impl Future<Output = bool>;
// ^ Problem is here! This returns a future, but not necessarily a `Send` future.
}
而 async-trait crate
对此的解决办法是返回 Pin<Box<dyn Future + Send>>
, 而非 yields -> impl Future
。
Pin<Box<dyn Future + Send>>
实现了 impl Future<Output = Self::Response> + Send
-
这里不使用
Box<dyn Future + Send>
的原因是它未实现 Unpin(也未实现 Future), 更进一步因为是dyn Future + Send
未实现 Unpin,所以需要使用Pin<Box<dyn Future + Send>>
; -
Pin<&mut impl Future>
和Pin<Box<impl Future>>
都实现了 Future 和 Unpin;
impl<Ptr> Unpin for Pin<Ptr>
where
Ptr: Unpin,
// https://github.com/rust-lang/rfcs/blob/master/text/3654-return-type-notation.md
trait Service<Request> {
type Response;
// Invoke the service.
async fn call(&self, req: Request) -> Self::Response;
}
// 下面的 LogService call() 方法只能在 thread-per-core or single-threaded executor 中使用,而不能在多线程 executor 中使用。
pub struct LogService<S>(S);
impl<S, R> Service<R> for LogService<S>
where
S: Service<R>,
R: Debug,
{
type Response = S::Response;
async fn call(&self, request: R) -> S::Response {
eprintln!("{request:?}");
self.0.call(request).await
}
}
// 多线程中使用报错:
async fn spawn_call<S>(service: S) -> S::Response
where
S: Service<(), Response: Send> + Send + 'static,
{
tokio::spawn(async move {
service.call(()).await // <--- Error
}).await
}
/*
error: future cannot be sent between threads safely
--> src/lib.rs:6:5
|
6 | / tokio::spawn(async move {
7 | | service.call(()).await // <--- Error
8 | | }).await.unwrap()
| |______^ future created by async block is not `Send`
|
= help: within `{async block@src/lib.rs:6:18: 8:6}`, the trait `Send` is not implemented for `impl Future<Output = <S as Service<()>>::Response>`, which is required by `{async block@src/lib.rs:6:18: 8:6}: Send`
note: future is not `Send` as it awaits another future which is not `Send`
--> src/lib.rs:7:9
|
7 | service.call(()).await // <--- Error
| ^^^^^^^^^^^^^^^^ await occurs here on type `impl Future<Output = <S as Service<()>>::Response>`, which is not `Send`
*/
// 解决办法:修改 call() 方法的返回值,返回一个 Send 类型对象
trait SendService<Request>: Send {
type Response;
// Invoke the service.
fn call(
&self,
req: Request,
) -> impl Future<Output = Self::Response> + Send;
// 具体实现可能是返回一个 Pin<Box<dyn Future + Send>> 类型对象
}
由于 AsyncFn*
不能为返回的 Future 对象指定 trait bound,也即不支持:H implements HealthCheck and its check method returns a Send future
。
-
In other words, we don’t want just
any type that implements HealthCheck
. We specifically want a type thatimplements HealthCheck and returns a Send future
. -
we need some kind of syntax for declaring that you want an
implementation that returns Send futures
, and not just any implementation
解决办法:
- 继续使用老的写法;
- 如果是 trait 中的 async 方法,则使用 async-trait crate;
- 或则等待 Return Type Notation 稳定:
-
RFC: https://github.com/rust-lang/rfcs/pull/3654, https://github.com/rust-lang/rfcs/blob/master/text/3654-return-type-notation.md
RTN 语法:A way to name “the type returned by a function”
// https://smallcultfollowing.com/babysteps/blog/2023/02/13/return-type-notation-send-bounds-part-2/
fn start_health_check<H>(health_check: H, server: Server)
where
H: HealthCheck + Send + 'static,
H::check(..): Send, // <— return type notation
// Here the where clause H::check(..): Send means “the type(s) returned when you call H::check must
// be Send. Since async functions return a future, this means that future must implement Send.
// 完整例子:
#![feature(return_type_notation)]
#![allow(dead_code)]
use std::time::Duration;
struct Server;
trait HealthCheck {
async fn check(&mut self, server: &Server) -> bool;
}
fn start_health_check<H>(mut health_check: H, server: Server)
where
H: HealthCheck + Send + 'static,
H::check(..): Send + 'static, // 加 'static 也 OK
{
tokio::spawn(async move {
while health_check.check(&server).await {
tokio::time::sleep(Duration::from_secs(1)).await;
}
emit_failure_log(&server).await;
});
}
async fn emit_failure_log(_server: &Server) { }
RTN 的各种语法(where 限界 或 关联类型限界):
// 注意:下面的 method 需要被替换为 Trait 实际的方法名称,例如上面的 check(...)
fn foo<T, U>()
where
// Associated type bound
T: Trait<method(..): Send + 'static>,
// Path bound
U: Trait,
U::method(..): Send + 'static,
{}
trait Trait {
// In GAT bounds.
type Item: Trait<method(..): Send + 'static>;
}
// In opaque item bounds too.
fn rpit() -> impl Foo<method(..): Send + 'static>;
使用 RTN 的示例:
// https://rust-lang.github.io/rfcs/3668-async-closures.html#interaction-with-return-type-notation-naming-the-future-returned-by-calling
async fn foo(x: F) -> Result<()>
where
F: AsyncFn(&str) -> Result<()>,
// The future from calling `F` is `Send` and `'static`.
F(..): Send + 'static,
// Which expands to two bounds:
// `for<'a> <F as AsyncFnMut>::CallRefFuture<'a>: Send`
// `<F as AsyncFnOnce>::CallOnceFuture: Send`
// the latter is only if `F` is bounded with `async Fn` or `async FnMut`.
{
tokio::spawn(x("hello, world")).await
}
// https://github.com/rust-lang/rust/pull/138424#issuecomment-2766836658
#![feature(return_type_notation)]
trait Trait {
async fn foo() {}
}
trait Other {}
impl<T> Other for T
where
T: Trait,
T::foo(..): Send,
{}
impl Other for u32 {}
// https://github.com/rust-lang/rust/pull/138424#issuecomment-2766947426
#![feature(return_type_notation)]
trait Trait {
fn foo() -> impl Sized;
}
trait Id {
type This: ?Sized;
}
impl<T: ?Sized> Id for T {
type This = T;
}
trait GetOpaque {
type Opaque: ?Sized;
}
impl<T: ?Sized, U> GetOpaque for T
where
T: Trait,
T::foo(..): Id<This = U>,
{
type Opaque = U;
}
type FooTypePos<T> = <T as GetOpaque>::Opaque;
impl Trait for u8 {
fn foo() -> impl Sized {
|| ()
}
}
struct ContainsOpaque(FooTypePos<u8>);
// 另一个例子:https://play.rust-lang.org/?version=nightly&mode=debug&edition=2021&gist=9691749c3f46ed13e459ee1d6382f7e8
#![feature(return_type_notation)]
async fn spawn_call<S>(service: S) -> S::Response
where
S: Service<(), Response: Send, call(..): Send> + Send + 'static,
{
tokio::spawn(async move {
service.call(()).await
}).await.unwrap()
}
trait Service<Request> {
type Response;
// Invoke the service.
async fn call(&self, req: Request) -> Self::Response;
}
// 另一个例子:https://play.rust-lang.org/?version=nightly&mode=debug&edition=2021&gist=6d45f55355188001ea6499314ce30b4b
#![feature(return_type_notation)]
trait Factory {
fn widgets(&self) -> impl Iterator<Item = Widget>;
}
struct ReverseWidgets<F: Factory<widgets(..): DoubleEndedIterator>> {
factory: F,
}
impl<F> Factory for ReverseWidgets<F>
where
F: Factory<widgets(..): DoubleEndedIterator>,
{
fn widgets(&self) -> impl Iterator<Item = Widget> {
self.factory.widgets().rev()
// 👆 requires that the iterator be double-ended
}
}
struct Widget;
与以前的返回 async block 的 closure 不同,async closure 可以捕获上下文对象:
let mut vec: Vec<String> = vec![];
// 错误:let closure = || async {
// 正确:
let closure = async || {
// async closure 使用 &'closure mut Vec<String> 方式捕获了上下文对象 vec,直到这个闭包被 drop(如被 .await)。
vec.push(ready(String::from("")).await);
};
let string: String = "Hello, world".into();
let closure = async move || {
// 闭包获得了 string 的所有权
ready(&string).await;
};
async closure 也支持 HTRB:
// 不建议:老的写法:Instead of writing:
fn higher_ranked<F>(callback: F) where F: Fn(&Arg) -> Pin<Box<dyn Future<Output = ()> + '_>> { todo!() }
// 新的写法:async closure 参数支持借用,也即支持 HRTB:
fn higher_ranked<F: AsyncFn(&Arg)> { todo!() }
// 示例
// We could also use APIT: `mut f: impl async FnMut(&str)`.
async fn for_each_city<F>(mut f: F)
where
F: AsyncFnMut(&str),
// ...which is sugar for:
// F: for<'a> async FnMut(&'a str),
{
for x in ["New York", "London", "Tokyo"] {
f(x).await;
}
}
async fn increment_city_population_db_query(city_name: &str) { todo!() }
async fn main() {
// Works for `async fn` that is higher-ranked.
for_each_city(increment_city_population_db_query).await;
}
// 示例:
where T: for<'a> async Fn(&'a ()) -> i32,
// 示例:
// Returns: `impl Future<Output = ()> + Captures<&'a ()>`
async fn foo<'a>(x: &'a ()) { _ = (x,); }
Rust 自动为实现 Fn*() -> Fut
且 Fut 实现 Future<Output = T>
的类型(闭包、异步函数、 dyn Fn* trait object 等)实现
AsyncFn*() -> T
。
// 原理类似于:(AsyncFn、AsyncFnMut 类似)
impl<F, Args, Fut, T> AsyncFnOnce<Args> for F
where
F: FnOnce<A, Output = Fut>,
Fut: Future<Output = T>,
{
type Output = T;
type CallOnceFuture = Fut;
fn async_call_once(self, args: Args) -> Self::CallOnceFuture {
FnOnce::call_once(self, args)
}
}
// 以下类型自动实现了 AsyncFn*:
// Async functions:
async fn foo() {}
// Functions that return a concrete future type:
fn foo() -> Pin<Box<dyn Future<Output = ()>>> { Box::pin(async {}) }
// Closures that return an async block:
let c = || async {};
- 当前只为直接可调用的具体类型实现了上述自动转换,对于函数参数位置的 impl trait like impl Fn() -> impl Future<Output = ()> does not implement AsyncFn();
fn is_async_fn(_: impl AsyncFn(&str)) {}
async fn async_fn_item(s: &str) { todo!() }
is_async_fn(s);
// ^^^ This works.
fn generic(f: impl Fn() -> impl Future<Output = ()>) {
is_async_fn(f);
// ^^^ This does not work (yet).
}
Future + Send + ‘static 限界 #
tokio 等异步运行时的对提交异步 task 的要求:
- spawn(future):多线程执行,future 需要实现
Future+Send+'static
- spawn_local(future): 单线程执行,future 需要实现
Future+'static
- spawn_blocking(closure): 多线程执行,closure 是实现
Send+'static
的同步闭包;
其中 Future 可以是 async block/async fn/ async closure
的实现。
async block/closure 会捕获上下文对象,而且默认优先是借用 &T 和 &mut T 捕获,所以很难满足 ‘static 要求,故一般需要使用 async move 的 block 和 closure 方式。
async block/fn/closure 不满足 Send 的常见情况:
- 跨 .await 使用没有实现 Send 的类型对象,如 Rc、MutexGuard 等。
Pin/Unpin/pin!()/.await #
Unpin 是标准库提供的 marker trait
, 它是一个标记类型(std::pin::Pin
是一个具体 struct 类型),只能由 Rust 编译器实现。
类型实现 Unpin trait
意味着该类型对象可以被安全的移动(move)。
Rust 为几乎所有类型都实现了 Unpin,例外的情况如下(它们实现了 !Unpin
):
Future
:因为实现Future trait object
可能包含对自身引用或保存指针类型的栈变量, 移动它可能会使这些地址指向失效。async fn/block/closure
语法糖返回的是实现dyn Future
的匿名类型,所以也未实现 Unpin;
Pin<P>
:Pin 本身就是为了处理不能移动的数据而设计的,Pin<P>
对于大多数 P 类型未实现 Unpin;PhantomPinned
:标记 trait,用来显式标记类型不应该实现 Unpin:struct MyType { _pin: PhantomPinned }
Pin<T>
是一个 struct std::pin::Pin
类型(而 Unpin 是 trait),将对象包装到 Pin 中后,可以确保该对象不会被移动(move)。
创建 Pin<Ptr>
对象的几种方式:
Pin::new(T)
: new() 方法对传入的 T 要求必须实现Deref trait
,而且Target
必须是 Unpin。
- 传入的一般是
&T、&mut T、Box<T>
类型,它们都实现了Deref trait
,对应的 Target 是 T 类型,所以 T 也需要实现 Unpin,T 不能是dyn Future
或Pin
对象
Box::pin(T)
和pin!(T)
:
-
T 可以是
dyn Future
类型对象, 前者返回的是Pin<Box<impl Future>>
,后者返回的是Pin<&mut T>
类型对象; -
Pin<&mut dyn Future>
,即包含了传入 impl Future 对象的&mut
借用,这样确保只能通过返回的 Pin 对象来操作 dyn Future,而 Pin 对 象又可以确保不会转移内部封装的对象,所以满足 Future 的 poll() 方法的签名 要求。
impl<Ptr> Pin<Ptr> where Ptr: Deref, <Ptr as Deref>::Target: Unpin
pub fn new(pointer: Ptr) -> Pin<Ptr>
// 示例:
let mut data=43;
let pinned: Pin<&mut i32> = Pin::new(&mut data); // &mut data Deref 的 Target 是 i32 类型,它实现了 Unpin
let boxed = Box::new(42);
let pinned: Pin<Box<i32>> = Pin::new(boxed); // boxed Deref 的 Target 是 i32 类型,它实现了 Unpin
// 返回 std::future::Ready<i32> struct 类型, 它实现了 Unpin 和 Future
let mut unpin_future = std::future::ready(5);
// &mut unpin_future 是 &mut Ready<i32> 类型,它实现了 Deref 且 Target 是 i32,实现了 Unpin,故满足 Pin::new() 的前面要求
let my_pinned_unpin_future: Pin<&mut _> = Pin::new(&mut unpin_future);
let pinned = Box::pin(42);
Pin<Ptr>
类型的方法:
as_ref()/as_mut()/info_ref()
都返回新的 Pin 对象。as_mut()
返回Pin<&mut T>
类型,满足Future::poll()
方法签名要求;
impl<Ptr> Pin<Ptr> where Ptr: Deref, <Ptr as Deref>::Target: Unpin
pub fn new(pointer: Ptr) -> Pin<Ptr>
pub fn into_inner(pin: Pin<Ptr>) -> Ptr
impl<Ptr> Pin<Ptr> where Ptr: Deref
pub unsafe fn new_unchecked(pointer: Ptr) -> Pin<Ptr>
pub fn as_ref(&self) -> Pin<&<Ptr as Deref>::Target> // 返回新的 Pin 对象
impl<Ptr> Pin<Ptr> where Ptr: DerefMut
pub fn as_mut(&mut self) -> Pin<&mut <Ptr as Deref>::Target> // 返回新的 Pin 对象
pub fn set(&mut self, value: <Ptr as Deref>::Target) where <Ptr as Deref>::Target: Sized
impl<'a, T> Pin<&'a T> where T: ?Sized
pub fn get_ref(self) -> &'a T // 返回 &T
impl<'a, T> Pin<&'a mut T> where T: ?Sized
pub fn into_ref(self) -> Pin<&'a T>
pub fn get_mut(self) -> &'a mut T where T: Unpin // 返回 &mut T
Pin<Ptr>
实现了 Deref/DerefMut trait
,可以调用 Ptr 指向的类型的方法,同时也可以作为 Pin::new()
的参数:
- 如果 T 实现了 Unpin,则
Pin<&mut T>
效果和&mut
类似,mem::replace() 或 mem::take() 可以 move 该值。
impl<Ptr> Deref for Pin<Ptr> where Ptr: Deref
type Target = <Ptr as Deref>::Target
fn deref(&self) -> &<Ptr as Deref>::Target
impl<Ptr> DerefMut for Pin<Ptr> where Ptr: DerefMut, <Ptr as Deref>::Target: Unpin
fn deref_mut(&mut self) -> &mut <Ptr as Deref>::Target
Pin<Ptr>
实现了 Future
,但要求 Deref 的 Target 也实现了 Future
,如 pin!(future_obj)
返回的对象:
impl<P> Future for Pin<P> where P: DerefMut, <P as Deref>::Target: Future
type Output = <<P as Deref>::Target as Future>::Output
fn poll( self: Pin<&mut Pin<P>>, cx: &mut Context<'_>, ) -> Poll<<Pin<P> as Future>::Output>
std::pin::Pin struct
常用场景是 std::future::Feature trait
的 poll
方法: 由于 Future trait object
必须保存
执行上下文 stack 上的变量,而 Future 下一次被 wake 执行的时机是不定的,所以为了避免 Future 对象上保存的 stack 变量的地址发生变化导致引用出错,需要将 Future
对象设置为 Pin<&mut Self>
类型,这样该对象将不能被转移,从而确保内部保存的栈地址有效。
- Future 没有实现 Unpin,因为 async/await 生成的 Future 可能包含自引用结构;
- 编译器将 async 函数转换为状态机时, 会产生包含内部引用的结构;
- poll() 方法的签名是
self: Pin<&mut Self>
,会消耗自身。
pub trait Future {
type Output;
// Required method
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
}
调用 async fn
返回一个 Future
类型值,在进行 poll() 前需要获得对应的 Pin<&mut impl Future>
对象,而 Future 没有实现
Deref,所以不能作为 Pin::new() 的参数。
Box::pin() 和 pin!() #
解决办法:使用 Box::pin() 或 pin!()
来创建满足 Future::poll() 函数前面要求的 Pin<&mut T> 对象:
pin!(future)
:返回一个同名的Pin<&mut impl Future>
类型对象,满足 poll() 方法签名要求。
- 可以对返回的 future 对象执行:future.await 或 (&mut future).await
- 或者对返回的 future 对象执行:future.poll() 或 (&mut future).poll()
- 或者调用返回 future 对象的 as_mut() 方法,返回一个新的 Pin<&mut impl Future> 对象,再调用它的 poll() 方法;
Box::pin(future)
: 返回的是Pin<Box<impl Future>>
对象,需要进一步调用Pin::as_mut()
返回新的Pin<&mut impl Future>
对象,才满足 poll() 方法签名要求:
Pin<&mut impl Future>
和 Pin<Box<impl Future>>
都实现了 Future:
impl<P> Future for Pin<P>
where
P: DerefMut,
<P as Deref>::Target: Future,
type Output = <<P as Deref>::Target as Future>::Output
fn poll(
self: Pin<&mut Pin<P>>,
cx: &mut Context<'_>,
) -> Poll<<Pin<P> as Future>::Output>
示例:
// 如果要 Pin<Box<T>>, 则需要使用 Box::pin(value) 函数
async fn add_one(x: u32) -> u32 {
x + 1
}
// fut 是 impl Future 的匿名类型
let fut = add_one(42);
// 返回一个 Pin<Box<impl Future>> 类型,Box 是智能指针类型。
let pinned_fut: Pin<Box<_>> = Box::pin(fut);
对于已有的 BoxBox::into_pin(value)
创建 Pin<Box<T>>
对象:
async fn add_one(x: u32) -> u32 {
x + 1
}
fn boxed_add_one(x: u32) -> Box<dyn Future<Output = u32>> {
Box::new(add_one(x))
}
// boxed_fut 类型是 Box<dyn Future<Output = u32>>
let boxed_fut = boxed_add_one(42);
// 返回 Pin<Box<dyn Future> 对象
let pinned_fut: Pin<Box<_>> = Box::into_pin(boxed_fut);
pin!(T)
宏返回 Pin<&mut T>
类型对象,如果如果传入的是 impl Future
对象,则返回 Pin<&mut impl Future>
对象,满足 poll()
方法签名要求:
- 如果 T 没有实现 Unpin(如 Future 对象),则该宏将 T 值 pin 到内存中(通过 &mut T 可变借用的方式),不允许 move。
let parker = Parker::new();
let unparker = parker.unparker().clone();
let waker = waker_fn(move || unparker.unpark());
let mut context = Context::from_waker(&waker);
// pin!(T) 宏返回 Pin<&mut T> 类型对象;
// 为 future 创建一个 Pin<&mut impl Future> 对象,
pin!(future);
let _ = future.poll(&mut context)
对 Future 对象调用 .await
表达式,用于对 Future 对象进行 poll,直到获得 Ready 值。编译器自动对 Future 对象进行 Pin,然后调用它的 poll() 方法
(同时也会消耗 Future 对象)。
.await 不支持对 Future 对象的 &mut 进行 poll,这是因为 Future 的 poll() 方法的 self 签名是 self: Pin<&mut Self>
:
async fn my_async_fn() {
// async logic here
}
#[tokio::main]
async fn main() {
let mut future = my_async_fn();
// 错误:.await 不支持 &mut Future 的自动 Pin
(&mut future).await;
}
解决办法:对 future 对象 使用 pin!(future)
返回 Pin<&mut impl Future>
对象,然后对该对象或它的 &mut pinned_future
进行
.await
:
.await
的本质是对Future
进行poll
,而poll
需要Pin<&mut Future>
,所以只要能得到Pin<&mut Future>
,无论是future.await
还是(&mut future).await
都可以。
// 下面代码 Ok
#[tokio::main]
async fn main() {
let future = my_async_fn();
pin!(future); // 返回一个同名(future)的 Pin<&mut impl Future> 对象;
// 必须先将 Future 对象 pin 住,获得同名的 Pin<&mut impl Future> 类型对象,然后才能使用 &mut 进行 .await。
// 否则,是不支持 &mut impl Future 的 .await 的。
// 以下 3 种方式均可:
// 1. 消耗 future 对象
future.await;
// 2. 不消耗 future 对象
(&mut future).await;
// 3. 不消耗 future 对象,as_mut() 返回一个新的 Pin<&mut impl Future> 类型对象
future.as_mut().await;
}
(&mut future).await
在 loop+select!{}
中得到应用:要对 Future 对象进行重复 poll,而 select!{}
在并发 .await 时,如果有返
回值,则会 drop 其它 branch 的 Future 对象,所以在 select branch 中不能直接使用 future 对象,以避免它被 drop 后下一次 loop 失败 select
失败。
解决办法是:
- 创建一个 mut 类型的 Future 对象;
- 使用
pin!(future)
来创建一个同名,但类型是Pin<&mut impl Future<Output=xx>>
的 Pin 对象, 它也实现了 Future; - 在 select branch 中使用
&mut future
来进行轮询。
async fn action() {
// Some asynchronous logic
}
#[tokio::main]
async fn main() {
let (mut tx, mut rx) = tokio::sync::mpsc::channel(128);
let operation = action(); // 返回一个 Future 对象, 没有对它 .await
// 必须先 Pin 该 Future 对象,创建一个同名的类型为 Pin<&mut impl Future<Output=i32> 类型对象 ;
tokio::pin!(operation);
loop {
tokio::select! {
// &mut impl Future 是不被 .await 支持的。
//
// 但是 &mut operation 类型是 &mut Pin<&mut impl Future<Output=i32> 被 .await 表达式所支持, 所以可以被 poll 轮询。
//
// 当该 branch 因不匹配或无返回值而被 drop/cancel 时,operation 任然有效,下一次 select!{} 可以接续使用 &mut operation
_ = &mut operation => break,
Some(v) = rx.recv() => {
if v % 2 == 0 {
break;
}
}
}
}
}
除了 &mut pinned_future
外,还可以调用 pinned_future.as_mut()
方法来创建一个新的 Pin<&mut impl Future>
类型对象,所以它也
实现了 Future,可以被 poll。
tokio 等异步运行时也会自动对 Future 对象进行 Pin 操作:
- pin!(future) 返回 Pin<&mut dyn Future> 类型,满足 poll() 方法前面要求,但是会消耗 future 自身。
- 所以在 loop 循环中,需要使用 Pin::as_mut() 返回一个新的 Pin<&mut dyn Future> 对象。
use futures::executor::block_on;
async fn hello_world() {
println!("hello, world!");
}
fn main() {
let future = hello_world();
// block_on() 自动对 future 对象进行 Pin 操作。
block_on(future);
}
// block_on() 的一种简化实现方式:
use waker_fn::waker_fn; // Cargo.toml: waker-fn = "1.1"
use futures_lite::pin; // Cargo.toml: futures-lite = "1.11"
use crossbeam::sync::Parker; // Cargo.toml: crossbeam = "0.8"
use std::future::Future;
use std::task::{Context, Poll};
fn block_on<F: Future>(future: F) -> F::Output {
let parker = Parker::new();
let unparker = parker.unparker().clone();
let waker = waker_fn(move || unparker.unpark());
let mut context = Context::from_waker(&waker);
// pin!(T) 宏返回 Pin<&mut T> 类型对象;
// 为 future 创建一个 Pin<&mut dyn Future> 对象,
pin!(future);
loop {
// future 对象是 Pin<&mut dyn Future> 类型,满足 poll() 方法前面要求,但是会消耗 future 自身。
// 所以在 loop 循环中,需要使用 Pin::as_mut() 返回一个新的 Pin<&mut dyn Future> 对象。
//
// future.as_mut() 实际是 Pin<&mut dyn Future>.as_mut(), 而 &mut dyn Future Deref 返回的是 dyn Future 对象,
// 所以 as_mut() 返回的是 Pin<&mut dyn Future> 对象
match future.as_mut().poll(&mut context) {
Poll::Ready(value) => return value,
Poll::Pending => parker.park(),
}
} }
// 另一个示例:
#![feature(noop_waker)]
use std::future::Future;
use std::task;
let waker = task::Waker::noop();
let mut cx = task::Context::from_waker(&waker);
// 返回 Pin<Box<dyn Future>> 类型对象,Rust 确保该对象内部的类型不会被 move。
let mut future = Box::pin(async { 10 });
// Pin 的 as_mut() 方法签名:pub fn as_mut(&mut self) -> Pin<&mut <Ptr as Deref>::Target>
// 所以,future.as_mut() 返回 Pin<&mut dyn Future> 对象,满足 Future 的 poll() 方法的函数签名要求。
assert_eq!(future.as_mut().poll(&mut cx), task::Poll::Ready(10));
-
pin!() 参数必须是 Future 对象,而不是能是表达式:
async fn my_async_fn() { // async logic here } #[tokio::main] async fn main() { // 错误 // let mut future = pin!(my_async_fn()); // OK let mut future = my_async_fn() pin!(future); // 如果 Pin<Ptr> 的 Ptr 实现了 Future,则 Pin<Ptr> 也实现了 Future, 所以可以对 Pin<Ptr> 进行 .await; // future.await; // // 或者: (&mut future).await; }
-
pin!{} 支持同时创建多个 Future 对象并 Pin 住:
use tokio::{pin, select}; async fn my_async_fn() { // async logic here } #[tokio::main] async fn main() { pin! { let future1 = my_async_fn(); let future2 = my_async_fn(); } select! { _ = &mut future1 => {} _ = &mut future2 => {} } }
如果类型实现了 Unpin,则可以从 Pin 中安全地获取可变引用:
fn example<T: Unpin>(x: Pin<&mut T>) {
let mut_ref: &mut T = Pin::into_inner(x);
}
spawn_blocking() 的简单参考实现 #
传入的闭包是同步函数 ,必须实现 Send + 'static
, 内部创建一个线程来执行业务逻辑,结束后通过调用 waker 来通知 executor 完成。
pub fn spawn_blocking<T, F>(closure: F) -> SpawnBlocking<T> where
F: FnOnce() -> T,
F: Send + 'static,
T: Send + 'static,
{
// Arc 确保 inner 和它 clone 后的对象都使用同一个 Mutex 和内部的 Shared 对象。
let inner = Arc::new(Mutex::new(Shared { value: None, waker: None, }));
std::thread::spawn({
let inner = inner.clone();
move || {
let value = closure();
let maybe_waker = {
let mut guard = inner.lock().unwrap();
guard.value = Some(value);
guard.waker.take()
};
if let Some(waker) = maybe_waker {
waker.wake();
}
} });
SpawnBlocking(inner)
}
// 为自定义 SpawnBlocking 实现 Future
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
impl<T: Send> Future for SpawnBlocking<T> {
type Output = T;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<T> {
let mut guard = self.0.lock().unwrap();
if let Some(value) = guard.value.take() {
return Poll::Ready(value);
}
// 更新 SpawnBlocking 内部的 waker
guard.waker = Some(cx.waker().clone());
Poll::Pending
}
}
block_on() 的简单实现 #
使用 Parker 机制来实现 waker,传入的 Future 在 loop poll 前,必须使用 pin!(future)
来转换为同名但类型为 Pin<&mut impl Future>
的对象。
pin!() 创建的 Pin 会获得 future 的所有权(参数类型是 self: Pin<&mut Self>),而且确保该对象的栈内存地址不会再发生变化。
future.poll() 会获得 Pin 对象的所有权,这样不能在 loop 中使用,使用 future.as_mut()
返回一个新的 Pin 对象来解决该问题。
use waker_fn::waker_fn; // Cargo.toml: waker-fn = "1.1"
use futures_lite::pin; // Cargo.toml: futures-lite = "1.11"
use crossbeam::sync::Parker; // Cargo.toml: crossbeam = "0.8"
use std::future::Future;
use std::task::{Context, Poll};
fn block_on<F: Future>(future: F) -> F::Output {
let parker = Parker::new();
let unparker = parker.unparker().clone();
let waker = waker_fn(move || unparker.unpark());
let mut context = Context::from_waker(&waker);
// pin!(T) 宏返回 Pin<&mut T> 类型对象;
// 为 future 创建一个同名的 Pin<&mut dyn Future> 对象,
pin!(future);
loop {
// Pin<&mut dyn Future> 的 as_mut 方法返回一个新的 Pin<&mut dyn Future> 对象
// 满足 poll() 方法要求同时不消耗 future。
match future.as_mut().poll(&mut context) {
Poll::Ready(value) => return value,
Poll::Pending => parker.park(),
}
} }
当 async task panic 时,可以通过 .await 返回的 Error 来判断:
use tokio::task::JoinError;
pub async fn run() {
let handle = tokio::spawn(work());
if let Err(e) = handle.await {
if let Ok(reason) = e.try_into_panic() {
// The task has panicked We resume unwinding the panic, thus propagating it to the current thread
panic::resume_unwind(reason);
}
}
}
pub async fn work() {
// [...]
}
async、await 是状态机语法糖 #
https://eventhelix.com/rust/rust-to-assembly-async-await/
.await 优化 #
尽量避免 stack buffer, 因为跨 .await 的上下文变量会随者 task Future 对象一起被保存, 如果使用较大的 stack buffer 变量, 则 自动生成的 task Future 对象就比较大, buffer size 一般是 page sized 对齐的, 这导致 task 的大小大概是 $page-size + a-few-bytes, 浪费内存。
use tokio::io::{self, AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpListener;
#[tokio::main]
async fn main() -> io::Result<()> {
let listener = TcpListener::bind("127.0.0.1:6142").await?;
loop {
let (mut socket, _) = listener.accept().await?;
tokio::spawn(async move {
// 在堆中分配 buff 内存, 而不使用 stack 上的 array
let mut buf = vec![0; 1024];
loop {
match socket.read(&mut buf).await {
Ok(0) => return,
Ok(n) => {
if socket.write_all(&buf[..n]).await.is_err() {
return;
}
}
Err(_) => {
return;
}
}
}
});
}
}
std::task::Poll 类型 #
对于返回 Poll 类型对象的函数/方法,不能使用 .await 来 poll 它,因为 Poll 类型本身没有实现 Future,但是可以调用它的 poll() 方法来返回 Ready 或 Pending 结果,或使用 std::task::ready!() 宏:
use std::task::{ready, Context, Poll};
use std::future::{self, Future};
use std::pin::Pin;
pub fn do_poll(cx: &mut Context<'_>) -> Poll<()> {
let mut fut = future::ready(42);
let fut = Pin::new(&mut fut);
let num = ready!(fut.poll(cx));
// ... use num
Poll::Ready(())
// 其中的 ready!(fut.poll(cx)) 等效于:
let num = match fut.poll(cx) {
Poll::Ready(t) => t,
Poll::Pending => return Poll::Pending,
};