跳过正文

tower

··3948 字
Rust Rust-Crate
rust crate - 这篇文章属于一个选集。
§ 9: 本文

tower 提供了构建 middleware 的三个核心抽象:

  1. Service trait:从一个 Request 对象产生一个 Response 对象的异步函数;
    • Request/Response 是协议无关的(没有 trait bound),tower_http crate 提供了 HTTP Request/Response 相关的 Service 实现;
    • service_fn: 从闭包或函数创建一个 Service;
  2. Layer trait:封装一个 Service,添加自定义逻辑,返回另一个 Service;
    • layer_fn: 从闭包或函数创建一个 Layer
  3. ServiceBuilder:将一个 Service 附加多个 Layer trait 逻辑,返回一个新的 Service 或 Layer;
    • ServiceBuilder 对象实现了 Layer trait,而且使用更方便;

为了保证 API 版本兼容性,Service trait 单独在 tower-service crate 中定义,Layer trait 单独在 tower-layer crate 中定义,然后在 tower crate 中 pub 导入和导出。

Service trait 是一个异步接口,核心是调用 call() 方法将 Request 转为 Respone:

  • Service 又被称为 middleware;
// Request/Response 都没有 trait bound,故是通用类型;
pub trait Service<Request> {
    type Response;
    type Error;  // 自定义 Error 类型
    type Future: Future
    where
        // 异步函数返回 Result,包含 Error
        <Self::Future as Future>::Output == Result<Self::Response, Self::Error>;

    // 用于 Backpressure:调用方应先调用该函数,返回 Ready 时再调用 call
    fn poll_ready(
        &mut self,
        cx: &mut Context<'_>
    ) -> Poll<Result<(), Self::Error>>;

    fn call(&mut self, req: Request) -> Self::Future;
}

使用 service_fn 从一个函数或闭包 T 来创建一个实现了 Service 的 ServiceFn 对象。

  • fn 的输入是 Request, 返回 Result<Response, Error>,通过 Error 来报错。
pub fn service_fn<T>(f: T) -> ServiceFn<T>

// 示例:
use tower::{service_fn, Service, ServiceExt, BoxError};
// service fn 闭包函数的输入是 Request, 返回 Result<Response, Error>,通过 Error 来报错。
async fn handle(request: Request) -> Result<Response, BoxError> {
    let response = Response::new("Hello, World!");
    Ok(response)
}
let mut service = service_fn(handle); // 使用 service_fn 从函数或闭包创建一个 Service
let response = service
    .ready() // 调用 poll_ready()
    .await?
    .call(Request::new())
    .await?;
assert_eq!("Hello, World!", response.into_body());

tower_service::Service trait 在 axum crate 中得到了广泛使用,比如 axum::Router 有很多 Service 相关的方法:

// Router 的 route_service() 方法直接传入一个 Service 对象:
pub fn route_service<T>(self, path: &str, service: T) -> Self
where
    T: Service<Request, Error = Infallible> + Clone + Send + 'static,
    T::Response: IntoResponse,
    T::Future: Send + 'static,

// 示例
use axum::{
    Router,
    body::Body,
    routing::{any_service, get_service},
    extract::Request,
    http::StatusCode,
    error_handling::HandleErrorLayer,
};
use tower_http::services::ServeFile;
use http::Response;
use std::{convert::Infallible, io};
use tower::service_fn;
let app = Router::new()
    .route(
        // Any request to `/` goes to a service
        "/",
        // Services whose response body is not `axum::body::BoxBody`
        // can be wrapped in `axum::routing::any_service` (or one of the other routing filters)
        // to have the response body mapped
        any_service(service_fn(|_: Request| async {
            let res = Response::new(Body::from("Hi from `GET /`"));
            Ok::<_, Infallible>(res)
        }))
    )
    .route_service(
        "/foo",
        // This service's response body is `axum::body::BoxBody` so
        // it can be routed to directly.
        service_fn(|req: Request| async move {
            let body = Body::from(format!("Hi from `{} /foo`", req.method()));
            let res = Response::new(body);
            Ok::<_, Infallible>(res)
        })
    )
    .route_service(
        // GET `/static/Cargo.toml` goes to a service from tower-http
        "/static/Cargo.toml",
        ServeFile::new("Cargo.toml"),
    );

tower::MakeService trait 用于创建 Service,定义如下:

  • tower::MakeService 是 service 工厂,用于创建新的 Service 对象。例如 TCP/HTTP Server,在接收一个新连接时,需要使用 tower::MakeService 来创建一个新 Serivce 对象,进而使用该对象来处理请求和产生响应;
  • axum crate 的 Router/MethodRouter/Handler 都实现了 tower::MakeService trait。
pub trait MakeService<Target, Request>: Sealed<(Target, Request)> {
    type Response;
    type Error;
    type Service: Service<Request, Response = Self::Response, Error = Self::Error>;
    type MakeError;
    type Future: Future<Output = Result<Self::Service, Self::MakeError>>;

    fn poll_ready( &mut self, cx: &mut Context<'_> ) -> Poll<Result<(), Self::MakeError>>;

    fn make_service(&mut self, target: Target) -> Self::Future;

    fn into_service(self) -> IntoService<Self, Request> where Self: Sized, { ... }

    fn as_service(&mut self) -> AsService<'_, Self, Request> where Self: Sized, { ... }
}


// axum Router 示例
impl Router
pub fn into_make_service(self) -> IntoMakeService<Self>

// Struct axum::routing::IntoMakeService 实现了 MakeService
impl<M, S, Target, Request> MakeService<Target, Request> for M
where
    M: Service<Target, Response = S>,
    S: Service<Request>,
type Response = <S as Service<Request>>::Response
// Responses given by the service
type Error = <S as Service<Request>>::Error
// Errors produced by the service
type Service = S
// The Service value created by this factory
type MakeError = <M as Service<Target>>::Error
// Errors produced while building a service.
type Future = <M as Service<Target>>::Future
// The future of the Service instance.


// 示例:
// Convert this router into a MakeService, that is a Service whose response is another service.
use axum::{
    routing::get,
    Router,
};
// app 实现了 MakeService trait,故可以作为  axum::serve() 的参数
let app = Router::new().route("/", get(|| async { "Hi!" }));
let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await.unwrap();
axum::serve(listener, app).await.unwrap();

Layer trait 为传入的 Service 添加处理逻辑,返回一个 新 Service

pub trait Layer<S> { // S 是封装的内部 Service
    type Service;
    fn layer(&self, inner: S) -> Self::Service; // 返回一个新的 Service
}


pub struct LogLayer {
    target: &'static str,
}
impl<S> Layer<S> for LogLayer {
    type Service = LogService<S>;
    fn layer(&self, service: S) -> Self::Service {
        LogService {
            target: self.target,
            service
        }
    }
}

// This service implements the Log behavior
pub struct LogService<S> {
    target: &'static str,
    service: S,
}
impl<S, Request> Service<Request> for LogService<S> where S: Service<Request>, Request: fmt::Debug,
{
    type Response = S::Response;
    type Error = S::Error;
    type Future = S::Future;

    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
        self.service.poll_ready(cx)
    }

    fn call(&mut self, request: Request) -> Self::Future {
        // Insert log statement here or other functionality
        println!("request = {:?}, target = {:?}", request, self.target);
        self.service.call(request)
    }
}

towner::layer::layer_fn 函数返回一个实现 Layer trait 的 LayerFn<T> 对象,可以使用一个闭包或函数来创建一个 Layer,闭包函数的输入输出都是一个 Service:

pub fn layer_fn<T>(f: T) -> LayerFn<T>

// A middleware that logs requests before forwarding them to another service
pub struct LogService<S> {
    target: &'static str,
    service: S,
}

impl<S, Request> Service<Request> for LogService<S>
where
    S: Service<Request>,
    Request: fmt::Debug,
{
    type Response = S::Response;
    type Error = S::Error;
    type Future = S::Future;

    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
        self.service.poll_ready(cx)
    }

    fn call(&mut self, request: Request) -> Self::Future {
        // Log the request
        println!("request = {:?}, target = {:?}", request, self.target);

        self.service.call(request)
    }
}

// A `Layer` that wraps services in `LogService`
let log_layer = layer_fn(|service| { // 闭包的输入和输出都是 Service
    LogService {
        service,
        target: "tower-docs",
    }
});

// 使用 service_fn 和闭包,创建一个 Service,然后使用 Layer::layer() 封装返回一个新 Service
// An example service. This one uppercases strings
let uppercase_service = tower::service_fn(|request: String| async move {
    Ok::<_, Infallible>(request.to_uppercase())
});
// Wrap our service in a `LogService` so requests are logged.
let wrapped_service = log_layer.layer(uppercase_service);

Layer 在 axum crate 中得到广泛应用:

  • axum 使用的 Layer 实现大部分是由 tower_http crate 或 towner crate 提供。
  • 如果要给 Router 添加多个 layer middleware,推荐使用 tower::ServiceBuilder;
    • axum Router 等添加的多个 layer,按照添加的逆序先后执行。但是 ServiceBuidler 添加的多个 layer,按照添加的顺序先后执行。
// auxm::router::Router 的 layer() 方法
pub fn layer<L>(self, layer: L) -> Router<S>
where
    L: Layer<Route> + Clone + Send + 'static,
    L::Service: Service<Request> + Clone + Send + 'static,
    <L::Service as Service<Request>>::Response: IntoResponse + 'static,
    <L::Service as Service<Request>>::Error: Into<Infallible> + 'static,
    <L::Service as Service<Request>>::Future: Send + 'static,

// layer() 用于给 Router 的所有 route 添加 layer middleware 处理逻辑。
// tower_http crate 提供了许多 HTTP 相关的 Layer middleware,可以作为 layer() 的参数:
use axum::{routing::get, Router};
use tower_http::trace::TraceLayer;
let app = Router::new()
    .route("/foo", get(|| async {}))
    .route("/bar", get(|| async {}))
    .layer(TraceLayer::new_for_http());

use axum::{routing::get, Router};
use tower_http::{trace::TraceLayer, compression::CompressionLayer};
let with_tracing = Router::new()
    .route("/foo", get(|| async {}))
    .layer(TraceLayer::new_for_http());
let with_compression = Router::new()
    .route("/bar", get(|| async {}))
    .layer(CompressionLayer::new());
// Merge everything into one `Router`
let app = Router::new()
    .merge(with_tracing)
    .merge(with_compression);

// 如果要给 Router 添加多个 layer middleware,推荐使用 tower::ServiceBuilder
use axum::{
    routing::get,
    Extension,
    Router,
};
use tower_http::{trace::TraceLayer};
use tower::ServiceBuilder;
async fn handler() {}
#[derive(Clone)]
struct State {}
let app = Router::new()
    .route("/", get(handler))
    .layer(
        ServiceBuilder::new()
            .layer(TraceLayer::new_for_http())
            .layer(Extension(State {}))
    );


// MethodRouter 也可以添加 Layer
use axum::{routing::get, Router};
use tower::limit::ConcurrencyLimitLayer;
async fn handler() {}
let app = Router::new().route(
    "/",
    // All requests to `GET /` will be sent through `ConcurrencyLimitLayer`
    // 由于 tower 提供的 Layer 实现是 Request/Response 类型无关的,所以也可以在 axum 中使用。
    get(handler).layer(ConcurrencyLimitLayer::new(64)),
);

使用 ServiceBuilder 可以将多个 layer middleware 组合到一个 service 中, 返回一个新的 Service:

  • Layer trait 的 layer() 返回 Serivce,所以不支持 Service 串联,但是 ServiceBuilder 可以为 Service添加多个 Layer;
  • Req 按照 layer 添加的顺序被依次处理,最后是被 service 处理;
use std::time::Duration;
use tower::{ServiceBuilder, ServiceExt, BoxError, service_fn};

async fn handle(request: &'static str) -> Result<&'static str, BoxError> {
    Ok(request)
}

// 为 闭包 Service 添加两个 layer:buffer 和 timeout
let svc = ServiceBuilder::new()
    .buffer(1024) // 添加 buffer layer
    .timeout(Duration::from_secs(10)) // 添加 timeout layer
    .service_fn(handle); // 通过闭包函数创建一个 Service,然后返回一个新的 Service

let response = svc.oneshot("foo").await?;

assert_eq!(response, "foo");

ServiceBuilder 提供了各种方法来添加 layer 和 service:

  • layer()/option_layer(): 添加一个通用的 layer
  • layer_fn() 使用传入的闭包创建和添加一个 layer;
  • buff()/retry()/timeout() 等:添加对应类型的 layer
    • 这些 layer 是 Request/Response 类型无关的通用 layer
  • service()/service_fn(): 最后添加一个 Service,返回一个新 Service
impl ServiceBuilder<Identity>
pub fn new() -> Self

impl<L> ServiceBuilder<L>

// 添加通用 layer
pub fn layer<T>(self, layer: T) -> ServiceBuilder<Stack<T, L>>
pub fn option_layer<T>( self, layer: Option<T> ) -> ServiceBuilder<Stack<Either<T, Identity>, L>>
pub fn layer_fn<F>(self, f: F) -> ServiceBuilder<Stack<LayerFn<F>, L>>

// 添加特定类型 layer
pub fn buffer<Request>( self, bound: usize ) -> ServiceBuilder<Stack<BufferLayer<Request>, L>>
pub fn concurrency_limit( self, max: usize ) -> ServiceBuilder<Stack<ConcurrencyLimitLayer, L>>
pub fn load_shed(self) -> ServiceBuilder<Stack<LoadShedLayer, L>>
pub fn rate_limit( self, num: u64, per: Duration ) -> ServiceBuilder<Stack<RateLimitLayer, L>>
pub fn retry<P>(self, policy: P) -> ServiceBuilder<Stack<RetryLayer<P>, L>>
pub fn timeout( self, timeout: Duration) -> ServiceBuilder<Stack<TimeoutLayer, L>>
pub fn filter<P>(self, predicate: P) -> ServiceBuilder<Stack<FilterLayer<P>, L>>
pub fn filter_async<P>( self, predicate: P ) -> ServiceBuilder<Stack<AsyncFilterLayer<P>, L>>
pub fn map_request<F, R1, R2>( self, f: F ) -> ServiceBuilder<Stack<MapRequestLayer<F>, L>> where F: FnMut(R1) -> R2 + Clone,
pub fn map_response<F>( self, f: F) -> ServiceBuilder<Stack<MapResponseLayer<F>, L>>
pub fn map_err<F>(self, f: F) -> ServiceBuilder<Stack<MapErrLayer<F>, L>>
pub fn map_future<F>(self, f: F) -> ServiceBuilder<Stack<MapFutureLayer<F>, L>>
pub fn then<F>(self, f: F) -> ServiceBuilder<Stack<ThenLayer<F>, L>>
pub fn and_then<F>(self, f: F) -> ServiceBuilder<Stack<AndThenLayer<F>, L>>
pub fn map_result<F>(self, f: F) -> ServiceBuilder<Stack<MapResultLayer<F>, L>>

pub fn into_inner(self) -> L

// 添加 Service,返回新的 Service
pub fn service<S>(&self, service: S) -> L::Service where L: Layer<S>,
pub fn service_fn<F>(self, f: F) -> L::Service where L: Layer<ServiceFn<F>>,

pub fn check_clone(self) -> Self where Self: Clone,
pub fn check_service_clone<S>(self) -> Self where L: Layer<S>, L::Service: Clone,
pub fn check_service<S, T, U, E>(self) -> Self where L: Layer<S>, L::Service: Service<T, Response = U, Error = E>,
pub fn boxed<S, R>( self ) -> ServiceBuilder<Stack<LayerFn<fn(_: L::Service) -> BoxService<R, <L::Service as Service<R>>::Response, <L::Service as Service<R>>::Error>>, L>> where
    L: Layer<S>, L::Service: Service<R> + Send + 'static, <L::Service as Service<R>>::Future: Send + 'static,
pub fn boxed_clone<S, R>( self) -> ServiceBuilder<Stack<LayerFn<fn(_: L::Service) -> BoxCloneService<R, <L::Service as Service<R>>::Response, <L::Service as Service<R>>::Error>>, L>> where
    L: Layer<S>,
    L::Service: Service<R> + Clone + Send + 'static,
    <L::Service as Service<R>>::Future: Send + 'static,

// axum 示例:如果要给 Router 添加多个 layer middleware,推荐使用 tower::ServiceBuilder
use axum::{
    routing::get,
    Extension,
    Router,
};
use tower_http::{trace::TraceLayer};
use tower::ServiceBuilder;
async fn handler() {}
#[derive(Clone)]
struct State {}
let app = Router::new()
    .route("/", get(handler))
    .layer(
        ServiceBuilder::new()
            .layer(TraceLayer::new_for_http())
            .layer(Extension(State {}))
    );

tower 标准中间件 layer module:这些 module 包含 Service 和 Layer 实现,被上面的 ServiceBuilder 的对应方法使用,是 Request/Response 类型无关的通用中间件:

  • balancebalance:Middleware that allows balancing load among multiple services.
  • bufferbuffer:Middleware that provides a buffered mpsc channel to a service.
  • builder:Builder types to compose layers and services
  • discoverdiscover:Service discovery
  • filterfilter:Conditionally dispatch requests to the inner service based on the result of a predicate.
  • hedgehedge:Pre-emptively retry requests which have been outstanding for longer than a given latency percentile.
  • layer:A collection of Layer based tower services
  • limitlimit:Tower middleware for limiting requests.
  • loadload:Service load measurement
  • load_shedload-shed:Middleware for shedding load when inner services aren’t ready.
  • makemake:Trait aliases for Services that produce specific types of Responses.
  • ready_cacheready-cache:A cache of services
  • reconnectreconnect:Reconnect services when they fail.
  • retryretry:Middleware for retrying “failed” requests.
  • spawn_readyspawn-ready:When an underlying service is not ready, drive it to readiness on a background task.
  • steersteer:This module provides functionality to aid managing routing requests between Services.
  • timeouttimeout:Middleware that applies a timeout to requests.
  • utilutil:Various utility types and functions that are generally used with Tower.

tower-http crate 提供了 HTTP 相关的中间件和工具,使用 http 和 http-body crate 中的 HTTP Request/Response 定义;

  • add_extensionadd-extension : Middleware that clones a value into each request’s extensions.
  • authauth : Authorization related middleware.
  • body : Body types.
  • catch_paniccatch-panic: Convert panics into responses.
  • classify: Tools for classifying responses as either success or failure.
  • compressioncompression-br or compression-deflate or compression-gzip or compression-zstd: Middleware that compresses response bodies.
  • corscors: Middleware which adds headers for CORS.
  • decompressiondecompression-br or decompression-deflate or decompression-gzip or decompression-zstd: Middleware that decompresses request and response bodies.
  • follow_redirectfollow-redirect: Middleware for following redirections.
  • limitlimit: Middleware for limiting request bodies.
  • map_request_bodymap-request-body: Apply a transformation to the request body.
  • map_response_bodymap-response-body: Apply a transformation to the response body.
  • metricsmetrics: Middlewares for adding metrics to services.
  • normalize_pathnormalize-path: Middleware that normalizes paths.
  • propagate_headerpropagate-header: Propagate a header from the request to the response.
  • request_idrequest-id: Set and propagate request ids.
  • sensitive_headerssensitive-headers: Middlewares that mark headers as sensitive.
  • services: Services that return responses without wrapping other Services.
  • set_headerset-header: Middleware for setting headers on requests and responses.
  • set_statusset-status: Middleware to override status codes.
  • timeouttimeout: Middleware that applies a timeout to requests.
  • tracetrace: Middleware that adds high level tracing to a Service.
  • validate_requestvalidate-request: Middleware that validates requests.

以 trace 为例,添加了该 middleware 后,tower/axum 等才会自动打印请求相关的 trace log:

use http::{Request, Response};
use tower::{ServiceBuilder, ServiceExt, Service};
use tower_http::trace::TraceLayer;
use std::convert::Infallible;
use http_body_util::Full;
use bytes::Bytes;

async fn handle(request: Request<Full<Bytes>>) -> Result<Response<Full<Bytes>>, Infallible> {
    Ok(Response::new(Full::default()))
}

// Setup tracing
tracing_subscriber::fmt::init();

let mut service = ServiceBuilder::new()
    .layer(TraceLayer::new_for_http())
    .service_fn(handle);

let request = Request::new(Full::from("foo"));

let response = service
    .ready()
    .await?
    .call(request)
    .await?;

// 运行:RUST_LOG=tower_http=trace cargo run

// 打印的日志:
// Mar 05 20:50:28.523 DEBUG request{method=GET path="/foo"}: tower_http::trace::on_request: started processing request
// Mar 05 20:50:28.524 DEBUG request{method=GET path="/foo"}: tower_http::trace::on_response: finished processing request latency=1 ms status=200
rust crate - 这篇文章属于一个选集。
§ 9: 本文

相关文章

axum
··9673 字
Rust Rust-Crate
clap
··5086 字
Rust Rust-Crate
diesel
··33229 字
Rust Rust-Crate
http/http_body
··3080 字
Rust Rust-Crate