跳过正文

tracing

··26237 字
Rust Rust-Crate
目录
rust crate - 这篇文章属于一个选集。
§ 7: 本文

核心概念:

  1. Span :记录程序的执行流程,有 enter 和 exit 操作,当 enter 后,当前线程会关联一个 span context,所有该 context 中记录的 event 都属于该 span。

           use tracing::{span, Level};
           // 必选参数:Level 和 Span id(name)
           let span = span!(Level::TRACE, "my_span");
    
           // enter() 返回一个 RAII guard,当它被 drop 时,span 自动退出。
           let _enter = span.enter();
           // 在 span 上下文记录 event。
    
  2. Events :Event 表示特定时刻的事件,通常位于 span context 中(非必须)。

           use tracing::{event, span, Level};
    
           // Event 可以位于 span context 之外
           event!(Level::INFO, "something happened");
    
           let span = span!(Level::INFO, "my_span");
           let _guard = span.enter();
           // 在 my_span context 中记录一个 event。
           event!(Level::DEBUG, "something happened inside my_span");
    
  3. Subscribers :当 span 和 event 发生时,它们被实现 Subscriber trait 的对象所记录或

聚合。Subscribers 提供了 enabled()/event()/enter()/exit() 等方法。

1 span!/event!()
#

使用 span!() 来创建特定 Level 和 id 的 span,然后调用它的 enter() 方法来创建一个 span context,后续该 span 对象被 drop 前,所有 event 都属于该 span。

  • span 的 enter() 可以嵌套, 子 span 打印 event 时会自动显示父 span;
  • span 的 fields 会自动被其中的 event 继承;
  • event 和 span 都需要指定 Level 级别;
use tracing::{span, Level};

let span = span!(Level::TRACE, "my span");
let _enter = span.enter();
// Any trace events that occur before the guard is dropped will occur
// within the span.  Dropping the guard will exit the span.

event 可以没有 span ,例如在创建和 enter span 前创建的 event,这时 target span 默认为所在的 crate::module。event 可以使用 target: "span_name"parent: &span 来指定父 span。

tracing::error_span!("myerrorspan", ?s);
tracing::error!(target: "myerrorspan", ?s, s.field_a, "just a debug message2");

对于自定义函数,可以使用 #[instrument] 属性宏来自动创建 span,它使用函数名作为 span name, 函数参数将作为 span 的 fields。函数内的 event 自动位于这个 span context 中。

  • skip():要去掉的参数名;
  • fields(): 指定要新加的 field=value;

由于 #[instrument] 默认将所有函数参数添加到日志输出,对于 HTTP Handler 函数来说,可能包含敏感信息(如用户提交的表单或 Body 数据等),可以使用 secrecy crate 来解决这个问题(它通过重新定义嵌入的字段类型的 Debug trait 来实现隐藏数据输出)。

use tracing::{Level, event, instrument};

#[instrument]
pub fn my_function(my_arg: usize) {
    // 这个 event 位于 my_function span 下,具有 my_arg field
    event!(Level::INFO, "inside my_function!");
    // ...
}

#[instrument(level="debug", target="this_crate::some_span", name="my_instrumented_span")]
async fn do_something_async() {
    // do some work
}

#[instrument(fields(http.uri = req.uri(), http.method = req.method()))]
pub fn handle_request<B>(req: http::Request<B>) -> http::Response<B> {
    // ... handle the request ...
}

// instrument 支持丰富的配置参数
impl Handler {
    #[instrument(
        name = "Handler::run",
        skip(self),
        fields(
            // `%` serializes the peer IP addr with `Display`
            peer_addr = %self.connection.peer_addr().unwrap()
        ),
    )]
    async fn run(&mut self) -> mini_redis::Result<()> {
        //...
    }
}

//! src/routes/subscriptions.rs
// [...]
#[tracing::instrument(
    name = "Adding a new subscriber",
    skip(form, pool),
    fields(
        request_id = %Uuid::new_v4(),
        subscriber_email = %form.email,
        subscriber_name= %form.name
    )
)]
pub async fn subscribe(form: web::Form<FormData>, pool: web::Data<PgPool>,) -> HttpResponse {
    match insert_subscriber(&pool, &form).await
    {
        Ok(_) => HttpResponse::Ok().finish(),
        Err(_) => HttpResponse::InternalServerError().finish()
    }
}

对于不能使用 #[instrument] 的第三方函数,可以使用 span 的 in_scope() 方法:

use tracing::info_span;
let json = info_span!("json.parse").in_scope(|| serde_json::from_slice(&buf))?;

使用 event!() 宏来记录 event,level 是必须的,但 message 是可选的:

use tracing::{event, Level};

// level 和 message, 字符串字面量默认为 message,故 message key 可以忽略。
event!(Level::INFO, "something has happened!");
// 等效为
event!(Level::INFO, message = "something has happened!")

event!(Level::INFO, key = "value") // 没有 message(为空)

span 和 event 都需要指定 Level,其它可选:

  1. message:可选;
  2. name: 默认为 ;
  3. target:默认为 module path,如 mycreate::my_module;
  4. parent:current span;
// span 必选: Level 和 span name/id
let span = span!(Level::TRACE, "my span");

// event:Level 必选, message 可选
event!(Level::INFO, "something has happened!");
event!(Level::INFO);  // 没有 message

// event 可选:parent span
event!(parent: &span, Level::INFO, "something has happened!");

// event 可选:target
event!(target: "app_events", Level::INFO, "something has happened!");

// event 可选:name,默认为 file:line
event!(name: "some_info", Level::INFO, "something has happened!");

span 和 event 还可以使用逗号分割的 field_name=field_value 来指定自定义属性:

  • span 的 field 都会被 event 继承;
// 对于 span, 必须在 span name/id 后添加 field=value
tracing::error_span!("myerrorspan", ?s);
// tracing::error_span!(?s, "myerrorspan"); // 错误

// 对于 event, 分两种情况:
// 1. 如果有 message, 自定义 field 都必须位于 message 之前;
// 2. 如果没有 message, 则可以使用 field=value 形式来定义任意 field.
tracing::error!(target: "myerrorspan", ?s, s.field_a, "just a debug message2");
tracing::error!(target: "myerrorspan", ?s, s.field_a, a = "b");  // 没有 message

// 在 enter _span drop 前, 后续的 event 都自动关联该 span
let _span = tracing::error_span!("my_enter_span", ?s).entered();
tracing::error!(?s, s.field_a, a = "c");

let user = "ferris";
span!(Level::TRACE, "login", user);
// 等效于
span!(Level::TRACE, "login", user = user);

let user = "ferris";
let email = "[email protected]";
// field name 中间可以有 .
span!(Level::TRACE, "login", user, user.email = email);

let user = User { name: "ferris", email: "[email protected]", };
// span 具有两个 fields `user.name = "ferris"` 和 `user.email = "[email protected]"`.
span!(Level::TRACE, "login", user.name, user.email);

value 名称前使用 ?和 % 来指定 Debug 或 Display 显示方式:

#[derive(Debug)]
struct MyStruct { field: &'static str, }
let my_struct = MyStruct { field: "Hello world!"};

// ?使用 Debug
event!(Level::TRACE, greeting = ?my_struct);
// 等效于
event!(Level::TRACE, greeting = tracing::field::debug(&my_struct));

// % 使用 Display
event!(Level::TRACE, greeting = %my_struct.field);
// 等效于
event!(Level::TRACE, greeting = tracing::field::display(&my_struct.field)

// ?和 % 也可以用在 key 名称前
event!(Level::TRACE, %my_struct.field);

如果为 span field 指定特殊的 tracing::field::Empty 值,则后续可以再设置:

use tracing::{trace_span, field};

// 创建一个 span, 具有两个 field,其中 greeting 具有值,而 parting
// 需要后续设置值
let span = trace_span!("my_span", greeting = "hello world", parting = field::Empty);

// 后续使用 span.record() 方法为 Empty field 指定具体的值
span.record("parting", &"goodbye world!");

event message 可以使用 format 字符串(span name/id 不支持格式化),这也是 event message 必须位于 key=value 后面的原因:后续的参数是 format 字符串的参数:

let question = "the ultimate question of life, the universe, and everything";
let answer = 42;
event!(
    Level::DEBUG,
    question.answer = answer,
    question.tricky = true,
    "the answer to {} is {}.", question, answer  // message
);

为了方便创建指定 Level 的 span 和 event, 可以使用带 level 的特殊宏, 如:

  • event : trace!, debug!, info!, warn!, error!
  • span : trace_span!, debug_span!, info_span!, warn_span!, error_span!

对于 span 的 enter/exit event,默认在 trace 级别才打印(-> 和 <-),可以设置 RUST_LOG=trace cargo run 来查看。

[.. INFO zero2prod] Adding a new subscriber.; request_id=f349b0fe..
[email protected] subscriber_name=le guin
[.. TRACE zero2prod] -> Adding a new subscriber.
[.. INFO zero2prod] request_id f349b0fe.. - Saving new subscriber details
in the database
[.. INFO zero2prod] request_id f349b0fe.. - New subscriber details have
been saved
[.. TRACE zero2prod] <- Adding a new subscriber.
[.. TRACE zero2prod] -- Adding a new subscriber.
[.. INFO actix_web] .. "POST /subscriptions HTTP/1.1" 200 .

2 metadata
#

span 和 event 都带有 Metadata 类型信息:

impl<'a> Metadata<'a>

pub const fn new(
    name: &'static str, // 缺省:file:line
    target: &'a str, // 缺省:module path
    level: Level,
    fields: FieldSet,
    kind: Kind // 可选值为 EVENT,SPAN,HINT
    // 下面可选的 metadata 指定源码位置:
    file: Option<&'a str>,
    line: Option<u32>,
    module_path: Option<&'a str>,
) -> Metadata<'a>

Subscriber::enable() 方法可以使用 span 或 event 关联的 Metadata 信息来对它们进行过滤。

3 log crate 互操作
#

创建 event 的 trace!, debug!, info! 等宏名称和 log crate 提供的记录日志的宏名称相同, 可以直接替换使用, tracing 的 event 包含了更丰富的结构化信息。

use std::{error::Error, io};
use tracing::{debug, error, info, span, warn, Level};

#[tracing::instrument]
pub fn shave(yak: usize) -> Result<(), Box<dyn Error + 'static>> {
    debug!(excitement = "yay!", "hello! I'm gonna shave a yak.");
    if yak == 3 {
        warn!("could not locate yak!");
        return Err(io::Error::new(io::ErrorKind::Other, "shaving yak failed!").into());
    } else {
        debug!("yak shaved successfully");
    }
    Ok(())
}

pub fn shave_all(yaks: usize) -> usize {
    let _span = span!(Level::TRACE, "shaving_yaks", yaks).entered();

    info!("shaving yaks");

    let mut yaks_shaved = 0;
    for yak in 1..=yaks {
        let res = shave(yak);
        debug!(yak, shaved = res.is_ok());

        if let Err(ref error) = res {
            error!(yak, error = error.as_ref(), "failed to shave yak!");
        } else {
            yaks_shaved += 1;
        }
        debug!(yaks_shaved);
    }
    yaks_shaved
}

tracing crate 支持与 log crate 互操作:

  1. tracing 可以 emit log crate 消费的 log records;
  2. tracing subscribers 也可以将 log crate emit 的 log records 当作 tracing events 来消费 (需要使用 tracing-log crate);

生成 log records:tracing 将 span 和 event 转换为 log record。需要配置如下两个 feature:

  • log feature: 在 没有激活 tracing Subscriber 的情况下将 tracing event/span 转换为 log record;
  • log-always feature: 即使激活了 tracing Subscriber, 也将 tracing event/span 转换为 log record;

生成的 log record 包含 span/event 的 fileds 和 metadata(如 target,level, module path,file,line 和 number 等)。而且 span 的 entered/exited/close() 也会创建 log record,它们的 log target 为 tracing::span。

消费 log records:=tracing-log crate= 提供了兼容层,它让 tracing subscriber 使用 tracing event 来消费和记录 log records 。适合于没有使用 tracing event 而使用 log crate 的 APIs 来记录日志的库(特别是一些同步库),否则会导致这些日志记录不被输出打印。

//! src/main.rs
//! [...]
use tracing::subscriber::set_global_default;
use tracing_bunyan_formatter::{BunyanFormattingLayer, JsonStorageLayer};

// path 是标识符,不支持短横杠,需要将 cargo package name 的短横杠转为下划线。
use tracing_subscriber::{layer::SubscriberExt, EnvFilter, Registry};
use tracing_log::LogTracer;

#[tokio::main]
async fn main() -> std::io::Result<()> {
    // Redirect all `log`'s events to our subscriber
    LogTracer::init().expect("Failed to set logger");

    let env_filter = EnvFilter::try_from_default_env()
        .unwrap_or_else(|_| EnvFilter::new("info"));

    let formatting_layer = BunyanFormattingLayer::new(
        "zero2prod".into(),
        std::io::stdout
    );

    let subscriber = Registry::default()
        .with(env_filter)
        .with(JsonStorageLayer)
        .with(formatting_layer);

    set_global_default(subscriber).expect("Failed to set subscriber");
    // [...]
}

4 subscriber
#

tracing::subscriber::Subscriber trait 定义了收集 trace/event 的函数接口:

  • enabled() : 根据 Metadata 来判断是否要记录该 record;
  • new_span(): 根据 Attributes 返回一个 span ID;
  • enter(): 开启一个新的 span;
  • exit(): 退出(完成)一个 span;

tracing crate 并没有提供该 trait 的实现,而是由其它 crate 来实现,如 tracing-subscriber crate。

pub trait Subscriber: 'static {
    // Required methods
    fn enabled(&self, metadata: &Metadata<'_>) -> bool;
    fn new_span(&self, span: &Attributes<'_>) -> Id;
    fn record(&self, span: &Id, values: &Record<'_>);
    fn record_follows_from(&self, span: &Id, follows: &Id);
    fn event(&self, event: &Event<'_>);
    fn enter(&self, span: &Id);
    fn exit(&self, span: &Id);

    // Provided methods
    fn on_register_dispatch(&self, subscriber: &Dispatch) { ... }
    fn register_callsite(&self, metadata: &'static Metadata<'static> ) -> Interest { ... }
    fn max_level_hint(&self) -> Option<LevelFilter> { ... }
    fn event_enabled(&self, event: &Event<'_>) -> bool { ... }
    fn clone_span(&self, id: &Id) -> Id { ... }
    fn drop_span(&self, _id: Id) { ... }
    fn try_close(&self, id: Id) -> bool { ... }
    fn current_span(&self) -> Current { ... }
    unsafe fn downcast_raw(&self, id: TypeId) -> Option<*const ()> { ... }
}

tracing::subscriber::set_default()/set_global_default()/with_default() 方法用于设置当前线程或全局的 subscriber(一般由 subscriber 的 init() 等方法自动设置):

  • set_default():为 当前线程 设置缺省的 Subscribe 实现;
  • set_global_default():为程序 所有线程 设置缺省的 Subscribe 实现;
  • with_default() :为闭包代码指定使用的缺省 Subscribe 实现;
// FooSubscriber 实现了 Subscriber
let my_subscriber = FooSubscriber::new();

// 全局 subscriber
tracing::subscriber::set_global_default(my_subscriber)
    .expect("setting tracing default failed");

// 局部闭包 Subscribe,可以按需创建多个 subscriber,分别来使用。
tracing::subscriber::with_default(my_subscriber, || {
    // 闭包内的 trace/event 将被 my_subscriber 记录
})

set_global_default() 的底层创建一个 tracing::dispatcher::Dispatch 对象,然后设置它为全局 dispatcher:

pub fn set_global_default<S>(subscriber: S) -> Result<(), SetGlobalDefaultError>
    where S: Subscriber + Send + Sync + 'static,
{
    crate::dispatcher::set_global_default(crate::Dispatch::new(subscriber))
}

tracing::dispatcher::Dispatch 对象负责发送 trace 数据给 Subscriber 实现

  • set_default():为 当前线程 设置缺省的 Dispatch;
  • set_global_default():为 程序所有线程 设置缺省的 Dispatch;
  • with_default() :为闭包代码指定使用的缺省 Dispatch;
  • get_default():返回当前线程使用的 Dispatch;
pub struct Dispatch { /* private fields */ }
// 从 Subscriber 实现来创建 Dispatch
pub fn new<S>(subscriber: S) -> Dispatch where S: Subscriber + Send + Sync + 'static,

// 创建一个 dispatch
use dispatcher::Dispatch;
let my_subscriber = FooSubscriber::new();
let my_dispatch = Dispatch::new(my_subscriber);

// 使用方式 1: 为闭包函数设置缺省 Subscribe
dispatcher::with_default(&my_dispatch, || {
    // my_subscriber is the default
});

// 使用方式 2: 为全局所有线程设置缺省 Subscribe
dispatcher::set_global_default(my_dispatch)
    .expect("global default was already set!");
// `my_subscriber` is now the default

// 使用方式 3: 为当前线程设置缺省 Subscribe
dispatcher::set_default(my_dispatch)
    .expect("default was already set!");

5 tracing-subscriber crate
#

tracing-subscriber crate 的 Registry 和 fmt::Subscriber(别名 FmtSubscriber) struct 类型实现了 tracing::Subscriber trait

  • FmtSubscriber:提供通用的格式化、日志级别过滤输出机制,支持灵活的配置;
  • Registry:可以创建和组合各种 Layer 对象来实现更复杂、更灵活的定制,如 opentelemetry 集成;

5.1 fmt subscriber
#

fmt::Subscriber (别名:FmtSubscriber) 的 new() 函数返回默认配置的 fmt Subscriber:

  • 使用 Full 输出格式;
  • 支持基于环境变量的 RUST_LOG 的 LevelFilter
  • 默认输出到 stdout;
  • 默认输出 INFO 级别及以上的日志;

自定义日志级别:RUST_LOG 环境变量或 SubscriberBuilder::with_max_level() 方法。

// tracing-subscriber::fmt::Subscriber 类型实现了 tracing::Subscriber trait
pub struct Subscriber<N = DefaultFields, E = Format<Full>, F = LevelFilter,
    W = fn() -> Stdout> { /* private fields */ }

impl Subscriber {
    // ...
    pub fn new() -> Self {
        Default::default()
    }
}
impl Default for Subscriber {
    fn default() -> Self {
        SubscriberBuilder::default().finish()
    }
}

pub const DEFAULT_MAX_LEVEL: LevelFilter = LevelFilter::INFO;
pub struct SubscriberBuilder<
    N = format::DefaultFields,
    E = format::Format<format::Full>,
    F = LevelFilter,
    W = fn() -> io::Stdout,
    > {
        filter: F,
        inner: Layer<Registry, N, E, W>,
    }

impl Default for SubscriberBuilder {
    fn default() -> Self {
        SubscriberBuilder {
            filter: Subscriber::DEFAULT_MAX_LEVEL, // INFO
            inner: Default::default(),
        }
            .log_internal_errors(true)
    }
}

// 示例
use tracing_subscriber;

#[tokio::main]
pub async fn main() -> mini_redis::Result<()> {
    // 创建缺省的 fmt::Subscriber
    let subscriber = tracing_subscriber::FmtSubscriber::new();
    // 设置为全局 subscriber
    tracing::subscriber::set_global_default(subscriber)?;

    // 也可以调用 fmt::init() 一步完成。
    // tracing_subscriber::fmt::init();

    //...
}

tracing_subscriber::fmt() 返回 tracing_subscriber::fmt::SubscriberBuilder 类型对象,可以用来精细配置 fmt Subscriber 的参数,如过滤、格式化和输出。

//  对 tracing_subscriber::fmt::SubscriberBuilder 进行更精细化的配置
let subscriber = tracing_subscriber::fmt()
    // Use a more compact, abbreviated log format
    .compact()
    // Display source code file paths
    .with_file(true)
    // Display source code line numbers
    .with_line_number(true)
    // Display the thread ID an event was recorded on
    .with_thread_ids(true)
    // Don't display the event's target (module path)
    .with_target(false)
    .with_ansi(true)
    // 通过环境变量字符串设置过滤规则
    .with_env_filter("tracing=trace,tokio=trace,runtime=trace")
    // 自定义最低日志输出级别
    .with_max_level(Level::DEBUG)
    //.pretty()
    .finish(); // 返回 tracing_subscriber::fmt::Subscriber 对象;

// 设置为全局 sbuscriber
tracing::subscriber::set_global_default(subscriber).unwrap();

调用 SubscriberBuidler 的 init() 方法: ,在创建 fmt::Subscriber 的同时设置为 global Subscriber:

let subscriber = tracing_subscriber::fmt()
    // ... add configuration
    .init() // 内部调用 builder.finish() 然后设置为 global Subscriber

除了上面单独设置 fmt 输出格式外,还可以创建一个 tracing_subscriber::fmt::Format 对象来自定义时间输出格式。默认提供了如下 4 种输出格式:

  1. format::Full(缺省)
  2. format::Compact
  3. format::Pretty
  4. format::Json
pub fn format() -> Format

// Struct tracing_subscriber::fmt::format::Format 定义:
pub struct Format<F = Full, T = SystemTime> { /* private fields */ }

// 创建一个自定义输出格式
let format = tracing_subscriber::fmt::format()
    .without_time()         // Don't include timestamps
    .with_target(false)     // Don't include event targets.
    .with_level(false)      // Don't include event levels.
    .compact();             // Use a more compact, abbreviated format.

// 使用自定义输出格式
tracing_subscriber::fmt::fmt()
    .event_format(format)
    .init();

tracing_subscriber::fmt() 创建的缺省 fmt::Subscriber 和 fmt::SubscriberBuilder::init() 方法配置了 从环境变量 RUST_LOG 创建 EnvFilter 的功能,该变量语法和 env_logger crate 一致:

  • 需要为 tracing-subscriber 显式开启 env-filter feature,如果未开启则默认启用 TRACE 级别的日志(即不过滤);
  • Setting RUST_LOG=debug enables all Spans and Events set to the log level DEBUG or higher
  • Setting RUST_LOG=my_crate=trace enables Spans and Events in my_crate at all log levels
  • RUST_ENV 中逗号风格的字符串称为 directive: "crate1::mod1=error,crate1::mod2=warn,crate2=debug,crate3::mod2::mod1=off"

5.2 registry
#

tracing_subscriber::registry::Registry 也实现了 tracing::Subscriber, 它的主要优点是可以将多个 Layer 组合起来, 实现灵活自定义 Subscriber:

  • Registry::default() 创建的 Registry 是空的,需要使用 with() 来关联至少一个 Layer 进行事件处理,如 fmt Layer;
        pub fn layer<S>() -> Layer<S>
    
        // Struct tracing_subscriber::fmt::Layer 定义:
        pub struct Layer<S, N = DefaultFields, E = Format<Full>, W = fn() -> Stdout> { /* private fields */ }
    
        // with() 方法是使用 Layer 配置 Registry 的核心方法
        impl<S> SubscriberExt for S where S: Subscriber,
            fn with<L>(self, layer: L) -> Layered<L, Self> where L: Layer<Self>, Self: Sized,
    
        // 将 Registry 作为 tracing crate 的全局 Subscriber
        impl<T> SubscriberInitExt for T where T: Into<Dispatch>
            fn set_default(self) -> DefaultGuard
            fn try_init(self) -> Result<(), TryInitError>
            fn init(self) // 等效于 set_global_default()
    

常使用 Fmt Layer 和 EnvFilter Layer, 同时 Layer 也是将其它 crate 如 opentelemetry 集成到 tracing 系统的主要方式。

  1. tracing_subscriber::fmt::layer() 返回一个 tracing_subscriber::fmt::Layer 对象, 使用

with_xx() 方法对它进行配置, 然后在 Registry::with() 中使用:

  • with_writer() 用于指定日志写入方式。
  // 自定义 Fmt Layer 和 EnvFilter Layer,然后用它们创建一个 Registery
  use tracing_subscriber::{fmt, EnvFilter};
  use tracing_subscriber::prelude::*;

  let fmt_layer = fmt::layer() // 自定义 Layer 对象配置
      .with_target(false) // don't include event targets when logging
      .with_level(false) // don't include event levels when logging
      .with_writer(io::stderr); // 将事件写到 stdout
      .event_format(fmt);

  let filter_layer = EnvFilter::try_from_default_env()
      .or_else(|_| EnvFilter::try_new("info"))
      .unwrap();

  let subscriber = tracing_subscriber::registry() // 等效于 Registry::default()
      .with(filter_layer)
      .with(fmt_layer)
      .init();

  //  Registry::default() 的 Registry 是空的,需要使用 with() 来关联至少一个 Layer
  let subscriber = Registry::default()
      .with(fmt::Layer::default());
  1. 创建 tracing_subscriber::filter::EnvFilter 类型对象,它实现了 Layer, 可以在 Registry 中使用,实现个性化的日志过滤:
         use tracing_subscriber::fmt::fmt;
         use tracing_subscriber::filter::EnvFilter;
    
         // 从环境变量读取配置。
         let mut filter = EnvFilter::from_default_env();
    
         // 自定义 EnvFilter,这里的 directives 是字符串,类似于 RUST_LOG 环境变了值。
         use tracing_subscriber::filter::{EnvFilter, Directive};
         let mut filter = EnvFilter::try_from_default_env()?
             .add_directive("my_crate::module=trace".parse()?)
             .add_directive("my_crate::my_other_module::something=info".parse()?)
             .add_directive(LevelFilter::INFO.into()) // 指定缺省的最低日志级别
             // .add_directive(Level::INFO.into()); // 或使用 Level 指定最低日志级别
    
         // 使用 EnvFilter
         tracing_subscriber::registry()
             .with(fmt::layer())
             .with(EnvFilter::from_default_env()) // 支持 RUST_LOG 环境变量
             .init();
    
         // 从 Level 创建 LevelFilter,当 Metadata Level 比传入的 Level 大时才输出。
         pub const fn from_level(level: Level) -> LevelFilter
    

使用 Registry 的 init() 方法来创建并设置为全局 Subscriber:

use tracing_subscriber::{fmt, Registry};
use tracing_subscriber::fmt::{self, format, time};
use tracing_subscriber::prelude::*;

// fmt::Layer::default() 等效于 fmt::layer()
Registry::default().with(fmt::Layer::default()).init()

// 使用 EnvFilter layer
use tracing_subscriber::{EnvFilter, fmt, prelude::*};
tracing_subscriber::registry()
    .with(fmt::layer())
    // 从缺省的 RUST_LOG env 中读取 LOG 配置
    .with(EnvFilter::from_default_env())
    .init();

tracing_subscriber::registry()
    .with(fmt::layer())
    // 从指定的 MYAPP_LOG env 中读取 LOG 配置
    .with(EnvFilter::from_env("MYAPP_LOG"))
    .init();

6 tracing-appender
#

该 crate 提供了终端和文件输出的 writer,文件支持轮转(如按时间周期、按文件大小等),需要和 tracing_subscriber::fmt::layer().with_writer(xxx) 来配合使用。

tracing_appender::non_blocking() 是非阻塞模式,内部创建一个 worker thread 来接受 log line 并写入 writer,log line 先被 enqueue,然后再被写入。

// 写文件:file_appender 实现了 std::io::Write
let file_appender = tracing_appender::rolling::hourly("/some/directory", "prefix.log");
tracing_subscriber::fmt()
    .with_writer(file_appender)
    .init();

// Non-Blocking Writer
// 1. 写 stdout
let (non_blocking, _guard) = tracing_appender::non_blocking(std::io::stdout());
tracing_subscriber::fmt()
    .with_writer(non_blocking)
    .init();

// 2. 自定义 writer
use std::io::Error;
struct TestWriter;
impl std::io::Write for TestWriter {
    fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
        let buf_len = buf.len();
        println!("{:?}", buf);
        Ok(buf_len)
    }

    fn flush(&mut self) -> std::io::Result<()> {
        Ok(())
    }
}
let (non_blocking, _guard) = tracing_appender::non_blocking(TestWriter);
tracing_subscriber::fmt()
    .with_writer(non_blocking)
    .init();

// Non-Blocking Rolling File Appender
let file_appender = tracing_appender::rolling::hourly("/some/directory", "prefix.log");
let (non_blocking, _guard) = tracing_appender::non_blocking(file_appender);
tracing_subscriber::fmt()
    .with_writer(non_blocking)
    .init();

7 tracing_opentelemetry crate
#

opentelemetry 的 OTEL 是一个标准的 tracing 数据格式,被很多开源或商业系统所支持,如可以使用 jaeger 收集和展示 OTEL 格式的 tracing 数据。

tracing_opentelemetry crate 的核心是提供了一个实现 tracing_subscriber::layer::Layer trait 的 OpenTelemetryLayer 类型, 然后再使用 tracing_subscriber 的 Registry 来和 tracing crate 协作。

opentelemetry crate 定义了 Tracer、Metrics、Context 等 trait 接口。

先创建一个 tracer(取决于 tracing 系统类型, 如 jaeger, zipkin 等):

  1. 创建一个 TracerProvider;
  2. 从 TracerProvider 创建一个 tracer;

再使用 tracing_subscriber 从 tracer 创建 Layer 和 Registry:

  1. 使用 tracer 创建一个实现 tracing_subscriber::layer::Layer trait 的 Layer
  2. 使用该 Layer 来创建一个实现 tracing::Subscriber trait 的 Registry 对象;
  3. 使用该 Subscriber 对象来记录 span/event;
use tracing_opentelemetry::OpenTelemetryLayer;
use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::Registry;
use opentelemetry_sdk::trace::TracerProvider;
use opentelemetry::trace::{Tracer, TracerProvider as _};
use tracing::{error, span};

// 1. 使用 jaeger exporter pipeline 创建一个名为 "trace_demo" 的 service
// Create a jaeger exporter pipeline for a `trace_demo` service.
let tracer = opentelemetry_jaeger::new_agent_pipeline()
    .with_service_name("trace_demo")
    .install_simple()
    .expect("Error initializing Jaeger exporter");
// 2. 或者使用 TracerProvider 创建一个 readme_example 的 service
// Create a new OpenTelemetry trace pipeline that prints to stdout
let provider = TracerProvider::builder()
    .with_simple_exporter(opentelemetry_stdout::SpanExporter::default())
    .build();
let tracer = provider.tracer("readme_example");

// Create a layer with the configured tracer
let otel_layer = OpenTelemetryLayer::new(tracer);
// 或者:
// let otel_layer = tracing_opentelemetry::layer().with_tracer(tracer);

// Use the tracing subscriber `Registry`, or any other subscriber
// that impls `LookupSpan`
let subscriber = Registry::default().with(otel_layer);

// Trace executed code
tracing::subscriber::with_default(subscriber, || {
    // Spans will be sent to the configured OpenTelemetry exporter
    let root = span!(tracing::Level::TRACE, "app_start", work_units = 2);
    let _enter = root.enter();

    error!("This event will be logged in the root span.");
});

8 opentelemetry-rust 项目
#

open-telemetry/opentelemetry-rust 项目提供了如下 crate:

  1. opentelemetry This is the OpenTelemetry API crate, and is the crate required to instrument libraries and applications. It contains Context API, Baggage API, Propagators API, Logging Bridge API, Metrics API, and Tracing API.
  2. opentelemetry-sdk This is the OpenTelemetry SDK crate, and contains the official OpenTelemetry SDK implementation. It contains Logging SDK, Metrics SDK, and Tracing SDK. It also contains propagator implementations.
  3. opentelemetry-otlp exporter to send telemetry (logs, metrics and traces) in the OTLP format to an endpoint accepting OTLP. This could be the OTel Collector, telemetry backends like Jaeger, Prometheus or vendor specific endpoints.
  4. opentelemetry-stdout exporter for sending logs, metrics and traces to stdout, for learning/debugging purposes.
  5. opentelemetry-http This crate contains utility functions to help with exporting telemetry, propagation, over http.
  6. opentelemetry-appender-log This crate provides logging appender to route logs emitted using the log crate to opentelemetry.
  7. opentelemetry-appender-tracing This crate provides logging appender to route logs emitted using the tracing crate to opentelemetry.
  8. opentelemetry-jaeger-propagator provides context propagation using jaeger propagation format.
  9. opentelemetry-prometheus provides a pipeline and exporter for sending metrics to Prometheus.
  10. opentelemetry-semantic-conventions provides standard names and semantic otel conventions.
  11. opentelemetry-zipkin provides a pipeline and exporter for sending traces to Zipkin.

分析版本: v0.27.0: https://docs.rs/opentelemetry/0.27.0/opentelemetry/

当前(2024.11.16)Rust 的 opentelemetry 版本状态: Signal/Component Overall Status Logs-API RC* Logs-SDK Beta Logs-OTLP Exporter Beta Logs-Appender-Tracing Beta Metrics-API RC Metrics-SDK Beta Metrics-OTLP Exporter Beta Traces-API Beta Traces-SDK Beta Traces-OTLP Exporter Beta

9 opentelemetry crate
#

分析版本: v0.27.0: https://docs.rs/opentelemetry/0.27.0/opentelemetry/

opentelemetry crate 提供了 global/logs/metrics/trace/context/propagation 等 module,它们分别定义了对应 trait 接口,如 LoggerProvider/MeterProvider/TracerProvider/TextMapPropagator trait, 但并没有提供它们的实现 , 而 opentelemetry_sdk crate 提供了它们的实现。

9.1 Context
#

opentelemetry 提供了 Context 类型:

  • Context 是执行上下文的 values 集合;
  • Context 为执行上下文中跨 API 边界的相关方提供了传播(propagate)和共享 values 集合的功能;

Context 是不可变类型,对它写入后返回一个包含原始 Context 内容和新值的新 Context (原来的 context 不变)。

Context 实现的方法:

  • 由于 Context 不可变,所以下面返回的 Self 对象都是新的 Context,而 self 不变;
pub struct Context { /* private fields */ }

impl Context
pub fn new() -> Self
pub fn current() -> Self
    use opentelemetry::Context;
    #[derive(Debug, PartialEq)]
    struct ValueA(&'static str);
    fn do_work() {
        assert_eq!(Context::current().get(), Some(&ValueA("a")));
    }
    let _guard = Context::new().with_value(ValueA("a")).attach();
    do_work()

pub fn map_current<T>(f: impl FnOnce(&Context) -> T) -> T

pub fn current_with_value<T: 'static + Send + Sync>(value: T) -> Self
    use opentelemetry::Context;

    // Given some value types defined in your application
    #[derive(Debug, PartialEq)]
    struct ValueA(&'static str);
    #[derive(Debug, PartialEq)]
    struct ValueB(u64);

    // You can create and attach context with the first value set to "a"
    let _guard = Context::new().with_value(ValueA("a")).attach();

    // And create another context based on the fist with a new value
    let all_current_and_b = Context::current_with_value(ValueB(42));

    // The second context now contains all the current values and the addition
    assert_eq!(all_current_and_b.get::<ValueA>(), Some(&ValueA("a")));
    assert_eq!(all_current_and_b.get::<ValueB>(), Some(&ValueB(42)));

pub fn get<T: 'static>(&self) -> Option<&T>
    use opentelemetry::Context;
    // Given some value types defined in your application
    #[derive(Debug, PartialEq)]
    struct ValueA(&'static str);
    #[derive(Debug, PartialEq)]
    struct MyUser();

    let cx = Context::new().with_value(ValueA("a"));

    // Values can be queried by type
    assert_eq!(cx.get::<ValueA>(), Some(&ValueA("a")));

    // And return none if not yet set
    assert_eq!(cx.get::<MyUser>(), None);

pub fn with_value<T: 'static + Send + Sync>(&self, value: T) -> Self
    use opentelemetry::Context;

    // Given some value types defined in your application
    #[derive(Debug, PartialEq)]
    struct ValueA(&'static str);
    #[derive(Debug, PartialEq)]
    struct ValueB(u64);

    // You can create a context with the first value set to "a"
    let cx_with_a = Context::new().with_value(ValueA("a"));

    // And create another context based on the fist with a new value
    let cx_with_a_and_b = cx_with_a.with_value(ValueB(42));

    // The first context is still available and unmodified
    assert_eq!(cx_with_a.get::<ValueA>(), Some(&ValueA("a")));
    assert_eq!(cx_with_a.get::<ValueB>(), None);

    // The second context now contains both values
    assert_eq!(cx_with_a_and_b.get::<ValueA>(), Some(&ValueA("a")));
    assert_eq!(cx_with_a_and_b.get::<ValueB>(), Some(&ValueB(42)));

pub fn attach(self) -> ContextGuard
    use opentelemetry::Context;

    #[derive(Debug, PartialEq)]
    struct ValueA(&'static str);

    let my_cx = Context::new().with_value(ValueA("a"));

    // Set the current thread context
    let cx_guard = my_cx.attach();
    assert_eq!(Context::current().get::<ValueA>(), Some(&ValueA("a")));

    // Drop the guard to restore the previous context
    drop(cx_guard);
    assert_eq!(Context::current().get::<ValueA>(), None);

获取和设置 context 对象的值:

  1. new() 函数:创建一个空 Context;
  2. with_value<T>() 方法:为 ctx 添加一个 T 类型值;
  3. get<T>() 方法:返回 ctx 中的 T 类型值;
  4. map_current() 方法: 对当前 Context 应用一个 FnOnce(&Context) -> T),返回 T 值;

使用 ctx.attach() 方法为当前线程设置一个 current Context,当返回的 ContextGuard 被 drop 时,线程的 current Context 被恢复为设置前的值。

  • 类似的,也可以为一个线程设置一个 active span,从而避免来回传递该 span;

使用 Context::current() 函数来获取线程的 current Context snapshot(Context 不可变)。

通过使用 Context::current_with_value(V).attach() 可以实现 Context 嵌套。

示例:

use opentelemetry::Context;

// Application-specific `a` and `b` values
#[derive(Debug, PartialEq)]
struct ValueA(&'static str);

#[derive(Debug, PartialEq)]
struct ValueB(u64);

// 创建一个 context,并设置为 current context
let _outer_guard = Context::new().with_value(ValueA("a")).attach();

// Only value a has been set
let current = Context::current();
assert_eq!(current.get::<ValueA>(), Some(&ValueA("a")));
assert_eq!(current.get::<ValueB>(), None);

{
    // 创建一个子 Context,并设置为 current context
    let _inner_guard = Context::current_with_value(ValueB(42)).attach();

    // Both values are set in inner context
    let current = Context::current();
    assert_eq!(current.get::<ValueA>(), Some(&ValueA("a")));
    assert_eq!(current.get::<ValueB>(), Some(&ValueB(42)));
} // block 结束后,_inner_guard 被 drop,current context 恢复为 _outer_guard 对应的 Context

// Resets to only the `a` value when inner guard is dropped
let current = Context::current();
assert_eq!(current.get::<ValueA>(), Some(&ValueA("a")));
assert_eq!(current.get::<ValueB>(), None);

Context 实现了 opentelemetry::trace::TraceContextExt trait, 可以用 Context 来保存和传递 trace span 数据(context 可以保存和传播任意类型数据):

  • current_with_span() 函数:返回一个包含 span 和 current context 信息的新 context(没有调用

context 的 attach() 方法,故不会改变当前线程的 active context);

  • with_span(span)方法: 为当前 context 关联一个 span,返回新的子 context;
  • span() 方法:Returns a reference to this context’s span, or the default no-op span if none has been set.
  • with_remote_span_context(SpanContext): Returns a copy of this context with the span context included.
pub trait TraceContextExt {
    // Required methods
    fn current_with_span<T: Span + Send + Sync + 'static>(span: T) -> Self;

        use opentelemetry::{global, trace::{TraceContextExt, Tracer}, Context};

        let tracer = global::tracer("example");
        // build a span
        let span = tracer.start("parent_span");

        // create a new context from the currently active context that includes this span
        let cx = Context::current_with_span(span);

        // create a child span by explicitly specifying the parent context
        let child = tracer.start_with_context("child_span", &cx);

    fn with_span<T: Span + Send + Sync + 'static>(&self, span: T) -> Self;

        use opentelemetry::{global, trace::{TraceContextExt, Tracer}, Context};

        fn fn_with_passed_in_context(cx: &Context) {
            let tracer = global::tracer("example");

            // build a span
            let span = tracer.start("parent_span");

            // create a new context from the given context that includes the span
            let cx_with_parent = cx.with_span(span);

            // create a child span by explicitly specifying the parent context
            let child = tracer.start_with_context("child_span", &cx_with_parent);
        }

    fn span(&self) -> SpanRef<'_>;

        use opentelemetry::{trace::TraceContextExt, Context};

        // Add an event to the currently active span
        Context::map_current(|cx| cx.span().add_event("An event!", vec![]));

    fn has_active_span(&self) -> bool;

        use opentelemetry::{trace::TraceContextExt, Context};
        assert!(!Context::map_current(|cx| cx.has_active_span()));

    fn with_remote_span_context(&self, span_context: SpanContext) -> Self;
}
impl TraceContextExt for Context

通过 span() 返回的 SpanRef 对象,可以为 span 添加 events、attributes 等。

9.2 Span
#

span 代表 trace 中的一个操作步骤。每个 span 有 0 或 1 个父 span,0 或多个子 span,没有 parent 的 span 称为 root span。

这些父子 span tree 组成了 trace, 每个 trace 只能有一个 root span。

当 end 或 drop 该 span 时,该 span 会被自动 export。

Span trait 定义:

  1. Span 有自己的 name,开始和结束 timesttamp,可以关联 KV 格式的一个或多个 attribute;
  2. Span 可以关联一个或多个 event,每个 event 有自己的 name 和 attributes 集合;
    • span 可以实现结构化日志记录;
  3. Span 可以记录 error
  4. Span 可以通过 SpanContext 来和其它 tracer 的 span 发生关联;
  5. 当调用 end() 后,Span 就结束了,这时 is_recording() 返回 false;
pub trait Span {
    // Required methods
    fn add_event_with_timestamp<T>(&mut self, name: T, timestamp: SystemTime, attributes: Vec<KeyValue>, ) where T: Into<Cow<'static, str>>;
    fn span_context(&self) -> &SpanContext;
    fn is_recording(&self) -> bool;
    fn set_attribute(&mut self, attribute: KeyValue);
    fn set_status(&mut self, status: Status);
    fn update_name<T>(&mut self, new_name: T) where T: Into<Cow<'static, str>>;
    fn add_link(&mut self, span_context: SpanContext, attributes: Vec<KeyValue>);
    fn end_with_timestamp(&mut self, timestamp: SystemTime);

    // Provided methods
    fn add_event<T>(&mut self, name: T, attributes: Vec<KeyValue>) where T: Into<Cow<'static, str>> { ... }
    fn record_error(&mut self, err: &dyn Error) { ... }
    fn set_attributes(&mut self, attributes: impl IntoIterator<Item = KeyValue>) { ... }
    fn end(&mut self) { ... }
}

Span 是从 Tracer 对象创建出来的,如 tracer.start(“name”) 创建一个 span:

// Get a tracer
let tracer = global::tracer("my_tracer");

// Create a span,父 span 为 active span 或空。
let span = tracer.start("parent_span");

span.set_status(Status::error("value too small"));
span.add_event("An event!", vec![KeyValue::new("happened", true)]);

和线程的 current Context 类似,也可以为线程设置一个 active span,这样后续创建(通过 Tracer 的 start() 等方法创建 span)的 span 均为它的子 span:

  • trace::mark_span_as_active(span): 将 span 设置为当前线程的 active span;
  • trace::get_active_span(): 获得当前线程的 active span;

使用线程 active span 的另一个好处是不需要来回传递该 span,而是使用 trace module 的 get_active_span() 函数来获取它。

use opentelemetry::{global, trace::{self, Span, Status, Tracer, TracerProvider}};

fn may_error(rand: f32) {
    if rand < 0.5 {
        // Get the currently active span to record additional attributes, status, etc.
        trace::get_active_span(|span| {
            span.set_status(Status::error("value too small"));
            span.add_event("An event!", vec![KeyValue::new("happened", true)]);
        });
    }
}

// Get a tracer
let tracer = global::tracer("my_tracer");

// Create a span, 当前 active span 为空。
let span = tracer.start("parent_span");

// Mark the span as active
let active = trace::mark_span_as_active(span);

// Any span created here will be a child of `parent_span`...

// Drop the guard and the span will no longer be active
drop(active)

Tracer::in_span() 方法可以简化上面的 active span 操作,in_span(name, f) 创建一个名为 name 的 span:

  1. 将它保存到传递给闭包 context 中;
  2. 将该 span 设置为闭包函数的 active span;
  3. 闭包返回时 span 被 drop;
use opentelemetry::{global, trace::{Span, Tracer}, KeyValue};
use opentelemetry::trace::get_active_span;

fn my_function() {
    // start an active span in one function
    global::tracer("my-component").in_span("span-name", |_cx| {

        // in_span() 方法自动将创建的 子 span 作为闭包函数的 active span

        // anything happening in functions we call can still access the active span...
        my_other_function();
    })
}

fn my_other_function() {
    // call methods on the current span from
    get_active_span(|span| {
        span.add_event("An event!", vec![KeyValue::new("happened", true)]);
    })
}

9.3 Event
#

span 中可以关联的 Event 定义如下:

#[non_exhaustive]
pub struct Event {
    pub name: Cow<'static, str>,
    pub timestamp: SystemTime,
    pub attributes: Vec<KeyValue>,
    pub dropped_attributes_count: u32,
}

Event 用于在 span 的 lifetime 中记录结构化事件,可以用来替代 log。

fn my_other_function() {
    // call methods on the current span from
    get_active_span(|span| {
        span.add_event("An event!", vec![KeyValue::new("happened", true)]);
    })
}

9.4 SpanContext/W3C TraceContext Spec
#

SpanContext 是 Span 中不可变的部分,可以被序列化和传播,遵从 W3C TraceContext specification:

W3C TraceContext 使用 2 个 HTTP Headers 来传递 trace context 信息,实现分布式追踪。

  • traceparent:标准固定长度格式,代表请求在整个 trace graph 中的位置;
    • base64 编码,短横杠风格的 4 部分信息:version、trace-id、parent-id、trace-flags
    • 当前,trace-flags 仅仅标准化了一个值为 1 的 FLAG_SAMPLED;
  • tracestate:扩展 traceparent,包含 vendor-specific 相关 date,使用 name/value 格式,是可选的;
traceparent: 00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01
    base16(version) = 00
    base16(trace-id) = 4bf92f3577b34da6a3ce929d0e0e4736
    base16(parent-id) = 00f067aa0ba902b7
    base16(trace-flags) = 01  // sampled
tracestate: rojo=00f067aa0ba902b7,congo=t61rcWkgMzE

Span 如果未设置 TraceFlags::SAMPLED,则会被大多数 tracing tools 所忽略。

pub struct SpanContext { /* private fields */ }

// SpanContext 实现的方法
pub const NONE: SpanContext = _
pub fn empty_context() -> Self
pub fn new(
    trace_id: TraceId,
    span_id: SpanId,
    trace_flags: TraceFlags,
    is_remote: bool,
    trace_state: TraceState,
) -> Self

pub fn trace_id(&self) -> TraceId
pub fn span_id(&self) -> SpanId
pub fn trace_flags(&self) -> TraceFlags
pub fn is_valid(&self) -> bool
pub fn is_remote(&self) -> bool
pub fn is_sampled(&self) -> bool
pub fn trace_state(&self) -> &TraceState

TraceFlags:

  • 当前 spec 仅支持的一个 TraceFlags::SAMPLED,如果未设置,这会被大多数 tracing 系统忽略。
pub struct TraceFlags(/* private fields */);

impl TraceFlags
pub const NOT_SAMPLED: TraceFlags = _
pub const SAMPLED: TraceFlags = _  // 当前 spec 仅支持的一个 flag

pub const fn new(flags: u8) -> Self
pub fn is_sampled(&self) -> bool
pub fn with_sampled(&self, sampled: bool) -> Self
pub fn to_u8(self) -> u8

TraceState:

  • 用于记录 system、vendor 相关的配置数据,使用 key-value 列表表示。
pub struct TraceState(/* private fields */);

impl TraceState
pub const NONE: TraceState = _  // The default TraceState, as a constant

pub fn from_key_value<T, K, V>(trace_state: T) -> TraceResult<Self>
where
    T: IntoIterator<Item = (K, V)>,
    K: ToString,
    V: ToString,

pub fn get(&self, key: &str) -> Option<&str>

pub fn insert<K, V>(&self, key: K, value: V) -> TraceResult<TraceState>
where
    K: Into<String>,
    V: Into<String>,

pub fn delete<K: Into<String>>(&self, key: K) -> TraceResult<TraceState>

pub fn header(&self) -> String

pub fn header_delimited(
    &self,
    entry_delimiter: &str,
    list_delimiter: &str,
) -> String

// 示例
use opentelemetry::trace::TraceState;

let kvs = vec![("foo", "bar"), ("apple", "banana")];
let trace_state = TraceState::from_key_value(kvs);

assert!(trace_state.is_ok());
assert_eq!(trace_state.unwrap().header(), String::from("foo=bar,apple=banana"))

9.5 Tracer/TracerProvider
#

trace API module 提供了如下三个主 traits:

  1. TracerProviders are the entry point of the API. They provide access to Tracers.
  2. Tracers are types responsible for creating Spans.
  3. Spans provide the API to trace an operation.

当 end 或 drop 该 span 时,该 span 会被自动 export。

对于 async runtime,会创建 async task 来 export 它们,为了提高效率应该使用 batch span processor。

trace 由 span tree 组成:

  1. 每个 span 有 0 或 1 个父 span, 0 或多个子 span;
    • root span:没有 parent 的 span;
  2. 这些父子 span tree 组成了 trace;
    • 每个 trace 只能有一个 root span;

Context 可以关联父 span,后续使用 Tracer::start_with_context() 方法从该 Context 创建子 span。

opentelemetry 为 async Future 实现了 FutureExt,从而为 Future 设置 active Context,后续在 async task 中可以使用 Context::current() 返回设置的 active Context,然后使用 context.span() 从中提取出 span 信息。

trace module 定义了 Tracers trait,用来创建新 Span:

  • build_with_context(): 使用 SpanBuidler 创建一个 span,并关联父 context;

  • start(name): 创建一个名称为 name 的新(子) span,它的父 span 为当前 active span;

  • start_with_context(name, context): 和 start 类型,但是给 span 关联一个 context;

    • 如果 context 包含 span B,则新创建的 span A 为 B 的子 span;
  • span_builder(name): 返回一个 SpanBuidler,可以用来指定 span 的所有属性;

  • build(SpanBuilder): 使用 SpanBuilder 构建一个 Span 并启动;

  • in_span(name, f): 创建一个名为 name 的 span:

    1. 将它保存到传递给闭包 context 中;
    2. 将该 span 设置为闭包函数的 active span;
    3. 闭包返回时 span 被 drop;
pub trait Tracer {
    type Span: Span;

    // Required method
    fn build_with_context(&self, builder: SpanBuilder, parent_cx: &Context,) -> Self::Span;

    // Provided methods
    fn start<T>(&self, name: T) -> Self::Span where T: Into<Cow<'static, str>> { ... }
    fn start_with_context<T>(&self, name: T, parent_cx: &Context) -> Self::Span where T: Into<Cow<'static, str>> { ... }
    fn span_builder<T>(&self, name: T) -> SpanBuilder where T: Into<Cow<'static, str>> { ... }
    fn build(&self, builder: SpanBuilder) -> Self::Span { ... }
    fn in_span<T, F, N>(&self, name: N, f: F) -> T where F: FnOnce(Context) -> T, N: Into<Cow<'static, str>>, Self::Span: Send + Sync + 'static { ... }

    // in_span() 的实现方式
    fn in_span<T, F, N>(&self, name: N, f: F) -> T
    where
        F: FnOnce(Context) -> T,
        N: Into<Cow<'static, str>>,
        Self::Span: Send + Sync + 'static,
    {
        let span = self.start(name);
        let cx = Context::current_with_span(span);
        let _guard = cx.clone().attach();
        f(cx)
    }
}

使用 Tracer 的 build() 或 span_builder() 来自定义复杂的 span:

pub struct SpanBuilder {
    pub trace_id: Option<TraceId>,
    pub span_id: Option<SpanId>,
    pub span_kind: Option<SpanKind>,
    pub name: Cow<'static, str>,
    pub start_time: Option<SystemTime>,
    pub end_time: Option<SystemTime>,
    pub attributes: Option<Vec<KeyValue>>,
    pub events: Option<Vec<Event>>,
    pub links: Option<Vec<Link>>,
    pub status: Status,
    pub sampling_result: Option<SamplingResult>,
}

// 例子
use opentelemetry::{
    global,
    trace::{TracerProvider, SpanBuilder, SpanKind, Tracer},
};

let tracer = global::tracer("example-tracer");

// The builder can be used to create a span directly with the tracer
let _span = tracer.build(SpanBuilder {
    name: "example-span-name".into(),
    span_kind: Some(SpanKind::Server),
    ..Default::default()
});

// Or used with builder pattern
let _span = tracer
    .span_builder("example-span-name")
    .with_kind(SpanKind::Server)
    .start(&tracer);

TracerProvider 用于创建 Tracer 对象:

  1. tracer_with_scope():使用指定的 scope 来创建一个新的 Tracer 对象;

    • InstrumentationScope 一般用于标记 library 和 crate 信息,如 name、version、attributes;
  2. tracer(): 创建一个指定 name 的 Tracer 对象;

    • 更常用;
         pub trait TracerProvider {
    	 type Tracer: Tracer;
    
    	 // Required method
    	 fn tracer_with_scope(&self, scope: InstrumentationScope) -> Self::Tracer;
    
    	 // Provided method
    	 fn tracer(&self, name: impl Into<Cow<'static, str>>) -> Self::Tracer { ... }
         }
    
         impl TracerProvider for GlobalTracerProvider
         impl TracerProvider for NoopTracerProvider
    

GlobalTracerProvider 和 NoopTracerProvider 实现了 TracerProvider。

一般使用 global module 的 GlobalTracerProvider 来创建 Tracer:

use opentelemetry::{global, InstrumentationScope, trace::TracerProvider};

let provider = global::tracer_provider();

// tracer used in applications/binaries
let tracer = provider.tracer("my_app");

// tracer used in libraries/crates that optionally includes version and schema url
let scope =
    InstrumentationScope::builder(env!("CARGO_PKG_NAME"))
        .with_version(env!("CARGO_PKG_VERSION"))
        .with_schema_url("https://opentelemetry.io/schema/1.0.0")
        .build();

let tracer = provider.tracer_with_scope(scope);

创建 TracerProvider 并设置为 global tracer provider:

  1. main 或 app 启动设置:
use opentelemetry::trace::{Tracer, noop::NoopTracerProvider};
use opentelemetry::global;

fn init_tracer() {
    // Swap this no-op provider for your tracing service of choice (jaeger, zipkin, etc)
    let provider = NoopTracerProvider::new();

    // Configure the global `TracerProvider` singleton when your app starts
    // (there is a no-op default if this is not set by your application)
    let _ = global::set_tracer_provider(provider);
}

fn do_something_tracked() {
    // 从全局 trace provider 中获得一个 tracer
    // Then you can get a named tracer instance anywhere in your codebase.
    let tracer = global::tracer("my-component");

    tracer.in_span("doing_work", |cx| {
        // Traced app logic here...
    });
}

// in main or other app start
init_tracer();
do_something_tracked();
  1. lib 设置:不需要创建全局 tracer provider,而是使用 global::tracer_provider() 获得所在

应用定义的 provider(上面的 init_tracer 函数创建的),然后创建 tracer:

use opentelemetry::{global, trace::{Span, Tracer, TracerProvider}};
use opentelemetry::InstrumentationScope;
use std::sync::Arc;

fn my_library_function() {
    // Use the global tracer provider to get access to the user-specified
    // tracer configuration
    let tracer_provider = global::tracer_provider();

    // Get a tracer for this library
    // 给 tracer 关联一些结构化信息
    let scope = InstrumentationScope::builder("my_name")
        .with_version(env!("CARGO_PKG_VERSION"))
        .with_schema_url("https://opentelemetry.io/schemas/1.17.0")
        .build();
    let tracer = tracer_provider.tracer_with_scope(scope);

    // 也可以使用一个简化版本来创建 tracer
    // let tracer = global::tracer("my-component");

    // Create spans
    let mut span = tracer.start("doing_work");

    // Do work...

    // End the span
    span.end();
}
  1. 同步代码模式

    context 用于在父子 span 间传递信息:context 可以关联父 span,后续通过该 context 可以创建子 span。

    • 从实现 Tracer 的 tracer 对象创建一个 span,作为父 span;
    • 为 父 span 关联 context;
    • 使用父 span 的 context 创建一个子 span;

    创建 tracer 和 span:

    use opentelemetry::{global, trace::{Span, Tracer, TraceContextExt}, Context};
    
    // 从 global trace provider 获得一个 tracer
    let tracer = global::tracer("my-component");
    
    // 创建一个 span
    let parent = tracer.start("foo");
    
    // 给 span 设置一些 KV 属性
    parent.set_attribute(KeyValue::new("http.client_ip", "83.164.160.102"));
    
    // 在线程当前 context 中保存 span
    let parent_cx = Context::current_with_span(parent);
    
    // 创建一个子 span 并关联含父 span 信息的 context
    let mut child = tracer.start_with_context("bar", &parent_cx);
    
    // ...
    
    // 将当前 Context 信息注入到 span,然后 attach 该 span,
    // 当 _guard 被 drop 时,current context 恢复为设置前的值。
    let _guard = Context::current_with_span(child).attach();
    
    // 位于带 context 的 span 上下文中。
    // do work tracked by the now current span
    
    // end or drop the span to export
    // 当 end 或 drop 该 span 时,会被自动 export
    drop(child)
    
    child.end(); // explicitly end
    drop(parent_cx) // or implicitly end on drop
    

    可以使用线程当前的 Context 来简化上面的步骤:

    use opentelemetry::{global, trace::{SpanKind, Tracer}};
    
    let tracer = global::tracer("my-component");
    
    // Create simple spans with `in_span`
    
    tracer.in_span("foo", |_foo_cx| {
        // _foo_cx 是 Context 类型,存有 span foo 对象;
        // 同时自动将 span foo 对象设置为闭包函数的 active span;
        // parent span is active
    
        tracer.in_span("bar", |_bar_cx| {
            // _bar_cx 是 Context 类型,存有父 span bar 对象;
            // 同时自动将 span bar 对象设置为闭包函数的 active span;
    
            // child span is now the active span and associated with the parent span
        });
        // child has ended, parent now the active span again
    });
    // parent has ended, no active spans
    
  1. 异步代码模式

    线程级别的 active span 不适合于 async task,因为它可能会被 async runtime 在多个线程间调度。

    对于 async task,需要使用 Context 来传递 span:

    • FutureExt:

      1. 提供 with_context/with_current_context() 方法;
      2. 所有 Future 均实现了 FutureExt trait,可以在创建 Future 时指定它的 Context;
      3. 后续当对应的 future task 被 poll 时,传入的 context 作为当前 context;
    • TraceContextExt:

      1. 提供 current_with_span()、with_span()、span() 等方法,用于在 Context 中保存和提取 span 对象(context 可以保存和传播任意类型数据)
      2. Context 类型实现了 TraceContextExt trait;

    FutureExt 例子:

    pub trait FutureExt: Sized {
        // Provided methods
        fn with_context(self, otel_cx: Context) -> WithContext<Self>  { ... }
        fn with_current_context(self) -> WithContext<Self>  { ... }
    }
    impl<T: Sized> FutureExt for T
    
    // 示例:
    use opentelemetry::{Context, global, trace::{FutureExt, TraceContextExt, Tracer}};
    
    async fn some_work() {
        // 返回 with_context() 设置的 current context,
        let context = Context::current();
    
        // 从 context 中提取 关联的 span
        let span = context.span();
    
    }
    
    // Get a tracer
    let tracer = global::tracer("my_tracer");
    
    // Start a span
    let span = tracer.start("my_span");
    
    // Perform some async work with this span as the currently active parent.
    
    // 基于当前 Context,创建一个带 span 的新 context
    let context = Context::current_with_span(span);
    
    // 为 async task 设置 Context,后续被 pool 时被设置为当前 context
    some_work().with_context(context).await;
    

    TraceContextExt 例子:

    use opentelemetry::{global, trace::{TraceContextExt, Tracer}, Context};
    
    let tracer = global::tracer("example");
    // build a span
    let span = tracer.start("parent_span");
    
    // create a new context from the currently active context that includes this span
    let cx = Context::current_with_span(span);
    
    // create a child span by explicitly specifying the parent context
    let child = tracer.start_with_context("child_span", &cx);
    
    // 另一个例子
    use opentelemetry::{global, trace::{TraceContextExt, Tracer}, Context};
    
    fn fn_with_passed_in_context(cx: &Context) {
        let tracer = global::tracer("example");
    
        // build a span
        let span = tracer.start("parent_span");
    
        // create a new context from the given context that includes the span
        let cx_with_parent = cx.with_span(span);
    
        // create a child span by explicitly specifying the parent context
        let child = tracer.start_with_context("child_span", &cx_with_parent);
    }
    

9.6 metrics module
#

metrics module 提供了 Counter/Guage/Histogram/UpDownCounter/Observer/Meter 类型定义和 MeterProvider trait 定义。

  • opentelemetry_prometheus crate 提供了 meterc provider 的实现。

metrics 包含两类 Instrument:

  1. Synchronous Instruments (e.g., Counter): These are used inline with your application’s processing logic. For example, you might use a Counter to record the number of HTTP requests received.

  2. Asynchronous Instruments (e.g., ObservableGauge): These allow you to register a callback function that is invoked during export. For instance, you could use an asynchronous gauge to monitor temperature from a sensor every time metrics are exported.

Synchronous Instruments 是和 app 的处理逻辑直接调用的,而 Asynchronous Instruments 在创建时设置一个 callback,当后续 export 时会被自动调用(不能手动调用)。

Meter 类型:用于创建各种类型的 Instrument Builder。

#[non_exhaustive]
pub struct Meter { /* private fields */ }

impl Meter
pub fn u64_counter(&self, name: impl Into<Cow<'static, str>>,) -> InstrumentBuilder<'_, Counter<u64>>
pub fn f64_counter(&self, name: impl Into<Cow<'static, str>>,) -> InstrumentBuilder<'_, Counter<f64>>
pub fn i64_up_down_counter( &self, name: impl Into<Cow<'static, str>>,) -> InstrumentBuilder<'_, UpDownCounter<i64>>
pub fn f64_up_down_counter( &self, name: impl Into<Cow<'static, str>>,) -> InstrumentBuilder<'_, UpDownCounter<f64>>
pub fn u64_gauge( &self, name: impl Into<Cow<'static, str>>,) -> InstrumentBuilder<'_, Gauge<u64>>
pub fn f64_gauge( &self, name: impl Into<Cow<'static, str>>,) -> InstrumentBuilder<'_, Gauge<f64>>
pub fn i64_gauge( &self, name: impl Into<Cow<'static, str>>,) -> InstrumentBuilder<'_, Gauge<i64>>
pub fn f64_histogram( &self, name: impl Into<Cow<'static, str>>,) -> HistogramBuilder<'_, Histogram<f64>>
pub fn u64_histogram( &self, name: impl Into<Cow<'static, str>>,) -> HistogramBuilder<'_, Histogram<u64>>

pub fn u64_observable_counter( &self, name: impl Into<Cow<'static, str>>,) -> AsyncInstrumentBuilder<'_, ObservableCounter<u64>, u64>
pub fn f64_observable_counter( &self, name: impl Into<Cow<'static, str>>,) -> AsyncInstrumentBuilder<'_, ObservableCounter<f64>, f64>
pub fn i64_observable_up_down_counter( &self, name: impl Into<Cow<'static, str>>,) -> AsyncInstrumentBuilder<'_, ObservableUpDownCounter<i64>, i64>
pub fn f64_observable_up_down_counter( &self, name: impl Into<Cow<'static, str>>,) -> AsyncInstrumentBuilder<'_, ObservableUpDownCounter<f64>, f64>
pub fn u64_observable_gauge( &self, name: impl Into<Cow<'static, str>>,) -> AsyncInstrumentBuilder<'_, ObservableGauge<u64>, u64>
pub fn i64_observable_gauge( &self, name: impl Into<Cow<'static, str>>,) -> AsyncInstrumentBuilder<'_, ObservableGauge<i64>, i64>
pub fn f64_observable_gauge( &self, name: impl Into<Cow<'static, str>>,) -> AsyncInstrumentBuilder<'_, ObservableGauge<f64>, f64>

InstrumentBuilder、HistogramBuilder、 AsyncInstrumentBuilder 用于创建一个 Instrument:

  • with_description(): 设置描述信息;
  • with_unit() : 设置单位;
  • build() 方法: 返回一个 Counter/UpDownCounter/Gauge/Histogram/ObservableCounter/ObservableUpDownCounter/ObservableGauge 类型对象。
#[non_exhaustive]
pub struct InstrumentBuilder<'a, T> {
    pub instrument_provider: &'a dyn InstrumentProvider,
    pub name: Cow<'static, str>,
    pub description: Option<Cow<'static, str>>,
    pub unit: Option<Cow<'static, str>>,
    /* private fields */
}

#[non_exhaustive]
pub struct HistogramBuilder<'a, T> {
    pub instrument_provider: &'a dyn InstrumentProvider,
    pub name: Cow<'static, str>,
    pub description: Option<Cow<'static, str>>,
    pub unit: Option<Cow<'static, str>>,
    pub boundaries: Option<Vec<f64>>,
    /* private fields */
}

#[non_exhaustive]
pub struct AsyncInstrumentBuilder<'a, I, M> {
    pub instrument_provider: &'a dyn InstrumentProvider,
    pub name: Cow<'static, str>,
    pub description: Option<Cow<'static, str>>,
    pub unit: Option<Cow<'static, str>>,
    pub callbacks: Vec<Callback<M>>,
    /* private fields */
}

InstrumentBuilder、AsyncInstrumentBuilder 的 build() 返回的 Counter 等类型对象一般会被重用所以一般设置为和 app lifetime 相同的 全局单例对象

use opentelemetry::metrics::{Meter};
use opentelemetry::{global, KeyValue};

   fn do_something_instrumented() {
    let meter = global::meter("my-component");
    // It is recommended to reuse the same counter instance for the
    // lifetime of the application
    let counter = meter.u64_counter("my_counter").build();

    // record measurements
    counter.add(1, &[KeyValue::new("mykey", "myvalue")]);
    }
}

对于 Counter/UpDownCounter 等类型对象,一般提供 add/set 等方法,在调用时可以设置 attributes:

#[non_exhaustive]
pub struct Counter<T>(/* private fields */);
    pub fn add(&self, value: T, attributes: &[KeyValue])

#[non_exhaustive]
pub struct Gauge<T>(/* private fields */);
    pub fn record(&self, value: T, attributes: &[KeyValue])

示例:

use opentelemetry::{global, KeyValue};

let meter = global::meter("my-meter");

// Synchronous Instruments

// u64 Counter
let u64_counter = meter.u64_counter("my_u64_counter").build();
u64_counter.add(
    10,
    &[
        KeyValue::new("mykey1", "myvalue1"),
        KeyValue::new("mykey2", "myvalue2"),
    ],
);

// f64 Counter
let f64_counter = meter.f64_counter("my_f64_counter").build();
f64_counter.add(
    3.15,
    &[
        KeyValue::new("mykey1", "myvalue1"),
        KeyValue::new("mykey2", "myvalue2"),
    ],
);


// u64 Observable Counter
let _observable_u64_counter = meter
    .u64_observable_counter("my_observable_u64_counter")
    .with_description("My observable counter example")
    .with_unit("myunit")
    .with_callback(|observer| {
        observer.observe(
            100,
            &[
                KeyValue::new("mykey1", "myvalue1"),
                KeyValue::new("mykey2", "myvalue2"),
            ],
        )
    })
    .build();

// f64 Observable Counter
let _observable_f64_counter = meter
    .f64_observable_counter("my_observable_f64_counter")
    .with_description("My observable counter example")
    .with_unit("myunit")
    .with_callback(|observer| {
        observer.observe(
            100.0,
            &[
                KeyValue::new("mykey1", "myvalue1"),
                KeyValue::new("mykey2", "myvalue2"),
            ],
        )
    })
    .build();

// i64 UpDownCounter
let updown_i64_counter = meter.i64_up_down_counter("my_updown_i64_counter").build();
updown_i64_counter.add(
    -10,
    &[
        KeyValue::new("mykey1", "myvalue1"),
        KeyValue::new("mykey2", "myvalue2"),
    ],
);

// f64 UpDownCounter
let updown_f64_counter = meter.f64_up_down_counter("my_updown_f64_counter").build();
updown_f64_counter.add(
    -10.67,
    &[
        KeyValue::new("mykey1", "myvalue1"),
        KeyValue::new("mykey2", "myvalue2"),
    ],
);

// i64 Observable UpDownCounter
let _observable_updown_i64_counter = meter
    .i64_observable_up_down_counter("my_observable_i64_updown_counter")
    .with_description("My observable updown counter example")
    .with_unit("myunit")
    .with_callback(|observer| {
        observer.observe(
            100,
            &[
                KeyValue::new("mykey1", "myvalue1"),
                KeyValue::new("mykey2", "myvalue2"),
            ],
        )
    })
    .build();

// f64 Observable UpDownCounter
let _observable_updown_f64_counter = meter
    .f64_observable_up_down_counter("my_observable_f64_updown_counter")
    .with_description("My observable updown counter example")
    .with_unit("myunit")
    .with_callback(|observer| {
        observer.observe(
            100.0,
            &[
                KeyValue::new("mykey1", "myvalue1"),
                KeyValue::new("mykey2", "myvalue2"),
            ],
        )
    })
    .build();

// i64 Gauge
let gauge = meter.i64_gauge("my_gauge").build();
gauge.record(
-10,
&[
    KeyValue::new("mykey1", "myvalue1"),
    KeyValue::new("mykey2", "myvalue2"),
],
);

// u64 Gauge
let gauge = meter.u64_gauge("my_gauge").build();
gauge.record(
101,
&[
    KeyValue::new("mykey1", "myvalue1"),
    KeyValue::new("mykey2", "myvalue2"),
],
);

// f64 Gauge
let gauge = meter.f64_gauge("my_gauge").build();
gauge.record(
12.5,
&[
    KeyValue::new("mykey1", "myvalue1"),
    KeyValue::new("mykey2", "myvalue2"),
],
);

// u64 Observable Gauge
let _observable_u64_gauge = meter
    .u64_observable_gauge("my_u64_gauge")
    .with_description("An observable gauge set to 1")
    .with_unit("myunit")
    .with_callback(|observer| {
        observer.observe(
            1,
            &[
                KeyValue::new("mykey1", "myvalue1"),
                KeyValue::new("mykey2", "myvalue2"),
            ],
        )
    })
    .build();

// f64 Observable Gauge
let _observable_f64_gauge = meter
    .f64_observable_gauge("my_f64_gauge")
    .with_description("An observable gauge set to 1.0")
    .with_unit("myunit")
    .with_callback(|observer| {
        observer.observe(
            1.0,
            &[
                KeyValue::new("mykey1", "myvalue1"),
                KeyValue::new("mykey2", "myvalue2"),
            ],
        )
    })
    .build();

// i64 Observable Gauge
let _observable_i64_gauge = meter
    .i64_observable_gauge("my_i64_gauge")
    .with_description("An observable gauge set to 1")
    .with_unit("myunit")
    .with_callback(|observer| {
        observer.observe(
            1,
            &[
                KeyValue::new("mykey1", "myvalue1"),
                KeyValue::new("mykey2", "myvalue2"),
            ],
        )
    })
    .build();

// f64 Histogram
let f64_histogram = meter.f64_histogram("my_f64_histogram").build();
f64_histogram.record(
    10.5,
    &[
        KeyValue::new("mykey1", "myvalue1"),
        KeyValue::new("mykey2", "myvalue2"),
    ],
);

// u64 Histogram
let u64_histogram = meter.u64_histogram("my_u64_histogram").build();
u64_histogram.record(
    12,
    &[
        KeyValue::new("mykey1", "myvalue1"),
        KeyValue::new("mykey2", "myvalue2"),
    ],
);

MeterProvider trait 用于获得命名的 Meter 对象:

  • opentelemetry_prometheus crate 提供了 meterc provider 的实现;
  • 一般使用 opentelemetry_sdk::metrics::SdkMeterProvider 来获得 Meter Provider,然后使用它创建一个 Meter。
pub trait MeterProvider {
    // Required method
    fn meter_with_scope(&self, scope: InstrumentationScope) -> Meter;

    // Provided method
    fn meter(&self, name: &'static str) -> Meter { ... }
}

// 应用示例:在 main 或 app 其他启动过程中定义全局 metrics provider
use opentelemetry::metrics::{Meter};
use opentelemetry::{global, KeyValue};
use opentelemetry_sdk::metrics::SdkMeterProvider;

fn init_meter() {
    let provider = SdkMeterProvider::default();

    // Configure the global `MeterProvider` singleton when your app starts
    // (there is a no-op default if this is not set by your application)
    global::set_meter_provider(provider)
}

fn do_something_instrumented() {
    // Then you can get a named tracer instance anywhere in your codebase.
    let meter = global::meter("my-component");

    // 或者复杂的步骤:从 global 注册的 meter provider 创建  meter provider
    let provider = global::meter_provider();
    // meter used in applications
    let meter = provider.meter("my_app");

    let counter = meter.u64_counter("my_counter").build();

    // record metrics
    counter.add(1, &[KeyValue::new("mykey", "myvalue")]);
}

// in main or other app start
init_meter();
do_something_instrumented();

// lib 示例
use opentelemetry::{global, KeyValue};
pub fn my_traced_library_function() {
    // End users of your library will configure their global meter provider
    // so you can use the global meter without any setup

    let  meter = global::meter("my-library-name");
    let counter = meter.u64_counter("my_counter").build();

    // record metrics
    counter.add(1, &[KeyValue::new("mykey", "myvalue")]);
}

opentelemetry_prometheus crate 提供了 meterc provider 的实现:

use opentelemetry::{metrics::MeterProvider, KeyValue};
use opentelemetry_sdk::metrics::SdkMeterProvider;
use prometheus::{Encoder, TextEncoder};


// create a new prometheus registry
let registry = prometheus::Registry::new();

// configure OpenTelemetry to use this registry
let exporter = opentelemetry_prometheus::exporter()
    .with_registry(registry.clone())
    .build()?;

// set up a meter to create instruments
let provider = SdkMeterProvider::builder().with_reader(exporter).build();
let meter = provider.meter("my-app");

// Use two instruments
let counter = meter
    .u64_counter("a.counter")
    .with_description("Counts things")
    .build();
let histogram = meter
    .u64_histogram("a.histogram")
    .with_description("Records values")
    .build();

counter.add(100, &[KeyValue::new("key", "value")]);
histogram.record(100, &[KeyValue::new("key", "value")]);

// Encode data as text or protobuf
let encoder = TextEncoder::new();
let metric_families = registry.gather();
let mut result = Vec::new();
encoder.encode(&metric_families, &mut result)?;

// result now contains encoded metrics:
//
// # HELP a_counter_total Counts things
// # TYPE a_counter_total counter
// a_counter_total{key="value",otel_scope_name="my-app"} 100
// # HELP a_histogram Records values
// # TYPE a_histogram histogram
// a_histogram_bucket{key="value",otel_scope_name="my-app",le="0"} 0
// a_histogram_bucket{key="value",otel_scope_name="my-app",le="5"} 0
// a_histogram_bucket{key="value",otel_scope_name="my-app",le="10"} 0
// a_histogram_bucket{key="value",otel_scope_name="my-app",le="25"} 0
// a_histogram_bucket{key="value",otel_scope_name="my-app",le="50"} 0
// a_histogram_bucket{key="value",otel_scope_name="my-app",le="75"} 0
// a_histogram_bucket{key="value",otel_scope_name="my-app",le="100"} 1
// a_histogram_bucket{key="value",otel_scope_name="my-app",le="250"} 1
// a_histogram_bucket{key="value",otel_scope_name="my-app",le="500"} 1
// a_histogram_bucket{key="value",otel_scope_name="my-app",le="750"} 1
// a_histogram_bucket{key="value",otel_scope_name="my-app",le="1000"} 1
// a_histogram_bucket{key="value",otel_scope_name="my-app",le="2500"} 1
// a_histogram_bucket{key="value",otel_scope_name="my-app",le="5000"} 1
// a_histogram_bucket{key="value",otel_scope_name="my-app",le="7500"} 1
// a_histogram_bucket{key="value",otel_scope_name="my-app",le="10000"} 1
// a_histogram_bucket{key="value",otel_scope_name="my-app",le="+Inf"} 1
// a_histogram_sum{key="value",otel_scope_name="my-app"} 100
// a_histogram_count{key="value",otel_scope_name="my-app"} 1
// # HELP otel_scope_info Instrumentation Scope metadata
// # TYPE otel_scope_info gauge
// otel_scope_info{otel_scope_name="my-app"} 1
// # HELP target_info Target metadata
// # TYPE target_info gauge
// target_info{service_name="unknown_service"} 1

9.7 logs module
#

logs module 提供了 Logger/LogRecord/LoggerProvider trait 定义,以及 NoopLoggerProvider 类型。

  • LogRecord 可以定义 event name、timestamp、severity、body、attributes 和关联的 TraceContext;
  • emit() : Emit a LogRecord. If there is active current thread’s Context, the logger will set the record’s TraceContext to the active trace context,
pub trait LogRecord {
    // Required methods
    fn set_event_name(&mut self, name: &'static str);
    fn set_target<T>(&mut self, _target: T) where T: Into<Cow<'static, str>>;
    fn set_timestamp(&mut self, timestamp: SystemTime);
    fn set_observed_timestamp(&mut self, timestamp: SystemTime);
    fn set_severity_text(&mut self, text: &'static str);
    fn set_severity_number(&mut self, number: Severity);
    fn set_body(&mut self, body: AnyValue);
    fn add_attributes<I, K, V>(&mut self, attributes: I) where I: IntoIterator<Item = (K, V)>, K: Into<Key>, V: Into<AnyValue>;
    fn add_attribute<K, V>(&mut self, key: K, value: V) where K: Into<Key>, V: Into<AnyValue>;

    // Provided method
    fn set_trace_context(
        &mut self,
        trace_id: TraceId,
        span_id: SpanId,
        trace_flags: Option<TraceFlags>,
    ) { ... }
}

pub trait Logger {
    type LogRecord: LogRecord;

    // Required methods
    fn create_log_record(&self) -> Self::LogRecord;
    fn emit(&self, record: Self::LogRecord);
    fn event_enabled(&self, level: Severity, target: &str) -> bool;
}

opentelemetry 提供了如下 log macro,只用于 opentelemetry 项目内部的测试:

  • otel_debug:Macro for logging debug messages in OpenTelemetry.
  • otel_error:Macro for logging error messages in OpenTelemetry.
  • otel_info:Note: These macros (otel_info!, otel_warn!, otel_debug!, and otel_error!) are intended to be used internally within OpenTelemetry code or for custom exporters and processors. They are not designed for general application logging and should not be used for that purpose.
  • otel_warn:Macro for logging warning messages in OpenTelemetry.
use opentelemetry::otel_info;
otel_info!(name: "sdk_start", version = "1.0.0", schema_url = "http://example.com");

一般不建议直接调用 opentelemetry Logs API/SDK,而是把它作为 Logs Bridge API 来使用。即使用 opentelemetry_appender_tracing 或 opentelemetry_appender_log crate,将 tracing 或 log crate 写的日志条目发送到 opentelemetry。或者使用 tracing 来作为 logging API, tracing 支持结构化日志,而且是活跃维护的。

use opentelemetry::KeyValue;
use opentelemetry_appender_tracing::layer;
use opentelemetry_sdk::{logs::LoggerProvider, Resource};
use tracing::error;
use tracing_subscriber::prelude::*;

fn main() {
    // 1. 创建一个 LoggerProvier
    let exporter = opentelemetry_stdout::LogExporter::default();
    let provider: LoggerProvider = LoggerProvider::builder()
        .with_resource(Resource::new(vec![KeyValue::new(
            "service.name",
            "log-appender-tracing-example",
        )]))
        .with_simple_exporter(exporter)
        .build();

    // 2. 使用 LoggerProvider 创建一个
    // opentelemetry_appender_tracing::layer::OpenTelemetryTracingBridge
    // 它将 tracing event 转发到 opentelemetry logs
    let layer = layer::OpenTelemetryTracingBridge::new(&provider);


    tracing_subscriber::registry().with(layer).init();

    error!(name: "my-event-name", target: "my-system", event_id = 20, user_name = "otel",
        user_email = "[email protected]", message = "This is an example message");
    let _ = provider.shutdown();
}

9.8 propagation module
#

Propagator 是服务间传递信息的一种机制,它从应用间传递的 message 中 read/write context data。

Propagator 使用 Context 来在服务间传递信息。

当前只定义了一种 TextMapPropagator 类型,它使用字符串 key/value paris,在 Context 和 carriers 中转换:

  • carriers 是实现 Injector 和 Extractor 的类型对象,一般为 HTTP Headers 或 HashMap;
  • Inject:Propagator 负责将 Context 信息编码,然后写入 Injector 中;
  • Extract: Propagator 负责从 Extractor 中获取信息,然后写入 Context;
pub trait TextMapPropagator: Debug {
    // Required methods
    // 将 Context 内容进行字符串编码,然后写入 Injector 中。
    fn inject_context(&self, cx: &Context, injector: &mut dyn Injector);

    // 使用 Extractor 提取字符串 KV 值,然后写入到返回的 Contex 中。
    // 当提取出错或没有值时会,返回传入的 cx 值;
    fn extract_with_context( &self, cx: &Context, extractor: &dyn Extractor, ) -> Context;

    // 返回一个 Field 迭代器
    fn fields(&self) -> FieldIter<'_> ;

    // Provided methods
    // 将 current Context 进行字符串编码,然后注入到 Injector 中
    fn inject(&self, injector: &mut dyn Injector) { ... }

    // 使用 Extractor 提取字符串 KV 值,然后写入到返回的 Contex 中。
    // 当提取出错或没有值时会,返回传入的 current Context;
    fn extract(&self, extractor: &dyn Extractor) -> Context { ... }
}

实现 TextMapPropagator 的类型:

  • opentelemetry::propagation 的 NoopTextMapPropagator 和 TextMapCompositePropagator;
  • opentelemetry_sdk::propagation 的 BaggagePropagator 和 TraceContextPropagator;
impl TextMapPropagator for NoopTextMapPropagator
impl TextMapPropagator for TextMapCompositePropagator

Baggage: 是 KV 字符串集合,每个 KV 可以可选的关联 metadata:

  • Baggage 是 A set of name/value pairs describing user-defined properties.
  • Baggage Name: ASCII strings according to the token format, defined in RFC2616, Section 2.2
  • Baggage Value:URL encoded UTF-8 strings.
  • Baggage Value Metadata: Additional metadata can be added to values in the form of a property set;

Baggage 的主要使用场景是 annotate telemtry metrics、traces、logs,给它添加 context 和 information。

  • Baggage 可以独立于 SpanContext/TraceContext 使用。
pub struct Baggage { /* private fields */ }
    pub fn new() -> Self
    pub fn get<K: AsRef<str>>(&self, key: K) -> Option<&Value>
    pub fn get_with_metadata<K: AsRef<str>>(&self, key: K,) -> Option<&(Value, BaggageMetadata)>
    pub fn insert<K, V>(&mut self, key: K, value: V) -> Option<Value> where K: Into<Key>, V: Into<Value>,
    pub fn insert_with_metadata<K, V, S>( &mut self, key: K, value: V, metadata: S,) -> Option<(Value, BaggageMetadata)>
    where
        K: Into<Key>,
        V: Into<Value>,
        S: Into<BaggageMetadata>,
    pub fn remove<K: Into<Key>>( &mut self, key: K,) -> Option<(Value, BaggageMetadata)>
    pub fn len(&self) -> usize
    pub fn is_empty(&self) -> bool
    pub fn iter(&self) -> Iter<'_> 

// BaggageMetadata 用于保存分号风格的 k=v 值列表,如 ;k1=v1;k2;k3=v3
pub struct BaggageMetadata(/* private fields */);
    pub fn as_str(&self) -> &str

opentelemetry 为 Context 实现了 BaggageExt trait,从而可以为 Contex 添加和提取 Baggage 数据:

pub trait BaggageExt {
    // Required methods
    fn with_baggage<T: IntoIterator<Item = I>, I: Into<KeyValueMetadata>>(
        &self,
        baggage: T,
    ) -> Self;

    fn current_with_baggage<T: IntoIterator<Item = I>, I: Into<KeyValueMetadata>>(
        baggage: T,
    ) -> Self;

    fn with_cleared_baggage(&self) -> Self;

    // 返回 Context 关联的 Baggage
    fn baggage(&self) -> &Baggage;
}

impl BaggageExt for Context


// 示例
use opentelemetry::{baggage::BaggageExt, Context, KeyValue, Value};

let cx = Context::current_with_baggage(vec![KeyValue::new("my-name", "my-value")]);

assert_eq!(
    cx.baggage().get("my-name"),
    Some(&Value::from("my-value")),
)

BaggagePropagator:在 opentelemetry_sdk::propagation 中定义,实现的 Extractor 按照 W3C Baggage format 从 HTTP Headers 中提取)Baggage 并注入到 Context 中,实现的 Injector 将 Context 中的 Baggage 按照 W3C Baggage format 注入到 HTTP Headers 中。

  • 使用 W3C Baggage format 来传播 name-value pairs 值: https://w3c.github.io/baggage/

    • 使用 baggage HTTP Header 来传播 name-value pairs 值,值是 UTF8 编码的字符串;
    • key=value 之间用逗号风格,KV 关联的 metadata 也是 KV 格式但是用分号分割;
    • 示例:

    baggage: key1=value1;property1;property2, key2 = value2, key3=value3; propertyKey=propertyValue

  • 实现了opentelemetry::propagation::TextMapPropagator trait:

    • 从 HTTP baggage Header 提取 name-value pairs 并注入到 Context 中;
      • Context 实现了 BaggageExt,使用它的 with_baggage() 方法来注入 Baggage;
    • 将 Context 中 name-value 值按照 W3C Baggage format 写入到 Headers 中;
pub struct BaggagePropagator { /* private fields */ }

// 示例
use opentelemetry::{baggage::BaggageExt, KeyValue, propagation::TextMapPropagator};
use opentelemetry_sdk::propagation::BaggagePropagator;
use std::collections::HashMap;

// Example baggage value passed in externally via http headers
let mut headers = HashMap::new();
headers.insert("baggage".to_string(), "user_id=1".to_string());

let propagator = BaggagePropagator::new();
// can extract from any type that impls `Extractor`, usually an HTTP header map
let cx = propagator.extract(&headers);

// Iterate over extracted name-value pairs
for (name, value) in cx.baggage() {
    // ...
}

// Add new baggage
let cx_with_additions = cx.with_baggage(vec![KeyValue::new("server_id", 42)]);

// Inject baggage into http request
propagator.inject_context(&cx_with_additions, &mut headers);

let header_value = headers.get("baggage").expect("header is injected");
assert!(header_value.contains("user_id=1"), "still contains previous name-value");
assert!(header_value.contains("server_id=42"), "contains new name-value pair");

Context 同时实现了 opentelemetry::trace::TraceContextExt trait,用于为 Context 关联 span 和 SpanContext:

  • SpanContext 按照 W3C TraceContext format 标准进行序列化为 HTTP Header:traceparent 和 tracestate;
  • Span::span_context() 方法返回 &SpanContext;

所以, 通过为 Context 实现 TraceContextExt,可以使用 Context 保存 SpanContext,同时也可以根据 Context 中保持的 Span 来返回对应的 SpanContext。

pub trait TraceContextExt {
    // Required methods
    fn current_with_span<T: Span + Send + Sync + 'static>(span: T) -> Self;
    fn with_span<T: Span + Send + Sync + 'static>(&self, span: T) -> Self;
    fn span(&self) -> SpanRef<'_>;
    fn has_active_span(&self) -> bool;
    fn with_remote_span_context(&self, span_context: SpanContext) -> Self;
}
impl TraceContextExt for Context

pub trait Span {
    //...
    fn span_context(&self) -> &SpanContext;
    //...
}

pub struct SpanContext { /* private fields */ }

TraceContextPropagator: 在 opentelemetry_sdk::propagation 中定义,它按照 W3C TraceContext format,实现的 Extractor 从 HTTP traceparent 和 tracestate Header 提取 SpanContext 信息,并注入到 Context 中,实现的 Injector 将 Context 中的 Span 对应的 SpanContext 注入到 HTTP traceparent 和 tracestate Header:

  • traceparent: 00-0af7651916cd43dd8448eb211c80319c-b7ad6b7169203331-01
  • tracestate: vendorname1=opaqueValue1,vendorname2=opaqueValue2
  • SpanContext 信息被序列化为 W3C TraceContext format;
pub struct TraceContextPropagator { /* private fields */ }

// https://docs.rs/opentelemetry_sdk/0.27.0/src/opentelemetry_sdk/propagation/trace_context.rs.html#119
impl TextMapPropagator for TraceContextPropagator {
    /// Properly encodes the values of the `SpanContext` and injects them
    /// into the `Injector`.
    fn inject_context(&self, cx: &Context, injector: &mut dyn Injector) {
        let span = cx.span();
        let span_context = span.span_context();
        if span_context.is_valid() {
            let header_value = format!(
                "{:02x}-{}-{}-{:02x}",
                SUPPORTED_VERSION,
                span_context.trace_id(),
                span_context.span_id(),
                span_context.trace_flags() & TraceFlags::SAMPLED
            );
            injector.set(TRACEPARENT_HEADER, header_value);
            injector.set(TRACESTATE_HEADER, span_context.trace_state().header());
        }
    }

    /// Retrieves encoded `SpanContext`s using the `Extractor`. It decodes
    /// the `SpanContext` and returns it. If no `SpanContext` was retrieved
    /// OR if the retrieved SpanContext is invalid then an empty `SpanContext`
    /// is returned.
    fn extract_with_context(&self, cx: &Context, extractor: &dyn Extractor) -> Context {
        self.extract_span_context(extractor)
            .map(|sc| cx.with_remote_span_context(sc))
            .unwrap_or_else(|_| cx.clone())
    }

    fn fields(&self) -> FieldIter<'_> {
        FieldIter::new(TRACE_CONTEXT_HEADER_FIELDS.as_ref())
    }
}

opentelemetry::propagation::composite::TextMapCompositePropagator 类型支持将多个 TextMapPropagator 组合在一起,实现将它们的所有的信息在 Context 和 carriers 中转换。

  • carriers 是实现 Injector 和 Extractor 的类型对象,一般为 HTTP Headers 或 HashMap;
    • 可以自定义 Injector 和 Extractor 实现,从而实现灵活的信息转换提取。
  • 为何要组合?
    • 不同 TextMapPropagator 按照不同的方式读取、写入 carriers 中的信息;
      • BaggagePropagator:使用 HTTP baggage Header 来传播 Baggage 信息;
      • TraceContextPropagator:使用 HTTP traceparent 和 tracestate Header 来传播 SpanContext 信息;
  • TextMapCompositePropagator 按照组合的顺序来从 carriers 中注入和提取信息。
pub struct TextMapCompositePropagator { /* private fields */ }
    pub fn new(propagators: Vec<Box<dyn TextMapPropagator + Send + Sync>>) -> Self

// 示例:
use opentelemetry::{
    baggage::BaggageExt,
    propagation::{TextMapPropagator, TextMapCompositePropagator},
    trace::{TraceContextExt, Tracer, TracerProvider},
    Context, KeyValue,
};
use opentelemetry_sdk::propagation::{
    BaggagePropagator, TraceContextPropagator,
};
use opentelemetry_sdk::trace as sdktrace;
use std::collections::HashMap;

// First create 1 or more propagators
let baggage_propagator = BaggagePropagator::new();
let trace_context_propagator = TraceContextPropagator::new();

// Then create a composite propagator
let composite_propagator = TextMapCompositePropagator::new(vec![
    Box::new(baggage_propagator),
    Box::new(trace_context_propagator),
]);

// Then for a given implementation of `Injector`
let mut injector = HashMap::new();

// And a given span
let example_span = sdktrace::TracerProvider::default()
    .tracer("example-component")
    .start("span-name");

// with the current context, call inject to add the headers
composite_propagator.inject_context(
    &Context::current_with_span(example_span)
        .with_baggage(vec![KeyValue::new("test", "example")]),
    &mut injector,
);

// The injector now has both `baggage` and `traceparent` headers
assert!(injector.get("baggage").is_some());
assert!(injector.get("traceparent").is_some());

为了方便使用,一般创建 opentelemetry::global::set_text_map_propagator() 来设置全局 TextMapPropagator,然后在其它函数中使用 opentelemetry::global::get_text_map_propagator() 来获取和使用它:

// https://medium.com/netwo/distributed-tracing-in-rust-b8eb2af3aff4

use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::util::SubscriberInitExt;
use tracing_subscriber::EnvFilter;

pub fn configure_tracing(level: String, service_name: String) {
    opentelemetry::global::set_text_map_propagator(opentelemetry_zipkin::Propagator::new());

    let tracer = opentelemetry_zipkin::new_pipeline()
        .with_service_name(service_name)
        .install_batch(opentelemetry::runtime::Tokio)
        .expect("unable to install zipkin tracer");

    let tracer = tracing_opentelemetry::layer().with_tracer(tracer);

    let subscriber = tracing_subscriber::fmt::layer().json();

    let level = EnvFilter::new(level);

    tracing_subscriber::registry()
        .with(subscriber)
        .with(level)
        .with(tracer)
        .init();
}

fn correlate_trace_from_delivery(delivery: Delivery) -> Delivery {
    let span = Span::current();

    let headers = &delivery
        .properties
        .headers()
        .clone()
        .unwrap_or_default()
        .inner()
        .clone();
    let parent_cx = opentelemetry::global::get_text_map_propagator(|propagator| {
        propagator.extract(&AmqpHeaderCarrier::new(headers))
    });

    span.set_parent(parent_cx);

    delivery
}

// 另一个实现 Injector/Extractor 的例子
//  https://www.hamzak.xyz/blog-posts/tracing-in-rust-a-comprehensive-guide
use opentelemetry::{
    global,
    propagation::{Extractor, Injector},
};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;

/// Serializable datastructure to hold the opentelemetry propagation context.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PropagationContext(HashMap<String, String>);

impl PropagationContext {
    fn empty() -> Self {
        Self(HashMap::new())
    }

    pub fn inject(context: &opentelemetry::Context) -> Self {
        global::get_text_map_propagator(|propagator| {
            let mut propagation_context = PropagationContext::empty();
            propagator.inject_context(context, &mut propagation_context);
            propagation_context
        })
    }

    pub fn extract(&self) -> opentelemetry::Context {
        global::get_text_map_propagator(|propagator| propagator.extract(self))
    }
}

impl Injector for PropagationContext {
    fn set(&mut self, key: &str, value: String) {
        self.0.insert(key.to_owned(), value);
    }
}

impl Extractor for PropagationContext {
    fn get(&self, key: &str) -> Option<&str> {
        let key = key.to_owned();
        self.0.get(&key).map(|v| v.as_ref())
    }

    fn keys(&self) -> Vec<&str> {
        self.0.keys().map(|k| k.as_ref()).collect()
    }
}

另一个例子,HTTP 请求和响应的分布式追踪:

// 从 req 中提取 Context 信息
fn get_parent_context(req: Request<Body>) -> Context {
    global::get_text_map_propagator(|propagator| {
        propagator.extract(&HeaderExtractor(req.headers()))
    })
}

async fn incoming_request(req: Request<Body>) -> Result<Response<Body>, Infallible> {
    // 从请求获得 Context
    let parent_cx = get_parent_context(req);

    // 创建一个包含 Context 的 Span
    let mut span = global::tracer("manual-server")
        .start_with_context("my-server-span", &parent_cx);

    span.set_attribute(KeyValue::new("my-server-key-1", "my-server-value-1"));

    // TODO Your incoming_request code goes here
}

// 向发出的 hyper 请求注入 context
async fn outgoing_request(context: Context-> std::result::Result<(), Box<dyn std::error::Error + Send + Sync + 'static>> {
    let client = Client::new();

    // 创建一个包含 Context 的 Span
    let span = global::tracer("manual-client")
        .start_with_context("my-client-span", &context);

    // 从 Span 创建一个 Context
    let cx = Context::current_with_span(span);

    let mut req = hyper::Request::builder().uri("<HTTP_URL>");

    // 将当前 Context 注入到 req Headers,从而实现分布式追踪
    global::get_text_map_propagator(|propagator| {
        propagator.inject_context(&cx, &mut HeaderInjector(&mut req.headers_mut().unwrap()))
    });

    cx.span().set_attribute(KeyValue::new("my-client-key-1", "my-client-value-1"));

    // TODO Your outgoing_request code goes here
}

9.9 opentelemetry-http crate
#

opentelemetry-http crate 是和 opentelemetry crate 的 propagation module 联合使用的,实现了 propagation module 定义的 Injector、Extractor,它从 HTTP Header 中提取和注入相关信息,进而实现分布式追踪。

opentelemetry-http 示例:

// https://github.com/SigNoz/sample-rust-app/blob/main/src/main.rs
#![warn(rust_2018_idioms)]

use opentelemetry::global::shutdown_tracer_provider;
use opentelemetry::sdk::Resource;
use opentelemetry::trace::TraceError;
use opentelemetry::{global, sdk::trace as sdktrace};
use opentelemetry::{trace::Tracer};
use opentelemetry_otlp::WithExportConfig;
use std::error::Error;

use hyper::{body::Body, Method, Request, Response, Server, StatusCode};
use hyper::service::{make_service_fn, service_fn};

use opentelemetry_http::HeaderExtractor;
use std::collections::HashMap;
use url::form_urlencoded;

static INDEX: &[u8] = b"<html><body><form action=\"post\" method=\"post\">Name: <input type=\"text\" name=\"name\"><br>Number: <input type=\"text\" name=\"number\"><br><input type=\"submit\"></body></html>";
static MISSING: &[u8] = b"Missing field";
static NOTNUMERIC: &[u8] = b"Number field is not numeric";

fn fibonacci(n: u8) -> u64 {
    match n {
        0 => 1,
        1 => 1,
        _ => fibonacci(n - 1) + fibonacci(n - 2),
    }
}

// Using service_fn, we can turn this function into a `Service`.
async fn handle(req: Request<Body>) -> Result<Response<Body>, hyper::Error> {
    let tracer = global::tracer("global_tracer");

    match (req.method(), req.uri().path()) {
        (&Method::GET, "/") | (&Method::GET, "/post") => Ok(Response::new(INDEX.into())),
        (&Method::POST, "/post") => {

            // Extract the incoming context
            let parent_cx = global::get_text_map_propagator(|propagator| {
                propagator.extract(&HeaderExtractor(req.headers()))
            });
            tracer.start_with_context("fibonacci", &parent_cx);

            // Concatenate the body...
            let b = hyper::body::to_bytes(req).await?;

            // Parse the request body. form_urlencoded::parse
            // always succeeds, but in general parsing may
            // fail (for example, an invalid post of json), so
            // returning early with BadRequest may be
            // necessary.
            //
            // Warning: this is a simplified use case. In
            // principle names can appear multiple times in a
            // form, and the values should be rolled up into a
            // HashMap<String, Vec<String>>. However in this
            // example the simpler approach is sufficient.
            let params = form_urlencoded::parse(b.as_ref())
                .into_owned()
                .collect::<HashMap<String, String>>();

            // Validate the request parameters, returning
            // early if an invalid input is detected.
            let name = if let Some(n) = params.get("name") {
                n
            } else {
                return Ok(Response::builder()
                    .status(StatusCode::UNPROCESSABLE_ENTITY)
                    .body(MISSING.into())
                    .unwrap());
            };
            let number = if let Some(n) = params.get("number") {
                if let Ok(v) = n.parse::<u8>() {
                    v
                } else {
                    return Ok(Response::builder()
                        .status(StatusCode::UNPROCESSABLE_ENTITY)
                        .body(NOTNUMERIC.into())
                        .unwrap());
                }
            } else {
                return Ok(Response::builder()
                    .status(StatusCode::UNPROCESSABLE_ENTITY)
                    .body(MISSING.into())
                    .unwrap());
            };

            let nth_fib = fibonacci(number);

            // Render the response. This will often involve
            // calls to a database or web service, which will
            // require creating a new stream for the response
            // body. Since those may fail, other error
            // responses such as InternalServiceError may be
            // needed here, too.

            let body = format!(
                "Hello {}, {}th fibonacci number is {}",
                name, number, nth_fib
            );
            Ok(Response::new(body.into()))
        }
        (&Method::GET, "/get") => {
            let query = if let Some(q) = req.uri().query() {
                q
            } else {
                return Ok(Response::builder()
                    .status(StatusCode::UNPROCESSABLE_ENTITY)
                    .body(MISSING.into())
                    .unwrap());
            };
            let params = form_urlencoded::parse(query.as_bytes())
                .into_owned()
                .collect::<HashMap<String, String>>();
            let page = if let Some(p) = params.get("page") {
                p
            } else {
                return Ok(Response::builder()
                    .status(StatusCode::UNPROCESSABLE_ENTITY)
                    .body(MISSING.into())
                    .unwrap());
            };
            let body = format!("You requested {}", page);
            Ok(Response::new(body.into()))
        }
        _ => Ok(Response::builder()
            .status(StatusCode::NOT_FOUND)
            .body(Body::empty())
            .unwrap()),
    }
}

fn init_tracer() -> Result<sdktrace::Tracer, TraceError> {
    opentelemetry_otlp::new_pipeline()
        .tracing()
        .with_exporter(opentelemetry_otlp::new_exporter().tonic().with_env())
        .with_trace_config(
            sdktrace::config().with_resource(Resource::default()),
        )
        .install_batch(opentelemetry::runtime::Tokio)
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error + Send + Sync + 'static>> {
    let _ = init_tracer()?;

    let addr = ([127, 0, 0, 1], 1337).into();
    let server = Server::bind(&addr).serve(make_service_fn(|_| async {
        Ok::<_, hyper::Error>(service_fn(handle))
    }));

    println!("Listening on {}", addr);
    if let Err(e) = server.await {
        eprintln!("server error: {}", e);
    }

    shutdown_tracer_provider();

    Ok(())
}
// https://docs.dynatrace.com/docs/extend-dynatrace/opentelemetry/walkthroughs/rust

use std::collections::HashMap;
use std::io::{BufRead, BufReader, Read};

use opentelemetry::{
    global,
    trace::{Span, TraceContextExt, TraceError, Tracer},
    Context, KeyValue,
};

use opentelemetry_sdk::{runtime, trace as sdktrace, Resource};
use opentelemetry_http::{HeaderExtractor, HeaderInjector};
use opentelemetry_otlp::WithExportConfig;
use opentelemetry_semantic_conventions as semcov;

fn init_opentelemetry() -> Result<sdktrace::Tracer, TraceError>  {
    // Helper function to read potentially available OneAgent data
    fn read_dt_metadata() -> Resource {
        fn read_single(path: &str, metadata: &mut Vec<KeyValue>) -> std::io::Result<()> {
            let mut file = std::fs::File::open(path)?;
            if path.starts_with("dt_metadata") {
                let mut name = String::new();
                file.read_to_string(&mut name)?;
                file = std::fs::File::open(name)?;
            }
            for line in BufReader::new(file).lines() {
                if let Some((k, v)) = line?.split_once('=') {
                    metadata.push(KeyValue::new(k.to_string(), v.to_string()))
                }
            }
            Ok(())
        }
        let mut metadata = Vec::new();
        for name in [
            "dt_metadata_e617c525669e072eebe3d0f08212e8f2.properties",
            "/var/lib/dynatrace/enrichment/dt_metadata.properties",
            "/var/lib/dynatrace/enrichment/dt_host_metadata.properties"
        ] {
            let _ = read_single(name, &mut metadata);
        }
        Resource::new(metadata)
    }

    // ===== GENERAL SETUP =====
    let DT_API_TOKEN = env::var("DT_API_TOKEN").unwrap(); // TODO: change
    let DT_API_URL = env::var("DT_API_URL").unwrap();

    let mut map = HashMap::new();
    map.insert("Authorization".to_string(), format!("Api-Token {}", DT_API_TOKEN));
    let mut resource = Resource::new([
     //TODO Replace with the name of your application
    KeyValue::new(semcov::resource::SERVICE_NAME, "rust-app")
    ]);
    resource = resource.merge(&read_dt_metadata());

    // ===== TRACING SETUP =====
    global::set_text_map_propagator(TraceContextPropagator::new());
    opentelemetry_otlp::new_pipeline()
        .tracing()
        .with_exporter(
            opentelemetry_otlp::new_exporter()
                .http()
                .with_endpoint(DT_API_URL)
                .with_headers(map),
        )
        .with_trace_config(
            sdktrace::config()
                .with_resource(resource)
                .with_sampler(sdktrace::Sampler::AlwaysOn),
        )
        // 设置 opentelemetry global 的 trace provider
        .install_batch(runtime::Tokio)
}

init_opentelemetry()

// 后续,在需要的位置,从 global 获取命名的 tracer
let tracer = global::tracer("my-tracer");

// 从 tracer 创建 span
let mut span = tracer.start("Call to /myendpoint");
span.set_attribute(KeyValue::new("http.method", "GET"));
span.set_attribute(KeyValue::new("net.protocol.version", "1.1"));
// TODO: Your code goes here
span.end();

// Ensure context propagation optional
// Context propagation is particularly important when network calls (for example, REST) are involved.

//Method to extract the parent context from the request
fn get_parent_context(req: Request<Body>) -> Context {
    global::get_text_map_propagator(|propagator| {
        propagator.extract(&HeaderExtractor(req.headers()))
    })
}

async fn incoming_request(req: Request<Body>) -> Result<Response<Body>, Infallible> {
    let parent_cx = get_parent_context(req);
    let mut span = global::tracer("manual-server")
        .start_with_context("my-server-span", &parent_cx); //TODO Replace with the name of your span
    span.set_attribute(KeyValue::new("my-server-key-1", "my-server-value-1")); //TODO Add attributes
    // TODO Your incoming_request code goes here
}

// 向发出的 hyper 请求注入 context
async fn outgoing_request(
    context: Context,
) -> std::result::Result<(), Box<dyn std::error::Error + Send + Sync + 'static>> {
    let client = Client::new();
    let span = global::tracer("manual-client")
        .start_with_context("my-client-span", &context); //TODO Replace with the name of your span
    let cx = Context::current_with_span(span);

    let mut req = hyper::Request::builder().uri("<HTTP_URL>");

    //Method to inject the current context in the request
    global::get_text_map_propagator(|propagator| {
        propagator.inject_context(&cx, &mut HeaderInjector(&mut req.headers_mut().unwrap()))
    });

    cx.span()
        .set_attribute(KeyValue::new("my-client-key-1", "my-client-value-1")); //TODO Add attributes

    // TODO Your outgoing_request code goes here
}

9.10 global module
#

global module 为 global telemetry primitives 提供了一些 utils:

  1. Global Trace API;
  2. Global Metrics API;

该 module 提供了设置 global meter provider、tracer provider 和 text map propagator 的函数,而 opentelemetry_sdk 提供了各种 Provider 的实现。

global tracer provider:

  • set_tracer_provider trace Sets the given TracerProvider instance as the current global provider.
  • shutdown_tracer_provider trace Shut down the current tracer provider. This will invoke the shutdown method on all span processors. span processors should export remaining spans before return
  • tracer trace Creates a named instance of Tracer via the configured GlobalTracerProvider.
  • tracer_provider trace Returns an instance of the currently configured global TracerProvider through GlobalTracerProvider.
  • tracer_with_scope trace Creates a Tracer with the given instrumentation scope via the configured GlobalTracerProvider.
          // Applications configure their tracer either by installing a trace pipeline, or calling set_tracer_provider.
          use opentelemetry::trace::{Tracer, noop::NoopTracerProvider};
          use opentelemetry::global;
    
          fn init_tracer() {
              // Swap this no-op provider for your tracing service of choice (jaeger, zipkin, etc)
              let provider = NoopTracerProvider::new();
    
              // Configure the global `TracerProvider` singleton when your app starts
              // (there is a no-op default if this is not set by your application)
              let _ = global::set_tracer_provider(provider);
          }
    
          fn do_something_tracked() {
              // Then you can get a named tracer instance anywhere in your codebase.
              let tracer = global::tracer("my-component");
    
              tracer.in_span("doing_work", |cx| {
                  // Traced app logic here...
              });
          }
    
          // in main or other app start
          init_tracer();
          do_something_tracked();
    
    
          // Usage in Libraries
          use std::sync::Arc;
          use opentelemetry::trace::Tracer;
          use opentelemetry::global;
          use opentelemetry::InstrumentationScope;
    
          pub fn my_traced_library_function() {
              // End users of your library will configure their global tracer provider
              // so you can use the global tracer without any setup
    
              let scope = InstrumentationScope::builder("my_library-name")
                  .with_version(env!("CARGO_PKG_VERSION"))
                  .with_schema_url("https://opentelemetry.io/schemas/1.17.0")
                  .build();
    
              let tracer = global::tracer_with_scope(scope);
    
              tracer.in_span("doing_library_work", |cx| {
                  // Traced library logic here...
              });
          }
    

global meter provider:

  • set_meter_provider metrics Sets the given MeterProvider instance as the current global meter provider.
  • meter metrics Creates a named Meter via the currently configured global MeterProvider.
  • meter_provider metrics Returns an instance of the currently configured global MeterProvider.
  • meter_with_scope metrics Creates a Meter with the given instrumentation scope.
          // Usage in Applications and libraries
          //
          // Applications and libraries can obtain meter from the global meter provider, and use the meter to create instruments to emit measurements.
          use opentelemetry::metrics::{Meter};
          use opentelemetry::{global, KeyValue};
    
              fn do_something_instrumented() {
              let meter = global::meter("my-component");
    
              // It is recommended to reuse the same counter instance for the
              // lifetime of the application
              let counter = meter.u64_counter("my_counter").build();
    
              // record measurements
              counter.add(1, &[KeyValue::new("mykey", "myvalue")]);
              }
          }
    
          // Usage in Applications
          //
          // Application owners have the responsibility to set the global meter provider.
          use opentelemetry::{global, KeyValue};
    
          fn main() {
              // Set the global meter provider
              // global::set_meter_provider(my_meter_provider().clone());
          }
    

global text map propagator:

  • set_text_map_propagator trace Sets the given TextMapPropagator propagator as the current global propagator.
  • get_text_map_propagator trace Executes a closure with a reference to the current global TextMapPropagator propagator.

10 opentelemetry_sdk crate
#

opentelemetry 定义了 LoggerProvider/MeterProvider/TracerProvider/TextMapPropagator trait, 是 opentelemetry 的 API,但并没有提供它们的实现 , 而 opentelemetry_sdk crate 提供了各种 Provider 的实现,所以 opentelemetry_sdk 是 opentelemetry 的 SDK。

opentelemetry_sdk 提供了如下 module:

  • error:Wrapper for error from trace, logs and metrics part of open telemetry.
  • export:Telemetry Export
  • logs logs:OpenTelemetry Log SDK
  • metrics metrics:The crust of the OpenTelemetry metrics SDK.
  • propagation trace:OpenTelemetry Propagators
  • resource :Representations of entities producing telemetry.
  • runtime:Provides an abstraction of several async runtimes
  • testing testing or test:In-Memory exporters for testing purpose.
  • trace trace:OpenTelemetry Trace SDK

Support for recording and exporting telemetry asynchronously and perform metrics aggregation can be added via the following flags:

  • rt-tokio: Spawn telemetry tasks using tokio’s multi-thread runtime.
  • rt-tokio-current-thread: Spawn telemetry tasks on a separate runtime so that the main runtime won’t be blocked.
  • rt-async-std: Spawn telemetry tasks using async-std’s runtime.

opentelemetry_sdk 的各种 Provider 的 Buidler 需要传入 exporter 后才能向外输出数据, 而 exporter 是其它 crate 提供的,如:

  1. opentelemetry_otel: 使用开源标准的 OTEL tracing 数据格式;
  2. opentelemetry_stdout:使用 stdout 显示 tracing 数据;

10.1 trace module
#

trace module: 定义了 Span/Tracer/TracerProvider 类型, 以及用于配置的 Config/Builder/BatchConfig/BatchConfigBuilder。

TracerPovider:

  • struct TracerProvider 用于创建和注册 Tracer 实例,它保存 SpanProcessor 的指针。
  • struct TracerProvider 实现了 opentelemetry::trace::TracerProvider trait
    • 关联类型是 opentelemetry_sdk::trace::Tracer struct 类型,它实现了 opentelemetry::trace::Tracer trait.
  • Clone 或 Drop TracerProvider 对象时,并不关闭 Span 的处理,需要调用 shutdown() 方法;
pub struct TracerProvider { /* private fields */ }

impl TracerProvider
pub fn builder() -> Builder
pub fn force_flush(&self) -> Vec<TraceResult<()>>
pub fn shutdown(&self) -> TraceResult<()>

// struct TracerProvider 实现了 opentelemetry::trace::TracerProvider trait
impl TracerProvider for TracerProvider
    type Tracer = Tracer
    fn tracer(&self, name: impl Into<Cow<'static, str>>) -> Self::Tracer
    fn tracer_with_scope(&self, scope: InstrumentationScope) -> Self::Tracer

使用 Builder 来设置 Provider 属性,然后创建 TracerPovider:

pub struct Builder { /* private fields */ }

impl Builder
    pub fn with_simple_exporter<T: SpanExporter + 'static>(self, exporter: T,) -> Self
    pub fn with_batch_exporter<T: SpanExporter + 'static, R: RuntimeChannel>(self, exporter: T, runtime: R,) -> Self
    pub fn with_span_processor<T: SpanProcessor + 'static>( self, processor: T,) -> Self
    pub fn with_config(self, config: Config) -> Self

    pub fn build(self) -> TracerProvider // 创建 TracerProvider

exporter 将一批 SpanData 发送给远程系统,可以是挨个发送,也可以是批量发送:

// SpanData 包含所有要发送的 span 数据。
pub struct SpanData {
    pub span_context: SpanContext,
    pub parent_span_id: SpanId,
    pub span_kind: SpanKind,
    pub name: Cow<'static, str>,
    pub start_time: SystemTime,
    pub end_time: SystemTime,
    pub attributes: Vec<KeyValue>,
    pub dropped_attributes_count: u32,
    pub events: SpanEvents,
    pub links: SpanLinks,
    pub status: Status,
    pub instrumentation_scope: InstrumentationScope,
}

// RuntimeChannel 为 log 和 span batch processors 提供了一定容量的 channel,用于发送和接受 Item
pub trait RuntimeChannel: Runtime {
    type Receiver<T: Debug + Send>: Stream<Item = T> + Send;
    type Sender<T: Debug + Send>: TrySend<Message = T> + Debug;

    // Required method
    fn batch_message_channel<T: Debug + Send>( &self, capacity: usize,) -> (Self::Sender<T>, Self::Receiver<T>);
}
impl RuntimeChannel for AsyncStd
impl RuntimeChannel for Tokio
impl RuntimeChannel for TokioCurrentThread

// SpanExporter 的 export() 方法返回的 ExportResult 类型
pub type ExportResult = Result<(), TraceError>;
#[non_exhaustive]
pub enum TraceError {
    ExportFailed(Box<dyn ExportError>),
    ExportTimedOut(Duration),
    TracerProviderAlreadyShutdown,
    Other(Box<dyn Error + Send + Sync + 'static>),
}

with_simple_exporter(exporter):设置 SpanExporter,挨个发送 log、span 记录;

  • SpanExporter 是协议相关的实现,用于将一批 SpanData 发送到远端系统;

with_batch_exporter(exporter,runtimeChannel): 设置 SpanExporter,批量发送 log、span 记录;

  • RuntimeChannel 为批量发送和接收 data 提供了一定容量的 channel;
  • 缺省实现:AsyncStd、Tokio 和 TokioCurrentThread

opentelemetry_sdk 提供了 SpanExporter 的缺省实现,但是都是测试目的,实际一般使用 opentelemetry_otel、 opentelemetry_stdout 等 crate 来创建 exporter:

  • InMemorySpanExporter
  • NoopSpanExporter
  • TokioSpanExporter

with_span_processor() 设置 SpanProcessor;

  • SpanProcessor 为 span start、end 定义了 hooks 接口,当 is_recording 为 ture 才会调用 span processors。
  • 缺省实现:SimpleSpanProcessor、BatchSpanProcessor
// SpanExporter 定义了如何将一批 SpanData 发送到远端系统,是协议相关的。
pub trait SpanExporter: Send + Sync + Debug {
    // Required method
    fn export( &mut self, batch: Vec<SpanData>, ) -> BoxFuture<'static, ExportResult>;

    // Provided methods
    fn shutdown(&mut self) { ... }
    fn force_flush(&mut self) -> BoxFuture<'static, ExportResult> { ... }
    fn set_resource(&mut self, _resource: &Resource) { ... }
}
// 以下三个 SpanExporter 的实现,都是测试目的,实际一般使用 opentelemetry_otel 等来创建 exporter
impl SpanExporter for InMemorySpanExporter
impl SpanExporter for NoopSpanExporter
impl SpanExporter for TokioSpanExporter

// SpanProcessor 为 span start、end 定义了 hooks 接口,当 is_recording 为 ture
// 才会调用 span processors。
pub trait SpanProcessor: Send + Sync + Debug {
    // Required methods

    // on_start is called when a Span is started.
    fn on_start(&self, span: &mut Span, cx: &Context);

    // on_end is called after a Span is ended (i.e., the end timestamp is already set).
    fn on_end(&self, span: SpanData);

    fn force_flush(&self) -> TraceResult<()>;
    fn shutdown(&self) -> TraceResult<()>;

    // Provided method
    fn set_resource(&mut self, _resource: &Resource) { ... }
}
impl SpanProcessor for SimpleSpanProcessor
impl<R: RuntimeChannel> SpanProcessor for BatchSpanProcessor<R>

SimpleSpanProcessor、BatchSpanProcessor 实现了 SpanProcessor trait,使用配置的 exporter 将 Span 发送出去:

  1. SimpleSpanProcessor 挨个将 finished 的 span 通过配置 SpanExporter 发送出去(不批量);

    • 有性能问题,一般用于 debug 和 test
  2. BatchSpanProcessor :异步的缓存 finished 的 span,然后按照配置的固定时间间隔发送出去;

    • BatchSpanProcessor 需要并发运行后台 task 来搜集和发送 span;
    • 支持 tokio、async-std 异步运行时
    • 使用 BatchConfigBuilder 来创建 BatchConfig,然后配置 BatchSpanProcessorBuilder;
// A builder for creating BatchSpanProcessor instances.
pub struct BatchSpanProcessorBuilder<E, R> { /* private fields */ }

impl<E, R> BatchSpanProcessorBuilder<E, R> where E: SpanExporter + 'static,  R: RuntimeChannel,
    pub fn with_batch_config(self, config: BatchConfig) -> Self // 配置
    pub fn build(self) -> BatchSpanProcessor<R> // 创建

// 其中的 BatchConfig 使用 opentelemetry_sdk::trace::BatchConfigBuilder 来创建
pub struct BatchConfigBuilder { /* private fields */ }
impl BatchConfigBuilder
    pub fn with_max_queue_size(self, max_queue_size: usize) -> Self
    pub fn with_max_export_batch_size(self, max_export_batch_size: usize) -> Self
    pub fn with_max_concurrent_exports(self, max_concurrent_exports: usize) -> Self
    pub fn with_scheduled_delay(self, scheduled_delay: Duration) -> Self
    pub fn with_max_export_timeout(self, max_export_timeout: Duration) -> Self

    pub fn build(self) -> BatchConfig // 创建 BatchConfig

Builder::with_config(Config) 使用 Config 来配置 SpanProcessor,包含:Sampler、IdGenerator、 SpanLimits、Resource 等:

  • 缺省的 Sampler 为 AlwayOn,也可以使用 JaegerRemoteSampler,它使用 remote sampling protoctol 来动态获取采样策略,支持三种:概率、 限速 和 基于 span operation;
  • IdGenerator 用来生成 Span 或 Trace ID,缺省为随机生成器;
  • Resource 是不可变的 telemetry attributes (KeyValue)集合,支持使用 Arc 进行高效的共享和 clone;

注意:TracerPrivder 的 Builder 的 Config 和 with_config() 接口将被删除,后续使用 with_xx() 方法来配置:

// https://github.com/open-telemetry/opentelemetry-rust/pull/2303#issue-2666251181

// old
let tracer_provider: TracerProvider = TracerProvider::builder()
    .with_config(Config::default().with_resource(Resource::empty()))
    .build();

// new
let tracer_provider: TracerProvider = TracerProvider::builder()
    .with_resource(Resource::empty())
    .build();

// old
let tracer_provider = TracerProvider::builder()
    .with_config(
        Config::default()
            .with_resource(Resource::empty())
            .with_sampler(Sampler::AlwaysOn)
            .with_max_events_per_span(16),
    )
    .build();

// new
let tracer_provider = TracerProvider::builder()
    .with_resource(Resource::empty())
    .with_sampler(Sampler::AlwaysOn)
    .with_max_events_per_span(16)
    .build();
#[non_exhaustive]
pub struct Config {
    pub sampler: Box<dyn ShouldSample>, // 设置采样器,缺省为 AlwaysOn
    pub id_generator: Box<dyn IdGenerator>, // 设置 ID 生成器,缺省为随机生成器
    pub span_limits: SpanLimits,
    pub resource: Cow<'static, Resource>,
}
impl Config
    pub fn with_sampler<T: ShouldSample + 'static>(self, sampler: T) -> Self
    pub fn with_id_generator<T: IdGenerator + 'static>( self, id_generator: T,) -> Self
    pub fn with_max_events_per_span(self, max_events: u32) -> Self
    pub fn with_max_attributes_per_span(self, max_attributes: u32) -> Self
    pub fn with_max_links_per_span(self, max_links: u32) -> Self
    pub fn with_max_attributes_per_event(self, max_attributes: u32) -> Self
    pub fn with_max_attributes_per_link(self, max_attributes: u32) -> Self
    pub fn with_span_limits(self, span_limits: SpanLimits) -> Self
    pub fn with_resource(self, resource: Resource) -> Self

pub struct SpanLimits {
    pub max_events_per_span: u32,
    pub max_attributes_per_span: u32,
    pub max_links_per_span: u32,
    pub max_attributes_per_event: u32,
    pub max_attributes_per_link: u32,
}
pub trait ShouldSample: CloneShouldSample + Send + Sync + Debug {
    // Required method
    fn should_sample(
        &self,
        parent_context: Option<&Context>,
        trace_id: TraceId,
        name: &str,
        span_kind: &SpanKind,
        attributes: &[KeyValue],
        links: &[Link],
    ) -> SamplingResult;
}
pub struct SamplingResult {
    pub decision: SamplingDecision,
    pub attributes: Vec<KeyValue>,
    pub trace_state: TraceState,
}
pub enum SamplingDecision {
    Drop,
    RecordOnly,
    RecordAndSample,
}
impl ShouldSample for Sampler
impl ShouldSample for JaegerRemoteSampler

// opentelemetry_sdk::trace enum Sampler 是缺省的内置 ShouldSample 实现(默认为 AlwaysOn):
#[non_exhaustive]
pub enum Sampler {
    AlwaysOn,
    AlwaysOff,
    ParentBased(Box<dyn ShouldSample>),
    TraceIdRatioBased(f64),
    JaegerRemote(JaegerRemoteSampler),
}

// opentelemetry_sdk::trace::JaegerRemoteSampler 也实现了 ShouldSample
// Sampler that fetches the sampling configuration from remotes.
// It offers the following sampling strategies:
// 1. Probabilistic, fetch a probability between [0.0, 1.0] from remotes and use it to sample traces. If the probability is 0.0, it will never sample traces. If the probability is 1.0, it will always sample traces.
// 2. Rate limiting, ses a leaky bucket rate limiter to ensure that traces are sampled with a certain constant rate.
// 3. Per Operations, instead of sampling all traces, it samples traces based on the span name.
//  Only probabilistic sampling is supported at the moment.
//
// User can build a JaegerRemoteSampler by getting a JaegerRemoteSamplerBuilder from
// Sampler::jaeger_remote.
//
// Note that the backend doesn’t need to be Jaeger so long as it supports jaeger remote sampling protocol.
//
pub struct JaegerRemoteSampler { /* private fields */ }

pub trait IdGenerator: Send + Sync + Debug {
    // Required methods
    fn new_trace_id(&self) -> TraceId;
    fn new_span_id(&self) -> SpanId;
}
impl IdGenerator for RandomIdGenerator

示例:

use opentelemetry::global;
use opentelemetry_sdk::{runtime, testing::trace::NoopSpanExporter, trace};
use opentelemetry_sdk::trace::BatchConfigBuilder;
use std::time::Duration;

#[tokio::main]
async fn main() {
    // Configure your preferred exporter
    let exporter = NoopSpanExporter::new();

    // Create a batch span processor using an exporter and a runtime
    let batch = trace::BatchSpanProcessor::builder(exporter, runtime::Tokio)
        .with_batch_config(BatchConfigBuilder::default().with_max_queue_size(4096).build())
        .build();

    // Then use the `with_batch_exporter` method to have the provider export spans in batches.
    let provider = trace::TracerProvider::builder()
        .with_span_processor(batch)
        .build();

    let _ = global::set_tracer_provider(provider);
}

示例:

use opentelemetry::{global, trace::{Tracer, TracerProvider as _}};
use opentelemetry_sdk::trace::TracerProvider;

fn main() {
    // Choose an exporter like `opentelemetry_stdout::SpanExporter`
    let exporter = new_exporter();

    // Create a new trace pipeline that prints to stdout
    let provider = TracerProvider::builder()
        .with_simple_exporter(exporter)
        .build();

    // Set provider to be used as global tracer provider
    let _ = global::set_tracer_provider(provider.clone());

    let tracer = provider.tracer("readme_example");

    tracer.in_span("doing_work", |cx| {
        // Traced app logic here...
    });

    // force all spans to flush
    for result in provider.force_flush() {
        if let Err(err) = result {
            // .. handle flush error
        }
    }

    // Shutdown trace pipeline
    global::shutdown_tracer_provider();
}

示例:

// https://github.com/open-telemetry/opentelemetry-rust/blob/main/examples/tracing-http-propagator/src/client.rs
use http_body_util::Full;
use hyper_util::{client::legacy::Client, rt::TokioExecutor};
use opentelemetry::{
    global,
    trace::{SpanKind, TraceContextExt, Tracer},
    Context, KeyValue,
};
use opentelemetry_http::{Bytes, HeaderInjector};
use opentelemetry_sdk::{propagation::TraceContextPropagator, trace::TracerProvider};
use opentelemetry_stdout::SpanExporter;

fn init_tracer() {
    global::set_text_map_propagator(TraceContextPropagator::new());

    // Install stdout exporter pipeline to be able to retrieve the collected spans.
    // For the demonstration, use `Sampler::AlwaysOn` sampler to sample all traces.
    let provider = TracerProvider::builder()
        .with_simple_exporter(SpanExporter::default())
        .build();

    global::set_tracer_provider(provider);
}

async fn send_request(
    url: &str,
    body_content: &str,
    span_name: &str,
) -> std::result::Result<(), Box<dyn std::error::Error + Send + Sync + 'static>> {
    let client = Client::builder(TokioExecutor::new()).build_http();
    let tracer = global::tracer("example/client");
    let span = tracer
        .span_builder(String::from(span_name))
        .with_kind(SpanKind::Client)
        .start(&tracer);
    let cx = Context::current_with_span(span);

    let mut req = hyper::Request::builder().uri(url);
    global::get_text_map_propagator(|propagator| {
        propagator.inject_context(&cx, &mut HeaderInjector(req.headers_mut().unwrap()))
    });
    let res = client
        .request(req.body(Full::new(Bytes::from(body_content.to_string())))?)
        .await?;

    cx.span().add_event(
        "Got response!",
        vec![KeyValue::new("status", res.status().to_string())],
    );

    Ok(())
}

#[tokio::main]
async fn main() -> std::result::Result<(), Box<dyn std::error::Error + Send + Sync + 'static>> {
    init_tracer();

    send_request(
        "http://127.0.0.1:3000/health",
        "Health Request!",
        "server_health_check",
    )
    .await?;
    send_request(
        "http://127.0.0.1:3000/echo",
        "Echo Request!",
        "server_echo_check",
    )
    .await?;

    Ok(())
}

示例:

// https://github.com/open-telemetry/opentelemetry-rust/blob/main/examples/tracing-jaeger/src/main.rs
use opentelemetry::global::shutdown_tracer_provider;
use opentelemetry::{
    global,
    trace::{TraceContextExt, TraceError, Tracer},
    KeyValue,
};
use opentelemetry_sdk::trace::TracerProvider;
use opentelemetry_sdk::{runtime, trace as sdktrace, Resource};
use opentelemetry_semantic_conventions::resource::SERVICE_NAME;

use std::error::Error;

fn init_tracer_provider() -> Result<opentelemetry_sdk::trace::TracerProvider, TraceError> {
    let exporter = opentelemetry_otlp::SpanExporter::builder()
        .with_tonic()
        .build()?;

    Ok(TracerProvider::builder()
        .with_batch_exporter(exporter, runtime::Tokio)
        .with_config(
            sdktrace::Config::default().with_resource(Resource::new(vec![KeyValue::new(
                SERVICE_NAME,
                "tracing-jaeger",
            )])),
        )
        .build())
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error + Send + Sync + 'static>> {
    let tracer_provider = init_tracer_provider().expect("Failed to initialize tracer provider.");
    global::set_tracer_provider(tracer_provider.clone());

    let tracer = global::tracer("tracing-jaeger");
    tracer.in_span("main-operation", |cx| {
        let span = cx.span();
        span.set_attribute(KeyValue::new("my-attribute", "my-value"));
        span.add_event(
            "Main span event".to_string(),
            vec![KeyValue::new("foo", "1")],
        );
        tracer.in_span("child-operation...", |cx| {
            let span = cx.span();
            span.add_event("Sub span event", vec![KeyValue::new("bar", "1")]);
        });
    });

    shutdown_tracer_provider();
    Ok(())
}

10.2 metrics module
#

metrics module 定义了 SdkMeterProvider 类型,它实现了 opentelemetry::metrics::MeterProvider trait:

// pub struct SdkMeterProvider { /* private fields */ }
pub trait MeterProvider {
    // Required method
    fn meter_with_scope(&self, scope: InstrumentationScope) -> Meter;

    // Provided method
    fn meter(&self, name: &'static str) -> Meter { ... }
}

pub struct SdkMeterProvider { /* private fields */ }
impl SdkMeterProvider
    pub fn builder() -> MeterProviderBuilder
    pub fn force_flush(&self) -> MetricResult<()>
    pub fn shutdown(&self) -> MetricResult<()>

使用 SdkMeterProvider::builder() 方法返回的 MeterProviderBuilder 类型来对 SdkMeterProvider 进行配置:

  • with_resource(self, resource: Resource):
    • Resource 是一组 KeyValue 集合,会作为 attributes 加到该 Provider 创建的所有 Metric 上;
  • with_reader<T: MetricReader>(self, reader: T)
    • 关键:MetricReader 用于采集、处理(聚合)和发送 Metrics;
    • 创建 MetricReader 时,需要使用协议(实现)相关的 opentelemetry_stdout 或 opentelemetry_otel crate 创建的 MetricsExporter;
  • with_view<T: View>(self, view: T)
    • View 定义了一组对 Instrument 进行处理的逻辑(闭包实现),返回处理后的数据。
pub struct MeterProviderBuilder { /* private fields */ }

impl MeterProviderBuilder
    pub fn with_resource(self, resource: Resource) -> Self

    pub fn with_reader<T: MetricReader>(self, reader: T) -> Self

    pub fn with_view<T: View>(self, view: T) -> Self
    pub fn build(self) -> SdkMeterProvider
  1. MetricReader

    MetricReader:用于采集、处理和输出 Metrics,是连接 SDK 和 exporter 的对象。创建该对象是,需要传入 metrics push exporter,一般是其它 crate 如 opentelemetry_stdout 或 opentelemetry_otel 提供的:

    1. push-based exports:自己实现 MetricExporter,然后创建一个 PeriodicReader 来实现该 trait;
    2. pull-based exporters:自己直接实现 MetricReader,因为它自己按需 read;
    pub trait MetricReader: Debug + Send + Sync + 'static {
        // Required methods
        fn register_pipeline(&self, pipeline: Weak<Pipeline>);
    
        // collect() 搜集和处理 Metrics
        fn collect(&self, rm: &mut ResourceMetrics) -> MetricResult<()>;
    
        fn force_flush(&self) -> MetricResult<()>;
        fn shutdown(&self) -> MetricResult<()>;
    
        // The output temporality, a function of instrument kind.
        // This SHOULD be obtained from the exporter.
        // If not configured, the Cumulative temporality SHOULD be used.
        fn temporality(&self, kind: InstrumentKind) -> Temporality;
    }
    
    impl MetricReader for TestMetricReader
    impl MetricReader for ManualReader
    impl MetricReader for PeriodicReader
    impl MetricReader for PeriodicReaderWithOwnThread
    

    opentelemetry_sdk 提供了 3 种 MetricReader 实现:

    • ManualReader
    • PeriodicReader
    • PeriodicReaderWithOwnThread

    ManualReader: 实现了 MetricReader,app 可以按需读取 metrics 和处理。

    • 使用 MetricReader 的 collect() 方法来获取 ResourceMetrics 和自定义处理;
    pub struct ManualReader { /* private fields */ }
    impl ManualReader
        pub fn builder() -> ManualReaderBuilder
    
    pub struct ManualReaderBuilder { /* private fields */ }
    impl ManualReaderBuilder
        pub fn new() -> Self
        pub fn with_temporality(self, temporality: Temporality) -> Self
        pub fn build(self) -> ManualReader
    
    // Defines the window that an aggregation was calculated over.
    #[non_exhaustive]
    pub enum Temporality {
        Cumulative,
        Delta,
        LowMemory,
    }
    
    // 示例:
    use opentelemetry_sdk::metrics::ManualReader;
    // can specify additional reader configuration
    let reader = ManualReader::builder().build();
    

    PeriodicReader: 实现了 MetricReader,周期自动调用 collect() 来搜集数据,然后将数据发送到远端系统;

    • 传入 PushMetricExporter 和 Runtime,用来周期向远端发送 metric 数据;
    • 发送的 interval、timeout 是可以配置的;
    pub struct PeriodicReader { /* private fields */ }
    impl PeriodicReader
        // Runtime 可选实现:
        //   1. opentelemetry_sdk::runtime::TokioCurrentThread
        //   2. opentelemetry_sdk::runtime::Tokio
        //   3. opentelemetry_sdk::runtime::AsyncStd
        pub fn builder<E, RT>(exporter: E, runtime: RT) -> PeriodicReaderBuilder<E, RT>
            where E: PushMetricExporter, RT: Runtime,
    
    pub struct PeriodicReaderBuilder<E, RT> { /* private fields */ }
    impl<E, RT> PeriodicReaderBuilder<E, RT> where E: PushMetricExporter, RT: Runtime,
        pub fn with_interval(self, interval: Duration) -> Self
        pub fn with_timeout(self, timeout: Duration) -> Self
        pub fn build(self) -> PeriodicReader
    
    // 示例
    use opentelemetry_sdk::metrics::PeriodicReader;
    let exporter = get_exporter(); // set up a push exporter like OTLP
    let runtime = get_runtime(); // select runtime: e.g. opentelemetry_sdk:runtime::Tokio
    let reader = PeriodicReader::builder(exporter, runtime).build();
    

    PeriodicReaderWithOwnThread: 和 PeriodicReader 类似,但是由于使用自己的 backgroup thread 来 push metrics,所以不需要任何 async runtime。不能和 OTLP exporter 联合使用,因为它要求 async runtime。

    pub struct PeriodicReaderWithOwnThread { /* private fields */ }
    impl PeriodicReaderWithOwnThread
        pub fn builder<E>(exporter: E) -> PeriodicReaderWithOwnThreadBuilder<E> where E: PushMetricExporter,
    
    pub struct PeriodicReaderWithOwnThreadBuilder<E> { /* private fields */ }
    impl<E> PeriodicReaderWithOwnThreadBuilder<E> where E: PushMetricExporter,
        pub fn with_interval(self, interval: Duration) -> Self
        pub fn with_timeout(self, timeout: Duration) -> Self
        pub fn build(self) -> PeriodicReaderWithOwnThread
    

    PeriodicReader/PeriodicReaderWithOwnThread 使用的 PushMetricExporter 是远端实现相关的:

    • 一般是其它 crate 如 opentelemetry_stdout/opentelemetry_otel 提供的,
    • export(ResourceMetrics) 方法将 ResourceMetrics 中的一批 ScopeMetrics 发送到远端系统;
    • ScopeMetrics 是 InstrumentationScope + Vec<Metric>,前者一般是 Instrumentation Libr 的元数据;
    • Metric 是一个 Aggregation 后的 time series,包含 name/description/unit/data;
    • Aggregation trait 定义了
    pub trait PushMetricExporter: Send + Sync + 'static {
        // Required methods
        fn export<'life0, 'life1, 'async_trait>( &'life0 self, metrics: &'life1 mut ResourceMetrics,) -> Pin<Box<dyn Future<Output = MetricResult<()>> + Send + 'async_trait>>
           where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait;
    
        fn force_flush<'life0, 'async_trait>( &'life0 self, ) -> Pin<Box<dyn Future<Output = MetricResult<()>> + Send + 'async_trait>>
           where Self: 'async_trait, 'life0: 'async_trait;
        fn shutdown(&self) -> MetricResult<()>;
        fn temporality(&self) -> Temporality;
    }
    
    // A collection of ScopeMetrics and the associated Resource that created them.
    pub struct ResourceMetrics {
        pub resource: Resource,
        pub scope_metrics: Vec<ScopeMetrics>,
    }
    
    // A collection of metrics produced by a meter.
    pub struct ScopeMetrics {
        pub scope: InstrumentationScope,
        pub metrics: Vec<Metric>,
    }
    
    // opentelemetry::InstrumentationScope
    #[non_exhaustive]
    pub struct InstrumentationScope { /* private fields */ }
    impl InstrumentationScope
        pub fn builder<T: Into<Cow<'static, str>>>( name: T,) -> InstrumentationScopeBuilder
        pub fn name(&self) -> &str
        pub fn version(&self) -> Option<&str>
        pub fn schema_url(&self) -> Option<&str>
        pub fn attributes(&self) -> impl Iterator<Item = &KeyValue>
    
    // A collection of one or more aggregated time series from an Instrument.
    pub struct Metric {
        pub name: Cow<'static, str>,
        pub description: Cow<'static, str>,
        pub unit: Cow<'static, str>,
        pub data: Box<dyn Aggregation>,
    }
    
    // The store of data reported by an Instrument.
    // It will be one of: Gauge, Sum, or Histogram.
    pub trait Aggregation: Debug + Any + Send + Sync {
        // Required methods
        fn as_any(&self) -> &dyn Any;
        fn as_mut(&mut self) -> &mut dyn Any;
    }
    impl<T: Debug + Send + Sync + 'static> Aggregation for ExponentialHistogram<T>
    impl<T: Debug + Send + Sync + 'static> Aggregation for Gauge<T>
    impl<T: Debug + Send + Sync + 'static> Aggregation for Histogram<T>
    impl<T: Debug + Send + Sync + 'static> Aggregation for Sum<T>
    
  1. View

    MeterProviderBuilder 的 with_view(view) 方法用于给 MeterProvider 关联一个 View。

    • 可以调用多次 with_view() 方法,来安顺序关联多个 view;
    • 如果未调用和设置,使用 default view;

    View 的功能:对传入的 Instrument 进行匹配和处理,也即自定义处理,然后返回处理后的 Stream 对象。

    MeterProviderBuilder 内部关联的多个 View 的调用时机:

    • opentelemetry-sdk/src/metrics/pipeline.rs 的 Inserter 的 instrument() 方法,

    该方法的功能是 Inserts the provided instrument into a pipeline.

    • 所以 view 是在 SDK 到数据,发送数据前处理的。

    View 的使用场景:

    1. Customize which Instruments are to be processed/ignored. For example, an instrumented library can provide both temperature and humidity, but the application developer might only want temperature.
    2. Customize the aggregation - if the default aggregation associated with the Instrument does not meet the needs of the user. For example, an HTTP client library might expose HTTP client request duration as Histogram by default, but the application developer might only want the total count of outgoing requests.
    3. Customize which attribute(s) are to be reported on metrics. For example, an HTTP server library might expose HTTP verb (e.g. GET, POST) and HTTP status code (e.g. 200, 301, 404). The application developer might only care about HTTP status code (e.g. reporting the total count of HTTP requests for each HTTP status code). There could also be extreme scenarios in which the application developer does not need any attributes (e.g. just get the total count of all incoming requests).
    pub struct MeterProviderBuilder { /* private fields */ }
    impl MeterProviderBuilder
        pub fn with_view<T: View>(self, view: T) -> Self
    
    pub trait View: Send + Sync + 'static {
        // Required method
        fn match_inst(&self, inst: &Instrument) -> Option<Stream>;
    }
    

    闭包实现了 View, 可以来快速自定义采集数据的处理逻辑:

    impl<T> View for T where T: Fn(&Instrument) -> Option<Stream> + Send + Sync + 'static,
    
    // 示例:
    use opentelemetry_sdk::metrics::{Instrument, SdkMeterProvider, Stream};
    
    // return streams for the given instrument
    let my_view = |i: &Instrument| {
      // return Some(Stream) or
      None
    };
    
    let provider = SdkMeterProvider::builder().with_view(my_view).build();
    

    opentelemetry-sdk::metrics 提供了 new_view(criteria: Instrument, mask: Stream) 函数来创建 View 对象:

    • criteria 代表匹配条件,Instrument::name 字段值支持 wildcard pattern matching:
      • * 匹配一个活多个任意字符;
      • ? 配置一个字符;
    • Instrument 的所有 non-empty fields 如果匹配 criteria 才应用 Stream mask;
    • Steam mask 对象用于对匹配的 Instrument 对应字段进行修改,返回修改后的 Steam 对象;
      • 使用传入 Steam 对象的非空 field 来修改传入 Instrument 对应字段;
    // opentelemetry_sdk::metrics::new_view
    pub fn new_view( criteria: Instrument, mask: Stream,) -> MetricResult<Box<dyn View>>
    
    #[non_exhaustive]
    pub struct Instrument {
        pub name: Cow<'static, str>,
        pub description: Cow<'static, str>,
        pub kind: Option<InstrumentKind>,
        pub unit: Cow<'static, str>,
        pub scope: InstrumentationScope,
    }
    
    // Describes the stream of data an instrument produces.
    #[non_exhaustive]
    pub struct Stream {
        pub name: Cow<'static, str>,
        pub description: Cow<'static, str>,
        pub unit: Cow<'static, str>,
        pub aggregation: Option<Aggregation>,
        pub allowed_attribute_keys: Option<Arc<HashSet<Key>>>,
    }
    impl Stream
        pub fn new() -> Self
        pub fn name(self, name: impl Into<Cow<'static, str>>) -> Self
        pub fn description(self, description: impl Into<Cow<'static, str>>) -> Self
        pub fn unit(self, unit: impl Into<Cow<'static, str>>) -> Self
        pub fn aggregation(self, aggregation: Aggregation) -> Self
        pub fn allowed_attribute_keys(self,attribute_keys: impl IntoIterator<Item = Key>,) -> Self
    
    // 示例
    use opentelemetry_sdk::metrics::{new_view, Aggregation, Instrument, Stream};
    let criteria = Instrument::new().name("counter_*");
    let mask = Stream::new().aggregation(Aggregation::Sum);
    let view = new_view(criteria, mask);
    
  1. SdkMeterProvider 示例

    // https://github.com/open-telemetry/opentelemetry-rust/blob/main/examples/metrics-basic/src/main.rs
    use opentelemetry::{global, KeyValue};
    use opentelemetry_sdk::metrics::{PeriodicReader, SdkMeterProvider};
    use opentelemetry_sdk::{runtime, Resource};
    use std::error::Error;
    use std::vec;
    
    fn init_meter_provider() -> opentelemetry_sdk::metrics::SdkMeterProvider {
        let exporter = opentelemetry_stdout::MetricExporterBuilder::default()
            // Build exporter using Delta Temporality (Defaults to Temporality::Cumulative)
            // .with_temporality(opentelemetry_sdk::metrics::Temporality::Delta)
            .build();
    
        let reader = PeriodicReader::builder(exporter, runtime::Tokio).build();
    
        let provider = SdkMeterProvider::builder()
            .with_reader(reader)
            .with_resource(Resource::new([KeyValue::new(
                "service.name",
                "metrics-basic-example",
            )]))
            .build();
        global::set_meter_provider(provider.clone());
        provider
    }
    
    #[tokio::main]
    async fn main() -> Result<(), Box<dyn Error + Send + Sync + 'static>> {
        // Initialize the MeterProvider with the stdout Exporter.
        let meter_provider = init_meter_provider();
    
        // Create a meter from the above MeterProvider.
        let meter = global::meter("mylibraryname");
    
        // Create a Counter Instrument.
        let counter = meter.u64_counter("my_counter").build();
    
        // Record measurements using the Counter instrument.
        counter.add(
            10,
            &[
                KeyValue::new("mykey1", "myvalue1"),
                KeyValue::new("mykey2", "myvalue2"),
            ],
        );
    
        // Create a ObservableCounter instrument and register a callback that reports the measurement.
        let _observable_counter = meter
            .u64_observable_counter("my_observable_counter")
            .with_description("My observable counter example description")
            .with_unit("myunit")
            .with_callback(|observer| {
                observer.observe(
                    100,
                    &[
                        KeyValue::new("mykey1", "myvalue1"),
                        KeyValue::new("mykey2", "myvalue2"),
                    ],
                )
            })
            .build();
    
        // Create a UpCounter Instrument.
        let updown_counter = meter.i64_up_down_counter("my_updown_counter").build();
    
        // Record measurements using the UpCounter instrument.
        updown_counter.add(
            -10,
            &[
                KeyValue::new("mykey1", "myvalue1"),
                KeyValue::new("mykey2", "myvalue2"),
            ],
        );
    
        // Create a Observable UpDownCounter instrument and register a callback that reports the measurement.
        let _observable_up_down_counter = meter
            .i64_observable_up_down_counter("my_observable_updown_counter")
            .with_description("My observable updown counter example description")
            .with_unit("myunit")
            .with_callback(|observer| {
                observer.observe(
                    100,
                    &[
                        KeyValue::new("mykey1", "myvalue1"),
                        KeyValue::new("mykey2", "myvalue2"),
                    ],
                )
            })
            .build();
    
        // Create a Histogram Instrument.
        let histogram = meter
            .f64_histogram("my_histogram")
            .with_description("My histogram example description")
            // Setting boundaries is optional. By default, the boundaries are set to
            // [0.0, 5.0, 10.0, 25.0, 50.0, 75.0, 100.0, 250.0, 500.0, 750.0, 1000.0, 2500.0, 5000.0, 7500.0, 10000.0]
            .with_boundaries(vec![0.0, 5.0, 10.0, 15.0, 20.0, 25.0])
            .build();
    
        // Record measurements using the histogram instrument.
        histogram.record(
            10.5,
            &[
                KeyValue::new("mykey1", "myvalue1"),
                KeyValue::new("mykey2", "myvalue2"),
            ],
        );
    
        // Note that there is no ObservableHistogram instrument.
    
        // Create a Gauge Instrument.
        let gauge = meter
            .f64_gauge("my_gauge")
            .with_description("A gauge set to 1.0")
            .with_unit("myunit")
            .build();
    
        gauge.record(
            1.0,
            &[
                KeyValue::new("mykey1", "myvalue1"),
                KeyValue::new("mykey2", "myvalue2"),
            ],
        );
    
        // Create a ObservableGauge instrument and register a callback that reports the measurement.
        let _observable_gauge = meter
            .f64_observable_gauge("my_observable_gauge")
            .with_description("An observable gauge set to 1.0")
            .with_unit("myunit")
            .with_callback(|observer| {
                observer.observe(
                    1.0,
                    &[
                        KeyValue::new("mykey1", "myvalue1"),
                        KeyValue::new("mykey2", "myvalue2"),
                    ],
                )
            })
            .build();
    
        // Metrics are exported by default every 30 seconds when using stdout exporter,
        // however shutting down the MeterProvider here instantly flushes
        // the metrics, instead of waiting for the 30 sec interval.
        meter_provider.shutdown()?;
        Ok(())
    }
    

10.3 propagation module
#

propagation module:提供了 BaggagePropagator 和 TraceContextPropagator struct,它们都实现了 opentelemetry::propagation::text_map_propagator::TextMapPropagator。

这两个 TextMapPropagator 实现在 opentelemetry crate 的 propagation module 中使用,用于在 carries(如 HashMap 或 HTTP Header)和 Context 中传递分布式追踪信息。

use opentelemetry::{baggage::BaggageExt, Key, propagation::TextMapPropagator};
use opentelemetry_sdk::propagation::BaggagePropagator;
use std::collections::HashMap;

// Example baggage value passed in externally via http headers
let mut headers = HashMap::new();
headers.insert("baggage".to_string(), "user_id=1".to_string());

let propagator = BaggagePropagator::new();
// can extract from any type that impls `Extractor`, usually an HTTP header map
let cx = propagator.extract(&headers);

// Iterate over extracted name-value pairs
for (name, value) in cx.baggage() {
    // ...
}

// Add new baggage
let cx_with_additions = cx.with_baggage(vec![Key::new("server_id").i64(42)]);

// Inject baggage into http request
propagator.inject_context(&cx_with_additions, &mut headers);

let header_value = headers.get("baggage").expect("header is injected");
assert!(header_value.contains("user_id=1"), "still contains previous name-value");
assert!(header_value.contains("server_id=42"), "contains new name-value pair");

10.4 logs module
#

opentelemetry_sdk::logs module 的接口和 opentelemetry_sdk::metrics modules 的接口非常像。

LoggerProvider 类型实现了 opentelemetry::logs::LoggerProvider trait。

LogExporter 和具体实现相关,由其它 crate 如 opentelemetry_otel 和 opentelemetry_stdout 等提供。

pub struct LoggerProvider { /* private fields */ }
impl LoggerProvider
    pub fn builder() -> Builder
    pub fn force_flush(&self) -> Vec<LogResult<()>>
    pub fn shutdown(&self) -> LogResult<()>


pub struct Builder { /* private fields */ }
impl Builder
    pub fn with_simple_exporter<T: LogExporter + 'static>(self, exporter: T) -> Self
    pub fn with_batch_exporter<T: LogExporter + 'static, R: RuntimeChannel>( self, exporter: T, runtime: R,) -> Self
    pub fn with_log_processor<T: LogProcessor + 'static>(self, processor: T) -> Self
    pub fn with_resource(self, resource: Resource) -> Self
    pub fn build(self) -> LoggerProvider

pub trait LogExporter: Send + Sync + Debug {
    // Required method
    fn export<'life0, 'life1, 'async_trait>( &'life0 mut self, batch: LogBatch<'life1>, ) -> Pin<Box<dyn Future<Output = LogResult<()>> + Send + 'async_trait>>
        where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait;

    // Provided methods
    fn shutdown(&mut self) { ... }
    fn event_enabled(
        &self,
        _level: Severity,
        _target: &str,
        _name: &str,
    ) -> bool { ... }
    fn set_resource(&mut self, _resource: &Resource) { ... }
}
impl LogExporter for InMemoryLogExporter

pub trait LogProcessor: Send + Sync + Debug {
    // Required methods
    fn emit(&self, data: &mut LogRecord, instrumentation: &InstrumentationScope);
    fn force_flush(&self) -> LogResult<()>;
    fn shutdown(&self) -> LogResult<()>;

    // Provided methods
    fn event_enabled(
        &self,
        _level: Severity,
        _target: &str,
        _name: &str,
    ) -> bool { ... }
    fn set_resource(&self, _resource: &Resource) { ... }
}
impl LogProcessor for SimpleLogProcessor
impl<R: RuntimeChannel> LogProcessor for BatchLogProcessor<R>

pub struct BatchLogProcessor<R: RuntimeChannel> { /* private fields */ }
impl<R: RuntimeChannel> BatchLogProcessor<R>
    pub fn builder<E>(exporter: E, runtime: R) -> BatchLogProcessorBuilder<E, R> where   E: LogExporter,

pub struct BatchLogProcessorBuilder<E, R> { /* private fields */ }
impl<E, R> BatchLogProcessorBuilder<E, R> where     E: LogExporter + 'static,    R: RuntimeChannel,
    pub fn with_batch_config(self, config: BatchConfig) -> Self
    pub fn build(self) -> BatchLogProcessor<R>

pub struct BatchConfigBuilder { /* private fields */ }
impl BatchConfigBuilder
    pub fn with_max_queue_size(self, max_queue_size: usize) -> Self
    pub fn with_scheduled_delay(self, scheduled_delay: Duration) -> Self
    pub fn with_max_export_timeout(self, max_export_timeout: Duration) -> Self
    pub fn with_max_export_batch_size(self, max_export_batch_size: usize) -> Self
    pub fn build(self) -> BatchConfig

示例:

// https://github.com/open-telemetry/opentelemetry-rust/blob/main/examples/logs-basic/src/main.rs

use log::{error, Level};
use opentelemetry::KeyValue;
use opentelemetry_appender_log::OpenTelemetryLogBridge;
use opentelemetry_sdk::logs::LoggerProvider;
use opentelemetry_sdk::Resource;
use opentelemetry_semantic_conventions::resource::SERVICE_NAME;

fn main() {

    // 创建一个 LogExporter 的实现对象,用于实现具体发送日志的功能。
    // Setup LoggerProvider with a stdout exporter
    let exporter = opentelemetry_stdout::LogExporter::default();

    let logger_provider = LoggerProvider::builder()
        .with_resource(Resource::new([KeyValue::new(
            SERVICE_NAME,
            "logs-basic-example",
        )]))
        .with_simple_exporter(exporter)
        .build();

    // Setup Log Appender for the log crate.
    let otel_log_appender = OpenTelemetryLogBridge::new(&logger_provider);

    log::set_boxed_logger(Box::new(otel_log_appender)).unwrap();
    log::set_max_level(Level::Error.to_level_filter());

    // Emit logs using macros from the log crate.
    // These logs gets piped through OpenTelemetry bridge and gets exported to stdout.
    error!(target: "my-target", "hello from {}. My price is {}", "apple", 2.99);
}

11 opentelemetry_appender_log/opentelemetry_appender_tracing crate
#

一般不建议直接调用 opentelemetry Logs API/SDK,而是把它作为 Logs Bridge API 来使用。即使用 opentelemetry_appender_tracing 或 opentelemetry_appender_log crate,将 tracing 或 log crate 写的日志条目发送到 opentelemetry。或者使用 tracing 来作为 logging API, tracing 支持结构化日志,而且是活跃维护的。

use opentelemetry::KeyValue;
use opentelemetry_appender_tracing::layer;
use opentelemetry_sdk::{logs::LoggerProvider, Resource};
use tracing::error;
use tracing_subscriber::prelude::*;

fn main() {
    // 1. 创建一个 LoggerProvier
    let exporter = opentelemetry_stdout::LogExporter::default();
    let provider: LoggerProvider = LoggerProvider::builder()
        .with_resource(Resource::new(vec![KeyValue::new(
            "service.name",
            "log-appender-tracing-example",
        )]))
        .with_simple_exporter(exporter)
        .build();

    // 2. 使用 LoggerProvider 创建一个
    // opentelemetry_appender_tracing::layer::OpenTelemetryTracingBridge
    // 它将 tracing event 转发到 opentelemetry logs
    let layer = layer::OpenTelemetryTracingBridge::new(&provider);

    tracing_subscriber::registry().with(layer).init();

    error!(name: "my-event-name", target: "my-system", event_id = 20, user_name = "otel",
        user_email = "[email protected]", message = "This is an example message");
    let _ = provider.shutdown();
}

12 opentelemetry_otlp crate
#

opentelemetry_otlp 使用 OTLP 格式向 OpenTelemetry collector 或其它兼容的后端发送 tracer/metrics/logs 数据。例如可以直接发送到支持 OTLP 的 jaeger。

注意: v0.27.0 版本开始,不再使用 XXPipeline,而是切换到 XXExporter:

当前,该 crate 只支持使用 grpc 或 http (in binary format) 来发送 OTLP 格式的数据。

  • 默认使用的是 grpc;
  • grpc 默认使用的是 Tonic;

如果要使用 http 需要开启如下 features:

  • http-proto: Use http as transport layer, protobuf as body format.
  • reqwest-blocking-client: Use reqwest blocking http client.
  • reqwest-client: Use reqwest http client.
  • reqwest-rustls: Use reqwest with TLS with system trust roots via rustls-native-certs crate.
  • reqwest-rustls-webkpi-roots: Use reqwest with TLS with Mozilla’s trust roots via webkpi-roots crate.

正在研发的是 http-json 发送数据。

启动一个 collector:

$ docker run -p 4317:4317 otel/opentelemetry-collector:latest

使用 opentelemetry_otlp 来先创建 Exporter,一般通过对应类型的 Builder 来创建 Exporter:

  • SpanExporter::builder()
  • MetricExporter::builder()
  • LogExporter::builder()

然后使用 Exporter 创建 Provider:

  • TracerProvider::builder()
  • SdkMeterProvider::builder()
  • LoggerProvider::builder()

在 Provider 时可以选择两种 export 方式:

  1. 挨个发送: .with_simple_exporter()
  2. 批量发送:.with_batch_exporter()

示例:

use opentelemetry::global;
use opentelemetry::trace::Tracer;

fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync + 'static>> {
    // First, create a OTLP exporter builder. Configure it as you need.
    let otlp_exporter = opentelemetry_otlp::SpanExporter::builder().with_tonic().build()?;
    // Then pass it into provider builder
    let _ = opentelemetry_sdk::trace::TracerProvider::builder()
        .with_simple_exporter(otlp_exporter)
        .build();
    let tracer = global::tracer("my_tracer");
    tracer.in_span("doing_work", |cx| {
        // Traced app logic here...
    });

    Ok(())
}

可以开启 rt-tokio], [rt-tokio-current-thread] or [rt-async-std] features 的情况下, 使用高性能 Batch Exporter:

let tracer = opentelemetry_sdk::trace::TracerProvider::builder()
       .with_batch_exporter(
           opentelemetry_otlp::SpanExporter::builder()
               .with_tonic()
               .build()?,
           opentelemetry_sdk::runtime::Tokio,
        )
       .build();

12.1 LogExporter 和 LogExpoterBuilder
#

fn init_logs() -> Result<opentelemetry_sdk::logs::LoggerProvider, LogError> {
    let exporter = LogExporter::builder()
        .with_tonic()
        .with_endpoint("http://localhost:4317")
        .build()?;

    Ok(LoggerProvider::builder()
        .with_resource(RESOURCE.clone())
        .with_batch_exporter(exporter, runtime::Tokio)
        .build())
}

12.2 MetricExporter/MetricExporterBuilder
#


let exporter = opentelemetry_otlp::MetricExporter::builder()
   .with_tonic()
   .with_endpoint("http://localhost:4318/v1/metrics")
   .with_protocol(Protocol::Grpc)
   .with_timeout(Duration::from_secs(3))
   .build()
   .unwrap();

let reader = opentelemetry_sdk::metrics::PeriodicReader::builder(exporter, opentelemetry_sdk::runtime::Tokio)
   .with_interval(std::time::Duration::from_secs(3))
    .with_timeout(Duration::from_secs(10))
   .build();

let provider = opentelemetry_sdk::metrics::SdkMeterProvider::builder()
   .with_reader(reader)
    .with_resource(Resource::new(vec![KeyValue::new("service.name", "example")]))
    .build();

tracer.in_span("doing_work", |cx| {
    // Traced app logic here...
});

12.3 SpanExporter/SpanExporterBuilder
#

pub struct SpanExporter(/* private fields */);
impl SpanExporter
    pub fn builder() -> SpanExporterBuilder<NoExporterBuilderSet>
    pub fn new(client: impl SpanExporter + 'static) -> Self


#[derive(Debug, Default, Clone)]
pub struct SpanExporterBuilder<C> {
    client: C,
}

impl SpanExporterBuilder<NoExporterBuilderSet> {}
    pub fn new() -> Self
    pub fn with_tonic(self) -> SpanExporterBuilder<TonicExporterBuilderSet>
    pub fn with_http(self) -> SpanExporterBuilder<HttpExporterBuilderSet>

impl SpanExporterBuilder<TonicExporterBuilderSet>
    pub fn build(self) -> Result<SpanExporter, opentelemetry::trace::TraceError>

impl SpanExporterBuilder<HttpExporterBuilderSet>
    pub fn build(self) -> Result<SpanExporter, opentelemetry::trace::TraceError>

SpanExporterBuilder 和 SpanExporterBuilder 实现了 HasExportConfig trait,进而实现了 WithExportConfig trait,可以调用 WithExportConfig 的方法来进行配置 Collector 的 endpoint、protocol、timeout 等参数。这些配置参数也可以通过环境变量进行配置:

  • OTEL_EXPORTER_OTLP_PROTOCOL_HTTP_PROTOBUF
  • OTEL_EXPORTER_OTLP_PROTOCOL_GRPC
  • OTEL_EXPORTER_OTLP_TIMEOUT
  • OTEL_EXPORTER_OTLP_TIMEOUT_DEFAULT
  • OTEL_EXPORTER_OTLP_GRPC_ENDPOINT_DEFAULT
  • OTEL_EXPORTER_OTLP_HTTP_ENDPOINT_DEFAULT
pub trait HasExportConfig {
    // Required method
    fn export_config(&mut self) -> &mut ExportConfig;
}
impl HasExportConfig for HttpExporterBuilder
impl HasExportConfig for TonicExporterBuilder
impl HasExportConfig for SpanExporterBuilder<TonicExporterBuilderSet>
impl HasExportConfig for SpanExporterBuilder<HttpExporterBuilderSet>

pub trait WithExportConfig {
    // Required methods
    fn with_endpoint<T: Into<String>>(self, endpoint: T) -> Self;
    fn with_protocol(self, protocol: Protocol) -> Self;
    fn with_timeout(self, timeout: Duration) -> Self;
    fn with_export_config(self, export_config: ExportConfig) -> Self;
}
impl<B: HasExportConfig> WithExportConfig for B

impl HasTonicConfig for SpanExporterBuilder<TonicExporterBuilderSet>
impl HasHttpConfig for SpanExporterBuilder<HttpExporterBuilderSet>

// 示例
use crate::opentelemetry_otlp::WithExportConfig;
let exporter_builder = opentelemetry_otlp::SpanExporter::builder()
    .with_tonic()
    .with_endpoint("http://localhost:7201");

13 opentelemetry_stdout crate
#

https://docs.rs/opentelemetry-stdout/0.27.0/opentelemetry_stdout/

该 crate 只用于测试目的,它提供了 opentelemetry_sdk 所需的各种 Exporter 实现。

  • SpanExporter
  • MetricExporter
  • LogExporter
{
use opentelemetry::metrics::MeterProvider;
use opentelemetry::trace::{Span, Tracer, TracerProvider as _};
use opentelemetry::{Context, KeyValue};

use opentelemetry_sdk::metrics::{SdkMeterProvider, PeriodicReader};
use opentelemetry_sdk::runtime;
use opentelemetry_sdk::trace::TracerProvider;

use opentelemetry_sdk::logs::LoggerProvider;

fn init_trace() -> TracerProvider {
    let exporter = opentelemetry_stdout::SpanExporter::default();
    TracerProvider::builder()
        .with_simple_exporter(exporter)
        .build()
}

fn init_metrics() -> SdkMeterProvider {
    let exporter = opentelemetry_stdout::MetricExporter::default();

    let reader = PeriodicReader::builder(exporter, runtime::Tokio).build();
    SdkMeterProvider::builder().with_reader(reader).build()
}

fn init_logs() -> LoggerProvider {
    let exporter = opentelemetry_stdout::LogExporter::default();

    LoggerProvider::builder()
        .with_simple_exporter(exporter)
        .build()
}

let tracer_provider = init_trace();
let meter_provider = init_metrics();
let logger_provider = init_logs();

// recorded traces, metrics and logs will now be sent to stdout:

14 opentelemetry-prometheus
#

opentelemetry-prometheus crate 提供了 opentelemetry meter exporter 功能:

  • opentelemetry_otel 提供的 meter exporter 适用于 push 模式的 opentelemetry collector;
use opentelemetry::{metrics::MeterProvider, KeyValue};
use opentelemetry_sdk::metrics::SdkMeterProvider;
use prometheus::{Encoder, TextEncoder};

// create a new prometheus registry
let registry = prometheus::Registry::new();

// configure OpenTelemetry to use this registry
let exporter = opentelemetry_prometheus::exporter()
    .with_registry(registry.clone())
    .build()?;

// set up a meter to create instruments
let provider = SdkMeterProvider::builder().with_reader(exporter).build();
let meter = provider.meter("my-app");

// Use two instruments
let counter = meter
    .u64_counter("a.counter")
    .with_description("Counts things")
    .init();
let histogram = meter
    .u64_histogram("a.histogram")
    .with_description("Records values")
    .init();

counter.add(100, &[KeyValue::new("key", "value")]);
histogram.record(100, &[KeyValue::new("key", "value")]);

// Encode data as text or protobuf
let encoder = TextEncoder::new();
let metric_families = registry.gather();
let mut result = Vec::new();
encoder.encode(&metric_families, &mut result)?;

// result now contains encoded metrics:
//
// # HELP a_counter_total Counts things
// # TYPE a_counter_total counter
// a_counter_total{key="value",otel_scope_name="my-app"} 100
// # HELP a_histogram Records values
// # TYPE a_histogram histogram
// a_histogram_bucket{key="value",otel_scope_name="my-app",le="0"} 0
// a_histogram_bucket{key="value",otel_scope_name="my-app",le="5"} 0
// a_histogram_bucket{key="value",otel_scope_name="my-app",le="10"} 0
// a_histogram_bucket{key="value",otel_scope_name="my-app",le="25"} 0
// a_histogram_bucket{key="value",otel_scope_name="my-app",le="50"} 0
// a_histogram_bucket{key="value",otel_scope_name="my-app",le="75"} 0
// a_histogram_bucket{key="value",otel_scope_name="my-app",le="100"} 1
// a_histogram_bucket{key="value",otel_scope_name="my-app",le="250"} 1
// a_histogram_bucket{key="value",otel_scope_name="my-app",le="500"} 1
// a_histogram_bucket{key="value",otel_scope_name="my-app",le="750"} 1
// a_histogram_bucket{key="value",otel_scope_name="my-app",le="1000"} 1
// a_histogram_bucket{key="value",otel_scope_name="my-app",le="2500"} 1
// a_histogram_bucket{key="value",otel_scope_name="my-app",le="5000"} 1
// a_histogram_bucket{key="value",otel_scope_name="my-app",le="7500"} 1
// a_histogram_bucket{key="value",otel_scope_name="my-app",le="10000"} 1
// a_histogram_bucket{key="value",otel_scope_name="my-app",le="+Inf"} 1
// a_histogram_sum{key="value",otel_scope_name="my-app"} 100
// a_histogram_count{key="value",otel_scope_name="my-app"} 1
// # HELP otel_scope_info Instrumentation Scope metadata
// # TYPE otel_scope_info gauge
// otel_scope_info{otel_scope_name="my-app"} 1
// # HELP target_info Target metadata
// # TYPE target_info gauge
// target_info{service_name="unknown_service"} 1

15 opentelemetry-zipkin
#

opentelemetry-zipkin 提供了 Trace Span 的收集和 Exporter 能力。

$ docker run -d -p 9411:9411 openzipkin/zipkin

注:当前 (20241119) 该 Crate 还使用 Pipeline 模式,后续会起换到 Exporter 和 Builder 模式。

use opentelemetry::trace::{Tracer, TraceError};
use opentelemetry::global;

fn main() -> Result<(), TraceError> {
    global::set_text_map_propagator(opentelemetry_zipkin::Propagator::new());
    let tracer = opentelemetry_zipkin::new_pipeline().install_simple()?;

    tracer.in_span("doing_work", |cx| {
        // Traced app logic here...
    });

    global::shutdown_tracer_provider(); // sending remaining spans

    Ok(())
}

16 tracing-flame
#

tracing 数据也可以生成火焰图。先在项目中添加 tracing-frame crate: cargo add tracing-flame

tracing_flame::FlameLayer() 提供了一个 Layer 实现,可用于配置 Registry 来作为全局 Subscribe:

use std::{fs::File, io::BufWriter};
use tracing_flame::FlameLayer;
use tracing_subscriber::{registry::Registry, prelude::*, fmt};

fn main()  {
    let fmt_layer = fmt::Layer::default();
    let (flame_layer, _guard) = FlameLayer::with_file("./tracing.folded").unwrap();
    tracing_subscriber::registry()
    .with(fmt::layer())
    .with(flame_layer)
    .init();
    // ... the rest of your code
}

tracing 数据会被保存到 ./tracing.folded 文件,然后使用 inferno crate 来转换成 flamegraph:

cargo install inferno

# flamegraph
cat tracing.folded | inferno-flamegraph > tracing-flamegraph.svg

# flamechart
cat tracing.folded | inferno-flamegraph --flamechart > tracing-flamechart.svg

另外一种方式是使用 perf 来搜集数据,然后绘制火焰图:

#https://github.com/zed-industries/zed/pull/8972/files
echo "Running perf on collab, collecting 30s of data..."
kubectl -n $environment exec -it deployments/collab -- perf record -p 1 -g -m 64 --call-graph dwarf -- sleep 30
run="collab-$environment-$(date -Iseconds)"
echo "Processing data and downloading to '$run.perf'..."
kubectl -n $environment exec -it deployments/collab -- perf --no-pager script > "$run.perf"
which inferno-flamegraph 2>/dev/null || (echo "installing inferno..."; cargo install inferno)
inferno-collapse-perf "$run.perf" | inferno-flamegraph > "$run.svg"
open "$run.svg"

rustc 自带的 profile 能力

# https://github.com/zed-industries/zed/pull/194

# Install some rust profiling tools:
cargo install --git https://github.com/rust-lang/measureme crox flamegraph summarize
cargo install cargo-llvm-lines

# Build zed with profiling (generates a trace file) and summarize the trace:
cargo rustc -p zed --lib -- -Zself-profile
# summarize summarize zed-82037.mm_profdata

#Count the LLVM lines generated during compilation:
cargo llvm-lines -p zed --lib

17 tracing-console
#

为 tokio 开启 tracing feature:

[dependencies]
# ...
tokio = { version = "1.15", features = ["full", "tracing"] }

同时在构建项目时添加 tokio_unstable feature, 这需要在 .cargo/config.toml 中指定:

[build]
rustflags = ["--cfg", "tokio_unstable"]

The tokio and runtime tracing targets must be enabled at the TRACE level. RUST_LOG=trace ./path/to/app 或 .with_env_filter(“tracing=trace,tokio=trace,runtime=trace”)

初始化 console subcribe:

console_subscriber::init();

然后编译运行程序时, 自动在 6669 端口暴露 tokio-console 可以读取的数据.

启动 tokio-console 来连接 6669 端口:

tokio-console

18 tracing-tree
#

使用 tree 格式在终端层次化展示 span、event:

// omitted: other `use` directives

use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt, Registry};
use tracing_tree::HierarchicalLayer;

#[tokio::main(flavor = "current_thread")]
async fn main() -> Result<(), Box<dyn Error>> {
    // 👇 new!
    Registry::default()
        .with(EnvFilter::from_default_env())
        .with(
            HierarchicalLayer::new(2)
                .with_targets(true)
                .with_bracketed_fields(true),
        )
        // 👇 new!
        .with(
            tracing_subscriber::fmt::layer()
                .json()
                .with_writer(File::create("/tmp/log.json").unwrap()),
        )
        .init();

    run_server().await
}
#+end

输出效果:

#+begin_src sh,
$ RUST_LOG=debug cargo run
   Compiling plaque v0.1.0 (/home/amos/bearcove/plaque)
    Finished dev [unoptimized + debuginfo] target(s) in 1.25s
     Running `target/debug/plaque`
plaque::run_server{}
  0ms  INFO plaque Listening on http://0.0.0.0:3779
  plaque::accept{}

  plaque::handle_connection{addr=127.0.0.1:34728}
    plaque::read_http_request{}

    0ms DEBUG plaque Got HTTP request, req=GET / HTTP/1.1
    Host: localhost:3779
    User-Agent: curl/7.79.1
    Accept: */*


    plaque::write_http_response{}


  plaque::accept{}

19 tracing 例子
#

Rust telemetry workshop:https://rust-exercises.com/telemetry/

https://www.shuttle.rs/blog/2024/01/09/getting-started-tracing-rust

https://tokio.rs/tokio/topics/tracing

https://burgers.io/custom-logging-in-rust-using-tracing

https://signoz.io/blog/opentelemetry-rust/

https://docs.dynatrace.com/docs/extend-dynatrace/opentelemetry/walkthroughs/rust

https://www.alibabacloud.com/help/en/sls/user-guide/import-trace-data-from-rust-applications-to-log-service-by-using-opentelemetry-sdk-for-rust

#![allow(dead_code)]

#[tracing::instrument]
fn trace_me(a: u32, b: u32) -> u32 {
    tracing::info!("trace_me info message");
    a + b
}

#[derive(Debug)]
struct MyStruct {
    field_a: u8,
    field_b: String,
}

#[tokio::main]
pub async fn main() {
    //let subscriber = tracing_subscriber::FmtSubscriber::new();

    let subscriber = tracing_subscriber::fmt()
        .compact()
        .with_file(true)
        .with_line_number(true)
        .with_thread_ids(true)
        .with_target(true)
        .with_ansi(true)
        // tracing=trace 的 key 为 crate 或 module 名称,这时指定运行的 --binary tracing 的日志级别
        .with_env_filter("tracing=trace,tokio=trace,runtime=trace")
        //.pretty()
        .finish();

    tracing::subscriber::set_global_default(subscriber).unwrap();

    //console_subscriber::init();

    tracing::warn!("just a test");

    trace_me(2, 3);

    let s = MyStruct{field_a: 8, field_b: "fieldB value".to_string()};

    // 对于 span, 必须在 name/id 后添加 k=v
    tracing::error_span!("myerrorspan", ?s);  // 这个不会出发打印日志,但指定了后续 event 的 span context
    // tracing::error_span!(?s, "myerrorspan"); // 报错

    // 对于 event, 分两种情况:
    // 1. 如果有 mesage, 自定义 field 都必须位于 message 之前;
    // 2. 如果没有 message, 则可以使用 field=value 形式来定义任意 field.
    tracing::error!(target: "myerrorspan", ?s, s.field_a, "just a debug message2"); // 有 message
    tracing::error!(target: "myerrorspan", ?s, s.field_a, a = "b");  // 没有 message

    // 在 enter _span drop 前, 后续的 event 都自动关联该 span
    let _span = tracing::error_span!("my_enter_span", ?s).entered();
    tracing::error!(?s, s.field_a, a = "c");

    // 子 span, 打印 event 时会自动先后打印所属的 span
    let _span = tracing::error_span!("my_enter_span2", ?s).entered();
    tracing::error!(?s, s.field_a, a = "d");

    //std::thread::sleep_ms(200000);
}

输出日志:

zj@a:~/codes/rust/mydemo$ cargo run --bin tracing
# 没有 span,tracing module
2024-03-07T11:54:16.136269Z  WARN ThreadId(01) tracing: src/bin/tracing.rs:42: just a test
# span trace_me 下的 tracing module
2024-03-07T11:54:16.136417Z  INFO ThreadId(01) trace_me: tracing: src/bin/tracing.rs:5: trace_me info message a=2 b=3
2024-03-07T11:54:16.136475Z ERROR ThreadId(01) tracing: src/bin/tracing.rs:49: just a debug message1 s=MyStruct { field_a: 8, field_b: "fieldB value" }
2024-03-07T11:54:16.136558Z ERROR ThreadId(01) myerrorspan: src/bin/tracing.rs:52: just a debug message2 s=MyStruct { field_a: 8, field_b: "fieldB value" } s.field_a=8
2024-03-07T11:54:16.136585Z ERROR ThreadId(01) myerrorspan: src/bin/tracing.rs:53: s=MyStruct { field_a: 8, field_b: "fieldB value" } s.field_a=8 a="b"
2024-03-07T11:54:16.136791Z ERROR ThreadId(01) my_enter_span: tracing: src/bin/tracing.rs:56: s=MyStruct { field_a: 8, field_b: "fieldB value" } s.field_a=8 a="c" s=MyStruct { field_a: 8, field_b: "fieldB value" }
# my_enter_span 下的 my_enter_span2 下的 tracing module
2024-03-07T11:54:16.136844Z ERROR ThreadId(01) my_enter_span:my_enter_span2: tracing: src/bin/tracing.rs:59: s=MyStruct { field_a: 8, field_b: "fieldB value" } s.field_a=8 a="d" s=MyStruct { field_a: 8, field_b: "fieldB value" } s=MyStruct { field_a: 8, field_b: "fieldB value" }

完整例子: Instrument your Rust application with OpenTelemetry:https://docs.dynatrace.com/docs/ingest-from/opentelemetry/walkthroughs/rust

opentelemetry = { version = "~0", features = ["trace", "metrics"] }
opentelemetry_sdk = { version = "~0" , features = ["rt-tokio"] }
opentelemetry-otlp = { version = "~0", features = ["http-proto", "reqwest-client", "reqwest-rustls"] }
opentelemetry-http = { version = "~0" }
opentelemetry-stdout = { version = "~0", features = ["trace"] }
opentelemetry-appender-log = { version = "~0" }
opentelemetry-semantic-conventions = { version = "~0" }
use std::{env, convert::Infallible, net::SocketAddr, collections::HashMap, io::{BufRead, BufReader, Read}};
use opentelemetry_sdk::{logs::LoggerProvider, metrics::{PeriodicReader, SdkMeterProvider}, propagation::TraceContextPropagator, runtime, trace::{self as sdktrace, Config}, Resource};
use opentelemetry_otlp::{LogExporter, MetricExporter, Protocol, SpanExporter, WithExportConfig, WithHttpConfig};
use opentelemetry_semantic_conventions as semcov;
use opentelemetry_semantic_conventions::trace;
use opentelemetry_http::{Bytes, HeaderExtractor, HeaderInjector};
use opentelemetry_appender_log::OpenTelemetryLogBridge;
use opentelemetry::{global, trace::{FutureExt, Span, SpanKind, TraceContextExt, Tracer}, Context, KeyValue};

fn init_opentelemetry() {

    // Helper function to read potentially available OneAgent data
    fn read_dt_metadata() -> Resource {
        fn read_single(path: &str, metadata: &mut Vec<KeyValue>) -> std::io::Result<()> {
            let mut file = std::fs::File::open(path)?;
            if path.starts_with("dt_metadata") {
                let mut name = String::new();
                file.read_to_string(&mut name)?;
                file = std::fs::File::open(name)?;
            }
            for line in BufReader::new(file).lines() {
                if let Some((k, v)) = line?.split_once('=') {
                    metadata.push(KeyValue::new(k.to_string(), v.to_string()))
                }
            }
            Ok(())
        }
        let mut metadata = Vec::new();
        for name in [
            "dt_metadata_e617c525669e072eebe3d0f08212e8f2.properties",
            "/var/lib/dynatrace/enrichment/dt_metadata.properties",
            "/var/lib/dynatrace/enrichment/dt_host_metadata.properties"
        ] {
            let _ = read_single(name, &mut metadata);
        }
        Resource::new(metadata)
    }


    // ===== GENERAL SETUP =====
    let dt_api_token = env::var("DT_API_TOKEN").unwrap(); // TODO: change
    let dt_api_url = env::var("DT_API_URL").unwrap();

    let mut map = HashMap::new();
    map.insert("Authorization".to_string(), format!("Api-Token {}", dt_api_token));
    let mut resource = Resource::new([
    KeyValue::new(semcov::resource::SERVICE_NAME, "rust-manual-quickstart") //TODO Replace with the name of your application
    ]);
    resource = resource.merge(&read_dt_metadata());

    // ===== TRACING SETUP =====
    global::set_text_map_propagator(TraceContextPropagator::new());
    let tracer_exporter = SpanExporter::builder()
        .with_http()
        .with_headers(map.clone())
        .with_protocol(Protocol::HttpBinary)
        .with_endpoint(dt_api_url.clone() + "/v1/traces")
        .build()
        .unwrap();
    let tracer_provider = sdktrace::TracerProvider::builder()
        .with_config(Config::default().with_resource(resource.clone()))
        .with_batch_exporter(tracer_exporter, runtime::Tokio)
        .build();
    global::set_tracer_provider(tracer_provider);

    // ===== METRICS SETUP ======
    let metrics_exporter = MetricExporter::builder()
        .with_http()
        .with_headers(map.clone())
        .with_endpoint(dt_api_url.clone() + "/v1/metrics")
        .with_protocol(opentelemetry_otlp::Protocol::HttpBinary)
        .build()
        .unwrap();
    let meter_provider = SdkMeterProvider::builder()
        .with_reader(PeriodicReader::builder(metrics_exporter, runtime::Tokio).build())
        .with_resource(resource.clone())
        .build();
    global::set_meter_provider(meter_provider);

    // ===== LOGS SETUP ======
    let logger_exporter = LogExporter::builder()
        .with_http()
        .with_headers(map.clone())
        .with_endpoint(dt_api_url.clone() + "/v1/logs")
        .with_protocol(opentelemetry_otlp::Protocol::HttpBinary)
        .build()
        .unwrap();
    let logger_provider = LoggerProvider::builder()
        .with_batch_exporter(logger_exporter, runtime::Tokio)
        .with_resource(resource.clone())
        .build();
    let otel_log_appender = OpenTelemetryLogBridge::new(&logger_provider);
    log::set_boxed_logger(Box::new(otel_log_appender)).unwrap();
    log::set_max_level(Level::Debug.to_level_filter());
}


let tracer = global::tracer("my-tracer");
let mut _span = tracer
    .span_builder("Call to /myendpoint")
    .with_kind(SpanKind::Internal)
    .start(&tracer);
_span.set_attribute(KeyValue::new("http.method", "GET"));
_span.set_attribute(KeyValue::new("net.protocol.version", "1.1"));

// TODO: Your code goes here

_span.end();

let meter = global::meter("request_counter");
let updown_counter = meter.i64_up_down_counter("request_counter").build();
updown_counter.add(1,&[],);

error!("logging an error");
debug!("logging a debug message");


// Utility function to extract the context from the incoming request headers
fn extract_context_from_request(req: &Request<Incoming>) -> Context {
    global::get_text_map_propagator(|propagator| {
        propagator.extract(&HeaderExtractor(req.headers()))
    })
}

async fn router(req: Request<Incoming>) -> Result<Response<BoxBody<Bytes, hyper::Error>>, Infallible> {

    // Extract the context from the incoming request headers
    let parent_cx = extract_context_from_request(&req);

    let response = {
        // Create a span parenting the remote client span.
        let tracer = global::tracer("example/server");
        let mut span = tracer
            .span_builder("router")
            .with_kind(SpanKind::Server)
            .start_with_context(&tracer, &parent_cx);

        // Adding custom attributes
        span.set_attribute(KeyValue::new("my-server-key-1", "my-server-value-1"));
    };

    // TODO Handle the HTTP request
}

async fn send_request(url: &str, body_content: &str, span_name: &str) -> std::result::Result<(), Box<dyn std::error::Error + Send + Sync + 'static>> {
    let client = Client::builder(TokioExecutor::new()).build_http();
    let tracer = global::tracer("example/client");
    let span = tracer
        .span_builder(String::from(span_name))
        .with_kind(SpanKind::Client)
        .start(&tracer);
    let cx = Context::current_with_span(span);

    let mut req = hyper::Request::builder().uri(url);
    global::get_text_map_propagator(|propagator| {
        propagator.inject_context(&cx, &mut HeaderInjector(req.headers_mut().unwrap()))
    });
    let res = client
        .request(req.body(Full::new(Bytes::from(body_content.to_string())))?)
        .await?;

    cx.span().add_event(
        "Got response!",
        vec![KeyValue::new("status", res.status().to_string())],
    );

    Ok(())
}

19.1 完整例子
#

use opentelemetry::{global, KeyValue, trace::Tracer};
use opentelemetry_sdk::{trace::{self, RandomIdGenerator, Sampler}, Resource};
use opentelemetry_sdk::metrics::Temporality;
use opentelemetry_otlp::{Protocol, WithExportConfig, WithTonicConfig};
use std::time::Duration;
use tonic::metadata::*;

fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync + 'static>> {
    let mut map = MetadataMap::with_capacity(3);

    map.insert("x-host", "example.com".parse().unwrap());
    map.insert("x-number", "123".parse().unwrap());
    map.insert_bin("trace-proto-bin", MetadataValue::from_bytes(b"[binary data]"));
    let exporter = opentelemetry_otlp::SpanExporter::builder()
        .with_tonic()
        .with_endpoint("http://localhost:4317")
        .with_timeout(Duration::from_secs(3))
        .with_metadata(map)
        .build()?;

    let tracer_provider = opentelemetry_sdk::trace::TracerProvider::builder()
        .with_batch_exporter(exporter, opentelemetry_sdk::runtime::Tokio)
        .with_config(
            trace::Config::default()
                .with_sampler(Sampler::AlwaysOn)
                .with_id_generator(RandomIdGenerator::default())
                .with_max_events_per_span(64)
                .with_max_attributes_per_span(16)
                .with_max_events_per_span(16)
                .with_resource(Resource::new(vec![KeyValue::new("service.name", "example")])),
        ).build();
    global::set_tracer_provider(tracer_provider);
    let tracer = global::tracer("tracer-name");

    let exporter = opentelemetry_otlp::MetricExporter::builder()
       .with_tonic()
       .with_endpoint("http://localhost:4318/v1/metrics")
       .with_protocol(Protocol::Grpc)
       .with_timeout(Duration::from_secs(3))
       .build()
       .unwrap();

   let reader = opentelemetry_sdk::metrics::PeriodicReader::builder(exporter, opentelemetry_sdk::runtime::Tokio)
       .with_interval(std::time::Duration::from_secs(3))
        .with_timeout(Duration::from_secs(10))
       .build();

   let provider = opentelemetry_sdk::metrics::SdkMeterProvider::builder()
       .with_reader(reader)
        .with_resource(Resource::new(vec![KeyValue::new("service.name", "example")]))
        .build();

    tracer.in_span("doing_work", |cx| {
        // Traced app logic here...
    });

    Ok(())
}
rust crate - 这篇文章属于一个选集。
§ 7: 本文

相关文章

anyhow
··1872 字
Rust Rust-Crate
anyhow crate 提供了自定义 Error 类型和 Result 类型,Error 类型自带 backtrace 和 context,支持用户友好的格式化信息输出。
bytes
··2900 字
Rust Rust-Crate
bytes 提供了高效的 zero-copy 连续内存区域的共享和读写能力。
chrono
··4003 字
Rust Rust-Crate
chrono 提供了丰富的 Date/Time 类型和相关操作。
hyper
··861 字
Rust Rust-Crate
hyper 是高性能的异步 HTTP 1/2 底层库。