LCOV - code coverage report
Current view: top level - proxy/src - logging.rs (source / functions) Coverage Total Hit
Test: 9ea6db2ee0fa9d49a3b75230199d5aaf7b855d49.info Lines: 59.9 % 519 311
Test Date: 2025-07-18 08:04:09 Functions: 44.6 % 112 50

            Line data    Source code
       1              : use std::cell::RefCell;
       2              : use std::collections::HashMap;
       3              : use std::sync::Arc;
       4              : use std::{env, io};
       5              : 
       6              : use chrono::{DateTime, Utc};
       7              : use opentelemetry::trace::TraceContextExt;
       8              : use tracing::subscriber::Interest;
       9              : use tracing::{Event, Metadata, Span, Subscriber, callsite, span};
      10              : use tracing_opentelemetry::OpenTelemetrySpanExt;
      11              : use tracing_subscriber::filter::{EnvFilter, LevelFilter};
      12              : use tracing_subscriber::fmt::format::{Format, Full};
      13              : use tracing_subscriber::fmt::time::SystemTime;
      14              : use tracing_subscriber::fmt::{FormatEvent, FormatFields};
      15              : use tracing_subscriber::layer::{Context, Layer};
      16              : use tracing_subscriber::prelude::*;
      17              : use tracing_subscriber::registry::LookupSpan;
      18              : 
      19              : use crate::metrics::Metrics;
      20              : 
      21              : /// Initialize logging and OpenTelemetry tracing and exporter.
      22              : ///
      23              : /// Logging can be configured using `RUST_LOG` environment variable.
      24              : ///
      25              : /// OpenTelemetry is configured with OTLP/HTTP exporter. It picks up
      26              : /// configuration from environment variables. For example, to change the
      27              : /// destination, set `OTEL_EXPORTER_OTLP_ENDPOINT=http://jaeger:4318`.
      28              : /// See <https://opentelemetry.io/docs/reference/specification/sdk-environment-variables>
      29            0 : pub async fn init() -> anyhow::Result<LoggingGuard> {
      30            0 :     let logfmt = LogFormat::from_env()?;
      31              : 
      32            0 :     let env_filter = EnvFilter::builder()
      33            0 :         .with_default_directive(LevelFilter::INFO.into())
      34            0 :         .from_env_lossy()
      35            0 :         .add_directive(
      36            0 :             "aws_config=info"
      37            0 :                 .parse()
      38            0 :                 .expect("this should be a valid filter directive"),
      39              :         )
      40            0 :         .add_directive(
      41            0 :             "azure_core::policies::transport=off"
      42            0 :                 .parse()
      43            0 :                 .expect("this should be a valid filter directive"),
      44              :         );
      45              : 
      46            0 :     let otlp_layer =
      47            0 :         tracing_utils::init_tracing("proxy", tracing_utils::ExportConfig::default()).await;
      48              : 
      49            0 :     let json_log_layer = if logfmt == LogFormat::Json {
      50            0 :         Some(JsonLoggingLayer::new(
      51            0 :             RealClock,
      52            0 :             StderrWriter {
      53            0 :                 stderr: std::io::stderr(),
      54            0 :             },
      55            0 :             &["conn_id", "ep", "query_id", "request_id", "session_id"],
      56            0 :         ))
      57              :     } else {
      58            0 :         None
      59              :     };
      60              : 
      61            0 :     let text_log_layer = if logfmt == LogFormat::Text {
      62            0 :         Some(
      63            0 :             tracing_subscriber::fmt::layer()
      64            0 :                 .with_ansi(false)
      65            0 :                 .with_writer(std::io::stderr)
      66            0 :                 .with_target(false),
      67            0 :         )
      68              :     } else {
      69            0 :         None
      70              :     };
      71              : 
      72            0 :     tracing_subscriber::registry()
      73            0 :         .with(env_filter)
      74            0 :         .with(otlp_layer)
      75            0 :         .with(json_log_layer)
      76            0 :         .with(text_log_layer)
      77            0 :         .try_init()?;
      78              : 
      79            0 :     Ok(LoggingGuard)
      80            0 : }
      81              : 
      82              : /// Initialize logging for local_proxy with log prefix and no opentelemetry.
      83              : ///
      84              : /// Logging can be configured using `RUST_LOG` environment variable.
      85            0 : pub fn init_local_proxy() -> anyhow::Result<LoggingGuard> {
      86            0 :     let env_filter = EnvFilter::builder()
      87            0 :         .with_default_directive(LevelFilter::INFO.into())
      88            0 :         .from_env_lossy();
      89              : 
      90            0 :     let fmt_layer = tracing_subscriber::fmt::layer()
      91            0 :         .with_ansi(false)
      92            0 :         .with_writer(std::io::stderr)
      93            0 :         .event_format(LocalProxyFormatter(Format::default().with_target(false)));
      94              : 
      95            0 :     tracing_subscriber::registry()
      96            0 :         .with(env_filter)
      97            0 :         .with(fmt_layer)
      98            0 :         .try_init()?;
      99              : 
     100            0 :     Ok(LoggingGuard)
     101            0 : }
     102              : 
     103              : pub struct LocalProxyFormatter(Format<Full, SystemTime>);
     104              : 
     105              : impl<S, N> FormatEvent<S, N> for LocalProxyFormatter
     106              : where
     107              :     S: Subscriber + for<'a> LookupSpan<'a>,
     108              :     N: for<'a> FormatFields<'a> + 'static,
     109              : {
     110            0 :     fn format_event(
     111            0 :         &self,
     112            0 :         ctx: &tracing_subscriber::fmt::FmtContext<'_, S, N>,
     113            0 :         mut writer: tracing_subscriber::fmt::format::Writer<'_>,
     114            0 :         event: &tracing::Event<'_>,
     115            0 :     ) -> std::fmt::Result {
     116            0 :         writer.write_str("[local_proxy] ")?;
     117            0 :         self.0.format_event(ctx, writer, event)
     118            0 :     }
     119              : }
     120              : 
     121              : pub struct LoggingGuard;
     122              : 
     123              : impl Drop for LoggingGuard {
     124            0 :     fn drop(&mut self) {
     125              :         // Shutdown trace pipeline gracefully, so that it has a chance to send any
     126              :         // pending traces before we exit.
     127            0 :         tracing::info!("shutting down the tracing machinery");
     128            0 :         tracing_utils::shutdown_tracing();
     129            0 :     }
     130              : }
     131              : 
     132              : #[derive(Copy, Clone, PartialEq, Eq, Default, Debug)]
     133              : enum LogFormat {
     134              :     Text,
     135              :     #[default]
     136              :     Json,
     137              : }
     138              : 
     139              : impl LogFormat {
     140            0 :     fn from_env() -> anyhow::Result<Self> {
     141            0 :         let logfmt = env::var("LOGFMT");
     142            0 :         Ok(match logfmt.as_deref() {
     143            0 :             Err(_) => LogFormat::default(),
     144            0 :             Ok("text") => LogFormat::Text,
     145            0 :             Ok("json") => LogFormat::Json,
     146            0 :             Ok(logfmt) => anyhow::bail!("unknown log format: {logfmt}"),
     147              :         })
     148            0 :     }
     149              : }
     150              : 
     151              : trait MakeWriter {
     152              :     fn make_writer(&self) -> impl io::Write;
     153              : }
     154              : 
     155              : struct StderrWriter {
     156              :     stderr: io::Stderr,
     157              : }
     158              : 
     159              : impl MakeWriter for StderrWriter {
     160              :     #[inline]
     161            0 :     fn make_writer(&self) -> impl io::Write {
     162            0 :         self.stderr.lock()
     163            0 :     }
     164              : }
     165              : 
     166              : // TODO: move into separate module or even separate crate.
     167              : trait Clock {
     168              :     fn now(&self) -> DateTime<Utc>;
     169              : }
     170              : 
     171              : struct RealClock;
     172              : 
     173              : impl Clock for RealClock {
     174              :     #[inline]
     175            0 :     fn now(&self) -> DateTime<Utc> {
     176            0 :         Utc::now()
     177            0 :     }
     178              : }
     179              : 
     180              : /// Name of the field used by tracing crate to store the event message.
     181              : const MESSAGE_FIELD: &str = "message";
     182              : 
     183              : /// Tracing used to enforce that spans/events have no more than 32 fields.
     184              : /// It seems this is no longer the case, but it's still documented in some places.
     185              : /// Generally, we shouldn't expect more than 32 fields anyway, so we can try and
     186              : /// rely on it for some (minor) performance gains.
     187              : const MAX_TRACING_FIELDS: usize = 32;
     188              : 
     189              : thread_local! {
     190              :     /// Thread-local instance with per-thread buffer for log writing.
     191              :     static EVENT_FORMATTER: RefCell<EventFormatter> = const { RefCell::new(EventFormatter::new()) };
     192              :     /// Cached OS thread ID.
     193              :     static THREAD_ID: u64 = gettid::gettid();
     194              : }
     195              : 
     196              : /// Map for values fixed at callsite registration.
     197              : // We use papaya here because registration rarely happens post-startup.
     198              : // papaya is good for read-heavy workloads.
     199              : //
     200              : // We use rustc_hash here because callsite::Identifier will always be an integer with low-bit entropy,
     201              : // since it's always a pointer to static mutable data. rustc_hash was designed for low-bit entropy.
     202              : type CallsiteMap<T> =
     203              :     papaya::HashMap<callsite::Identifier, T, std::hash::BuildHasherDefault<rustc_hash::FxHasher>>;
     204              : 
     205              : /// Implements tracing layer to handle events specific to logging.
     206              : struct JsonLoggingLayer<C: Clock, W: MakeWriter> {
     207              :     clock: C,
     208              :     writer: W,
     209              : 
     210              :     /// tracks which fields of each **event** are duplicates
     211              :     skipped_field_indices: CallsiteMap<SkippedFieldIndices>,
     212              : 
     213              :     /// tracks callsite names to an ID.
     214              :     callsite_name_ids: papaya::HashMap<&'static str, u32, ahash::RandomState>,
     215              : 
     216              :     span_info: CallsiteMap<CallsiteSpanInfo>,
     217              : 
     218              :     /// Fields we want to keep track of in a separate json object.
     219              :     extract_fields: &'static [&'static str],
     220              : }
     221              : 
     222              : impl<C: Clock, W: MakeWriter> JsonLoggingLayer<C, W> {
     223            0 :     fn new(clock: C, writer: W, extract_fields: &'static [&'static str]) -> Self {
     224            0 :         JsonLoggingLayer {
     225            0 :             clock,
     226            0 :             skipped_field_indices: CallsiteMap::default(),
     227            0 :             span_info: CallsiteMap::default(),
     228            0 :             callsite_name_ids: papaya::HashMap::default(),
     229            0 :             writer,
     230            0 :             extract_fields,
     231            0 :         }
     232            0 :     }
     233              : 
     234              :     #[inline]
     235            6 :     fn span_info(&self, metadata: &'static Metadata<'static>) -> CallsiteSpanInfo {
     236            6 :         self.span_info
     237            6 :             .pin()
     238            6 :             .get_or_insert_with(metadata.callsite(), || {
     239            3 :                 CallsiteSpanInfo::new(&self.callsite_name_ids, metadata, self.extract_fields)
     240            3 :             })
     241            6 :             .clone()
     242            6 :     }
     243              : }
     244              : 
     245              : impl<S, C: Clock + 'static, W: MakeWriter + 'static> Layer<S> for JsonLoggingLayer<C, W>
     246              : where
     247              :     S: Subscriber + for<'a> LookupSpan<'a>,
     248              : {
     249            1 :     fn on_event(&self, event: &Event<'_>, ctx: Context<'_, S>) {
     250              :         use std::io::Write;
     251              : 
     252              :         // TODO: consider special tracing subscriber to grab timestamp very
     253              :         //       early, before OTel machinery, and add as event extension.
     254            1 :         let now = self.clock.now();
     255              : 
     256            1 :         EVENT_FORMATTER.with(|f| {
     257            1 :             let mut borrow = f.try_borrow_mut();
     258            1 :             let formatter = match borrow.as_deref_mut() {
     259            1 :                 Ok(formatter) => formatter,
     260              :                 // If the thread local formatter is borrowed,
     261              :                 // then we likely hit an edge case were we panicked during formatting.
     262              :                 // We allow the logging to proceed with an uncached formatter.
     263            0 :                 Err(_) => &mut EventFormatter::new(),
     264              :             };
     265              : 
     266            1 :             formatter.format(
     267            1 :                 now,
     268            1 :                 event,
     269            1 :                 &ctx,
     270            1 :                 &self.skipped_field_indices,
     271            1 :                 self.extract_fields,
     272              :             );
     273              : 
     274            1 :             let mut writer = self.writer.make_writer();
     275            1 :             if writer.write_all(formatter.buffer()).is_err() {
     276            0 :                 Metrics::get().proxy.logging_errors_count.inc();
     277            1 :             }
     278            1 :         });
     279            1 :     }
     280              : 
     281              :     /// Registers a SpanFields instance as span extension.
     282            3 :     fn on_new_span(&self, attrs: &span::Attributes<'_>, id: &span::Id, ctx: Context<'_, S>) {
     283            3 :         let span = ctx.span(id).expect("span must exist");
     284              : 
     285            3 :         let mut fields = SpanFields::new(self.span_info(span.metadata()));
     286            3 :         attrs.record(&mut fields);
     287              : 
     288              :         // This is a new span: the extensions should not be locked
     289              :         // unless some layer spawned a thread to process this span.
     290              :         // I don't think any layers do that.
     291            3 :         span.extensions_mut().insert(fields);
     292            3 :     }
     293              : 
     294            0 :     fn on_record(&self, id: &span::Id, values: &span::Record<'_>, ctx: Context<'_, S>) {
     295            0 :         let span = ctx.span(id).expect("span must exist");
     296              : 
     297              :         // assumption: `on_record` is rarely called.
     298              :         // assumption: a span being updated by one thread,
     299              :         //             and formatted by another thread is even rarer.
     300            0 :         let mut ext = span.extensions_mut();
     301            0 :         if let Some(fields) = ext.get_mut::<SpanFields>() {
     302            0 :             values.record(fields);
     303            0 :         }
     304            0 :     }
     305              : 
     306              :     /// Called (lazily) roughly once per event/span instance. We quickly check
     307              :     /// for duplicate field names and record duplicates as skippable. Last field wins.
     308            4 :     fn register_callsite(&self, metadata: &'static Metadata<'static>) -> Interest {
     309            4 :         debug_assert!(
     310            4 :             metadata.fields().len() <= MAX_TRACING_FIELDS,
     311            0 :             "callsite {metadata:?} has too many fields."
     312              :         );
     313              : 
     314            4 :         if !metadata.is_event() {
     315              :             // register the span info.
     316            3 :             self.span_info(metadata);
     317              :             // Must not be never because we wouldn't get trace and span data.
     318            3 :             return Interest::always();
     319            1 :         }
     320              : 
     321            1 :         let mut field_indices = SkippedFieldIndices::default();
     322            1 :         let mut seen_fields = HashMap::new();
     323            5 :         for field in metadata.fields() {
     324            5 :             if let Some(old_index) = seen_fields.insert(field.name(), field.index()) {
     325            3 :                 field_indices.set(old_index);
     326            3 :             }
     327              :         }
     328              : 
     329            1 :         if !field_indices.is_empty() {
     330            1 :             self.skipped_field_indices
     331            1 :                 .pin()
     332            1 :                 .insert(metadata.callsite(), field_indices);
     333            1 :         }
     334              : 
     335            1 :         Interest::always()
     336            4 :     }
     337              : }
     338              : 
     339              : /// Any span info that is fixed to a particular callsite. Not variable between span instances.
     340              : #[derive(Clone)]
     341              : struct CallsiteSpanInfo {
     342              :     /// index of each field to extract. usize::MAX if not found.
     343              :     extract: Arc<[usize]>,
     344              : 
     345              :     /// tracks the fixed "callsite ID" for each span.
     346              :     /// note: this is not stable between runs.
     347              :     normalized_name: Arc<str>,
     348              : }
     349              : 
     350              : impl CallsiteSpanInfo {
     351            3 :     fn new(
     352            3 :         callsite_name_ids: &papaya::HashMap<&'static str, u32, ahash::RandomState>,
     353            3 :         metadata: &'static Metadata<'static>,
     354            3 :         extract_fields: &[&'static str],
     355            3 :     ) -> Self {
     356            5 :         let names: Vec<&'static str> = metadata.fields().iter().map(|f| f.name()).collect();
     357              : 
     358              :         // get all the indices of span fields we want to focus
     359            3 :         let extract = extract_fields
     360            3 :             .iter()
     361              :             // use rposition, since we want last match wins.
     362            3 :             .map(|f1| names.iter().rposition(|f2| f1 == f2).unwrap_or(usize::MAX))
     363            3 :             .collect();
     364              : 
     365              :         // normalized_name is unique for each callsite, but it is not
     366              :         // unified across separate proxy instances.
     367              :         // todo: can we do better here?
     368            3 :         let cid = *callsite_name_ids
     369            3 :             .pin()
     370            3 :             .update_or_insert(metadata.name(), |&cid| cid + 1, 0);
     371              : 
     372              :         // we hope that most span names are unique, in which case this will always be 0
     373            3 :         let normalized_name = if cid == 0 {
     374            2 :             metadata.name().into()
     375              :         } else {
     376              :             // if the span name is not unique, add the numeric ID to span name to distinguish it.
     377              :             // sadly this is non-determinstic, across restarts but we should fix it by disambiguating re-used span names instead.
     378            1 :             format!("{}#{cid}", metadata.name()).into()
     379              :         };
     380              : 
     381            3 :         Self {
     382            3 :             extract,
     383            3 :             normalized_name,
     384            3 :         }
     385            3 :     }
     386              : }
     387              : 
     388              : #[derive(Clone)]
     389              : struct RawValue(Box<[u8]>);
     390              : 
     391              : impl RawValue {
     392            5 :     fn new(v: impl json::ValueEncoder) -> Self {
     393            5 :         Self(json::value_to_vec!(|val| v.encode(val)).into_boxed_slice())
     394            5 :     }
     395              : }
     396              : 
     397              : impl json::ValueEncoder for &RawValue {
     398            6 :     fn encode(self, v: json::ValueSer<'_>) {
     399            6 :         v.write_raw_json(&self.0);
     400            6 :     }
     401              : }
     402              : 
     403              : /// Stores span field values recorded during the spans lifetime.
     404              : struct SpanFields {
     405              :     values: [Option<RawValue>; MAX_TRACING_FIELDS],
     406              : 
     407              :     /// cached span info so we can avoid extra hashmap lookups in the hot path.
     408              :     span_info: CallsiteSpanInfo,
     409              : }
     410              : 
     411              : impl SpanFields {
     412            3 :     fn new(span_info: CallsiteSpanInfo) -> Self {
     413              :         Self {
     414            3 :             span_info,
     415              :             values: [const { None }; MAX_TRACING_FIELDS],
     416              :         }
     417            3 :     }
     418              : }
     419              : 
     420              : impl tracing::field::Visit for SpanFields {
     421              :     #[inline]
     422            0 :     fn record_f64(&mut self, field: &tracing::field::Field, value: f64) {
     423            0 :         self.values[field.index()] = Some(RawValue::new(value));
     424            0 :     }
     425              : 
     426              :     #[inline]
     427            5 :     fn record_i64(&mut self, field: &tracing::field::Field, value: i64) {
     428            5 :         self.values[field.index()] = Some(RawValue::new(value));
     429            5 :     }
     430              : 
     431              :     #[inline]
     432            0 :     fn record_u64(&mut self, field: &tracing::field::Field, value: u64) {
     433            0 :         self.values[field.index()] = Some(RawValue::new(value));
     434            0 :     }
     435              : 
     436              :     #[inline]
     437            0 :     fn record_i128(&mut self, field: &tracing::field::Field, value: i128) {
     438            0 :         if let Ok(value) = i64::try_from(value) {
     439            0 :             self.values[field.index()] = Some(RawValue::new(value));
     440            0 :         } else {
     441            0 :             self.values[field.index()] = Some(RawValue::new(format_args!("{value}")));
     442            0 :         }
     443            0 :     }
     444              : 
     445              :     #[inline]
     446            0 :     fn record_u128(&mut self, field: &tracing::field::Field, value: u128) {
     447            0 :         if let Ok(value) = u64::try_from(value) {
     448            0 :             self.values[field.index()] = Some(RawValue::new(value));
     449            0 :         } else {
     450            0 :             self.values[field.index()] = Some(RawValue::new(format_args!("{value}")));
     451            0 :         }
     452            0 :     }
     453              : 
     454              :     #[inline]
     455            0 :     fn record_bool(&mut self, field: &tracing::field::Field, value: bool) {
     456            0 :         self.values[field.index()] = Some(RawValue::new(value));
     457            0 :     }
     458              : 
     459              :     #[inline]
     460            0 :     fn record_bytes(&mut self, field: &tracing::field::Field, value: &[u8]) {
     461            0 :         self.values[field.index()] = Some(RawValue::new(value));
     462            0 :     }
     463              : 
     464              :     #[inline]
     465            0 :     fn record_str(&mut self, field: &tracing::field::Field, value: &str) {
     466            0 :         self.values[field.index()] = Some(RawValue::new(value));
     467            0 :     }
     468              : 
     469              :     #[inline]
     470            0 :     fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) {
     471            0 :         self.values[field.index()] = Some(RawValue::new(format_args!("{value:?}")));
     472            0 :     }
     473              : 
     474              :     #[inline]
     475            0 :     fn record_error(
     476            0 :         &mut self,
     477            0 :         field: &tracing::field::Field,
     478            0 :         value: &(dyn std::error::Error + 'static),
     479            0 :     ) {
     480            0 :         self.values[field.index()] = Some(RawValue::new(format_args!("{value}")));
     481            0 :     }
     482              : }
     483              : 
     484              : /// List of field indices skipped during logging. Can list duplicate fields or
     485              : /// metafields not meant to be logged.
     486              : #[derive(Copy, Clone, Default)]
     487              : struct SkippedFieldIndices {
     488              :     // 32-bits is large enough for `MAX_TRACING_FIELDS`
     489              :     bits: u32,
     490              : }
     491              : 
     492              : impl SkippedFieldIndices {
     493              :     #[inline]
     494            1 :     fn is_empty(self) -> bool {
     495            1 :         self.bits == 0
     496            1 :     }
     497              : 
     498              :     #[inline]
     499            3 :     fn set(&mut self, index: usize) {
     500            3 :         debug_assert!(index <= 32, "index out of bounds of 32-bit set");
     501            3 :         self.bits |= 1 << index;
     502            3 :     }
     503              : 
     504              :     #[inline]
     505            5 :     fn contains(self, index: usize) -> bool {
     506            5 :         self.bits & (1 << index) != 0
     507            5 :     }
     508              : }
     509              : 
     510              : /// Formats a tracing event and writes JSON to its internal buffer including a newline.
     511              : // TODO: buffer capacity management, truncate if too large
     512              : struct EventFormatter {
     513              :     logline_buffer: Vec<u8>,
     514              : }
     515              : 
     516              : impl EventFormatter {
     517              :     #[inline]
     518            0 :     const fn new() -> Self {
     519            0 :         EventFormatter {
     520            0 :             logline_buffer: Vec::new(),
     521            0 :         }
     522            0 :     }
     523              : 
     524              :     #[inline]
     525            1 :     fn buffer(&self) -> &[u8] {
     526            1 :         &self.logline_buffer
     527            1 :     }
     528              : 
     529            1 :     fn format<S>(
     530            1 :         &mut self,
     531            1 :         now: DateTime<Utc>,
     532            1 :         event: &Event<'_>,
     533            1 :         ctx: &Context<'_, S>,
     534            1 :         skipped_field_indices: &CallsiteMap<SkippedFieldIndices>,
     535            1 :         extract_fields: &'static [&'static str],
     536            1 :     ) where
     537            1 :         S: Subscriber + for<'a> LookupSpan<'a>,
     538              :     {
     539            1 :         let timestamp = now.to_rfc3339_opts(chrono::SecondsFormat::Micros, true);
     540              : 
     541              :         use tracing_log::NormalizeEvent;
     542            1 :         let normalized_meta = event.normalized_metadata();
     543            1 :         let meta = normalized_meta.as_ref().unwrap_or_else(|| event.metadata());
     544              : 
     545            1 :         let skipped_field_indices = skipped_field_indices
     546            1 :             .pin()
     547            1 :             .get(&meta.callsite())
     548            1 :             .copied()
     549            1 :             .unwrap_or_default();
     550              : 
     551            1 :         self.logline_buffer.clear();
     552            1 :         let serializer = json::ValueSer::new(&mut self.logline_buffer);
     553            1 :         json::value_as_object!(|serializer| {
     554              :             // Timestamp comes first, so raw lines can be sorted by timestamp.
     555            1 :             serializer.entry("timestamp", &*timestamp);
     556              : 
     557              :             // Level next.
     558            1 :             serializer.entry("level", meta.level().as_str());
     559              : 
     560              :             // Message next.
     561            1 :             let mut message_extractor =
     562            1 :                 MessageFieldExtractor::new(serializer.key("message"), skipped_field_indices);
     563            1 :             event.record(&mut message_extractor);
     564            1 :             message_extractor.finish();
     565              : 
     566              :             // Direct message fields.
     567              :             {
     568            1 :                 let mut message_skipper = MessageFieldSkipper::new(
     569            1 :                     serializer.key("fields").object(),
     570            1 :                     skipped_field_indices,
     571              :                 );
     572            1 :                 event.record(&mut message_skipper);
     573              : 
     574              :                 // rollback if no fields are present.
     575            1 :                 if message_skipper.present {
     576            1 :                     message_skipper.serializer.finish();
     577            1 :                 }
     578              :             }
     579              : 
     580            1 :             let mut extracted = ExtractedSpanFields::new(extract_fields);
     581              : 
     582            1 :             let spans = serializer.key("spans");
     583            1 :             json::value_as_object!(|spans| {
     584            1 :                 let parent_spans = ctx
     585            1 :                     .event_span(event)
     586            1 :                     .map_or(vec![], |parent| parent.scope().collect());
     587              : 
     588            3 :                 for span in parent_spans.iter().rev() {
     589            3 :                     let ext = span.extensions();
     590              : 
     591              :                     // all spans should have this extension.
     592            3 :                     let Some(fields) = ext.get() else { continue };
     593              : 
     594            3 :                     extracted.layer_span(fields);
     595              : 
     596            3 :                     let SpanFields { values, span_info } = fields;
     597              : 
     598            3 :                     let span_fields = spans.key(&*span_info.normalized_name);
     599            3 :                     json::value_as_object!(|span_fields| {
     600            5 :                         for (field, value) in std::iter::zip(span.metadata().fields(), values) {
     601            5 :                             if let Some(value) = value {
     602            5 :                                 span_fields.entry(field.name(), value);
     603            5 :                             }
     604              :                         }
     605              :                     });
     606              :                 }
     607              :             });
     608              : 
     609              :             // TODO: thread-local cache?
     610            1 :             let pid = std::process::id();
     611              :             // Skip adding pid 1 to reduce noise for services running in containers.
     612            1 :             if pid != 1 {
     613            1 :                 serializer.entry("process_id", pid);
     614            1 :             }
     615              : 
     616            1 :             THREAD_ID.with(|tid| serializer.entry("thread_id", tid));
     617              : 
     618              :             // TODO: tls cache? name could change
     619            1 :             if let Some(thread_name) = std::thread::current().name()
     620            1 :                 && !thread_name.is_empty()
     621            1 :                 && thread_name != "tokio-runtime-worker"
     622            1 :             {
     623            1 :                 serializer.entry("thread_name", thread_name);
     624            1 :             }
     625              : 
     626            1 :             if let Some(task_id) = tokio::task::try_id() {
     627            0 :                 serializer.entry("task_id", format_args!("{task_id}"));
     628            1 :             }
     629              : 
     630            1 :             serializer.entry("target", meta.target());
     631              : 
     632              :             // Skip adding module if it's the same as target.
     633            1 :             if let Some(module) = meta.module_path()
     634            1 :                 && module != meta.target()
     635            0 :             {
     636            0 :                 serializer.entry("module", module);
     637            1 :             }
     638              : 
     639            1 :             if let Some(file) = meta.file() {
     640            1 :                 if let Some(line) = meta.line() {
     641            1 :                     serializer.entry("src", format_args!("{file}:{line}"));
     642            1 :                 } else {
     643            0 :                     serializer.entry("src", file);
     644            0 :                 }
     645            0 :             }
     646              : 
     647              :             {
     648            1 :                 let otel_context = Span::current().context();
     649            1 :                 let otel_spanref = otel_context.span();
     650            1 :                 let span_context = otel_spanref.span_context();
     651            1 :                 if span_context.is_valid() {
     652            0 :                     serializer.entry("trace_id", format_args!("{}", span_context.trace_id()));
     653            1 :                 }
     654              :             }
     655              : 
     656            1 :             if extracted.has_values() {
     657              :                 // TODO: add fields from event, too?
     658            1 :                 let extract = serializer.key("extract");
     659            1 :                 json::value_as_object!(|extract| {
     660            1 :                     for (key, value) in std::iter::zip(extracted.names, extracted.values) {
     661            1 :                         if let Some(value) = value {
     662            1 :                             extract.entry(*key, &value);
     663            1 :                         }
     664              :                     }
     665              :                 });
     666            0 :             }
     667              :         });
     668              : 
     669            1 :         self.logline_buffer.push(b'\n');
     670            1 :     }
     671              : }
     672              : 
     673              : /// Extracts the message field that's mixed will other fields.
     674              : struct MessageFieldExtractor<'buf> {
     675              :     serializer: Option<json::ValueSer<'buf>>,
     676              :     skipped_field_indices: SkippedFieldIndices,
     677              : }
     678              : 
     679              : impl<'buf> MessageFieldExtractor<'buf> {
     680              :     #[inline]
     681            1 :     fn new(serializer: json::ValueSer<'buf>, skipped_field_indices: SkippedFieldIndices) -> Self {
     682            1 :         Self {
     683            1 :             serializer: Some(serializer),
     684            1 :             skipped_field_indices,
     685            1 :         }
     686            1 :     }
     687              : 
     688              :     #[inline]
     689            1 :     fn finish(self) {
     690            1 :         if let Some(ser) = self.serializer {
     691            0 :             ser.value("");
     692            1 :         }
     693            1 :     }
     694              : 
     695              :     #[inline]
     696            5 :     fn record_field(&mut self, field: &tracing::field::Field, v: impl json::ValueEncoder) {
     697            5 :         if field.name() == MESSAGE_FIELD
     698            2 :             && !self.skipped_field_indices.contains(field.index())
     699            1 :             && let Some(ser) = self.serializer.take()
     700            1 :         {
     701            1 :             ser.value(v);
     702            4 :         }
     703            5 :     }
     704              : }
     705              : 
     706              : impl tracing::field::Visit for MessageFieldExtractor<'_> {
     707              :     #[inline]
     708            0 :     fn record_f64(&mut self, field: &tracing::field::Field, value: f64) {
     709            0 :         self.record_field(field, value);
     710            0 :     }
     711              : 
     712              :     #[inline]
     713            3 :     fn record_i64(&mut self, field: &tracing::field::Field, value: i64) {
     714            3 :         self.record_field(field, value);
     715            3 :     }
     716              : 
     717              :     #[inline]
     718            0 :     fn record_u64(&mut self, field: &tracing::field::Field, value: u64) {
     719            0 :         self.record_field(field, value);
     720            0 :     }
     721              : 
     722              :     #[inline]
     723            0 :     fn record_i128(&mut self, field: &tracing::field::Field, value: i128) {
     724            0 :         self.record_field(field, value);
     725            0 :     }
     726              : 
     727              :     #[inline]
     728            0 :     fn record_u128(&mut self, field: &tracing::field::Field, value: u128) {
     729            0 :         self.record_field(field, value);
     730            0 :     }
     731              : 
     732              :     #[inline]
     733            0 :     fn record_bool(&mut self, field: &tracing::field::Field, value: bool) {
     734            0 :         self.record_field(field, value);
     735            0 :     }
     736              : 
     737              :     #[inline]
     738            0 :     fn record_bytes(&mut self, field: &tracing::field::Field, value: &[u8]) {
     739            0 :         self.record_field(field, format_args!("{value:x?}"));
     740            0 :     }
     741              : 
     742              :     #[inline]
     743            1 :     fn record_str(&mut self, field: &tracing::field::Field, value: &str) {
     744            1 :         self.record_field(field, value);
     745            1 :     }
     746              : 
     747              :     #[inline]
     748            1 :     fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) {
     749            1 :         self.record_field(field, format_args!("{value:?}"));
     750            1 :     }
     751              : 
     752              :     #[inline]
     753            0 :     fn record_error(
     754            0 :         &mut self,
     755            0 :         field: &tracing::field::Field,
     756            0 :         value: &(dyn std::error::Error + 'static),
     757            0 :     ) {
     758            0 :         self.record_field(field, format_args!("{value}"));
     759            0 :     }
     760              : }
     761              : 
     762              : /// A tracing field visitor that skips the message field.
     763              : struct MessageFieldSkipper<'buf> {
     764              :     serializer: json::ObjectSer<'buf>,
     765              :     skipped_field_indices: SkippedFieldIndices,
     766              :     present: bool,
     767              : }
     768              : 
     769              : impl<'buf> MessageFieldSkipper<'buf> {
     770              :     #[inline]
     771            1 :     fn new(serializer: json::ObjectSer<'buf>, skipped_field_indices: SkippedFieldIndices) -> Self {
     772            1 :         Self {
     773            1 :             serializer,
     774            1 :             skipped_field_indices,
     775            1 :             present: false,
     776            1 :         }
     777            1 :     }
     778              : 
     779              :     #[inline]
     780            5 :     fn record_field(&mut self, field: &tracing::field::Field, v: impl json::ValueEncoder) {
     781            5 :         if field.name() != MESSAGE_FIELD
     782            3 :             && !field.name().starts_with("log.")
     783            3 :             && !self.skipped_field_indices.contains(field.index())
     784            1 :         {
     785            1 :             self.serializer.entry(field.name(), v);
     786            1 :             self.present |= true;
     787            4 :         }
     788            5 :     }
     789              : }
     790              : 
     791              : impl tracing::field::Visit for MessageFieldSkipper<'_> {
     792              :     #[inline]
     793            0 :     fn record_f64(&mut self, field: &tracing::field::Field, value: f64) {
     794            0 :         self.record_field(field, value);
     795            0 :     }
     796              : 
     797              :     #[inline]
     798            3 :     fn record_i64(&mut self, field: &tracing::field::Field, value: i64) {
     799            3 :         self.record_field(field, value);
     800            3 :     }
     801              : 
     802              :     #[inline]
     803            0 :     fn record_u64(&mut self, field: &tracing::field::Field, value: u64) {
     804            0 :         self.record_field(field, value);
     805            0 :     }
     806              : 
     807              :     #[inline]
     808            0 :     fn record_i128(&mut self, field: &tracing::field::Field, value: i128) {
     809            0 :         self.record_field(field, value);
     810            0 :     }
     811              : 
     812              :     #[inline]
     813            0 :     fn record_u128(&mut self, field: &tracing::field::Field, value: u128) {
     814            0 :         self.record_field(field, value);
     815            0 :     }
     816              : 
     817              :     #[inline]
     818            0 :     fn record_bool(&mut self, field: &tracing::field::Field, value: bool) {
     819            0 :         self.record_field(field, value);
     820            0 :     }
     821              : 
     822              :     #[inline]
     823            0 :     fn record_bytes(&mut self, field: &tracing::field::Field, value: &[u8]) {
     824            0 :         self.record_field(field, format_args!("{value:x?}"));
     825            0 :     }
     826              : 
     827              :     #[inline]
     828            1 :     fn record_str(&mut self, field: &tracing::field::Field, value: &str) {
     829            1 :         self.record_field(field, value);
     830            1 :     }
     831              : 
     832              :     #[inline]
     833            1 :     fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) {
     834            1 :         self.record_field(field, format_args!("{value:?}"));
     835            1 :     }
     836              : 
     837              :     #[inline]
     838            0 :     fn record_error(
     839            0 :         &mut self,
     840            0 :         field: &tracing::field::Field,
     841            0 :         value: &(dyn std::error::Error + 'static),
     842            0 :     ) {
     843            0 :         self.record_field(field, format_args!("{value}"));
     844            0 :     }
     845              : }
     846              : 
     847              : struct ExtractedSpanFields {
     848              :     names: &'static [&'static str],
     849              :     values: Vec<Option<RawValue>>,
     850              : }
     851              : 
     852              : impl ExtractedSpanFields {
     853            1 :     fn new(names: &'static [&'static str]) -> Self {
     854            1 :         ExtractedSpanFields {
     855            1 :             names,
     856            1 :             values: vec![None; names.len()],
     857            1 :         }
     858            1 :     }
     859              : 
     860            3 :     fn layer_span(&mut self, fields: &SpanFields) {
     861            3 :         let SpanFields { values, span_info } = fields;
     862              : 
     863              :         // extract the fields
     864            3 :         for (i, &j) in span_info.extract.iter().enumerate() {
     865            3 :             let Some(Some(value)) = values.get(j) else {
     866            1 :                 continue;
     867              :             };
     868              : 
     869              :             // TODO: replace clone with reference, if possible.
     870            2 :             self.values[i] = Some(value.clone());
     871              :         }
     872            3 :     }
     873              : 
     874              :     #[inline]
     875            1 :     fn has_values(&self) -> bool {
     876            1 :         self.values.iter().any(|v| v.is_some())
     877            1 :     }
     878              : }
     879              : 
     880              : #[cfg(test)]
     881              : mod tests {
     882              :     use std::sync::{Arc, Mutex, MutexGuard};
     883              : 
     884              :     use assert_json_diff::assert_json_eq;
     885              :     use tracing::info_span;
     886              : 
     887              :     use super::*;
     888              : 
     889              :     struct TestClock {
     890              :         current_time: Mutex<DateTime<Utc>>,
     891              :     }
     892              : 
     893              :     impl Clock for Arc<TestClock> {
     894            2 :         fn now(&self) -> DateTime<Utc> {
     895            2 :             *self.current_time.lock().expect("poisoned")
     896            2 :         }
     897              :     }
     898              : 
     899              :     struct VecWriter<'a> {
     900              :         buffer: MutexGuard<'a, Vec<u8>>,
     901              :     }
     902              : 
     903              :     impl MakeWriter for Arc<Mutex<Vec<u8>>> {
     904            1 :         fn make_writer(&self) -> impl io::Write {
     905            1 :             VecWriter {
     906            1 :                 buffer: self.lock().expect("poisoned"),
     907            1 :             }
     908            1 :         }
     909              :     }
     910              : 
     911              :     impl io::Write for VecWriter<'_> {
     912            1 :         fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
     913            1 :             self.buffer.write(buf)
     914            1 :         }
     915              : 
     916            0 :         fn flush(&mut self) -> io::Result<()> {
     917            0 :             Ok(())
     918            0 :         }
     919              :     }
     920              : 
     921              :     #[test]
     922            1 :     fn test_field_collection() {
     923            1 :         let clock = Arc::new(TestClock {
     924            1 :             current_time: Mutex::new(Utc::now()),
     925            1 :         });
     926            1 :         let buffer = Arc::new(Mutex::new(Vec::new()));
     927            1 :         let log_layer = JsonLoggingLayer {
     928            1 :             clock: clock.clone(),
     929            1 :             skipped_field_indices: papaya::HashMap::default(),
     930            1 :             span_info: papaya::HashMap::default(),
     931            1 :             callsite_name_ids: papaya::HashMap::default(),
     932            1 :             writer: buffer.clone(),
     933            1 :             extract_fields: &["x"],
     934            1 :         };
     935              : 
     936            1 :         let registry = tracing_subscriber::Registry::default().with(log_layer);
     937              : 
     938            1 :         tracing::subscriber::with_default(registry, || {
     939            1 :             info_span!("some_span", x = 24).in_scope(|| {
     940            1 :                 info_span!("some_other_span", y = 30).in_scope(|| {
     941            1 :                     info_span!("some_span", x = 40, x = 41, x = 42).in_scope(|| {
     942            1 :                         tracing::error!(
     943              :                             a = 1,
     944              :                             a = 2,
     945              :                             a = 3,
     946              :                             message = "explicit message field",
     947            0 :                             "implicit message field"
     948              :                         );
     949            1 :                     });
     950            1 :                 });
     951            1 :             });
     952            1 :         });
     953              : 
     954            1 :         let buffer = Arc::try_unwrap(buffer)
     955            1 :             .expect("no other reference")
     956            1 :             .into_inner()
     957            1 :             .expect("poisoned");
     958            1 :         let actual: serde_json::Value = serde_json::from_slice(&buffer).expect("valid JSON");
     959            1 :         let expected: serde_json::Value = serde_json::json!(
     960              :             {
     961            1 :                 "timestamp": clock.now().to_rfc3339_opts(chrono::SecondsFormat::Micros, true),
     962            1 :                 "level": "ERROR",
     963            1 :                 "message": "explicit message field",
     964            1 :                 "fields": {
     965            1 :                     "a": 3,
     966              :                 },
     967            1 :                 "spans": {
     968            1 :                     "some_span":{
     969            1 :                         "x": 24,
     970              :                     },
     971            1 :                     "some_other_span": {
     972            1 :                         "y": 30,
     973              :                     },
     974            1 :                     "some_span#1": {
     975            1 :                         "x": 42,
     976              :                     },
     977              :                 },
     978            1 :                 "extract": {
     979            1 :                     "x": 42,
     980              :                 },
     981            1 :                 "src": actual.as_object().unwrap().get("src").unwrap().as_str().unwrap(),
     982            1 :                 "target": "proxy::logging::tests",
     983            1 :                 "process_id": actual.as_object().unwrap().get("process_id").unwrap().as_number().unwrap(),
     984            1 :                 "thread_id": actual.as_object().unwrap().get("thread_id").unwrap().as_number().unwrap(),
     985            1 :                 "thread_name": "logging::tests::test_field_collection",
     986              :             }
     987              :         );
     988              : 
     989            1 :         assert_json_eq!(actual, expected);
     990            1 :     }
     991              : }
        

Generated by: LCOV version 2.1-beta