跳过正文

异步:async

··2121 字
Rust
rust-lang - 这篇文章属于一个选集。
§ 15: 本文

Thread 的问题:

  1. 每个 thread 都有固定的 stack size 内存需求,成千上万各 thread 会消耗大量的内存;
  2. 内核调度 thread 运行的上下文切换开销也是很大的;
  3. thread 在执行 read/write 系统调用时,尤其是网络通信,会有大量的时间被 block 等待,此时该 thread 不能再执行其它事情,效率第。

async 通过创建大量异步 task,然后使用一个 thread pool 来执行它们,在某个 task 被阻塞时 executor 自动调度其它 task 到 thread 上运行,从而:

  1. task 相比 thread 更轻量化,没有了大量 thread stack 的开销,切换执行速度也更快,所以一个程序内部可以创建大量的异步 task;
  2. executor 可以在 task 阻塞时调度其它 task 到 thread 运行,从而执行效率和并发高;

async 的核心是 Future trait, 它的 poll() 方法的 self 类型是 Pin<&mut Self> ,表示一旦开始第一次 poll 后 Self 的地址必须是固定的,不能被转移, 这是为了确保 Future 对象内部保存的栈变量继续有效。

poll() 方法的第二个参数 Context 封装了 Waker ,当 poll Pending 时 Future 的实现可以保存该 waker(一般是在一个辅助线程中), 后当条件满足时调用 waker 来唤醒 async executor 来重新 poll 自己。

trait Future {
    type Output;

    // For now, read `Pin<&mut Self>` as `&mut Self`.
    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
}

enum Poll<T> {
    Ready(T),
    Pending,
}

同步函数或方法可以返回 impl Future,即执行该函数时返回一个 Future 对象,但是并没有开始 poll 执行它。 Future 对象只有被 poll 时才开始执行, 一般使用 .await 来 poll:

  • impl Trait 只能用于 普通函数或方法的返回值 ,不能用于 trait 中关联函数或方法的返回值。
fn read_to_string(&mut self, buf: &mut String) -> impl Future<Output = Result<usize>>;

为了方便实现异步任务,Rust 提供了 async fn 和 async block

  1. async fn 可以使用 .await 表达式,但是普通函数不行;
  2. 执行该函数只会返回一个 Future,需要使用 .await 表达式来 poll 结果;
async fn example(x: &str) -> usize { // async func
    x.len()
}
// 等效于
fn example<'a>(x: &'a str) -> impl Future<Output = usize> + 'a {
    async move { x.len() } // async block
}

use async_std::io::prelude::*;
use async_std::net;
async fn cheapo_request(host: &str, port: u16, path: &str) -> std::io::Result<String>
{
    let mut socket = net::TcpStream::connect((host, port)).await?;
    let request = format!("GET {} HTTP/1.1\r\nHost: {}\r\n\r\n", path, host);
    socket.write_all(request.as_bytes()).await?;
    socket.shutdown(net::Shutdown::Write)?;
    let mut response = String::new();
    socket.read_to_string(&mut response).await?;
    Ok(response)
}
let response = cheapo_request(host, port, path); // 函数返回一个 Future 对象

async fn 可以 .await poll 其它异步函数,所以 async executor 在执行 async fn 的过程中,可能会多次暂停,暂停的位置是各 .await 表达式(这些 .await 位置也是异步运行时的 yield point ),每次暂停都会返回一个 新的 Future 对象 ,它封装了暂停位置依赖的上下文对象:栈变量、函数参数等,后续可能被调度到其他线程上运行。所以, async fn 需要实现 Send+'static , 也就是函数内跨 .await 的对象都需要是 Send + ‘static 的。

注意:对于返回 Poll 类型对象的函数/方法,不能使用 .await 来 poll 它,因为 Poll 类型本身没有实现 Future,但是可以调用它的 poll() 方法来返回 Ready 或 Pending 结果,或使用 std::task::ready!() 宏:

use std::task::{ready, Context, Poll};
use std::future::{self, Future};
use std::pin::Pin;

pub fn do_poll(cx: &mut Context<'_>) -> Poll<()> {
    let mut fut = future::ready(42);
    let fut = Pin::new(&mut fut);

    let num = ready!(fut.poll(cx));
    // ... use num

    Poll::Ready(())
}

// 其中的 ready!(fut.poll(cx)) 等效于:
let num = match fut.poll(cx) {
    Poll::Ready(t) => t,
    Poll::Pending => return Poll::Pending,
};

trait 可以包含 async fn,但一旦 trait 包含 async fn, 就不能再为它创建 trait object

pub trait Trait {
    async fn f(&self);
}

// 编译报错:error[E0038]: the trait `main::Trait` cannot be made into an object
pub fn make() -> Box<dyn Trait> {
    unimplemented!()
}

解决办法:使用基于宏的三方库 async-trait ,在定义和实现该 trait 时添加 #[async_trait] 属性宏即可。

use async_trait::async_trait;

#[async_trait]
trait Advertisement {
    async fn run(&self);
}

struct Modal;

#[async_trait]
impl Advertisement for Modal {
    async fn run(&self) {
        self.render_fullscreen().await;
        for _ in 0..4u16 {
            remind_user_to_join_mailing_list().await;
        }
        self.hide_for_now().await;
    }
}
// 后续可以使用:Vec<Box<dyn Advertisement + Sync>> or &[&dyn Advertisement],

async block 也返回 Future 对象,其中可以使用 .await,async block 类似于 func,提供了一个 async context;

  • 在 async block 中使用 ? 来传播错误或 return 值都是 async block 返回,而不是它所在的函数返回;
  • async block 可以和闭包一样捕获环境中的对象(借用或 move),可以指定 async move 来获得对象的所有权,这时 async block 返回的 Future 具有 ‘static lifetime;
  • async block 中不能使用 break/continue;
use async_std::net;
use async_std::task;

// serve_one 是一个 Future<Output=Result>,只有当 .await 或 poll 它时才会执行 async block 中的代码。
let serve_one = async {
    // ? 是 async block 返回,结果是 Err.
    let listener = net::TcpListener::bind("localhost:8087").await?;
    let (mut socket, _addr) = listener.accept().await?;
};

pub async fn many_requests(requests: Vec<(String, u16, String)>) -> Vec<std::io::Result<String>>
{
    let mut handles = vec![];
    for (host, port, path) in requests {
        handles.push(
            task::spawn_local(
                // 使用 async move 来获取 host、path 的所有权。
                async move {
                    cheapo_request(&host, port, &path).await
                }
            ));
    }
    //...
}

loop {
    async move {
        break; // error[E0267]: `break` inside of an `async` block
    }
}

aysnc block 没有指定返回值类型的机制,在编译时可能出错。

解决办法:为 Ok 指定它所属的 Enum Result 类型:

let input = async_std::io::stdin();

// future 是 Future<Output=Result>
let future = async {
    let mut line = String::new();
    // ? 是 async block 返回,结果是 std::io::Result<usize>.
    input.read_line(&mut line).await?;
    println!("Read line: {}", line);

    // Ok(()) // 错误
    Ok::<(), std::io::Error>(()) // 正确,指定 Ok 所属的 Result 类型
    // 或者
    // std::result::Result::<(), std::io::Error>::Ok(());
};

由于 async block 返回一个 Future,故可以使用它为同步函数返回 impl Future 对象:

use std::io;
use std::future::Future;

fn cheapo_request(host: &str, port: u16, path: &str) -> impl Future<Output = io::Result<String>> + 'static
{
    let host = host.to_string();
    let path = path.to_string();

    async move {
        // 捕获了 host、path 的所有权
        //... use &*host, port, and path ...
    }
}

spawn_blocking() 的简单参考实现:

  • 传入的闭包是 同步函数 ,必须实现 Send + ‘static;
  • 内部创建一个线程来执行业务逻辑,结束候通过调用 waker 来通知 executor 完成;
pub fn spawn_blocking<T, F>(closure: F) -> SpawnBlocking<T> where
        F: FnOnce() -> T,
        F: Send + 'static,
        T: Send + 'static,
{
    let inner = Arc::new(Mutex::new(Shared { value: None, waker: None, }));
    std::thread::spawn({
        let inner = inner.clone();
        move || {
            let value = closure();
            let maybe_waker = {
                let mut guard = inner.lock().unwrap();
                guard.value = Some(value);
                guard.waker.take()
            };
            if let Some(waker) = maybe_waker {
                waker.wake();
            }
        } });
    SpawnBlocking(inner)
}

// 为自定义 SpawnBlocking 实现 Future
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
impl<T: Send> Future for SpawnBlocking<T> { type Output = T;
    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<T> {
        let mut guard = self.0.lock().unwrap(); if let Some(value) = guard.value.take() {
            return Poll::Ready(value); }
        guard.waker = Some(cx.waker().clone());
        Poll::Pending
    }
}

block_on() 的简单实现:

  • 使用 Parker 机制来实现 waker;
  • 传入的 Future 在 loop poll 前,必须使用 pin!(future) 来转换为同名但类型为 Pin<&mut F> 的对象, pin!() 创建的 Pin 会获得 future 的所有权,而且确保该对象的栈内存地址不会再发生变化;
  • future.poll() 会获得 Pin 对象的所有权,这样不能在 loop 中使用,使用 future.as_mut() 返回一个新的 Pin 对象来解决该问题。
use waker_fn::waker_fn; // Cargo.toml: waker-fn = "1.1"
use futures_lite::pin; // Cargo.toml: futures-lite = "1.11"
use crossbeam::sync::Parker; // Cargo.toml: crossbeam = "0.8"
use std::future::Future;
use std::task::{Context, Poll};
fn block_on<F: Future>(future: F) -> F::Output {
    let parker = Parker::new();
    let unparker = parker.unparker().clone();
    let waker = waker_fn(move || unparker.unpark());
    let mut context = Context::from_waker(&waker);
    pin!(future);
    loop {
        match future.as_mut().poll(&mut context) {
            Poll::Ready(value) => return value,
            Poll::Pending => parker.park(),
        }
    } }

当 async task panic 时,可以通过 .await 返回的 Error 来判断:

use tokio::task::JoinError;

pub async fn run() {
    let handle = tokio::spawn(work());
    if let Err(e) = handle.await {
        if let Ok(reason) = e.try_into_panic() {
            // The task has panicked We resume unwinding the panic, thus propagating it to the
            // current thread
            panic::resume_unwind(reason);
        }
    }
}

pub async fn work() {
    // [...]
}
rust-lang - 这篇文章属于一个选集。
§ 15: 本文

相关文章

不安全:unsafe
··824 字
Rust
Rust
借用:refer/borrow
··3120 字
Rust
Rust 引用类型和借用
内置类型:type
··16432 字
Rust
Rust 内置基本类型介绍
函数、方法和闭包:function/method/closure
··6964 字
Rust
Rust 函数、方法和闭包