跳过正文

tower

··5422 字
Rust Rust-Crate
目录
rust crate - 这篇文章属于一个选集。
§ 9: 本文

tower 提供了构建 middleware 的 4 个核心抽象:

  1. Service trait :从一个 Request 产生一个 Response 的异步函数;

    • Request/Response 是协议无关的(没有 trait bound);
    • tower_http crate 提供了 HTTP Request/Response 相关的 Service 实现;
    • service_fn: 从闭包或函数创建一个 Service;
  2. ServiceExt trait :是 Service trait 的扩展,提供 ready()/and_then()/map_response()/filter() 等方便使用的方法;

  3. Layer trait :封装一个 Service,添加自定义逻辑,返回另一个 Service;

    • layer_fn: 从闭包或函数创建一个 Layer;
  4. ServiceBuilder :将一个 Service 附加多个 Layer trait,返回一个新的 Service 或 Layer;

    • ServiceBuilder 对象实现了 Layer trait,而且使用更方便;

为了保证 API 版本兼容性,Service trait 单独在 tower-service crate 中定义,Layer trait 单独在 tower-layer crate 中定义,它们都在 tower crate 中被 pub use 导入和导出。

1 Service
#

Service trait 提供了异步方法 call(), 它将 Request 转为 Result<Respone, Error> 的 Future 对象,类似于: async fn(Request) -> Result<Response, Error>

// 这里的 Request 和 Response 是泛型参数,而非 http::Request/Response struct 类型。
pub trait Service<Request> {
    type Response;
    type Error;
    type Future: Future
        // Future 的 Output 是 Result 类型
        where <Self::Future as Future>::Output == Result<Self::Response, Self::Error>;

    // 用于 Backpressure:调用方应先调用该函数,返回 Ready 时再调用 call
    fn poll_ready(
        &mut self,
        cx: &mut Context<'_>
    ) -> Poll<Result<(), Self::Error>>;

    fn call(&mut self, req: Request) -> Self::Future;
}

tower::ServiceExt 是 Serice trait 的扩展,提供了 ready()/and_then()/map_response()/filter() 等方便使用的方法。

典型使用模式: let response = my_service.ready().await?.call(req).await;

pub trait ServiceExt<Request>: Service<Request> {
    fn ready(&mut self) -> Ready<'_, Self, Request> where Self: Sized
    fn ready_and(&mut self) -> ReadyAnd<'_, Self, Request> where Self: Sized
    fn ready_oneshot(self) -> ReadyOneshot<Self, Request> where Self: Sized
    fn oneshot(self, req: Request) -> Oneshot<Self, Request> where Self: Sized

    fn call_all<S>(self, reqs: S) -> CallAll<Self, S> where Self: Sized, Self::Error: Into<BoxError>, S: Stream<Item = Request>

    fn and_then<F>(self, f: F) -> AndThen<Self, F> where Self: Sized, F: Clone

    fn map_response<F, Response>(self, f: F) -> MapResponse<Self, F>
       where Self: Sized, F: FnOnce(Self::Response) -> Response + Clone
    fn map_err<F, Error>(self, f: F) -> MapErr<Self, F> where Self: Sized, F: FnOnce(Self::Error) -> Error + Clone
    fn map_result<F, Response, Error>(self, f: F) -> MapResult<Self, F>
       where Self: Sized, Error: From<Self::Error>,
             F: FnOnce(Result<Self::Response, Self::Error>) -> Result<Response, Error> + Clone
    fn map_request<F, NewRequest>(self, f: F) -> MapRequest<Self, F> where Self: Sized, F: FnMut(NewRequest) -> Request

    fn filter<F, NewRequest>(self, filter: F) -> Filter<Self, F> where Self: Sized, F: Predicate<NewRequest>
    fn filter_async<F, NewRequest>(self, filter: F) -> AsyncFilter<Self, F> where Self: Sized, F: AsyncPredicate<NewRequest>

    fn then<F, Response, Error, Fut>(self, f: F) -> Then<Self, F>
       where Self: Sized, Error: From<Self::Error>,
          F: FnOnce(Result<Self::Response, Self::Error>) -> Fut + Clone, Fut:    Future<Output = Result<Response, Error>>

    fn map_future<F, Fut, Response, Error>(self, f: F) -> MapFuture<Self, F>
       where Self: Sized, F: FnMut(Self::Future) -> Fut, Error: From<Self::Error>, Fut: Future<Output = Result<Response, Error>>

    fn boxed(self) -> BoxService<Request, Self::Response, Self::Error>
       where Self: Sized + Send + 'static, Self::Future: Send + 'static
    fn boxed_clone( self ) -> BoxCloneService<Request, Self::Response, Self::Error>
       where Self: Clone + Sized + Send + 'static, Self::Future: Send + 'static
}

// 示例
// A service returning Result<Record, _>
let service = DatabaseService::new("127.0.0.1:8080");

// Map the response into a new response
let mut new_service = service.map_response(|record| record.name);

// Call the new service
let id = 13;
let name = new_service
    .ready()
    .await? // 当 Service 准备好 Ready 时才调用它的 call() 方法
    .call(id)
    .await?;

// filter(): Composes this service with a Filter that conditionally accepts or rejects requests
// based on a predicate.

// Fallibly map the request to a new request
let mut new_service = service
    .filter(|id_str: &str| id_str.parse().map_err(DbError::Parse));

// Call the new service
let id = "13";
let response = new_service
    .ready()
    .await?
    .call(id)
    .await;

service_fn<T>(f: T) 从一个函数或闭包 T 创建一个实现了 Service trait 的 ServiceFn 对象:

  • T 是 FnMut(Request) -> Future<Output = Result<R,E>> 闭包,也即返回 Result 类型, 但 Request 和 Response 都是泛型参数(类型没有限制)。
pub fn service_fn<T>(f: T) -> ServiceFn<T> // T 是一个返回 Future 的异步函数。

// ServiceFn<T> 实现了 Service trait
impl<T, F, Request, R, E> Service<Request> for ServiceFn<T>
where
    T: FnMut(Request) -> F,
    F: Future<Output = Result<R, E>>,
{
    type Response = R;
    type Error = E;
    type Future = F;

    fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), E>> {
        Ok(()).into()
    }

    fn call(&mut self, req: Request) -> Self::Future {
        (self.f)(req)
    }
}

// 示例:
use tower::{service_fn, Service, ServiceExt, BoxError};

// service fn 闭包函数的输入是 Request, 返回必须是 Result 类型,通过 Error 来报错。
async fn handle(request: Request) -> Result<Response, BoxError> {
    let response = Response::new("Hello, World!");
    Ok(response)
}

// 使用 service_fn 从函数或闭包创建一个 Service
let mut service = service_fn(handle);
let response = service
    .ready() // 调用 poll_ready()
    .await?
    .call(Request::new())
    .await?;
assert_eq!("Hello, World!", response.into_body());

tower_service::Service traitaxum crate 中得到了广泛使用:

  1. axum::Router:有很多 Service 相关的方法;
  2. tower_http::services module 提供了很多 HTTP 相关 Service 实现;

axum Router 的 route_service() 方法直接传入一个 Service 对象,该 Service 关联的 Error 默认是 Infallible,即不能失败类型。创建该 Service 对象的两种方式:

  1. 使用 service_fn() 闭包;
  2. 使用 tower-http 提供的各种 Service Middleware;
//
pub fn route_service<T>(self, path: &str, service: T) -> Self
where
    T: Service<Request, Error = Infallible> + Clone + Send + 'static,
    T::Response: IntoResponse,
    T::Future: Send + 'static,

// 示例
use axum::{
    Router,
    body::Body,
    routing::{any_service, get_service},
    extract::Request,
    http::StatusCode,
    error_handling::HandleErrorLayer,
};
use tower_http::services::ServeFile;
use http::Response;
use std::{convert::Infallible, io};
use tower::service_fn;

let app = Router::new()
    .route(
        "/",
        any_service(service_fn(|_: Request| async {
            let res = Response::new(Body::from("Hi from `GET /`"));
            Ok::<_, Infallible>(res) // service_fn 的闭包必须返回 Result,且 Error 是 Infallible 类型
        }))
    )
    .route_service(
        "/foo",
        service_fn(|req: Request| async move {  // async move {} 是一个 async block expression
            let body = Body::from(format!("Hi from `{} /foo`", req.method()));
            let res = Response::new(body);
            Ok::<_, Infallible>(res)
        })
    )
    .route_service(
        "/static/Cargo.toml",
        ServeFile::new("Cargo.toml"), // tower-http::ServeFile::new() 返回一个 Service
    );

tower::MakeService trait 是 Service 工厂,用于创建新的 Service 对象,例如 TCP/HTTP Server 在接收一个新连接时需要使用 tower::MakeService 的 into_service()/as_service() 来创建一个新 Serivce 对象来处理请求和产生响应。

  • into_service() 和 as_service() 类似,前者消耗 MakeService 对象,而后者是 &mut 可变借用;
pub trait MakeService<Target, Request>: Sealed<(Target, Request)> {
    type Response;
    type Error;
    type Service: Service<Request, Response = Self::Response, Error = Self::Error>;
    type MakeError;
    type Future: Future<Output = Result<Self::Service, Self::MakeError>>; // 异步返回的是另一个 Service

    // Required methods
    fn poll_ready( &mut self, cx: &mut Context<'_> ) -> Poll<Result<(), Self::MakeError>>;
    fn make_service(&mut self, target: Target) -> Self::Future;

    // Provided methods
    fn into_service(self) -> IntoService<Self, Request> where Self: Sized, { ... }
    fn as_service(&mut self) -> AsService<'_, Self, Request> where Self: Sized, { ... }
}

// tower 为 Service<Target, Response = Service<Request>> 类型 实现了 MakeService trait,
// 该 Service 的 Response 类型为 Serivce<Request>, 即从自身 Service 创建另一个 Service
impl<M, S, Target, Request> MakeService<Target, Request> for M
where
    M: Service<Target, Response = S>,
    S: Service<Request>
   type Response = <S as Service<Request>>::Response
   type Error = <S as Service<Request>>::Error
   type Service = S
   type MakeError = <M as Service<Target>>::Error
   type Future = <M as Service<Target>>::Future

fn into_service(self) -> IntoService<Self, Request> 方法为例,它返回的对象 IntoService 实现了 Service of Service:

// tower::IntoService<M, Request> struct 实现了 Service<Target> trait
impl<M, S, Target, Request> Service<Target> for IntoService<M, Request>
where
    M: Service<Target, Response = S>,
    S: Service<Request>
    type Response = <M as Service<Target>>::Response // Responses given by the service.
    type Error = <M as Service<Target>>::Error // Errors produced by the service.
    type Future = <M as Service<Target>>::Future // The future response value.

   // Returns Poll::Ready(Ok(())) when the service is able to process requests. Read more
   fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>

   // Process the request and return the response asynchronously. Read more
   fn call(&mut self, target: Target) -> Self::Future


// 示例
use std::convert::Infallible;
use tower::Service;
use tower::make::MakeService;
use tower::service_fn;

// A `MakeService`,注意是嵌套 Service。
let make_service = service_fn(|make_req: ()| async {
    // 返回一个 Service,Request 是 () 类型
    // 该 Service 的返回结果为另一个内部 service_fn() 创建的另一个 Service,Request 是 String 类型
    Ok::<_, Infallible>(
        service_fn(|req: String| async {
            Ok::<_, Infallible>(req)
        }
        )
    )
});

// Convert the `MakeService` into a `Service`。返回的是实现 Service 的 IntoService<M,
// Request> 类型其中 Request 是 () 类型
let mut svc = make_service.into_service();

// Make a new service。IntoService 的 call() 方法传入 Request 类型值 (), 返回新的
// Service 类型
let mut new_svc = svc.call(()).await.unwrap();

// Call the service。该 Service 类型的 Request 是 String 类型
let res = new_svc.call("foo".to_string()).await.unwrap();

示例:axum::serve() 的参数 make_service, 传入的 Service 的 Request 是 IncomingStream 类型,而响应是另一个 Service<Request, Response = Response, Error = Infallible> 类型,其中 Request 和 Response 均为 http::request::Request 和 http::response::Response , 其中 Body 为 Struct axum::body::Body ,它实现了 http_body::Body<bytes::Bytes> trait

  • Struct axum::serve::IncomingStream

注意:M 没有直接使用 tower::MakeService trait 来做限界,但是效果和语义是一致的:

// axum::serve() 函数定义:
pub fn serve<M, S>(tcp_listener: TcpListener, make_service: M) -> Serve<M, S>
where
    // 外层 Service,输入是 IncomingStream 类型,响应是另一个 Service
    M: for<'a> Service<IncomingStream<'a>, Error = Infallible, Response = S>,

    // 内层 Service
    // 输入 Request 是 Type Alias axum::extract::Request<T=Body>=http::request::Request<T>
    // 响应 Response 是 Type Alias axum::response::Response<T=Body> = http::response::Response<T>;
    //
    // Request<T=Body> 和 Response<T=Body> 中的 Body 为 Struct axum::body::Body,它实现了
    // http_body::Body<bytes::Bytes> trait.
    S: Service<Request, Response = Response, Error = Infallible> + Clone + Send + 'static,
    S::Future: Send,

axum crate 的 Router/MethodRouter/Handler 都实现了 tower::MakeService trait:

// Router 实现了 Service<IncomingStream<'_>>,对应的 Response 还是 Router 类型
impl Service<IncomingStream<'_>> for Router<()>
  type Response = Router
  type Error = Infallible
  type Future = Ready<Result>::Response, <Router as Service<IncomingStream<'_>>>::Error>>

// Router 实现了 Service<Request<B>>, 对应的 Reqeust<B> 中 B 实现 http_body::Body<Data= bytes::Bytes>,
// 响应为 Struct http::response::Response< axum::body::Body>
impl<B> Service<Request<B>> for Router<()>
where
    B: HttpBody<Data = Bytes> + Send + 'static,
    B::Error: Into<BoxError>
  type Response = Response<Body>
  type Error = Infallible
  type Future = RouteFuture<Infallible>


// 示例
use axum::{
    routing::get,
    Router,
};
// app 对应的 Router 类型实现了 MakeService trait,故可以作为 axum::serve() 的参数
let app = Router::new().route("/", get(|| async { "Hi!" }));
let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await.unwrap();
axum::serve(listener, app).await.unwrap();

综上:Router 实现了 Service<IncomingStream<’_>> 它的 Response 为 Router 类型,该 Router 又实现了 Service<Request<B>>,所以 Router 满足 axum::serve() 的 make_service 参数的 M 限界要求。

Router 虽然实现了 Service<Request<B>> 即 Service<http::request::Request<B: http_body::Body<Data=bytes.Bytes>> , 但是直接调用 tower::ServiceExt 的方法会报错,解决办法是使用 Router 的 pub fn as_service<B>(&mut self) -> RouterAsService<'_, B, S> 方法:

use axum::{
    Router,
    routing::get,
    http::Request,
    body::Body,
};
use tower::{Service, ServiceExt};

let mut router = Router::new().route("/", get(|| async {}));
let request = Request::new(Body::empty());

// let response = router.ready().await?.call(request).await?;
//                       ^^^^^ cannot infer type for type parameter `B`

// OK
let response = router.as_service().ready().await?.call(request).await?;

另外,Router 还提供了 into_make_service()/into_make_service_with_connect_info() 等方法来创建一个实现 MakeService trait 的类型:

// Convert this router into a MakeService, that is a Service whose response is another service.
pub fn into_make_service(self) -> IntoMakeService<Self> // 注意泛型参数 Self 为 Router 类型
pub fn into_make_service_with_connect_info<C>(self) -> IntoMakeServiceWithConnectInfo<Self, C>

// S 为 Router 类型时,Response 也为 Router 类型,而 Router 实现了 MakeService
// trait. 所以 IntoMakeService 实现了 Service 和 MakeService trait。
impl<S, T> Service<T> for IntoMakeService<S> where S: Clone
  type Response = S
  type Error = Infallible
  type Future = IntoMakeServiceFuture<S>


use axum::{
    routing::get,
    Router,
};
let app = Router::new().route("/", get(|| async { "Hi!" }));
let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await.unwrap();
axum::serve(listener, app.into_make_service()).await.unwrap();

use axum::{
    extract::ConnectInfo,
    routing::get,
    Router,
};
use std::net::SocketAddr;

let app = Router::new().route("/", get(handler));
// 从 request extension 中提取 ConnectInfo(默认包含 client remote address)
async fn handler(ConnectInfo(addr): ConnectInfo<SocketAddr>) -> String {
    format!("Hello {addr}")
}

let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await.unwrap();
// 在 request extention 中设置 ConnectInfo
axum::serve(listener, app.into_make_service_with_connect_info::<SocketAddr>()).await.unwrap();

2 Layer
#

Layer trait 为传入的 Service 添加处理逻辑,返回一个 新 Service

Layer 的主要使用场景实现可复用的 HTTP 中间件。

pub trait Layer<S> {
    type Service; // 返回的 Service 类型
    // 传入的 S 是任意类型,返回一个新的 Service 类型
    fn layer(&self, inner: S) -> Self::Service;
}

tower::layer::layer_fn() 支持使用闭包来创建一个实现 Layer trait 的 LayerFn<T> 类型对象,闭包函数的输入/输出是 Service 类型值:

pub fn layer_fn<T>(f: T) -> LayerFn<T>

// LayerFn<T> 实现了 Layer<S>
impl<F, S, Out> Layer<S> for LayerFn<F> where F: Fn(S) -> Out
  type Service = Out


// A middleware that logs requests before forwarding them to another service
pub struct LogService<S> {
    target: &'static str,
    service: S,
}

impl<S, Request> Service<Request> for LogService<S>
where
    S: Service<Request>,
    Request: fmt::Debug,
{
    type Response = S::Response;
    type Error = S::Error;
    type Future = S::Future;

    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
        self.service.poll_ready(cx)
    }

    fn call(&mut self, request: Request) -> Self::Future {
        // Log the request
        println!("request = {:?}, target = {:?}", request, self.target);

        self.service.call(request)
    }
}

// A `Layer` that wraps services in `LogService`
let log_layer = layer_fn(|service| { // layer_fn() 的闭包的输入和输出都是 Service
    LogService {
        service,
        target: "tower-docs",
    }
});

// 使用 service_fn 和闭包,创建另一个 Service,然后使用 Layer::layer() 封装返回一个新
let uppercase_service = tower::service_fn(|request: String| async move {
    Ok::<_, Infallible>(request.to_uppercase())
});

// Wrap our service in a `LogService` so requests are logged.
let wrapped_service = log_layer.layer(uppercase_service);

一般在实现 Layer trait 的同时,也需要实现 Service 作为 Layer::layer() 的返回类型,所以需要自定义两个类型:

  • 一个是 Layer 类型,另一个是 Layer 中关联的 Service 类型;
// ConcurrencyLimitLayer Layer 返回一个关联的 ConcurrencyLimit Service
impl<S> Layer<S> for ConcurrencyLimitLayer
  type Service = ConcurrencyLimit<S>

Layer 示例:

pub struct LogLayer {
    target: &'static str,
}

impl<S> Layer<S> for LogLayer {
    type Service = LogService<S>;

    fn layer(&self, service: S) -> Self::Service {
        LogService {
            target: self.target,
            service
        }
    }
}

// This service implements the Log behavior
pub struct LogService<S> {
    target: &'static str,
    service: S,
}

impl<S, Request> Service<Request> for LogService<S>
   where S: Service<Request>, Request: fmt::Debug,
{
    type Response = S::Response;
    type Error = S::Error;
    type Future = S::Future;

    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
        self.service.poll_ready(cx)
    }

    fn call(&mut self, request: Request) -> Self::Future {
        // Insert log statement here or other functionality
        println!("request = {:?}, target = {:?}", request, self.target);
        self.service.call(request)
    }
}

tower crate 提供了提供了多种 Layer 和对应 Service trait 的实现,它们位于各自的 module 中,如:

  • balance :在多个 Service 中负载均衡;
  • buffer : 为 Service 提供一个缓冲的 mpsc channel;
  • builder :提供 Builder 类型来组合 Layer 和 Service
  • discover :服务发现
  • filter :基于 predicate 来分发请求给内部 Service
  • hedge :当 latency 比例超过阈值时,自动重发请求;
  • layer :A collection of Layer based tower services
  • limit :限速
  • load :服务 Load 测量
  • load_shed :Service 不 ready 时服务降级;
  • make :Trait aliases for Services that produce specific types of Responses.
  • ready_cache :Service Cache
  • reconnect :服务失败时自动重连
  • retry :重试失败请求
  • spawn_ready :当 Service 不 Ready 时,在后台启动一个 poll 任务
  • steer :用于在 Service 间路由请求;
  • timeout :为请求添加 timeout
  • util :各种在 Tower 中使用的工具类型和函数
use tower::balance::p2c::Balance;
use tower::load::Load;
use tower::{Service, ServiceExt};
use futures_util::pin_mut;

async fn spread<Req, S: Service<Req> + Load>(svc1: S, svc2: S, reqs: impl Stream<Item = Req>)
where
    S::Error: Into<tower::BoxError>,
    S::Metric: std::fmt::Debug,
{
    // Spread load evenly across the two services
    let p2c = Balance::new(tower::discover::ServiceList::new(vec![svc1, svc2]));

    // Issue all the requests that come in.
    // Some will go to svc1, some will go to svc2.
    pin_mut!(reqs);
    let mut responses = p2c.call_all(reqs);
    while let Some(rsp) = responses.next().await {
        // ...
    }
}

use tower::buffer::Buffer;
use tower::{Service, ServiceExt};

async fn mass_produce<S: Service<usize>>(svc: S)
where
  S: 'static + Send,
  S::Error: Send + Sync + std::error::Error,
  S::Future: Send
{
    let svc = Buffer::new(svc, 10 /* buffer length */);
    for _ in 0..10 {
        let mut svc = svc.clone();
        tokio::spawn(async move {
            for i in 0usize.. {
                svc.ready().await.expect("service crashed").call(i).await;
            }
        });
    }
}

3 ServiceBuilder
#

使用 tower::ServiceBuilder struct 可以创建一个包含多个 Layer 的 Service 类型,后续按照在 ServiceBuilder 中添加的 Layer 顺序来对请求进行先后处理。

  • layer()/option_layer(): 添加一个通用的 layer;
  • layer_fn(): 使用传入的闭包创建和添加一个 layer;
  • service()/service_fn(): 最后添加一个 Service,返回一个新 Service;

ServiceBuidler 也提供了 buff()/retry()/timeout() 等方法, 用于 快速集成 tower::buffer::BufferLayer 等 Layer 中间件:

注:layer()/option_layer()/layer_fn() 和 Layer 中间件方法返回的 都是 Layer 类型 ,而当调用 service()/service_fn() 后则 返回 Service

impl ServiceBuilder<Identity>
    pub fn new() -> Self

impl<L> ServiceBuilder<L>

// 添加通用 layer
pub fn layer<T>(self, layer: T) -> ServiceBuilder<Stack<T, L>>
pub fn option_layer<T>( self, layer: Option<T> ) -> ServiceBuilder<Stack<Either<T, Identity>, L>>
pub fn layer_fn<F>(self, f: F) -> ServiceBuilder<Stack<LayerFn<F>, L>>

// 添加 Service,返回新的 Service
pub fn service<S>(&self, service: S) -> L::Service where L: Layer<S>
pub fn service_fn<F>(self, f: F) -> L::Service where L: Layer<ServiceFn<F>>

// 集成 tower 提供的中间件 Layer, 如 tower::buffer::BufferLayer
pub fn buffer<Request>( self, bound: usize ) -> ServiceBuilder<Stack<BufferLayer<Request>, L>>
pub fn concurrency_limit( self, max: usize ) -> ServiceBuilder<Stack<ConcurrencyLimitLayer, L>>
pub fn load_shed(self) -> ServiceBuilder<Stack<LoadShedLayer, L>>
pub fn rate_limit( self, num: u64, per: Duration ) -> ServiceBuilder<Stack<RateLimitLayer, L>>
pub fn retry<P>(self, policy: P) -> ServiceBuilder<Stack<RetryLayer<P>, L>>
pub fn timeout( self, timeout: Duration) -> ServiceBuilder<Stack<TimeoutLayer, L>>
pub fn filter<P>(self, predicate: P) -> ServiceBuilder<Stack<FilterLayer<P>, L>>
pub fn filter_async<P>( self, predicate: P ) -> ServiceBuilder<Stack<AsyncFilterLayer<P>, L>>
pub fn map_request<F, R1, R2>( self, f: F ) -> ServiceBuilder<Stack<MapRequestLayer<F>, L>> where F: FnMut(R1) -> R2 + Clone
pub fn map_response<F>( self, f: F) -> ServiceBuilder<Stack<MapResponseLayer<F>, L>>
pub fn map_err<F>(self, f: F) -> ServiceBuilder<Stack<MapErrLayer<F>, L>>
pub fn map_future<F>(self, f: F) -> ServiceBuilder<Stack<MapFutureLayer<F>, L>>
pub fn then<F>(self, f: F) -> ServiceBuilder<Stack<ThenLayer<F>, L>>
pub fn and_then<F>(self, f: F) -> ServiceBuilder<Stack<AndThenLayer<F>, L>>
pub fn map_result<F>(self, f: F) -> ServiceBuilder<Stack<MapResultLayer<F>, L>>

//...

示例:

use tower::ServiceBuilder;
use tower::ServiceExt;

// Suppose we have some `Service` whose request type is `String`:
let string_svc = tower::service_fn(|request: String| async move {
    println!("request: {}", request);
    Ok(())
});

// ...but we want to call that service with a `usize`. What do we do?

let usize_svc = ServiceBuilder::new()
     // Add a middlware that converts the request type to a `String`:
    .map_request(|request: usize| format!("{}", request)) // 返回一个 Layer
    // ...and wrap the string service with that middleware:
    .service(string_svc); // 返回一个 Service

// Now, we can call that service with a `usize`:
usize_svc.oneshot(42).await?;

ServiceBuilder 实现了 Layer , 可以将多个 Layer 组合起来形成一个新 Layer

// 如果要给 Router 添加多个 layer middleware,推荐使用 tower::ServiceBuilder
use axum::{
    routing::get,
    Extension,
    Router,
};
use tower_http::{trace::TraceLayer};
use tower::ServiceBuilder;

async fn handler() {}

#[derive(Clone)]
struct State {}

let app = Router::new()
    .route("/", get(handler))
    .layer(
        ServiceBuilder::new()
            .layer(TraceLayer::new_for_http())
            .layer(Extension(State {}))
    );

ServiceBuilder 没有实现 Service , 需要通过 service()/service_fn() 来创建 Service :

use std::time::Duration;
use tower::{ServiceBuilder, ServiceExt, BoxError, service_fn};

async fn handle(request: &'static str) -> Result<&'static str, BoxError> {
    Ok(request)
}

// 返回一个 Service,内部先后经过 layer 处理:buffer,timeout
let svc = ServiceBuilder::new()
    .buffer(1024) // 添加 buffer layer
    .timeout(Duration::from_secs(10)) // 添加 timeout layer
    .service_fn(handle); // 通过闭包函数创建一个 Service,然后返回一个新的 Service

let response = svc.oneshot("foo").await?;

assert_eq!(response, "foo");

4 tower-http
#

tower::buffer 等 Layer 中间件是 Request/Response 类型无关的 。但 tower-http crate 提供了 HTTP 相关的中间件和工具,它们都使用 http 和 http-body crate 中定义的 HTTP Request/Response 类型:

  • add_extension : Middleware that clones a value into each request’s extensions.
  • auth : Authorization related middleware.
  • body : Body types.
  • catch_panic : Convert panics into responses.
  • classify : Tools for classifying responses as either success or failure.
  • compression :提供了 compression-br or compression-deflate or compression-gzip or compression-zstd :Middleware that compresses response bodies.
  • cors :Middleware which adds headers for CORS.
  • decompression : decompression-br or decompression-deflate or decompression-gzip or decompression-zstd :Middleware that decompresses request and response bodies.
  • follow_redirect : Middleware for following redirections.
  • limit : Middleware for limiting request bodies.
  • map_request_body : Apply a transformation to the request body.
  • map_response_body : Apply a transformation to the response body.
  • metrics : Middlewares for adding metrics to services.
  • normalize_path : Middleware that normalizes paths.
  • propagate_header : Propagate a header from the request to the response.
  • request_id : Set and propagate request ids.
  • sensitive_headers : Middlewares that mark headers as sensitive.
  • services : Services that return responses without wrapping other Services.
  • set_header : Middleware for setting headers on requests and responses.
  • set_status : Middleware to override status codes.
  • timeout : Middleware that applies a timeout to requests.
  • trace : Middleware that adds high level tracing to a Service.
  • validate_request : Middleware that validates requests.

这些中间件都有两种类型组成,它们分别实现了 Layer 和 Service trait:

例如下面的 S 是 Service<Request<ReqBody>, Response = Response<ResBody>> 类型:

pub struct AddAuthorizationLayer { /* private fields */ }


impl<S> Layer<S> for AddAuthorizationLayer
  type Service = AddAuthorization<S> // 关联的是实现 Service 的 AddAuthorization 类型

pub struct AddAuthorization<S> { /* private fields */ }

impl<S, ReqBody, ResBody> Service<Request<ReqBody>> for AddAuthorization<S>
  where S: Service<Request<ReqBody>, Response = Response<ResBody>>
  type Response = <S as Service<Request<ReqBody>>>::Response // Responses given by the service.
  type Error = <S as Service<Request<ReqBody>>>::Error // Errors produced by the service.
  type Future = <S as Service<Request<ReqBody>>>::Future // The future response value.

tower_http 同时提供了 ServiceBuildExt trait ,它为 tower::ServiceBuilder 扩展了一些将上面中间件快速集成的方法:

pub trait ServiceBuilderExt<L>: Sealed<L> + Sized {
    // Required methods
    fn propagate_header( self, header: HeaderName ) -> ServiceBuilder<Stack<PropagateHeaderLayer, L>>;
    fn add_extension<T>( self, value: T ) -> ServiceBuilder<Stack<AddExtensionLayer<T>, L>>;
    fn map_request_body<F>( self, f: F ) -> ServiceBuilder<Stack<MapRequestBodyLayer<F>, L>>;
    fn map_response_body<F>( self, f: F ) -> ServiceBuilder<Stack<MapResponseBodyLayer<F>, L>>;
    fn compression(self) -> ServiceBuilder<Stack<CompressionLayer, L>>;
    fn decompression(self) -> ServiceBuilder<Stack<DecompressionLayer, L>>;
    fn trace_for_http( self ) -> ServiceBuilder<Stack<TraceLayer<HttpMakeClassifier>, L>>;
    fn trace_for_grpc( self ) -> ServiceBuilder<Stack<TraceLayer<GrpcMakeClassifier>, L>>;
    fn follow_redirects( self ) -> ServiceBuilder<Stack<FollowRedirectLayer<Standard>, L>>;
    // ...
}

以 trace 为例,添加了该 middleware 后,tower/axum 等才会自动打印请求相关的 trace log:

use http::{Request, Response};
use tower::{ServiceBuilder, ServiceExt, Service};
use tower_http::trace::TraceLayer;
use std::convert::Infallible;
use http_body_util::Full;
use bytes::Bytes;

// Request 是 http::Request, Response 是 http::Resonse, Full<Bytes> 实现了 http_body::Body trait
async fn handle(request: Request<Full<Bytes>>) -> Result<Response<Full<Bytes>>, Infallible> {
    Ok(Response::new(Full::default()))
}

tracing_subscriber::fmt::init(); // 全局使用 fmt subscriber 缺省实现

let mut service = ServiceBuilder::new()
    .layer(TraceLayer::new_for_http())
    // 或者使用 ServiceBuilderExt 提供的 Layer 集成方法
    // .trace_for_http()
    .service_fn(handle);

let request = Request::new(Full::from("foo"));
let response = service
    .ready()
    .await?
    .call(request)
    .await?;

// 运行:RUST_LOG=tower_http=trace cargo run

// 打印的日志:
// Mar 05 20:50:28.523 DEBUG request{method=GET path="/foo"}: tower_http::trace::on_request: started processing request
// Mar 05 20:50:28.524 DEBUG request{method=GET path="/foo"}: tower_http::trace::on_response: finished processing request latency=1 ms status=200

tower_http crate 和 tower crate 提供了大量 Layer 实现,它们在 axum crate 中得到广泛使用,例如 axum Router 的 layer() 方法可以添加 Layer。axum Router 的 layer() 按照添加的逆序先后执行 。但是 towner::ServiceBuidler 添加的多个 layer,按照添加的顺序先后执行,所以更建议使用 ServiceBuilder。

// auxm::router::Router 的 layer() 方法
pub fn layer<L>(self, layer: L) -> Router<S>
where
    L: Layer<Route> + Clone + Send + 'static,
    L::Service: Service<Request> + Clone + Send + 'static,
    <L::Service as Service<Request>>::Response: IntoResponse + 'static,
    <L::Service as Service<Request>>::Error: Into<Infallible> + 'static,
    <L::Service as Service<Request>>::Future: Send + 'static,

use axum::{routing::get, Router};
use tower_http::{trace::TraceLayer, compression::CompressionLayer};

let with_tracing = Router::new()
    .route("/foo", get(|| async {}))
    .route("/bar", get(|| async {}))
    .layer(TraceLayer::new_for_http()); // 对整个 Router 生效

let with_compression = Router::new()
    .route("/bar", get(|| async {}))
    .layer(CompressionLayer::new());

let app = Router::new()
    .merge(with_tracing)
    .merge(with_compression);


// 也可以为 MethodRouter 添加 Layer,仅适合该 Method
use axum::{routing::get, Router};
use tower::limit::ConcurrencyLimitLayer;
async fn handler() {}
let app = Router::new().route( "/", get(handler).layer(ConcurrencyLimitLayer::new(64)),);

// 如果要给 Router 添加多个 layer middleware,推荐使用 tower::ServiceBuilder
use axum::{routing::get, Extension, Router,};
use tower_http::{trace::TraceLayer};
use tower::ServiceBuilder;
async fn handler() {}
#[derive(Clone)]
struct State {}
let app = Router::new()
    .route("/", get(handler))
    .layer(
        ServiceBuilder::new() // 按照添加的 layer 顺序先后执行
            .layer(TraceLayer::new_for_http())
            .layer(Extension(State {}))
    );
rust crate - 这篇文章属于一个选集。
§ 9: 本文

相关文章

axum
··14161 字
Rust Rust-Crate
axum 是基于 hyper 实现的高性能异步 HTTP 1/2 Server 库。
clap
··5510 字
Rust Rust-Crate
clap 用于快速构建命令行程序,提供命令&参数定义、解析等功能。
config
··1978 字
Rust Rust-Crate
config 提供从文件或环境变量解析配置参数的功能。
diesel
··34358 字
Rust Rust-Crate
diesel 是高性能的 ORM 和 Query Builder,crates.io 使用它来操作数据库。