tower 提供了构建 middleware 的 4 个核心抽象:
-
Service trait
:从一个 Request 产生一个 Response 的异步函数;- Request/Response 是协议无关的(没有 trait bound);
tower_http
crate 提供了 HTTP Request/Response 相关的 Service 实现;- service_fn: 从闭包或函数创建一个 Service;
-
ServiceExt trait
:是 Service trait 的扩展,提供 ready()/and_then()/map_response()/filter() 等方便使用的方法; -
Layer trait
:封装一个 Service,添加自定义逻辑,返回另一个 Service;- layer_fn: 从闭包或函数创建一个 Layer;
-
ServiceBuilder
:将一个 Service 附加多个 Layer trait,返回一个新的 Service 或 Layer;- ServiceBuilder 对象实现了 Layer trait,而且使用更方便;
为了保证 API 版本兼容性,Service trait 单独在 tower-service crate
中定义,Layer trait 单独在 tower-layer crate
中定义,它们都在 tower crate
中被 pub use 导入和导出。
1 Service #
Service trait 提供了异步方法 call(), 它将 Request 转为 Result<Respone, Error>
的 Future 对象,类似于: async fn(Request) -> Result<Response, Error>
// 这里的 Request 和 Response 是泛型参数,而非 http::Request/Response struct 类型。
pub trait Service<Request> {
type Response;
type Error;
type Future: Future
// Future 的 Output 是 Result 类型
where <Self::Future as Future>::Output == Result<Self::Response, Self::Error>;
// 用于 Backpressure:调用方应先调用该函数,返回 Ready 时再调用 call
fn poll_ready(
&mut self,
cx: &mut Context<'_>
) -> Poll<Result<(), Self::Error>>;
fn call(&mut self, req: Request) -> Self::Future;
}
tower::ServiceExt 是 Serice trait 的扩展,提供了 ready()/and_then()/map_response()/filter() 等方便使用的方法。
典型使用模式: let response = my_service.ready().await?.call(req).await;
pub trait ServiceExt<Request>: Service<Request> {
fn ready(&mut self) -> Ready<'_, Self, Request> where Self: Sized
fn ready_and(&mut self) -> ReadyAnd<'_, Self, Request> where Self: Sized
fn ready_oneshot(self) -> ReadyOneshot<Self, Request> where Self: Sized
fn oneshot(self, req: Request) -> Oneshot<Self, Request> where Self: Sized
fn call_all<S>(self, reqs: S) -> CallAll<Self, S> where Self: Sized, Self::Error: Into<BoxError>, S: Stream<Item = Request>
fn and_then<F>(self, f: F) -> AndThen<Self, F> where Self: Sized, F: Clone
fn map_response<F, Response>(self, f: F) -> MapResponse<Self, F>
where Self: Sized, F: FnOnce(Self::Response) -> Response + Clone
fn map_err<F, Error>(self, f: F) -> MapErr<Self, F> where Self: Sized, F: FnOnce(Self::Error) -> Error + Clone
fn map_result<F, Response, Error>(self, f: F) -> MapResult<Self, F>
where Self: Sized, Error: From<Self::Error>,
F: FnOnce(Result<Self::Response, Self::Error>) -> Result<Response, Error> + Clone
fn map_request<F, NewRequest>(self, f: F) -> MapRequest<Self, F> where Self: Sized, F: FnMut(NewRequest) -> Request
fn filter<F, NewRequest>(self, filter: F) -> Filter<Self, F> where Self: Sized, F: Predicate<NewRequest>
fn filter_async<F, NewRequest>(self, filter: F) -> AsyncFilter<Self, F> where Self: Sized, F: AsyncPredicate<NewRequest>
fn then<F, Response, Error, Fut>(self, f: F) -> Then<Self, F>
where Self: Sized, Error: From<Self::Error>,
F: FnOnce(Result<Self::Response, Self::Error>) -> Fut + Clone, Fut: Future<Output = Result<Response, Error>>
fn map_future<F, Fut, Response, Error>(self, f: F) -> MapFuture<Self, F>
where Self: Sized, F: FnMut(Self::Future) -> Fut, Error: From<Self::Error>, Fut: Future<Output = Result<Response, Error>>
fn boxed(self) -> BoxService<Request, Self::Response, Self::Error>
where Self: Sized + Send + 'static, Self::Future: Send + 'static
fn boxed_clone( self ) -> BoxCloneService<Request, Self::Response, Self::Error>
where Self: Clone + Sized + Send + 'static, Self::Future: Send + 'static
}
// 示例
// A service returning Result<Record, _>
let service = DatabaseService::new("127.0.0.1:8080");
// Map the response into a new response
let mut new_service = service.map_response(|record| record.name);
// Call the new service
let id = 13;
let name = new_service
.ready()
.await? // 当 Service 准备好 Ready 时才调用它的 call() 方法
.call(id)
.await?;
// filter(): Composes this service with a Filter that conditionally accepts or rejects requests
// based on a predicate.
// Fallibly map the request to a new request
let mut new_service = service
.filter(|id_str: &str| id_str.parse().map_err(DbError::Parse));
// Call the new service
let id = "13";
let response = new_service
.ready()
.await?
.call(id)
.await;
service_fn<T>(f: T)
从一个函数或闭包 T 创建一个实现了 Service trait 的 ServiceFn
对象:
- T 是
FnMut(Request) -> Future<Output = Result<R,E>>
闭包,也即返回 Result 类型, 但 Request 和 Response 都是泛型参数(类型没有限制)。
pub fn service_fn<T>(f: T) -> ServiceFn<T> // T 是一个返回 Future 的异步函数。
// ServiceFn<T> 实现了 Service trait
impl<T, F, Request, R, E> Service<Request> for ServiceFn<T>
where
T: FnMut(Request) -> F,
F: Future<Output = Result<R, E>>,
{
type Response = R;
type Error = E;
type Future = F;
fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), E>> {
Ok(()).into()
}
fn call(&mut self, req: Request) -> Self::Future {
(self.f)(req)
}
}
// 示例:
use tower::{service_fn, Service, ServiceExt, BoxError};
// service fn 闭包函数的输入是 Request, 返回必须是 Result 类型,通过 Error 来报错。
async fn handle(request: Request) -> Result<Response, BoxError> {
let response = Response::new("Hello, World!");
Ok(response)
}
// 使用 service_fn 从函数或闭包创建一个 Service
let mut service = service_fn(handle);
let response = service
.ready() // 调用 poll_ready()
.await?
.call(Request::new())
.await?;
assert_eq!("Hello, World!", response.into_body());
tower_service::Service trait
在 axum crate
中得到了广泛使用:
- axum::Router:有很多 Service 相关的方法;
tower_http::services
module 提供了很多 HTTP 相关 Service 实现;
axum Router 的 route_service() 方法直接传入一个 Service 对象,该 Service 关联的 Error 默认是 Infallible,即不能失败类型。创建该 Service 对象的两种方式:
- 使用 service_fn() 闭包;
- 使用 tower-http 提供的各种 Service Middleware;
//
pub fn route_service<T>(self, path: &str, service: T) -> Self
where
T: Service<Request, Error = Infallible> + Clone + Send + 'static,
T::Response: IntoResponse,
T::Future: Send + 'static,
// 示例
use axum::{
Router,
body::Body,
routing::{any_service, get_service},
extract::Request,
http::StatusCode,
error_handling::HandleErrorLayer,
};
use tower_http::services::ServeFile;
use http::Response;
use std::{convert::Infallible, io};
use tower::service_fn;
let app = Router::new()
.route(
"/",
any_service(service_fn(|_: Request| async {
let res = Response::new(Body::from("Hi from `GET /`"));
Ok::<_, Infallible>(res) // service_fn 的闭包必须返回 Result,且 Error 是 Infallible 类型
}))
)
.route_service(
"/foo",
service_fn(|req: Request| async move { // async move {} 是一个 async block expression
let body = Body::from(format!("Hi from `{} /foo`", req.method()));
let res = Response::new(body);
Ok::<_, Infallible>(res)
})
)
.route_service(
"/static/Cargo.toml",
ServeFile::new("Cargo.toml"), // tower-http::ServeFile::new() 返回一个 Service
);
tower::MakeService trait
是 Service 工厂,用于创建新的 Service 对象,例如 TCP/HTTP Server 在接收一个新连接时需要使用 tower::MakeService 的 into_service()/as_service()
来创建一个新 Serivce 对象来处理请求和产生响应。
- into_service() 和 as_service() 类似,前者消耗 MakeService 对象,而后者是 &mut 可变借用;
pub trait MakeService<Target, Request>: Sealed<(Target, Request)> {
type Response;
type Error;
type Service: Service<Request, Response = Self::Response, Error = Self::Error>;
type MakeError;
type Future: Future<Output = Result<Self::Service, Self::MakeError>>; // 异步返回的是另一个 Service
// Required methods
fn poll_ready( &mut self, cx: &mut Context<'_> ) -> Poll<Result<(), Self::MakeError>>;
fn make_service(&mut self, target: Target) -> Self::Future;
// Provided methods
fn into_service(self) -> IntoService<Self, Request> where Self: Sized, { ... }
fn as_service(&mut self) -> AsService<'_, Self, Request> where Self: Sized, { ... }
}
// tower 为 Service<Target, Response = Service<Request>> 类型 实现了 MakeService trait,
// 该 Service 的 Response 类型为 Serivce<Request>, 即从自身 Service 创建另一个 Service
impl<M, S, Target, Request> MakeService<Target, Request> for M
where
M: Service<Target, Response = S>,
S: Service<Request>
type Response = <S as Service<Request>>::Response
type Error = <S as Service<Request>>::Error
type Service = S
type MakeError = <M as Service<Target>>::Error
type Future = <M as Service<Target>>::Future
以 fn into_service(self) -> IntoService<Self, Request>
方法为例,它返回的对象 IntoService 实现了 Service of Service:
// tower::IntoService<M, Request> struct 实现了 Service<Target> trait
impl<M, S, Target, Request> Service<Target> for IntoService<M, Request>
where
M: Service<Target, Response = S>,
S: Service<Request>
type Response = <M as Service<Target>>::Response // Responses given by the service.
type Error = <M as Service<Target>>::Error // Errors produced by the service.
type Future = <M as Service<Target>>::Future // The future response value.
// Returns Poll::Ready(Ok(())) when the service is able to process requests. Read more
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>
// Process the request and return the response asynchronously. Read more
fn call(&mut self, target: Target) -> Self::Future
// 示例
use std::convert::Infallible;
use tower::Service;
use tower::make::MakeService;
use tower::service_fn;
// A `MakeService`,注意是嵌套 Service。
let make_service = service_fn(|make_req: ()| async {
// 返回一个 Service,Request 是 () 类型
// 该 Service 的返回结果为另一个内部 service_fn() 创建的另一个 Service,Request 是 String 类型
Ok::<_, Infallible>(
service_fn(|req: String| async {
Ok::<_, Infallible>(req)
}
)
)
});
// Convert the `MakeService` into a `Service`。返回的是实现 Service 的 IntoService<M,
// Request> 类型其中 Request 是 () 类型
let mut svc = make_service.into_service();
// Make a new service。IntoService 的 call() 方法传入 Request 类型值 (), 返回新的
// Service 类型
let mut new_svc = svc.call(()).await.unwrap();
// Call the service。该 Service 类型的 Request 是 String 类型
let res = new_svc.call("foo".to_string()).await.unwrap();
示例:axum::serve() 的参数 make_service, 传入的 Service 的 Request 是 IncomingStream 类型,而响应是另一个 Service<Request, Response = Response, Error = Infallible> 类型,其中 Request 和 Response 均为 http::request::Request 和 http::response::Response
, 其 中 Body 为 Struct axum::body::Body
,它实现了 http_body::Body<bytes::Bytes> trait
。
- Struct axum::serve::IncomingStream
注意:M 没有直接使用 tower::MakeService trait 来做限界,但是效果和语义是一致的:
// axum::serve() 函数定义:
pub fn serve<M, S>(tcp_listener: TcpListener, make_service: M) -> Serve<M, S>
where
// 外层 Service,输入是 IncomingStream 类型,响应是另一个 Service
M: for<'a> Service<IncomingStream<'a>, Error = Infallible, Response = S>,
// 内层 Service
// 输入 Request 是 Type Alias axum::extract::Request<T=Body>=http::request::Request<T>
// 响应 Response 是 Type Alias axum::response::Response<T=Body> = http::response::Response<T>;
//
// Request<T=Body> 和 Response<T=Body> 中的 Body 为 Struct axum::body::Body,它实现了
// http_body::Body<bytes::Bytes> trait.
S: Service<Request, Response = Response, Error = Infallible> + Clone + Send + 'static,
S::Future: Send,
axum crate 的 Router/MethodRouter/Handler 都实现了 tower::MakeService trait:
// Router 实现了 Service<IncomingStream<'_>>,对应的 Response 还是 Router 类型
impl Service<IncomingStream<'_>> for Router<()>
type Response = Router
type Error = Infallible
type Future = Ready<Result>::Response, <Router as Service<IncomingStream<'_>>>::Error>>
// Router 实现了 Service<Request<B>>, 对应的 Reqeust<B> 中 B 实现 http_body::Body<Data= bytes::Bytes>,
// 响应为 Struct http::response::Response< axum::body::Body>
impl<B> Service<Request<B>> for Router<()>
where
B: HttpBody<Data = Bytes> + Send + 'static,
B::Error: Into<BoxError>
type Response = Response<Body>
type Error = Infallible
type Future = RouteFuture<Infallible>
// 示例
use axum::{
routing::get,
Router,
};
// app 对应的 Router 类型实现了 MakeService trait,故可以作为 axum::serve() 的参数
let app = Router::new().route("/", get(|| async { "Hi!" }));
let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await.unwrap();
axum::serve(listener, app).await.unwrap();
综上:Router 实现了 Service<IncomingStream<’_>> 它的 Response 为 Router 类型,该 Router 又实现了 Service<Request<B>>,所以 Router 满足 axum::serve() 的 make_service 参数的 M 限界要求。
Router 虽然实现了 Service<Request<B>> 即 Service<http::request::Request<B: http_body::Body<Data=bytes.Bytes>>
, 但是直接调用 tower::ServiceExt 的方法会报错,解决办法是使用 Router 的 pub fn as_service<B>(&mut self) -> RouterAsService<'_, B, S>
方法:
use axum::{
Router,
routing::get,
http::Request,
body::Body,
};
use tower::{Service, ServiceExt};
let mut router = Router::new().route("/", get(|| async {}));
let request = Request::new(Body::empty());
// let response = router.ready().await?.call(request).await?;
// ^^^^^ cannot infer type for type parameter `B`
// OK
let response = router.as_service().ready().await?.call(request).await?;
另外,Router 还提供了 into_make_service()/into_make_service_with_connect_info()
等方法来创建一个实现 MakeService trait 的类型:
// Convert this router into a MakeService, that is a Service whose response is another service.
pub fn into_make_service(self) -> IntoMakeService<Self> // 注意泛型参数 Self 为 Router 类型
pub fn into_make_service_with_connect_info<C>(self) -> IntoMakeServiceWithConnectInfo<Self, C>
// S 为 Router 类型时,Response 也为 Router 类型,而 Router 实现了 MakeService
// trait. 所以 IntoMakeService 实现了 Service 和 MakeService trait。
impl<S, T> Service<T> for IntoMakeService<S> where S: Clone
type Response = S
type Error = Infallible
type Future = IntoMakeServiceFuture<S>
use axum::{
routing::get,
Router,
};
let app = Router::new().route("/", get(|| async { "Hi!" }));
let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await.unwrap();
axum::serve(listener, app.into_make_service()).await.unwrap();
use axum::{
extract::ConnectInfo,
routing::get,
Router,
};
use std::net::SocketAddr;
let app = Router::new().route("/", get(handler));
// 从 request extension 中提取 ConnectInfo(默认包含 client remote address)
async fn handler(ConnectInfo(addr): ConnectInfo<SocketAddr>) -> String {
format!("Hello {addr}")
}
let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await.unwrap();
// 在 request extention 中设置 ConnectInfo
axum::serve(listener, app.into_make_service_with_connect_info::<SocketAddr>()).await.unwrap();
2 Layer #
Layer trait 为传入的 Service 添加处理逻辑,返回一个 新 Service
。
Layer 的主要使用场景实现可复用的 HTTP 中间件。
pub trait Layer<S> {
type Service; // 返回的 Service 类型
// 传入的 S 是任意类型,返回一个新的 Service 类型
fn layer(&self, inner: S) -> Self::Service;
}
tower::layer::layer_fn()
支持使用闭包来创建一个实现 Layer trait 的 LayerFn<T>
类型对象,闭包函数的输入/输出是 Service 类型值:
pub fn layer_fn<T>(f: T) -> LayerFn<T>
// LayerFn<T> 实现了 Layer<S>
impl<F, S, Out> Layer<S> for LayerFn<F> where F: Fn(S) -> Out
type Service = Out
// A middleware that logs requests before forwarding them to another service
pub struct LogService<S> {
target: &'static str,
service: S,
}
impl<S, Request> Service<Request> for LogService<S>
where
S: Service<Request>,
Request: fmt::Debug,
{
type Response = S::Response;
type Error = S::Error;
type Future = S::Future;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.service.poll_ready(cx)
}
fn call(&mut self, request: Request) -> Self::Future {
// Log the request
println!("request = {:?}, target = {:?}", request, self.target);
self.service.call(request)
}
}
// A `Layer` that wraps services in `LogService`
let log_layer = layer_fn(|service| { // layer_fn() 的闭包的输入和输出都是 Service
LogService {
service,
target: "tower-docs",
}
});
// 使用 service_fn 和闭包,创建另一个 Service,然后使用 Layer::layer() 封装返回一个新
let uppercase_service = tower::service_fn(|request: String| async move {
Ok::<_, Infallible>(request.to_uppercase())
});
// Wrap our service in a `LogService` so requests are logged.
let wrapped_service = log_layer.layer(uppercase_service);
一般在实现 Layer trait 的同时,也需要实现 Service 作为 Layer::layer() 的返回类型,所以需要自定义两个类型:
- 一个是 Layer 类型,另一个是 Layer 中关联的 Service 类型;
// ConcurrencyLimitLayer Layer 返回一个关联的 ConcurrencyLimit Service
impl<S> Layer<S> for ConcurrencyLimitLayer
type Service = ConcurrencyLimit<S>
Layer 示例:
pub struct LogLayer {
target: &'static str,
}
impl<S> Layer<S> for LogLayer {
type Service = LogService<S>;
fn layer(&self, service: S) -> Self::Service {
LogService {
target: self.target,
service
}
}
}
// This service implements the Log behavior
pub struct LogService<S> {
target: &'static str,
service: S,
}
impl<S, Request> Service<Request> for LogService<S>
where S: Service<Request>, Request: fmt::Debug,
{
type Response = S::Response;
type Error = S::Error;
type Future = S::Future;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.service.poll_ready(cx)
}
fn call(&mut self, request: Request) -> Self::Future {
// Insert log statement here or other functionality
println!("request = {:?}, target = {:?}", request, self.target);
self.service.call(request)
}
}
tower crate 提供了提供了多种 Layer 和对应 Service trait 的实现,它们位于各自的 module 中,如:
balance
:在多个 Service 中负载均衡;buffer
: 为 Service 提供一个缓冲的 mpsc channel;builder
:提供 Builder 类型来组合 Layer 和 Servicediscover
:服务发现filter
:基于 predicate 来分发请求给内部 Servicehedge
:当 latency 比例超过阈值时,自动重发请求;layer
:A collection of Layer based tower serviceslimit
:限速load
:服务 Load 测量load_shed
:Service 不 ready 时服务降级;make
:Trait aliases for Services that produce specific types of Responses.ready_cache
:Service Cachereconnect
:服务失败时自动重连retry
:重试失败请求spawn_ready
:当 Service 不 Ready 时,在后台启动一个 poll 任务steer
:用于在 Service 间路由请求;timeout
:为请求添加 timeoututil
:各种在 Tower 中使用的工具类型和函数
use tower::balance::p2c::Balance;
use tower::load::Load;
use tower::{Service, ServiceExt};
use futures_util::pin_mut;
async fn spread<Req, S: Service<Req> + Load>(svc1: S, svc2: S, reqs: impl Stream<Item = Req>)
where
S::Error: Into<tower::BoxError>,
S::Metric: std::fmt::Debug,
{
// Spread load evenly across the two services
let p2c = Balance::new(tower::discover::ServiceList::new(vec![svc1, svc2]));
// Issue all the requests that come in.
// Some will go to svc1, some will go to svc2.
pin_mut!(reqs);
let mut responses = p2c.call_all(reqs);
while let Some(rsp) = responses.next().await {
// ...
}
}
use tower::buffer::Buffer;
use tower::{Service, ServiceExt};
async fn mass_produce<S: Service<usize>>(svc: S)
where
S: 'static + Send,
S::Error: Send + Sync + std::error::Error,
S::Future: Send
{
let svc = Buffer::new(svc, 10 /* buffer length */);
for _ in 0..10 {
let mut svc = svc.clone();
tokio::spawn(async move {
for i in 0usize.. {
svc.ready().await.expect("service crashed").call(i).await;
}
});
}
}
3 ServiceBuilder #
使用 tower::ServiceBuilder struct
可以创建一个包含多个 Layer 的 Service 类型,后续按照在 ServiceBuilder 中添加的 Layer 顺序来对请求进行先后处理。
- layer()/option_layer(): 添加一个通用的 layer;
- layer_fn(): 使用传入的闭包创建和添加一个 layer;
- service()/service_fn(): 最后添加一个 Service,返回一个新 Service;
ServiceBuidler 也提供了 buff()/retry()/timeout() 等方法, 用于 快速集成
tower::buffer::BufferLayer 等 Layer 中间件:
注:layer()/option_layer()/layer_fn() 和 Layer 中间件方法返回的 都是 Layer 类型
,而当调用 service()/service_fn() 后则 返回 Service
。
impl ServiceBuilder<Identity>
pub fn new() -> Self
impl<L> ServiceBuilder<L>
// 添加通用 layer
pub fn layer<T>(self, layer: T) -> ServiceBuilder<Stack<T, L>>
pub fn option_layer<T>( self, layer: Option<T> ) -> ServiceBuilder<Stack<Either<T, Identity>, L>>
pub fn layer_fn<F>(self, f: F) -> ServiceBuilder<Stack<LayerFn<F>, L>>
// 添加 Service,返回新的 Service
pub fn service<S>(&self, service: S) -> L::Service where L: Layer<S>
pub fn service_fn<F>(self, f: F) -> L::Service where L: Layer<ServiceFn<F>>
// 集成 tower 提供的中间件 Layer, 如 tower::buffer::BufferLayer
pub fn buffer<Request>( self, bound: usize ) -> ServiceBuilder<Stack<BufferLayer<Request>, L>>
pub fn concurrency_limit( self, max: usize ) -> ServiceBuilder<Stack<ConcurrencyLimitLayer, L>>
pub fn load_shed(self) -> ServiceBuilder<Stack<LoadShedLayer, L>>
pub fn rate_limit( self, num: u64, per: Duration ) -> ServiceBuilder<Stack<RateLimitLayer, L>>
pub fn retry<P>(self, policy: P) -> ServiceBuilder<Stack<RetryLayer<P>, L>>
pub fn timeout( self, timeout: Duration) -> ServiceBuilder<Stack<TimeoutLayer, L>>
pub fn filter<P>(self, predicate: P) -> ServiceBuilder<Stack<FilterLayer<P>, L>>
pub fn filter_async<P>( self, predicate: P ) -> ServiceBuilder<Stack<AsyncFilterLayer<P>, L>>
pub fn map_request<F, R1, R2>( self, f: F ) -> ServiceBuilder<Stack<MapRequestLayer<F>, L>> where F: FnMut(R1) -> R2 + Clone
pub fn map_response<F>( self, f: F) -> ServiceBuilder<Stack<MapResponseLayer<F>, L>>
pub fn map_err<F>(self, f: F) -> ServiceBuilder<Stack<MapErrLayer<F>, L>>
pub fn map_future<F>(self, f: F) -> ServiceBuilder<Stack<MapFutureLayer<F>, L>>
pub fn then<F>(self, f: F) -> ServiceBuilder<Stack<ThenLayer<F>, L>>
pub fn and_then<F>(self, f: F) -> ServiceBuilder<Stack<AndThenLayer<F>, L>>
pub fn map_result<F>(self, f: F) -> ServiceBuilder<Stack<MapResultLayer<F>, L>>
//...
示例:
use tower::ServiceBuilder;
use tower::ServiceExt;
// Suppose we have some `Service` whose request type is `String`:
let string_svc = tower::service_fn(|request: String| async move {
println!("request: {}", request);
Ok(())
});
// ...but we want to call that service with a `usize`. What do we do?
let usize_svc = ServiceBuilder::new()
// Add a middlware that converts the request type to a `String`:
.map_request(|request: usize| format!("{}", request)) // 返回一个 Layer
// ...and wrap the string service with that middleware:
.service(string_svc); // 返回一个 Service
// Now, we can call that service with a `usize`:
usize_svc.oneshot(42).await?;
ServiceBuilder 实现了 Layer
, 可以将多个 Layer 组合起来形成一个新 Layer
// 如果要给 Router 添加多个 layer middleware,推荐使用 tower::ServiceBuilder
use axum::{
routing::get,
Extension,
Router,
};
use tower_http::{trace::TraceLayer};
use tower::ServiceBuilder;
async fn handler() {}
#[derive(Clone)]
struct State {}
let app = Router::new()
.route("/", get(handler))
.layer(
ServiceBuilder::new()
.layer(TraceLayer::new_for_http())
.layer(Extension(State {}))
);
ServiceBuilder 没有实现 Service
, 需要通过 service()/service_fn() 来创建 Service :
use std::time::Duration;
use tower::{ServiceBuilder, ServiceExt, BoxError, service_fn};
async fn handle(request: &'static str) -> Result<&'static str, BoxError> {
Ok(request)
}
// 返回一个 Service,内部先后经过 layer 处理:buffer,timeout
let svc = ServiceBuilder::new()
.buffer(1024) // 添加 buffer layer
.timeout(Duration::from_secs(10)) // 添加 timeout layer
.service_fn(handle); // 通过闭包函数创建一个 Service,然后返回一个新的 Service
let response = svc.oneshot("foo").await?;
assert_eq!(response, "foo");
4 tower-http #
tower::buffer 等 Layer 中间件是 Request/Response 类型无关的
。但 tower-http crate 提供了 HTTP 相关的中间件和工具,它们都使用 http 和 http-body crate 中定义的 HTTP
Request/Response 类型:
add_extension
: Middleware that clones a value into each request’s extensions.auth
: Authorization related middleware.body
: Body types.catch_panic
: Convert panics into responses.classify
: Tools for classifying responses as either success or failure.compression
:提供了compression-br
orcompression-deflate
orcompression-gzip
orcompression-zstd
:Middleware that compresses response bodies.cors
:Middleware which adds headers for CORS.decompression
:decompression-br
ordecompression-deflate
ordecompression-gzip
ordecompression-zstd
:Middleware that decompresses request and response bodies.follow_redirect
: Middleware for following redirections.limit
: Middleware for limiting request bodies.map_request_body
: Apply a transformation to the request body.map_response_body
: Apply a transformation to the response body.metrics
: Middlewares for adding metrics to services.normalize_path
: Middleware that normalizes paths.propagate_header
: Propagate a header from the request to the response.request_id
: Set and propagate request ids.sensitive_headers
: Middlewares that mark headers as sensitive.services
: Services that return responses without wrapping other Services.set_header
: Middleware for setting headers on requests and responses.set_status
: Middleware to override status codes.timeout
: Middleware that applies a timeout to requests.trace
: Middleware that adds high level tracing to a Service.validate_request
: Middleware that validates requests.
这些中间件都有两种类型组成,它们分别实现了 Layer 和 Service trait:
例如下面的 S 是 Service<Request<ReqBody>, Response = Response<ResBody>> 类型:
- Serivcehttp::request::Request%3CReqBody, Response = http::response::Response%3CResBody%3E> 类型
- 即 Service 的输入和输出都是 http crate 定义的
Request/Response
类型;
pub struct AddAuthorizationLayer { /* private fields */ }
impl<S> Layer<S> for AddAuthorizationLayer
type Service = AddAuthorization<S> // 关联的是实现 Service 的 AddAuthorization 类型
pub struct AddAuthorization<S> { /* private fields */ }
impl<S, ReqBody, ResBody> Service<Request<ReqBody>> for AddAuthorization<S>
where S: Service<Request<ReqBody>, Response = Response<ResBody>>
type Response = <S as Service<Request<ReqBody>>>::Response // Responses given by the service.
type Error = <S as Service<Request<ReqBody>>>::Error // Errors produced by the service.
type Future = <S as Service<Request<ReqBody>>>::Future // The future response value.
tower_http 同时提供了 ServiceBuildExt trait
,它为 tower::ServiceBuilder 扩展了一些将上面中间件快速集成的方法:
pub trait ServiceBuilderExt<L>: Sealed<L> + Sized {
// Required methods
fn propagate_header( self, header: HeaderName ) -> ServiceBuilder<Stack<PropagateHeaderLayer, L>>;
fn add_extension<T>( self, value: T ) -> ServiceBuilder<Stack<AddExtensionLayer<T>, L>>;
fn map_request_body<F>( self, f: F ) -> ServiceBuilder<Stack<MapRequestBodyLayer<F>, L>>;
fn map_response_body<F>( self, f: F ) -> ServiceBuilder<Stack<MapResponseBodyLayer<F>, L>>;
fn compression(self) -> ServiceBuilder<Stack<CompressionLayer, L>>;
fn decompression(self) -> ServiceBuilder<Stack<DecompressionLayer, L>>;
fn trace_for_http( self ) -> ServiceBuilder<Stack<TraceLayer<HttpMakeClassifier>, L>>;
fn trace_for_grpc( self ) -> ServiceBuilder<Stack<TraceLayer<GrpcMakeClassifier>, L>>;
fn follow_redirects( self ) -> ServiceBuilder<Stack<FollowRedirectLayer<Standard>, L>>;
// ...
}
以 trace 为例,添加了该 middleware 后,tower/axum 等才会自动打印请求相关的 trace log:
use http::{Request, Response};
use tower::{ServiceBuilder, ServiceExt, Service};
use tower_http::trace::TraceLayer;
use std::convert::Infallible;
use http_body_util::Full;
use bytes::Bytes;
// Request 是 http::Request, Response 是 http::Resonse, Full<Bytes> 实现了 http_body::Body trait
async fn handle(request: Request<Full<Bytes>>) -> Result<Response<Full<Bytes>>, Infallible> {
Ok(Response::new(Full::default()))
}
tracing_subscriber::fmt::init(); // 全局使用 fmt subscriber 缺省实现
let mut service = ServiceBuilder::new()
.layer(TraceLayer::new_for_http())
// 或者使用 ServiceBuilderExt 提供的 Layer 集成方法
// .trace_for_http()
.service_fn(handle);
let request = Request::new(Full::from("foo"));
let response = service
.ready()
.await?
.call(request)
.await?;
// 运行:RUST_LOG=tower_http=trace cargo run
// 打印的日志:
// Mar 05 20:50:28.523 DEBUG request{method=GET path="/foo"}: tower_http::trace::on_request: started processing request
// Mar 05 20:50:28.524 DEBUG request{method=GET path="/foo"}: tower_http::trace::on_response: finished processing request latency=1 ms status=200
tower_http crate 和 tower crate 提供了大量 Layer 实现,它们在 axum crate 中得到广泛使用,例如 axum Router 的 layer() 方法可以添加 Layer。axum Router 的 layer() 按照添加的逆序先后执行
。但是 towner::ServiceBuidler 添加的多个 layer,按照添加的顺序先后执行,所以更建议使用 ServiceBuilder。
// auxm::router::Router 的 layer() 方法
pub fn layer<L>(self, layer: L) -> Router<S>
where
L: Layer<Route> + Clone + Send + 'static,
L::Service: Service<Request> + Clone + Send + 'static,
<L::Service as Service<Request>>::Response: IntoResponse + 'static,
<L::Service as Service<Request>>::Error: Into<Infallible> + 'static,
<L::Service as Service<Request>>::Future: Send + 'static,
use axum::{routing::get, Router};
use tower_http::{trace::TraceLayer, compression::CompressionLayer};
let with_tracing = Router::new()
.route("/foo", get(|| async {}))
.route("/bar", get(|| async {}))
.layer(TraceLayer::new_for_http()); // 对整个 Router 生效
let with_compression = Router::new()
.route("/bar", get(|| async {}))
.layer(CompressionLayer::new());
let app = Router::new()
.merge(with_tracing)
.merge(with_compression);
// 也可以为 MethodRouter 添加 Layer,仅适合该 Method
use axum::{routing::get, Router};
use tower::limit::ConcurrencyLimitLayer;
async fn handler() {}
let app = Router::new().route( "/", get(handler).layer(ConcurrencyLimitLayer::new(64)),);
// 如果要给 Router 添加多个 layer middleware,推荐使用 tower::ServiceBuilder
use axum::{routing::get, Extension, Router,};
use tower_http::{trace::TraceLayer};
use tower::ServiceBuilder;
async fn handler() {}
#[derive(Clone)]
struct State {}
let app = Router::new()
.route("/", get(handler))
.layer(
ServiceBuilder::new() // 按照添加的 layer 顺序先后执行
.layer(TraceLayer::new_for_http())
.layer(Extension(State {}))
);