Thread 的问题:
- 每个 thread 都有固定的 stack size 内存需求,成千上万各 thread 会消耗大量的内存;
- 内核调度 thread 运行的上下文切换开销也是很大的;
- thread 在执行 read/write 系统调用时,尤其是网络通信,会有大量的时间被 block 等待,此时该 thread 不能再执行其它事情,效率第。
async 通过创建大量异步 task,然后使用一个 thread pool 来执行它们,在某个 task 被阻塞时 executor 自动调度其它 task 到 thread 上运行,从而:
- task 相比 thread 更轻量化,没有了大量 thread stack 的开销,切换执行速度也更快,所以一个程序内部可以创建大量的异步 task;
- executor 可以在 task 阻塞时调度其它 task 到 thread 运行,从而执行效率和并发高;
async 的核心是 Future trait, 它的 poll()
方法的 self 类型是 Pin<&mut Self>
,表示一旦开始第一次
poll 后 Self 的地址必须是固定的,不能被转移, 这是为了确保 Future 对象内部保存的栈变量继续有效。
poll() 方法的第二个参数 Context 封装了 Waker
,当 poll Pending 时 Future 的实现可以保存该
waker(一般是在一个辅助线程中), 后当条件满足时调用 waker 来唤醒 async executor 来重新 poll 自己。
trait Future {
type Output;
// For now, read `Pin<&mut Self>` as `&mut Self`.
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
}
enum Poll<T> {
Ready(T),
Pending,
}
同步函数或方法可以返回 impl Future,即执行该函数时返回一个 Future 对象,但是并没有开始 poll 执行它。 Future 对象只有被 poll 时才开始执行, 一般使用 .await 来 poll:
- impl Trait 只能用于
普通函数或方法的返回值
,不能用于 trait 中关联函数或方法的返回值。
fn read_to_string(&mut self, buf: &mut String) -> impl Future<Output = Result<usize>>;
为了方便实现异步任务,Rust 提供了 async fn 和 async block
:
- async fn 可以使用 .await 表达式,但是普通函数不行;
- 执行该函数只会返回一个 Future,需要使用 .await 表达式来 poll 结果;
async fn example(x: &str) -> usize { // async func
x.len()
}
// 等效于
fn example<'a>(x: &'a str) -> impl Future<Output = usize> + 'a {
async move { x.len() } // async block
}
use async_std::io::prelude::*;
use async_std::net;
async fn cheapo_request(host: &str, port: u16, path: &str) -> std::io::Result<String>
{
let mut socket = net::TcpStream::connect((host, port)).await?;
let request = format!("GET {} HTTP/1.1\r\nHost: {}\r\n\r\n", path, host);
socket.write_all(request.as_bytes()).await?;
socket.shutdown(net::Shutdown::Write)?;
let mut response = String::new();
socket.read_to_string(&mut response).await?;
Ok(response)
}
let response = cheapo_request(host, port, path); // 函数返回一个 Future 对象
async fn 可以 .await poll 其它异步函数,所以 async executor 在执行 async fn 的过程中,可能会多次暂停,暂停的位置是各 .await 表达式(这些 .await 位置也是异步运行时的 yield point
),每次暂停都会返回一个 新的 Future 对象
,它封装了暂停位置依赖的上下文对象:栈变量、函数参数等,后续可能被调度到其他线程上运行。所以, async fn 需要实现 Send+'static
, 也就是函数内跨 .await 的对象都需要是 Send +
‘static 的。
注意:对于返回 Poll 类型对象的函数/方法,不能使用 .await 来 poll 它,因为 Poll 类型本身没有实现 Future,但是可以调用它的 poll() 方法来返回 Ready 或 Pending 结果,或使用 std::task::ready!() 宏:
use std::task::{ready, Context, Poll};
use std::future::{self, Future};
use std::pin::Pin;
pub fn do_poll(cx: &mut Context<'_>) -> Poll<()> {
let mut fut = future::ready(42);
let fut = Pin::new(&mut fut);
let num = ready!(fut.poll(cx));
// ... use num
Poll::Ready(())
}
// 其中的 ready!(fut.poll(cx)) 等效于:
let num = match fut.poll(cx) {
Poll::Ready(t) => t,
Poll::Pending => return Poll::Pending,
};
trait 可以包含 async fn,但一旦 trait 包含 async fn, 就不能再为它创建 trait object
。
pub trait Trait {
async fn f(&self);
}
// 编译报错:error[E0038]: the trait `main::Trait` cannot be made into an object
pub fn make() -> Box<dyn Trait> {
unimplemented!()
}
解决办法:使用基于宏的三方库 async-trait
,在定义和实现该 trait 时添加 #[async_trait]
属性宏即可。
use async_trait::async_trait;
#[async_trait]
trait Advertisement {
async fn run(&self);
}
struct Modal;
#[async_trait]
impl Advertisement for Modal {
async fn run(&self) {
self.render_fullscreen().await;
for _ in 0..4u16 {
remind_user_to_join_mailing_list().await;
}
self.hide_for_now().await;
}
}
// 后续可以使用:Vec<Box<dyn Advertisement + Sync>> or &[&dyn Advertisement],
async block 也返回 Future 对象,其中可以使用 .await,async block 类似于 func,提供了一个 async context;
- 在 async block 中使用 ? 来传播错误或 return 值都是 async block 返回,而不是它所在的函数返回;
- async block 可以和闭包一样捕获环境中的对象(借用或 move),可以指定
async move
来获得对象的所有权,这时 async block 返回的 Future 具有 ‘static lifetime; - async block 中不能使用 break/continue;
use async_std::net;
use async_std::task;
// serve_one 是一个 Future<Output=Result>,只有当 .await 或 poll 它时才会执行 async block 中的代码。
let serve_one = async {
// ? 是 async block 返回,结果是 Err.
let listener = net::TcpListener::bind("localhost:8087").await?;
let (mut socket, _addr) = listener.accept().await?;
};
pub async fn many_requests(requests: Vec<(String, u16, String)>) -> Vec<std::io::Result<String>>
{
let mut handles = vec![];
for (host, port, path) in requests {
handles.push(
task::spawn_local(
// 使用 async move 来获取 host、path 的所有权。
async move {
cheapo_request(&host, port, &path).await
}
));
}
//...
}
loop {
async move {
break; // error[E0267]: `break` inside of an `async` block
}
}
aysnc block 没有指定返回值类型的机制,在编译时可能出错。
解决办法:为 Ok 指定它所属的 Enum Result 类型:
let input = async_std::io::stdin();
// future 是 Future<Output=Result>
let future = async {
let mut line = String::new();
// ? 是 async block 返回,结果是 std::io::Result<usize>.
input.read_line(&mut line).await?;
println!("Read line: {}", line);
// Ok(()) // 错误
Ok::<(), std::io::Error>(()) // 正确,指定 Ok 所属的 Result 类型
// 或者
// std::result::Result::<(), std::io::Error>::Ok(());
};
由于 async block 返回一个 Future,故可以使用它为同步函数返回 impl Future 对象:
use std::io;
use std::future::Future;
fn cheapo_request(host: &str, port: u16, path: &str) -> impl Future<Output = io::Result<String>> + 'static
{
let host = host.to_string();
let path = path.to_string();
async move {
// 捕获了 host、path 的所有权
//... use &*host, port, and path ...
}
}
spawn_blocking() 的简单参考实现:
- 传入的闭包是
同步函数
,必须实现 Send + ‘static; - 内部创建一个线程来执行业务逻辑,结束候通过调用 waker 来通知 executor 完成;
pub fn spawn_blocking<T, F>(closure: F) -> SpawnBlocking<T> where
F: FnOnce() -> T,
F: Send + 'static,
T: Send + 'static,
{
let inner = Arc::new(Mutex::new(Shared { value: None, waker: None, }));
std::thread::spawn({
let inner = inner.clone();
move || {
let value = closure();
let maybe_waker = {
let mut guard = inner.lock().unwrap();
guard.value = Some(value);
guard.waker.take()
};
if let Some(waker) = maybe_waker {
waker.wake();
}
} });
SpawnBlocking(inner)
}
// 为自定义 SpawnBlocking 实现 Future
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
impl<T: Send> Future for SpawnBlocking<T> { type Output = T;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<T> {
let mut guard = self.0.lock().unwrap(); if let Some(value) = guard.value.take() {
return Poll::Ready(value); }
guard.waker = Some(cx.waker().clone());
Poll::Pending
}
}
block_on() 的简单实现:
- 使用 Parker 机制来实现 waker;
- 传入的 Future 在 loop poll 前,必须使用 pin!(future) 来转换为同名但类型为 Pin<&mut F> 的对象, pin!() 创建的 Pin 会获得 future 的所有权,而且确保该对象的栈内存地址不会再发生变化;
- future.poll() 会获得 Pin 对象的所有权,这样不能在 loop 中使用,使用 future.as_mut() 返回一个新的 Pin 对象来解决该问题。
use waker_fn::waker_fn; // Cargo.toml: waker-fn = "1.1"
use futures_lite::pin; // Cargo.toml: futures-lite = "1.11"
use crossbeam::sync::Parker; // Cargo.toml: crossbeam = "0.8"
use std::future::Future;
use std::task::{Context, Poll};
fn block_on<F: Future>(future: F) -> F::Output {
let parker = Parker::new();
let unparker = parker.unparker().clone();
let waker = waker_fn(move || unparker.unpark());
let mut context = Context::from_waker(&waker);
pin!(future);
loop {
match future.as_mut().poll(&mut context) {
Poll::Ready(value) => return value,
Poll::Pending => parker.park(),
}
} }
当 async task panic 时,可以通过 .await 返回的 Error 来判断:
use tokio::task::JoinError;
pub async fn run() {
let handle = tokio::spawn(work());
if let Err(e) = handle.await {
if let Ok(reason) = e.try_into_panic() {
// The task has panicked We resume unwinding the panic, thus propagating it to the
// current thread
panic::resume_unwind(reason);
}
}
}
pub async fn work() {
// [...]
}