tower 提供了构建 middleware 的三个核心抽象:
- Service trait:从一个 Request 对象产生一个 Response 对象的异步函数;
- Request/Response 是协议无关的(没有 trait bound),tower_http crate 提供了 HTTP Request/Response 相关的 Service 实现;
- service_fn: 从闭包或函数创建一个 Service;
- Layer trait:封装一个 Service,添加自定义逻辑,返回另一个 Service;
- layer_fn: 从闭包或函数创建一个 Layer
- ServiceBuilder:将一个 Service 附加多个 Layer trait 逻辑,返回一个新的 Service 或 Layer;
- ServiceBuilder 对象实现了 Layer trait,而且使用更方便;
为了保证 API 版本兼容性,Service trait 单独在 tower-service crate
中定义,Layer trait 单独在
tower-layer crate
中定义,然后在 tower crate 中 pub 导入和导出。
Service trait 是一个异步接口,核心是调用 call() 方法将 Request 转为 Respone:
- Service 又被称为 middleware;
// Request/Response 都没有 trait bound,故是通用类型;
pub trait Service<Request> {
type Response;
type Error; // 自定义 Error 类型
type Future: Future
where
// 异步函数返回 Result,包含 Error
<Self::Future as Future>::Output == Result<Self::Response, Self::Error>;
// 用于 Backpressure:调用方应先调用该函数,返回 Ready 时再调用 call
fn poll_ready(
&mut self,
cx: &mut Context<'_>
) -> Poll<Result<(), Self::Error>>;
fn call(&mut self, req: Request) -> Self::Future;
}
使用 service_fn 从一个函数或闭包 T 来创建一个实现了 Service 的 ServiceFn 对象。
- fn 的输入是 Request, 返回 Result<Response, Error>,通过 Error 来报错。
pub fn service_fn<T>(f: T) -> ServiceFn<T>
// 示例:
use tower::{service_fn, Service, ServiceExt, BoxError};
// service fn 闭包函数的输入是 Request, 返回 Result<Response, Error>,通过 Error 来报错。
async fn handle(request: Request) -> Result<Response, BoxError> {
let response = Response::new("Hello, World!");
Ok(response)
}
let mut service = service_fn(handle); // 使用 service_fn 从函数或闭包创建一个 Service
let response = service
.ready() // 调用 poll_ready()
.await?
.call(Request::new())
.await?;
assert_eq!("Hello, World!", response.into_body());
tower_service::Service trait 在 axum crate 中得到了广泛使用,比如 axum::Router 有很多 Service 相关的方法:
// Router 的 route_service() 方法直接传入一个 Service 对象:
pub fn route_service<T>(self, path: &str, service: T) -> Self
where
T: Service<Request, Error = Infallible> + Clone + Send + 'static,
T::Response: IntoResponse,
T::Future: Send + 'static,
// 示例
use axum::{
Router,
body::Body,
routing::{any_service, get_service},
extract::Request,
http::StatusCode,
error_handling::HandleErrorLayer,
};
use tower_http::services::ServeFile;
use http::Response;
use std::{convert::Infallible, io};
use tower::service_fn;
let app = Router::new()
.route(
// Any request to `/` goes to a service
"/",
// Services whose response body is not `axum::body::BoxBody`
// can be wrapped in `axum::routing::any_service` (or one of the other routing filters)
// to have the response body mapped
any_service(service_fn(|_: Request| async {
let res = Response::new(Body::from("Hi from `GET /`"));
Ok::<_, Infallible>(res)
}))
)
.route_service(
"/foo",
// This service's response body is `axum::body::BoxBody` so
// it can be routed to directly.
service_fn(|req: Request| async move {
let body = Body::from(format!("Hi from `{} /foo`", req.method()));
let res = Response::new(body);
Ok::<_, Infallible>(res)
})
)
.route_service(
// GET `/static/Cargo.toml` goes to a service from tower-http
"/static/Cargo.toml",
ServeFile::new("Cargo.toml"),
);
tower::MakeService trait 用于创建 Service,定义如下:
- tower::MakeService 是 service 工厂,用于创建新的 Service 对象。例如 TCP/HTTP Server,在接收一个新连接时,需要使用 tower::MakeService 来创建一个新 Serivce 对象,进而使用该对象来处理请求和产生响应;
- axum crate 的 Router/MethodRouter/Handler 都实现了 tower::MakeService trait。
pub trait MakeService<Target, Request>: Sealed<(Target, Request)> {
type Response;
type Error;
type Service: Service<Request, Response = Self::Response, Error = Self::Error>;
type MakeError;
type Future: Future<Output = Result<Self::Service, Self::MakeError>>;
fn poll_ready( &mut self, cx: &mut Context<'_> ) -> Poll<Result<(), Self::MakeError>>;
fn make_service(&mut self, target: Target) -> Self::Future;
fn into_service(self) -> IntoService<Self, Request> where Self: Sized, { ... }
fn as_service(&mut self) -> AsService<'_, Self, Request> where Self: Sized, { ... }
}
// axum Router 示例
impl Router
pub fn into_make_service(self) -> IntoMakeService<Self>
// Struct axum::routing::IntoMakeService 实现了 MakeService
impl<M, S, Target, Request> MakeService<Target, Request> for M
where
M: Service<Target, Response = S>,
S: Service<Request>,
type Response = <S as Service<Request>>::Response
// Responses given by the service
type Error = <S as Service<Request>>::Error
// Errors produced by the service
type Service = S
// The Service value created by this factory
type MakeError = <M as Service<Target>>::Error
// Errors produced while building a service.
type Future = <M as Service<Target>>::Future
// The future of the Service instance.
// 示例:
// Convert this router into a MakeService, that is a Service whose response is another service.
use axum::{
routing::get,
Router,
};
// app 实现了 MakeService trait,故可以作为 axum::serve() 的参数
let app = Router::new().route("/", get(|| async { "Hi!" }));
let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await.unwrap();
axum::serve(listener, app).await.unwrap();
Layer trait 为传入的 Service 添加处理逻辑,返回一个 新 Service
:
pub trait Layer<S> { // S 是封装的内部 Service
type Service;
fn layer(&self, inner: S) -> Self::Service; // 返回一个新的 Service
}
pub struct LogLayer {
target: &'static str,
}
impl<S> Layer<S> for LogLayer {
type Service = LogService<S>;
fn layer(&self, service: S) -> Self::Service {
LogService {
target: self.target,
service
}
}
}
// This service implements the Log behavior
pub struct LogService<S> {
target: &'static str,
service: S,
}
impl<S, Request> Service<Request> for LogService<S> where S: Service<Request>, Request: fmt::Debug,
{
type Response = S::Response;
type Error = S::Error;
type Future = S::Future;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.service.poll_ready(cx)
}
fn call(&mut self, request: Request) -> Self::Future {
// Insert log statement here or other functionality
println!("request = {:?}, target = {:?}", request, self.target);
self.service.call(request)
}
}
towner::layer::layer_fn
函数返回一个实现 Layer trait 的 LayerFn<T> 对象,可以使用一个闭包或函数来创建一个 Layer,闭包函数的输入输出都是一个 Service:
pub fn layer_fn<T>(f: T) -> LayerFn<T>
// A middleware that logs requests before forwarding them to another service
pub struct LogService<S> {
target: &'static str,
service: S,
}
impl<S, Request> Service<Request> for LogService<S>
where
S: Service<Request>,
Request: fmt::Debug,
{
type Response = S::Response;
type Error = S::Error;
type Future = S::Future;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.service.poll_ready(cx)
}
fn call(&mut self, request: Request) -> Self::Future {
// Log the request
println!("request = {:?}, target = {:?}", request, self.target);
self.service.call(request)
}
}
// A `Layer` that wraps services in `LogService`
let log_layer = layer_fn(|service| { // 闭包的输入和输出都是 Service
LogService {
service,
target: "tower-docs",
}
});
// 使用 service_fn 和闭包,创建一个 Service,然后使用 Layer::layer() 封装返回一个新 Service
// An example service. This one uppercases strings
let uppercase_service = tower::service_fn(|request: String| async move {
Ok::<_, Infallible>(request.to_uppercase())
});
// Wrap our service in a `LogService` so requests are logged.
let wrapped_service = log_layer.layer(uppercase_service);
Layer 在 axum crate 中得到广泛应用:
- axum 使用的 Layer 实现大部分是由 tower_http crate 或 towner crate 提供。
- 如果要给 Router 添加多个 layer middleware,推荐使用 tower::ServiceBuilder;
- axum Router 等添加的多个 layer,按照添加的逆序先后执行。但是 ServiceBuidler 添加的多个 layer,按照添加的顺序先后执行。
// auxm::router::Router 的 layer() 方法
pub fn layer<L>(self, layer: L) -> Router<S>
where
L: Layer<Route> + Clone + Send + 'static,
L::Service: Service<Request> + Clone + Send + 'static,
<L::Service as Service<Request>>::Response: IntoResponse + 'static,
<L::Service as Service<Request>>::Error: Into<Infallible> + 'static,
<L::Service as Service<Request>>::Future: Send + 'static,
// layer() 用于给 Router 的所有 route 添加 layer middleware 处理逻辑。
// tower_http crate 提供了许多 HTTP 相关的 Layer middleware,可以作为 layer() 的参数:
use axum::{routing::get, Router};
use tower_http::trace::TraceLayer;
let app = Router::new()
.route("/foo", get(|| async {}))
.route("/bar", get(|| async {}))
.layer(TraceLayer::new_for_http());
use axum::{routing::get, Router};
use tower_http::{trace::TraceLayer, compression::CompressionLayer};
let with_tracing = Router::new()
.route("/foo", get(|| async {}))
.layer(TraceLayer::new_for_http());
let with_compression = Router::new()
.route("/bar", get(|| async {}))
.layer(CompressionLayer::new());
// Merge everything into one `Router`
let app = Router::new()
.merge(with_tracing)
.merge(with_compression);
// 如果要给 Router 添加多个 layer middleware,推荐使用 tower::ServiceBuilder
use axum::{
routing::get,
Extension,
Router,
};
use tower_http::{trace::TraceLayer};
use tower::ServiceBuilder;
async fn handler() {}
#[derive(Clone)]
struct State {}
let app = Router::new()
.route("/", get(handler))
.layer(
ServiceBuilder::new()
.layer(TraceLayer::new_for_http())
.layer(Extension(State {}))
);
// MethodRouter 也可以添加 Layer
use axum::{routing::get, Router};
use tower::limit::ConcurrencyLimitLayer;
async fn handler() {}
let app = Router::new().route(
"/",
// All requests to `GET /` will be sent through `ConcurrencyLimitLayer`
// 由于 tower 提供的 Layer 实现是 Request/Response 类型无关的,所以也可以在 axum 中使用。
get(handler).layer(ConcurrencyLimitLayer::new(64)),
);
使用 ServiceBuilder
可以将多个 layer middleware 组合到一个 service 中, 返回一个新的 Service:
- Layer trait 的 layer() 返回 Serivce,所以不支持 Service 串联,但是 ServiceBuilder 可以为 Service添加多个 Layer;
- Req 按照 layer 添加的顺序被依次处理,最后是被 service 处理;
use std::time::Duration;
use tower::{ServiceBuilder, ServiceExt, BoxError, service_fn};
async fn handle(request: &'static str) -> Result<&'static str, BoxError> {
Ok(request)
}
// 为 闭包 Service 添加两个 layer:buffer 和 timeout
let svc = ServiceBuilder::new()
.buffer(1024) // 添加 buffer layer
.timeout(Duration::from_secs(10)) // 添加 timeout layer
.service_fn(handle); // 通过闭包函数创建一个 Service,然后返回一个新的 Service
let response = svc.oneshot("foo").await?;
assert_eq!(response, "foo");
ServiceBuilder 提供了各种方法来添加 layer 和 service:
- layer()/option_layer(): 添加一个通用的 layer
- layer_fn() 使用传入的闭包创建和添加一个 layer;
- buff()/retry()/timeout() 等:添加对应类型的 layer
- 这些 layer 是 Request/Response
类型无关的通用 layer
- 这些 layer 是 Request/Response
- service()/service_fn(): 最后添加一个 Service,返回一个新 Service
impl ServiceBuilder<Identity>
pub fn new() -> Self
impl<L> ServiceBuilder<L>
// 添加通用 layer
pub fn layer<T>(self, layer: T) -> ServiceBuilder<Stack<T, L>>
pub fn option_layer<T>( self, layer: Option<T> ) -> ServiceBuilder<Stack<Either<T, Identity>, L>>
pub fn layer_fn<F>(self, f: F) -> ServiceBuilder<Stack<LayerFn<F>, L>>
// 添加特定类型 layer
pub fn buffer<Request>( self, bound: usize ) -> ServiceBuilder<Stack<BufferLayer<Request>, L>>
pub fn concurrency_limit( self, max: usize ) -> ServiceBuilder<Stack<ConcurrencyLimitLayer, L>>
pub fn load_shed(self) -> ServiceBuilder<Stack<LoadShedLayer, L>>
pub fn rate_limit( self, num: u64, per: Duration ) -> ServiceBuilder<Stack<RateLimitLayer, L>>
pub fn retry<P>(self, policy: P) -> ServiceBuilder<Stack<RetryLayer<P>, L>>
pub fn timeout( self, timeout: Duration) -> ServiceBuilder<Stack<TimeoutLayer, L>>
pub fn filter<P>(self, predicate: P) -> ServiceBuilder<Stack<FilterLayer<P>, L>>
pub fn filter_async<P>( self, predicate: P ) -> ServiceBuilder<Stack<AsyncFilterLayer<P>, L>>
pub fn map_request<F, R1, R2>( self, f: F ) -> ServiceBuilder<Stack<MapRequestLayer<F>, L>> where F: FnMut(R1) -> R2 + Clone,
pub fn map_response<F>( self, f: F) -> ServiceBuilder<Stack<MapResponseLayer<F>, L>>
pub fn map_err<F>(self, f: F) -> ServiceBuilder<Stack<MapErrLayer<F>, L>>
pub fn map_future<F>(self, f: F) -> ServiceBuilder<Stack<MapFutureLayer<F>, L>>
pub fn then<F>(self, f: F) -> ServiceBuilder<Stack<ThenLayer<F>, L>>
pub fn and_then<F>(self, f: F) -> ServiceBuilder<Stack<AndThenLayer<F>, L>>
pub fn map_result<F>(self, f: F) -> ServiceBuilder<Stack<MapResultLayer<F>, L>>
pub fn into_inner(self) -> L
// 添加 Service,返回新的 Service
pub fn service<S>(&self, service: S) -> L::Service where L: Layer<S>,
pub fn service_fn<F>(self, f: F) -> L::Service where L: Layer<ServiceFn<F>>,
pub fn check_clone(self) -> Self where Self: Clone,
pub fn check_service_clone<S>(self) -> Self where L: Layer<S>, L::Service: Clone,
pub fn check_service<S, T, U, E>(self) -> Self where L: Layer<S>, L::Service: Service<T, Response = U, Error = E>,
pub fn boxed<S, R>( self ) -> ServiceBuilder<Stack<LayerFn<fn(_: L::Service) -> BoxService<R, <L::Service as Service<R>>::Response, <L::Service as Service<R>>::Error>>, L>> where
L: Layer<S>, L::Service: Service<R> + Send + 'static, <L::Service as Service<R>>::Future: Send + 'static,
pub fn boxed_clone<S, R>( self) -> ServiceBuilder<Stack<LayerFn<fn(_: L::Service) -> BoxCloneService<R, <L::Service as Service<R>>::Response, <L::Service as Service<R>>::Error>>, L>> where
L: Layer<S>,
L::Service: Service<R> + Clone + Send + 'static,
<L::Service as Service<R>>::Future: Send + 'static,
// axum 示例:如果要给 Router 添加多个 layer middleware,推荐使用 tower::ServiceBuilder
use axum::{
routing::get,
Extension,
Router,
};
use tower_http::{trace::TraceLayer};
use tower::ServiceBuilder;
async fn handler() {}
#[derive(Clone)]
struct State {}
let app = Router::new()
.route("/", get(handler))
.layer(
ServiceBuilder::new()
.layer(TraceLayer::new_for_http())
.layer(Extension(State {}))
);
tower 标准中间件 layer module:这些 module 包含 Service 和 Layer 实现,被上面的 ServiceBuilder 的对应方法使用,是 Request/Response 类型无关的通用中间件:
- balancebalance:Middleware that allows balancing load among multiple services.
- bufferbuffer:Middleware that provides a buffered mpsc channel to a service.
- builder:Builder types to compose layers and services
- discoverdiscover:Service discovery
- filterfilter:Conditionally dispatch requests to the inner service based on the result of a predicate.
- hedgehedge:Pre-emptively retry requests which have been outstanding for longer than a given latency percentile.
- layer:A collection of Layer based tower services
- limitlimit:Tower middleware for limiting requests.
- loadload:Service load measurement
- load_shedload-shed:Middleware for shedding load when inner services aren’t ready.
- makemake:Trait aliases for Services that produce specific types of Responses.
- ready_cacheready-cache:A cache of services
- reconnectreconnect:Reconnect services when they fail.
- retryretry:Middleware for retrying “failed” requests.
- spawn_readyspawn-ready:When an underlying service is not ready, drive it to readiness on a background task.
- steersteer:This module provides functionality to aid managing routing requests between Services.
- timeouttimeout:Middleware that applies a timeout to requests.
- utilutil:Various utility types and functions that are generally used with Tower.
tower-http crate 提供了 HTTP 相关的中间件和工具,使用 http 和 http-body crate 中的 HTTP Request/Response 定义;
- add_extensionadd-extension : Middleware that clones a value into each request’s extensions.
- authauth : Authorization related middleware.
- body : Body types.
- catch_paniccatch-panic: Convert panics into responses.
- classify: Tools for classifying responses as either success or failure.
- compressioncompression-br or compression-deflate or compression-gzip or compression-zstd: Middleware that compresses response bodies.
- corscors: Middleware which adds headers for CORS.
- decompressiondecompression-br or decompression-deflate or decompression-gzip or decompression-zstd: Middleware that decompresses request and response bodies.
- follow_redirectfollow-redirect: Middleware for following redirections.
- limitlimit: Middleware for limiting request bodies.
- map_request_bodymap-request-body: Apply a transformation to the request body.
- map_response_bodymap-response-body: Apply a transformation to the response body.
- metricsmetrics: Middlewares for adding metrics to services.
- normalize_pathnormalize-path: Middleware that normalizes paths.
- propagate_headerpropagate-header: Propagate a header from the request to the response.
- request_idrequest-id: Set and propagate request ids.
- sensitive_headerssensitive-headers: Middlewares that mark headers as sensitive.
- services: Services that return responses without wrapping other Services.
- set_headerset-header: Middleware for setting headers on requests and responses.
- set_statusset-status: Middleware to override status codes.
- timeouttimeout: Middleware that applies a timeout to requests.
- tracetrace: Middleware that adds high level tracing to a Service.
- validate_requestvalidate-request: Middleware that validates requests.
以 trace 为例,添加了该 middleware 后,tower/axum 等才会自动打印请求相关的 trace log:
use http::{Request, Response};
use tower::{ServiceBuilder, ServiceExt, Service};
use tower_http::trace::TraceLayer;
use std::convert::Infallible;
use http_body_util::Full;
use bytes::Bytes;
async fn handle(request: Request<Full<Bytes>>) -> Result<Response<Full<Bytes>>, Infallible> {
Ok(Response::new(Full::default()))
}
// Setup tracing
tracing_subscriber::fmt::init();
let mut service = ServiceBuilder::new()
.layer(TraceLayer::new_for_http())
.service_fn(handle);
let request = Request::new(Full::from("foo"));
let response = service
.ready()
.await?
.call(request)
.await?;
// 运行:RUST_LOG=tower_http=trace cargo run
// 打印的日志:
// Mar 05 20:50:28.523 DEBUG request{method=GET path="/foo"}: tower_http::trace::on_request: started processing request
// Mar 05 20:50:28.524 DEBUG request{method=GET path="/foo"}: tower_http::trace::on_response: finished processing request latency=1 ms status=200