LCOV - code coverage report
Current view: top level - proxy/src - logging.rs (source / functions) Coverage Total Hit
Test: 4be46b1c0003aa3bbac9ade362c676b419df4c20.info Lines: 59.7 % 521 311
Test Date: 2025-07-22 17:50:06 Functions: 45.0 % 111 50

            Line data    Source code
       1              : use std::cell::RefCell;
       2              : use std::collections::HashMap;
       3              : use std::sync::Arc;
       4              : use std::{env, io};
       5              : 
       6              : use chrono::{DateTime, Utc};
       7              : use opentelemetry::trace::TraceContextExt;
       8              : use tracing::subscriber::Interest;
       9              : use tracing::{Event, Metadata, Span, Subscriber, callsite, span};
      10              : use tracing_opentelemetry::OpenTelemetrySpanExt;
      11              : use tracing_subscriber::filter::{EnvFilter, LevelFilter};
      12              : use tracing_subscriber::fmt::format::{Format, Full};
      13              : use tracing_subscriber::fmt::time::SystemTime;
      14              : use tracing_subscriber::fmt::{FormatEvent, FormatFields};
      15              : use tracing_subscriber::layer::{Context, Layer};
      16              : use tracing_subscriber::prelude::*;
      17              : use tracing_subscriber::registry::LookupSpan;
      18              : 
      19              : use crate::metrics::Metrics;
      20              : 
      21              : /// Initialize logging and OpenTelemetry tracing and exporter.
      22              : ///
      23              : /// Logging can be configured using `RUST_LOG` environment variable.
      24              : ///
      25              : /// OpenTelemetry is configured with OTLP/HTTP exporter. It picks up
      26              : /// configuration from environment variables. For example, to change the
      27              : /// destination, set `OTEL_EXPORTER_OTLP_ENDPOINT=http://jaeger:4318`.
      28              : /// See <https://opentelemetry.io/docs/reference/specification/sdk-environment-variables>
      29            0 : pub fn init() -> anyhow::Result<LoggingGuard> {
      30            0 :     let logfmt = LogFormat::from_env()?;
      31              : 
      32            0 :     let env_filter = EnvFilter::builder()
      33            0 :         .with_default_directive(LevelFilter::INFO.into())
      34            0 :         .from_env_lossy()
      35            0 :         .add_directive(
      36            0 :             "aws_config=info"
      37            0 :                 .parse()
      38            0 :                 .expect("this should be a valid filter directive"),
      39              :         )
      40            0 :         .add_directive(
      41            0 :             "azure_core::policies::transport=off"
      42            0 :                 .parse()
      43            0 :                 .expect("this should be a valid filter directive"),
      44              :         );
      45              : 
      46            0 :     let provider = tracing_utils::init_tracing("proxy", tracing_utils::ExportConfig::default());
      47            0 :     let otlp_layer = provider.as_ref().map(tracing_utils::layer);
      48              : 
      49            0 :     let json_log_layer = if logfmt == LogFormat::Json {
      50            0 :         Some(JsonLoggingLayer::new(
      51            0 :             RealClock,
      52            0 :             StderrWriter {
      53            0 :                 stderr: std::io::stderr(),
      54            0 :             },
      55            0 :             &["conn_id", "ep", "query_id", "request_id", "session_id"],
      56            0 :         ))
      57              :     } else {
      58            0 :         None
      59              :     };
      60              : 
      61            0 :     let text_log_layer = if logfmt == LogFormat::Text {
      62            0 :         Some(
      63            0 :             tracing_subscriber::fmt::layer()
      64            0 :                 .with_ansi(false)
      65            0 :                 .with_writer(std::io::stderr)
      66            0 :                 .with_target(false),
      67            0 :         )
      68              :     } else {
      69            0 :         None
      70              :     };
      71              : 
      72            0 :     tracing_subscriber::registry()
      73            0 :         .with(env_filter)
      74            0 :         .with(otlp_layer)
      75            0 :         .with(json_log_layer)
      76            0 :         .with(text_log_layer)
      77            0 :         .try_init()?;
      78              : 
      79            0 :     Ok(LoggingGuard(provider))
      80            0 : }
      81              : 
      82              : /// Initialize logging for local_proxy with log prefix and no opentelemetry.
      83              : ///
      84              : /// Logging can be configured using `RUST_LOG` environment variable.
      85            0 : pub fn init_local_proxy() -> anyhow::Result<LoggingGuard> {
      86            0 :     let env_filter = EnvFilter::builder()
      87            0 :         .with_default_directive(LevelFilter::INFO.into())
      88            0 :         .from_env_lossy();
      89              : 
      90            0 :     let fmt_layer = tracing_subscriber::fmt::layer()
      91            0 :         .with_ansi(false)
      92            0 :         .with_writer(std::io::stderr)
      93            0 :         .event_format(LocalProxyFormatter(Format::default().with_target(false)));
      94              : 
      95            0 :     tracing_subscriber::registry()
      96            0 :         .with(env_filter)
      97            0 :         .with(fmt_layer)
      98            0 :         .try_init()?;
      99              : 
     100            0 :     Ok(LoggingGuard(None))
     101            0 : }
     102              : 
     103              : pub struct LocalProxyFormatter(Format<Full, SystemTime>);
     104              : 
     105              : impl<S, N> FormatEvent<S, N> for LocalProxyFormatter
     106              : where
     107              :     S: Subscriber + for<'a> LookupSpan<'a>,
     108              :     N: for<'a> FormatFields<'a> + 'static,
     109              : {
     110            0 :     fn format_event(
     111            0 :         &self,
     112            0 :         ctx: &tracing_subscriber::fmt::FmtContext<'_, S, N>,
     113            0 :         mut writer: tracing_subscriber::fmt::format::Writer<'_>,
     114            0 :         event: &tracing::Event<'_>,
     115            0 :     ) -> std::fmt::Result {
     116            0 :         writer.write_str("[local_proxy] ")?;
     117            0 :         self.0.format_event(ctx, writer, event)
     118            0 :     }
     119              : }
     120              : 
     121              : pub struct LoggingGuard(Option<tracing_utils::Provider>);
     122              : 
     123              : impl Drop for LoggingGuard {
     124            0 :     fn drop(&mut self) {
     125            0 :         if let Some(p) = &self.0 {
     126              :             // Shutdown trace pipeline gracefully, so that it has a chance to send any
     127              :             // pending traces before we exit.
     128            0 :             tracing::info!("shutting down the tracing machinery");
     129            0 :             drop(p.shutdown());
     130            0 :         }
     131            0 :     }
     132              : }
     133              : 
     134              : #[derive(Copy, Clone, PartialEq, Eq, Default, Debug)]
     135              : enum LogFormat {
     136              :     Text,
     137              :     #[default]
     138              :     Json,
     139              : }
     140              : 
     141              : impl LogFormat {
     142            0 :     fn from_env() -> anyhow::Result<Self> {
     143            0 :         let logfmt = env::var("LOGFMT");
     144            0 :         Ok(match logfmt.as_deref() {
     145            0 :             Err(_) => LogFormat::default(),
     146            0 :             Ok("text") => LogFormat::Text,
     147            0 :             Ok("json") => LogFormat::Json,
     148            0 :             Ok(logfmt) => anyhow::bail!("unknown log format: {logfmt}"),
     149              :         })
     150            0 :     }
     151              : }
     152              : 
     153              : trait MakeWriter {
     154              :     fn make_writer(&self) -> impl io::Write;
     155              : }
     156              : 
     157              : struct StderrWriter {
     158              :     stderr: io::Stderr,
     159              : }
     160              : 
     161              : impl MakeWriter for StderrWriter {
     162              :     #[inline]
     163            0 :     fn make_writer(&self) -> impl io::Write {
     164            0 :         self.stderr.lock()
     165            0 :     }
     166              : }
     167              : 
     168              : // TODO: move into separate module or even separate crate.
     169              : trait Clock {
     170              :     fn now(&self) -> DateTime<Utc>;
     171              : }
     172              : 
     173              : struct RealClock;
     174              : 
     175              : impl Clock for RealClock {
     176              :     #[inline]
     177            0 :     fn now(&self) -> DateTime<Utc> {
     178            0 :         Utc::now()
     179            0 :     }
     180              : }
     181              : 
     182              : /// Name of the field used by tracing crate to store the event message.
     183              : const MESSAGE_FIELD: &str = "message";
     184              : 
     185              : /// Tracing used to enforce that spans/events have no more than 32 fields.
     186              : /// It seems this is no longer the case, but it's still documented in some places.
     187              : /// Generally, we shouldn't expect more than 32 fields anyway, so we can try and
     188              : /// rely on it for some (minor) performance gains.
     189              : const MAX_TRACING_FIELDS: usize = 32;
     190              : 
     191              : thread_local! {
     192              :     /// Thread-local instance with per-thread buffer for log writing.
     193              :     static EVENT_FORMATTER: RefCell<EventFormatter> = const { RefCell::new(EventFormatter::new()) };
     194              :     /// Cached OS thread ID.
     195              :     static THREAD_ID: u64 = gettid::gettid();
     196              : }
     197              : 
     198              : /// Map for values fixed at callsite registration.
     199              : // We use papaya here because registration rarely happens post-startup.
     200              : // papaya is good for read-heavy workloads.
     201              : //
     202              : // We use rustc_hash here because callsite::Identifier will always be an integer with low-bit entropy,
     203              : // since it's always a pointer to static mutable data. rustc_hash was designed for low-bit entropy.
     204              : type CallsiteMap<T> =
     205              :     papaya::HashMap<callsite::Identifier, T, std::hash::BuildHasherDefault<rustc_hash::FxHasher>>;
     206              : 
     207              : /// Implements tracing layer to handle events specific to logging.
     208              : struct JsonLoggingLayer<C: Clock, W: MakeWriter> {
     209              :     clock: C,
     210              :     writer: W,
     211              : 
     212              :     /// tracks which fields of each **event** are duplicates
     213              :     skipped_field_indices: CallsiteMap<SkippedFieldIndices>,
     214              : 
     215              :     /// tracks callsite names to an ID.
     216              :     callsite_name_ids: papaya::HashMap<&'static str, u32, ahash::RandomState>,
     217              : 
     218              :     span_info: CallsiteMap<CallsiteSpanInfo>,
     219              : 
     220              :     /// Fields we want to keep track of in a separate json object.
     221              :     extract_fields: &'static [&'static str],
     222              : }
     223              : 
     224              : impl<C: Clock, W: MakeWriter> JsonLoggingLayer<C, W> {
     225            0 :     fn new(clock: C, writer: W, extract_fields: &'static [&'static str]) -> Self {
     226            0 :         JsonLoggingLayer {
     227            0 :             clock,
     228            0 :             skipped_field_indices: CallsiteMap::default(),
     229            0 :             span_info: CallsiteMap::default(),
     230            0 :             callsite_name_ids: papaya::HashMap::default(),
     231            0 :             writer,
     232            0 :             extract_fields,
     233            0 :         }
     234            0 :     }
     235              : 
     236              :     #[inline]
     237            6 :     fn span_info(&self, metadata: &'static Metadata<'static>) -> CallsiteSpanInfo {
     238            6 :         self.span_info
     239            6 :             .pin()
     240            6 :             .get_or_insert_with(metadata.callsite(), || {
     241            3 :                 CallsiteSpanInfo::new(&self.callsite_name_ids, metadata, self.extract_fields)
     242            3 :             })
     243            6 :             .clone()
     244            6 :     }
     245              : }
     246              : 
     247              : impl<S, C: Clock + 'static, W: MakeWriter + 'static> Layer<S> for JsonLoggingLayer<C, W>
     248              : where
     249              :     S: Subscriber + for<'a> LookupSpan<'a>,
     250              : {
     251            1 :     fn on_event(&self, event: &Event<'_>, ctx: Context<'_, S>) {
     252              :         use std::io::Write;
     253              : 
     254              :         // TODO: consider special tracing subscriber to grab timestamp very
     255              :         //       early, before OTel machinery, and add as event extension.
     256            1 :         let now = self.clock.now();
     257              : 
     258            1 :         EVENT_FORMATTER.with(|f| {
     259            1 :             let mut borrow = f.try_borrow_mut();
     260            1 :             let formatter = match borrow.as_deref_mut() {
     261            1 :                 Ok(formatter) => formatter,
     262              :                 // If the thread local formatter is borrowed,
     263              :                 // then we likely hit an edge case were we panicked during formatting.
     264              :                 // We allow the logging to proceed with an uncached formatter.
     265            0 :                 Err(_) => &mut EventFormatter::new(),
     266              :             };
     267              : 
     268            1 :             formatter.format(
     269            1 :                 now,
     270            1 :                 event,
     271            1 :                 &ctx,
     272            1 :                 &self.skipped_field_indices,
     273            1 :                 self.extract_fields,
     274              :             );
     275              : 
     276            1 :             let mut writer = self.writer.make_writer();
     277            1 :             if writer.write_all(formatter.buffer()).is_err() {
     278            0 :                 Metrics::get().proxy.logging_errors_count.inc();
     279            1 :             }
     280            1 :         });
     281            1 :     }
     282              : 
     283              :     /// Registers a SpanFields instance as span extension.
     284            3 :     fn on_new_span(&self, attrs: &span::Attributes<'_>, id: &span::Id, ctx: Context<'_, S>) {
     285            3 :         let span = ctx.span(id).expect("span must exist");
     286              : 
     287            3 :         let mut fields = SpanFields::new(self.span_info(span.metadata()));
     288            3 :         attrs.record(&mut fields);
     289              : 
     290              :         // This is a new span: the extensions should not be locked
     291              :         // unless some layer spawned a thread to process this span.
     292              :         // I don't think any layers do that.
     293            3 :         span.extensions_mut().insert(fields);
     294            3 :     }
     295              : 
     296            0 :     fn on_record(&self, id: &span::Id, values: &span::Record<'_>, ctx: Context<'_, S>) {
     297            0 :         let span = ctx.span(id).expect("span must exist");
     298              : 
     299              :         // assumption: `on_record` is rarely called.
     300              :         // assumption: a span being updated by one thread,
     301              :         //             and formatted by another thread is even rarer.
     302            0 :         let mut ext = span.extensions_mut();
     303            0 :         if let Some(fields) = ext.get_mut::<SpanFields>() {
     304            0 :             values.record(fields);
     305            0 :         }
     306            0 :     }
     307              : 
     308              :     /// Called (lazily) roughly once per event/span instance. We quickly check
     309              :     /// for duplicate field names and record duplicates as skippable. Last field wins.
     310            4 :     fn register_callsite(&self, metadata: &'static Metadata<'static>) -> Interest {
     311            4 :         debug_assert!(
     312            4 :             metadata.fields().len() <= MAX_TRACING_FIELDS,
     313            0 :             "callsite {metadata:?} has too many fields."
     314              :         );
     315              : 
     316            4 :         if !metadata.is_event() {
     317              :             // register the span info.
     318            3 :             self.span_info(metadata);
     319              :             // Must not be never because we wouldn't get trace and span data.
     320            3 :             return Interest::always();
     321            1 :         }
     322              : 
     323            1 :         let mut field_indices = SkippedFieldIndices::default();
     324            1 :         let mut seen_fields = HashMap::new();
     325            5 :         for field in metadata.fields() {
     326            5 :             if let Some(old_index) = seen_fields.insert(field.name(), field.index()) {
     327            3 :                 field_indices.set(old_index);
     328            3 :             }
     329              :         }
     330              : 
     331            1 :         if !field_indices.is_empty() {
     332            1 :             self.skipped_field_indices
     333            1 :                 .pin()
     334            1 :                 .insert(metadata.callsite(), field_indices);
     335            1 :         }
     336              : 
     337            1 :         Interest::always()
     338            4 :     }
     339              : }
     340              : 
     341              : /// Any span info that is fixed to a particular callsite. Not variable between span instances.
     342              : #[derive(Clone)]
     343              : struct CallsiteSpanInfo {
     344              :     /// index of each field to extract. usize::MAX if not found.
     345              :     extract: Arc<[usize]>,
     346              : 
     347              :     /// tracks the fixed "callsite ID" for each span.
     348              :     /// note: this is not stable between runs.
     349              :     normalized_name: Arc<str>,
     350              : }
     351              : 
     352              : impl CallsiteSpanInfo {
     353            3 :     fn new(
     354            3 :         callsite_name_ids: &papaya::HashMap<&'static str, u32, ahash::RandomState>,
     355            3 :         metadata: &'static Metadata<'static>,
     356            3 :         extract_fields: &[&'static str],
     357            3 :     ) -> Self {
     358            5 :         let names: Vec<&'static str> = metadata.fields().iter().map(|f| f.name()).collect();
     359              : 
     360              :         // get all the indices of span fields we want to focus
     361            3 :         let extract = extract_fields
     362            3 :             .iter()
     363              :             // use rposition, since we want last match wins.
     364            3 :             .map(|f1| names.iter().rposition(|f2| f1 == f2).unwrap_or(usize::MAX))
     365            3 :             .collect();
     366              : 
     367              :         // normalized_name is unique for each callsite, but it is not
     368              :         // unified across separate proxy instances.
     369              :         // todo: can we do better here?
     370            3 :         let cid = *callsite_name_ids
     371            3 :             .pin()
     372            3 :             .update_or_insert(metadata.name(), |&cid| cid + 1, 0);
     373              : 
     374              :         // we hope that most span names are unique, in which case this will always be 0
     375            3 :         let normalized_name = if cid == 0 {
     376            2 :             metadata.name().into()
     377              :         } else {
     378              :             // if the span name is not unique, add the numeric ID to span name to distinguish it.
     379              :             // sadly this is non-determinstic, across restarts but we should fix it by disambiguating re-used span names instead.
     380            1 :             format!("{}#{cid}", metadata.name()).into()
     381              :         };
     382              : 
     383            3 :         Self {
     384            3 :             extract,
     385            3 :             normalized_name,
     386            3 :         }
     387            3 :     }
     388              : }
     389              : 
     390              : #[derive(Clone)]
     391              : struct RawValue(Box<[u8]>);
     392              : 
     393              : impl RawValue {
     394            5 :     fn new(v: impl json::ValueEncoder) -> Self {
     395            5 :         Self(json::value_to_vec!(|val| v.encode(val)).into_boxed_slice())
     396            5 :     }
     397              : }
     398              : 
     399              : impl json::ValueEncoder for &RawValue {
     400            6 :     fn encode(self, v: json::ValueSer<'_>) {
     401            6 :         v.write_raw_json(&self.0);
     402            6 :     }
     403              : }
     404              : 
     405              : /// Stores span field values recorded during the spans lifetime.
     406              : struct SpanFields {
     407              :     values: [Option<RawValue>; MAX_TRACING_FIELDS],
     408              : 
     409              :     /// cached span info so we can avoid extra hashmap lookups in the hot path.
     410              :     span_info: CallsiteSpanInfo,
     411              : }
     412              : 
     413              : impl SpanFields {
     414            3 :     fn new(span_info: CallsiteSpanInfo) -> Self {
     415              :         Self {
     416            3 :             span_info,
     417              :             values: [const { None }; MAX_TRACING_FIELDS],
     418              :         }
     419            3 :     }
     420              : }
     421              : 
     422              : impl tracing::field::Visit for SpanFields {
     423              :     #[inline]
     424            0 :     fn record_f64(&mut self, field: &tracing::field::Field, value: f64) {
     425            0 :         self.values[field.index()] = Some(RawValue::new(value));
     426            0 :     }
     427              : 
     428              :     #[inline]
     429            5 :     fn record_i64(&mut self, field: &tracing::field::Field, value: i64) {
     430            5 :         self.values[field.index()] = Some(RawValue::new(value));
     431            5 :     }
     432              : 
     433              :     #[inline]
     434            0 :     fn record_u64(&mut self, field: &tracing::field::Field, value: u64) {
     435            0 :         self.values[field.index()] = Some(RawValue::new(value));
     436            0 :     }
     437              : 
     438              :     #[inline]
     439            0 :     fn record_i128(&mut self, field: &tracing::field::Field, value: i128) {
     440            0 :         if let Ok(value) = i64::try_from(value) {
     441            0 :             self.values[field.index()] = Some(RawValue::new(value));
     442            0 :         } else {
     443            0 :             self.values[field.index()] = Some(RawValue::new(format_args!("{value}")));
     444            0 :         }
     445            0 :     }
     446              : 
     447              :     #[inline]
     448            0 :     fn record_u128(&mut self, field: &tracing::field::Field, value: u128) {
     449            0 :         if let Ok(value) = u64::try_from(value) {
     450            0 :             self.values[field.index()] = Some(RawValue::new(value));
     451            0 :         } else {
     452            0 :             self.values[field.index()] = Some(RawValue::new(format_args!("{value}")));
     453            0 :         }
     454            0 :     }
     455              : 
     456              :     #[inline]
     457            0 :     fn record_bool(&mut self, field: &tracing::field::Field, value: bool) {
     458            0 :         self.values[field.index()] = Some(RawValue::new(value));
     459            0 :     }
     460              : 
     461              :     #[inline]
     462            0 :     fn record_bytes(&mut self, field: &tracing::field::Field, value: &[u8]) {
     463            0 :         self.values[field.index()] = Some(RawValue::new(value));
     464            0 :     }
     465              : 
     466              :     #[inline]
     467            0 :     fn record_str(&mut self, field: &tracing::field::Field, value: &str) {
     468            0 :         self.values[field.index()] = Some(RawValue::new(value));
     469            0 :     }
     470              : 
     471              :     #[inline]
     472            0 :     fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) {
     473            0 :         self.values[field.index()] = Some(RawValue::new(format_args!("{value:?}")));
     474            0 :     }
     475              : 
     476              :     #[inline]
     477            0 :     fn record_error(
     478            0 :         &mut self,
     479            0 :         field: &tracing::field::Field,
     480            0 :         value: &(dyn std::error::Error + 'static),
     481            0 :     ) {
     482            0 :         self.values[field.index()] = Some(RawValue::new(format_args!("{value}")));
     483            0 :     }
     484              : }
     485              : 
     486              : /// List of field indices skipped during logging. Can list duplicate fields or
     487              : /// metafields not meant to be logged.
     488              : #[derive(Copy, Clone, Default)]
     489              : struct SkippedFieldIndices {
     490              :     // 32-bits is large enough for `MAX_TRACING_FIELDS`
     491              :     bits: u32,
     492              : }
     493              : 
     494              : impl SkippedFieldIndices {
     495              :     #[inline]
     496            1 :     fn is_empty(self) -> bool {
     497            1 :         self.bits == 0
     498            1 :     }
     499              : 
     500              :     #[inline]
     501            3 :     fn set(&mut self, index: usize) {
     502            3 :         debug_assert!(index <= 32, "index out of bounds of 32-bit set");
     503            3 :         self.bits |= 1 << index;
     504            3 :     }
     505              : 
     506              :     #[inline]
     507            5 :     fn contains(self, index: usize) -> bool {
     508            5 :         self.bits & (1 << index) != 0
     509            5 :     }
     510              : }
     511              : 
     512              : /// Formats a tracing event and writes JSON to its internal buffer including a newline.
     513              : // TODO: buffer capacity management, truncate if too large
     514              : struct EventFormatter {
     515              :     logline_buffer: Vec<u8>,
     516              : }
     517              : 
     518              : impl EventFormatter {
     519              :     #[inline]
     520            0 :     const fn new() -> Self {
     521            0 :         EventFormatter {
     522            0 :             logline_buffer: Vec::new(),
     523            0 :         }
     524            0 :     }
     525              : 
     526              :     #[inline]
     527            1 :     fn buffer(&self) -> &[u8] {
     528            1 :         &self.logline_buffer
     529            1 :     }
     530              : 
     531            1 :     fn format<S>(
     532            1 :         &mut self,
     533            1 :         now: DateTime<Utc>,
     534            1 :         event: &Event<'_>,
     535            1 :         ctx: &Context<'_, S>,
     536            1 :         skipped_field_indices: &CallsiteMap<SkippedFieldIndices>,
     537            1 :         extract_fields: &'static [&'static str],
     538            1 :     ) where
     539            1 :         S: Subscriber + for<'a> LookupSpan<'a>,
     540              :     {
     541            1 :         let timestamp = now.to_rfc3339_opts(chrono::SecondsFormat::Micros, true);
     542              : 
     543              :         use tracing_log::NormalizeEvent;
     544            1 :         let normalized_meta = event.normalized_metadata();
     545            1 :         let meta = normalized_meta.as_ref().unwrap_or_else(|| event.metadata());
     546              : 
     547            1 :         let skipped_field_indices = skipped_field_indices
     548            1 :             .pin()
     549            1 :             .get(&meta.callsite())
     550            1 :             .copied()
     551            1 :             .unwrap_or_default();
     552              : 
     553            1 :         self.logline_buffer.clear();
     554            1 :         let serializer = json::ValueSer::new(&mut self.logline_buffer);
     555            1 :         json::value_as_object!(|serializer| {
     556              :             // Timestamp comes first, so raw lines can be sorted by timestamp.
     557            1 :             serializer.entry("timestamp", &*timestamp);
     558              : 
     559              :             // Level next.
     560            1 :             serializer.entry("level", meta.level().as_str());
     561              : 
     562              :             // Message next.
     563            1 :             let mut message_extractor =
     564            1 :                 MessageFieldExtractor::new(serializer.key("message"), skipped_field_indices);
     565            1 :             event.record(&mut message_extractor);
     566            1 :             message_extractor.finish();
     567              : 
     568              :             // Direct message fields.
     569              :             {
     570            1 :                 let mut message_skipper = MessageFieldSkipper::new(
     571            1 :                     serializer.key("fields").object(),
     572            1 :                     skipped_field_indices,
     573              :                 );
     574            1 :                 event.record(&mut message_skipper);
     575              : 
     576              :                 // rollback if no fields are present.
     577            1 :                 if message_skipper.present {
     578            1 :                     message_skipper.serializer.finish();
     579            1 :                 }
     580              :             }
     581              : 
     582            1 :             let mut extracted = ExtractedSpanFields::new(extract_fields);
     583              : 
     584            1 :             let spans = serializer.key("spans");
     585            1 :             json::value_as_object!(|spans| {
     586            1 :                 let parent_spans = ctx
     587            1 :                     .event_span(event)
     588            1 :                     .map_or(vec![], |parent| parent.scope().collect());
     589              : 
     590            3 :                 for span in parent_spans.iter().rev() {
     591            3 :                     let ext = span.extensions();
     592              : 
     593              :                     // all spans should have this extension.
     594            3 :                     let Some(fields) = ext.get() else { continue };
     595              : 
     596            3 :                     extracted.layer_span(fields);
     597              : 
     598            3 :                     let SpanFields { values, span_info } = fields;
     599              : 
     600            3 :                     let span_fields = spans.key(&*span_info.normalized_name);
     601            3 :                     json::value_as_object!(|span_fields| {
     602            5 :                         for (field, value) in std::iter::zip(span.metadata().fields(), values) {
     603            5 :                             if let Some(value) = value {
     604            5 :                                 span_fields.entry(field.name(), value);
     605            5 :                             }
     606              :                         }
     607              :                     });
     608              :                 }
     609              :             });
     610              : 
     611              :             // TODO: thread-local cache?
     612            1 :             let pid = std::process::id();
     613              :             // Skip adding pid 1 to reduce noise for services running in containers.
     614            1 :             if pid != 1 {
     615            1 :                 serializer.entry("process_id", pid);
     616            1 :             }
     617              : 
     618            1 :             THREAD_ID.with(|tid| serializer.entry("thread_id", tid));
     619              : 
     620              :             // TODO: tls cache? name could change
     621            1 :             if let Some(thread_name) = std::thread::current().name()
     622            1 :                 && !thread_name.is_empty()
     623            1 :                 && thread_name != "tokio-runtime-worker"
     624            1 :             {
     625            1 :                 serializer.entry("thread_name", thread_name);
     626            1 :             }
     627              : 
     628            1 :             if let Some(task_id) = tokio::task::try_id() {
     629            0 :                 serializer.entry("task_id", format_args!("{task_id}"));
     630            1 :             }
     631              : 
     632            1 :             serializer.entry("target", meta.target());
     633              : 
     634              :             // Skip adding module if it's the same as target.
     635            1 :             if let Some(module) = meta.module_path()
     636            1 :                 && module != meta.target()
     637            0 :             {
     638            0 :                 serializer.entry("module", module);
     639            1 :             }
     640              : 
     641            1 :             if let Some(file) = meta.file() {
     642            1 :                 if let Some(line) = meta.line() {
     643            1 :                     serializer.entry("src", format_args!("{file}:{line}"));
     644            1 :                 } else {
     645            0 :                     serializer.entry("src", file);
     646            0 :                 }
     647            0 :             }
     648              : 
     649              :             {
     650            1 :                 let otel_context = Span::current().context();
     651            1 :                 let otel_spanref = otel_context.span();
     652            1 :                 let span_context = otel_spanref.span_context();
     653            1 :                 if span_context.is_valid() {
     654            0 :                     serializer.entry("trace_id", format_args!("{}", span_context.trace_id()));
     655            1 :                 }
     656              :             }
     657              : 
     658            1 :             if extracted.has_values() {
     659              :                 // TODO: add fields from event, too?
     660            1 :                 let extract = serializer.key("extract");
     661            1 :                 json::value_as_object!(|extract| {
     662            1 :                     for (key, value) in std::iter::zip(extracted.names, extracted.values) {
     663            1 :                         if let Some(value) = value {
     664            1 :                             extract.entry(*key, &value);
     665            1 :                         }
     666              :                     }
     667              :                 });
     668            0 :             }
     669              :         });
     670              : 
     671            1 :         self.logline_buffer.push(b'\n');
     672            1 :     }
     673              : }
     674              : 
     675              : /// Extracts the message field that's mixed will other fields.
     676              : struct MessageFieldExtractor<'buf> {
     677              :     serializer: Option<json::ValueSer<'buf>>,
     678              :     skipped_field_indices: SkippedFieldIndices,
     679              : }
     680              : 
     681              : impl<'buf> MessageFieldExtractor<'buf> {
     682              :     #[inline]
     683            1 :     fn new(serializer: json::ValueSer<'buf>, skipped_field_indices: SkippedFieldIndices) -> Self {
     684            1 :         Self {
     685            1 :             serializer: Some(serializer),
     686            1 :             skipped_field_indices,
     687            1 :         }
     688            1 :     }
     689              : 
     690              :     #[inline]
     691            1 :     fn finish(self) {
     692            1 :         if let Some(ser) = self.serializer {
     693            0 :             ser.value("");
     694            1 :         }
     695            1 :     }
     696              : 
     697              :     #[inline]
     698            5 :     fn record_field(&mut self, field: &tracing::field::Field, v: impl json::ValueEncoder) {
     699            5 :         if field.name() == MESSAGE_FIELD
     700            2 :             && !self.skipped_field_indices.contains(field.index())
     701            1 :             && let Some(ser) = self.serializer.take()
     702            1 :         {
     703            1 :             ser.value(v);
     704            4 :         }
     705            5 :     }
     706              : }
     707              : 
     708              : impl tracing::field::Visit for MessageFieldExtractor<'_> {
     709              :     #[inline]
     710            0 :     fn record_f64(&mut self, field: &tracing::field::Field, value: f64) {
     711            0 :         self.record_field(field, value);
     712            0 :     }
     713              : 
     714              :     #[inline]
     715            3 :     fn record_i64(&mut self, field: &tracing::field::Field, value: i64) {
     716            3 :         self.record_field(field, value);
     717            3 :     }
     718              : 
     719              :     #[inline]
     720            0 :     fn record_u64(&mut self, field: &tracing::field::Field, value: u64) {
     721            0 :         self.record_field(field, value);
     722            0 :     }
     723              : 
     724              :     #[inline]
     725            0 :     fn record_i128(&mut self, field: &tracing::field::Field, value: i128) {
     726            0 :         self.record_field(field, value);
     727            0 :     }
     728              : 
     729              :     #[inline]
     730            0 :     fn record_u128(&mut self, field: &tracing::field::Field, value: u128) {
     731            0 :         self.record_field(field, value);
     732            0 :     }
     733              : 
     734              :     #[inline]
     735            0 :     fn record_bool(&mut self, field: &tracing::field::Field, value: bool) {
     736            0 :         self.record_field(field, value);
     737            0 :     }
     738              : 
     739              :     #[inline]
     740            0 :     fn record_bytes(&mut self, field: &tracing::field::Field, value: &[u8]) {
     741            0 :         self.record_field(field, format_args!("{value:x?}"));
     742            0 :     }
     743              : 
     744              :     #[inline]
     745            1 :     fn record_str(&mut self, field: &tracing::field::Field, value: &str) {
     746            1 :         self.record_field(field, value);
     747            1 :     }
     748              : 
     749              :     #[inline]
     750            1 :     fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) {
     751            1 :         self.record_field(field, format_args!("{value:?}"));
     752            1 :     }
     753              : 
     754              :     #[inline]
     755            0 :     fn record_error(
     756            0 :         &mut self,
     757            0 :         field: &tracing::field::Field,
     758            0 :         value: &(dyn std::error::Error + 'static),
     759            0 :     ) {
     760            0 :         self.record_field(field, format_args!("{value}"));
     761            0 :     }
     762              : }
     763              : 
     764              : /// A tracing field visitor that skips the message field.
     765              : struct MessageFieldSkipper<'buf> {
     766              :     serializer: json::ObjectSer<'buf>,
     767              :     skipped_field_indices: SkippedFieldIndices,
     768              :     present: bool,
     769              : }
     770              : 
     771              : impl<'buf> MessageFieldSkipper<'buf> {
     772              :     #[inline]
     773            1 :     fn new(serializer: json::ObjectSer<'buf>, skipped_field_indices: SkippedFieldIndices) -> Self {
     774            1 :         Self {
     775            1 :             serializer,
     776            1 :             skipped_field_indices,
     777            1 :             present: false,
     778            1 :         }
     779            1 :     }
     780              : 
     781              :     #[inline]
     782            5 :     fn record_field(&mut self, field: &tracing::field::Field, v: impl json::ValueEncoder) {
     783            5 :         if field.name() != MESSAGE_FIELD
     784            3 :             && !field.name().starts_with("log.")
     785            3 :             && !self.skipped_field_indices.contains(field.index())
     786            1 :         {
     787            1 :             self.serializer.entry(field.name(), v);
     788            1 :             self.present |= true;
     789            4 :         }
     790            5 :     }
     791              : }
     792              : 
     793              : impl tracing::field::Visit for MessageFieldSkipper<'_> {
     794              :     #[inline]
     795            0 :     fn record_f64(&mut self, field: &tracing::field::Field, value: f64) {
     796            0 :         self.record_field(field, value);
     797            0 :     }
     798              : 
     799              :     #[inline]
     800            3 :     fn record_i64(&mut self, field: &tracing::field::Field, value: i64) {
     801            3 :         self.record_field(field, value);
     802            3 :     }
     803              : 
     804              :     #[inline]
     805            0 :     fn record_u64(&mut self, field: &tracing::field::Field, value: u64) {
     806            0 :         self.record_field(field, value);
     807            0 :     }
     808              : 
     809              :     #[inline]
     810            0 :     fn record_i128(&mut self, field: &tracing::field::Field, value: i128) {
     811            0 :         self.record_field(field, value);
     812            0 :     }
     813              : 
     814              :     #[inline]
     815            0 :     fn record_u128(&mut self, field: &tracing::field::Field, value: u128) {
     816            0 :         self.record_field(field, value);
     817            0 :     }
     818              : 
     819              :     #[inline]
     820            0 :     fn record_bool(&mut self, field: &tracing::field::Field, value: bool) {
     821            0 :         self.record_field(field, value);
     822            0 :     }
     823              : 
     824              :     #[inline]
     825            0 :     fn record_bytes(&mut self, field: &tracing::field::Field, value: &[u8]) {
     826            0 :         self.record_field(field, format_args!("{value:x?}"));
     827            0 :     }
     828              : 
     829              :     #[inline]
     830            1 :     fn record_str(&mut self, field: &tracing::field::Field, value: &str) {
     831            1 :         self.record_field(field, value);
     832            1 :     }
     833              : 
     834              :     #[inline]
     835            1 :     fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) {
     836            1 :         self.record_field(field, format_args!("{value:?}"));
     837            1 :     }
     838              : 
     839              :     #[inline]
     840            0 :     fn record_error(
     841            0 :         &mut self,
     842            0 :         field: &tracing::field::Field,
     843            0 :         value: &(dyn std::error::Error + 'static),
     844            0 :     ) {
     845            0 :         self.record_field(field, format_args!("{value}"));
     846            0 :     }
     847              : }
     848              : 
     849              : struct ExtractedSpanFields {
     850              :     names: &'static [&'static str],
     851              :     values: Vec<Option<RawValue>>,
     852              : }
     853              : 
     854              : impl ExtractedSpanFields {
     855            1 :     fn new(names: &'static [&'static str]) -> Self {
     856            1 :         ExtractedSpanFields {
     857            1 :             names,
     858            1 :             values: vec![None; names.len()],
     859            1 :         }
     860            1 :     }
     861              : 
     862            3 :     fn layer_span(&mut self, fields: &SpanFields) {
     863            3 :         let SpanFields { values, span_info } = fields;
     864              : 
     865              :         // extract the fields
     866            3 :         for (i, &j) in span_info.extract.iter().enumerate() {
     867            3 :             let Some(Some(value)) = values.get(j) else {
     868            1 :                 continue;
     869              :             };
     870              : 
     871              :             // TODO: replace clone with reference, if possible.
     872            2 :             self.values[i] = Some(value.clone());
     873              :         }
     874            3 :     }
     875              : 
     876              :     #[inline]
     877            1 :     fn has_values(&self) -> bool {
     878            1 :         self.values.iter().any(|v| v.is_some())
     879            1 :     }
     880              : }
     881              : 
     882              : #[cfg(test)]
     883              : mod tests {
     884              :     use std::sync::{Arc, Mutex, MutexGuard};
     885              : 
     886              :     use assert_json_diff::assert_json_eq;
     887              :     use tracing::info_span;
     888              : 
     889              :     use super::*;
     890              : 
     891              :     struct TestClock {
     892              :         current_time: Mutex<DateTime<Utc>>,
     893              :     }
     894              : 
     895              :     impl Clock for Arc<TestClock> {
     896            2 :         fn now(&self) -> DateTime<Utc> {
     897            2 :             *self.current_time.lock().expect("poisoned")
     898            2 :         }
     899              :     }
     900              : 
     901              :     struct VecWriter<'a> {
     902              :         buffer: MutexGuard<'a, Vec<u8>>,
     903              :     }
     904              : 
     905              :     impl MakeWriter for Arc<Mutex<Vec<u8>>> {
     906            1 :         fn make_writer(&self) -> impl io::Write {
     907            1 :             VecWriter {
     908            1 :                 buffer: self.lock().expect("poisoned"),
     909            1 :             }
     910            1 :         }
     911              :     }
     912              : 
     913              :     impl io::Write for VecWriter<'_> {
     914            1 :         fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
     915            1 :             self.buffer.write(buf)
     916            1 :         }
     917              : 
     918            0 :         fn flush(&mut self) -> io::Result<()> {
     919            0 :             Ok(())
     920            0 :         }
     921              :     }
     922              : 
     923              :     #[test]
     924            1 :     fn test_field_collection() {
     925            1 :         let clock = Arc::new(TestClock {
     926            1 :             current_time: Mutex::new(Utc::now()),
     927            1 :         });
     928            1 :         let buffer = Arc::new(Mutex::new(Vec::new()));
     929            1 :         let log_layer = JsonLoggingLayer {
     930            1 :             clock: clock.clone(),
     931            1 :             skipped_field_indices: papaya::HashMap::default(),
     932            1 :             span_info: papaya::HashMap::default(),
     933            1 :             callsite_name_ids: papaya::HashMap::default(),
     934            1 :             writer: buffer.clone(),
     935            1 :             extract_fields: &["x"],
     936            1 :         };
     937              : 
     938            1 :         let registry = tracing_subscriber::Registry::default().with(log_layer);
     939              : 
     940            1 :         tracing::subscriber::with_default(registry, || {
     941            1 :             info_span!("some_span", x = 24).in_scope(|| {
     942            1 :                 info_span!("some_other_span", y = 30).in_scope(|| {
     943            1 :                     info_span!("some_span", x = 40, x = 41, x = 42).in_scope(|| {
     944            1 :                         tracing::error!(
     945              :                             a = 1,
     946              :                             a = 2,
     947              :                             a = 3,
     948              :                             message = "explicit message field",
     949            0 :                             "implicit message field"
     950              :                         );
     951            1 :                     });
     952            1 :                 });
     953            1 :             });
     954            1 :         });
     955              : 
     956            1 :         let buffer = Arc::try_unwrap(buffer)
     957            1 :             .expect("no other reference")
     958            1 :             .into_inner()
     959            1 :             .expect("poisoned");
     960            1 :         let actual: serde_json::Value = serde_json::from_slice(&buffer).expect("valid JSON");
     961            1 :         let expected: serde_json::Value = serde_json::json!(
     962              :             {
     963            1 :                 "timestamp": clock.now().to_rfc3339_opts(chrono::SecondsFormat::Micros, true),
     964            1 :                 "level": "ERROR",
     965            1 :                 "message": "explicit message field",
     966            1 :                 "fields": {
     967            1 :                     "a": 3,
     968              :                 },
     969            1 :                 "spans": {
     970            1 :                     "some_span":{
     971            1 :                         "x": 24,
     972              :                     },
     973            1 :                     "some_other_span": {
     974            1 :                         "y": 30,
     975              :                     },
     976            1 :                     "some_span#1": {
     977            1 :                         "x": 42,
     978              :                     },
     979              :                 },
     980            1 :                 "extract": {
     981            1 :                     "x": 42,
     982              :                 },
     983            1 :                 "src": actual.as_object().unwrap().get("src").unwrap().as_str().unwrap(),
     984            1 :                 "target": "proxy::logging::tests",
     985            1 :                 "process_id": actual.as_object().unwrap().get("process_id").unwrap().as_number().unwrap(),
     986            1 :                 "thread_id": actual.as_object().unwrap().get("thread_id").unwrap().as_number().unwrap(),
     987            1 :                 "thread_name": "logging::tests::test_field_collection",
     988              :             }
     989              :         );
     990              : 
     991            1 :         assert_json_eq!(actual, expected);
     992            1 :     }
     993              : }
        

Generated by: LCOV version 2.1-beta