LCOV - code coverage report
Current view: top level - proxy/src - logging.rs (source / functions) Coverage Total Hit
Test: 07bee600374ccd486c69370d0972d9035964fe68.info Lines: 54.8 % 584 320
Test Date: 2025-02-20 13:11:02 Functions: 30.5 % 141 43

            Line data    Source code
       1              : use std::cell::{Cell, RefCell};
       2              : use std::collections::HashMap;
       3              : use std::hash::BuildHasher;
       4              : use std::{env, io};
       5              : 
       6              : use chrono::{DateTime, Utc};
       7              : use opentelemetry::trace::TraceContextExt;
       8              : use scopeguard::defer;
       9              : use serde::ser::{SerializeMap, Serializer};
      10              : use tracing::subscriber::Interest;
      11              : use tracing::{callsite, span, Event, Metadata, Span, Subscriber};
      12              : use tracing_opentelemetry::OpenTelemetrySpanExt;
      13              : use tracing_subscriber::filter::{EnvFilter, LevelFilter};
      14              : use tracing_subscriber::fmt::format::{Format, Full};
      15              : use tracing_subscriber::fmt::time::SystemTime;
      16              : use tracing_subscriber::fmt::{FormatEvent, FormatFields};
      17              : use tracing_subscriber::layer::{Context, Layer};
      18              : use tracing_subscriber::prelude::*;
      19              : use tracing_subscriber::registry::{LookupSpan, SpanRef};
      20              : 
      21              : /// Initialize logging and OpenTelemetry tracing and exporter.
      22              : ///
      23              : /// Logging can be configured using `RUST_LOG` environment variable.
      24              : ///
      25              : /// OpenTelemetry is configured with OTLP/HTTP exporter. It picks up
      26              : /// configuration from environment variables. For example, to change the
      27              : /// destination, set `OTEL_EXPORTER_OTLP_ENDPOINT=http://jaeger:4318`.
      28              : /// See <https://opentelemetry.io/docs/reference/specification/sdk-environment-variables>
      29            0 : pub async fn init() -> anyhow::Result<LoggingGuard> {
      30            0 :     let logfmt = LogFormat::from_env()?;
      31              : 
      32            0 :     let env_filter = EnvFilter::builder()
      33            0 :         .with_default_directive(LevelFilter::INFO.into())
      34            0 :         .from_env_lossy()
      35            0 :         .add_directive(
      36            0 :             "aws_config=info"
      37            0 :                 .parse()
      38            0 :                 .expect("this should be a valid filter directive"),
      39            0 :         )
      40            0 :         .add_directive(
      41            0 :             "azure_core::policies::transport=off"
      42            0 :                 .parse()
      43            0 :                 .expect("this should be a valid filter directive"),
      44            0 :         );
      45              : 
      46            0 :     let otlp_layer = tracing_utils::init_tracing("proxy").await;
      47              : 
      48            0 :     let json_log_layer = if logfmt == LogFormat::Json {
      49            0 :         Some(JsonLoggingLayer {
      50            0 :             clock: RealClock,
      51            0 :             skipped_field_indices: papaya::HashMap::default(),
      52            0 :             writer: StderrWriter {
      53            0 :                 stderr: std::io::stderr(),
      54            0 :             },
      55            0 :         })
      56              :     } else {
      57            0 :         None
      58              :     };
      59              : 
      60            0 :     let text_log_layer = if logfmt == LogFormat::Text {
      61            0 :         Some(
      62            0 :             tracing_subscriber::fmt::layer()
      63            0 :                 .with_ansi(false)
      64            0 :                 .with_writer(std::io::stderr)
      65            0 :                 .with_target(false),
      66            0 :         )
      67              :     } else {
      68            0 :         None
      69              :     };
      70              : 
      71            0 :     tracing_subscriber::registry()
      72            0 :         .with(env_filter)
      73            0 :         .with(otlp_layer)
      74            0 :         .with(json_log_layer)
      75            0 :         .with(text_log_layer)
      76            0 :         .try_init()?;
      77              : 
      78            0 :     Ok(LoggingGuard)
      79            0 : }
      80              : 
      81              : /// Initialize logging for local_proxy with log prefix and no opentelemetry.
      82              : ///
      83              : /// Logging can be configured using `RUST_LOG` environment variable.
      84            0 : pub fn init_local_proxy() -> anyhow::Result<LoggingGuard> {
      85            0 :     let env_filter = EnvFilter::builder()
      86            0 :         .with_default_directive(LevelFilter::INFO.into())
      87            0 :         .from_env_lossy();
      88            0 : 
      89            0 :     let fmt_layer = tracing_subscriber::fmt::layer()
      90            0 :         .with_ansi(false)
      91            0 :         .with_writer(std::io::stderr)
      92            0 :         .event_format(LocalProxyFormatter(Format::default().with_target(false)));
      93            0 : 
      94            0 :     tracing_subscriber::registry()
      95            0 :         .with(env_filter)
      96            0 :         .with(fmt_layer)
      97            0 :         .try_init()?;
      98              : 
      99            0 :     Ok(LoggingGuard)
     100            0 : }
     101              : 
     102              : pub struct LocalProxyFormatter(Format<Full, SystemTime>);
     103              : 
     104              : impl<S, N> FormatEvent<S, N> for LocalProxyFormatter
     105              : where
     106              :     S: Subscriber + for<'a> LookupSpan<'a>,
     107              :     N: for<'a> FormatFields<'a> + 'static,
     108              : {
     109            0 :     fn format_event(
     110            0 :         &self,
     111            0 :         ctx: &tracing_subscriber::fmt::FmtContext<'_, S, N>,
     112            0 :         mut writer: tracing_subscriber::fmt::format::Writer<'_>,
     113            0 :         event: &tracing::Event<'_>,
     114            0 :     ) -> std::fmt::Result {
     115            0 :         writer.write_str("[local_proxy] ")?;
     116            0 :         self.0.format_event(ctx, writer, event)
     117            0 :     }
     118              : }
     119              : 
     120              : pub struct LoggingGuard;
     121              : 
     122              : impl Drop for LoggingGuard {
     123            0 :     fn drop(&mut self) {
     124            0 :         // Shutdown trace pipeline gracefully, so that it has a chance to send any
     125            0 :         // pending traces before we exit.
     126            0 :         tracing::info!("shutting down the tracing machinery");
     127            0 :         tracing_utils::shutdown_tracing();
     128            0 :     }
     129              : }
     130              : 
     131              : // TODO: make JSON the default
     132              : #[derive(Copy, Clone, PartialEq, Eq, Default, Debug)]
     133              : enum LogFormat {
     134              :     #[default]
     135              :     Text = 1,
     136              :     Json,
     137              : }
     138              : 
     139              : impl LogFormat {
     140            0 :     fn from_env() -> anyhow::Result<Self> {
     141            0 :         let logfmt = env::var("LOGFMT");
     142            0 :         Ok(match logfmt.as_deref() {
     143            0 :             Err(_) => LogFormat::default(),
     144            0 :             Ok("text") => LogFormat::Text,
     145            0 :             Ok("json") => LogFormat::Json,
     146            0 :             Ok(logfmt) => anyhow::bail!("unknown log format: {logfmt}"),
     147              :         })
     148            0 :     }
     149              : }
     150              : 
     151              : trait MakeWriter {
     152              :     fn make_writer(&self) -> impl io::Write;
     153              : }
     154              : 
     155              : struct StderrWriter {
     156              :     stderr: io::Stderr,
     157              : }
     158              : 
     159              : impl MakeWriter for StderrWriter {
     160              :     #[inline]
     161            0 :     fn make_writer(&self) -> impl io::Write {
     162            0 :         self.stderr.lock()
     163            0 :     }
     164              : }
     165              : 
     166              : // TODO: move into separate module or even separate crate.
     167              : trait Clock {
     168              :     fn now(&self) -> DateTime<Utc>;
     169              : }
     170              : 
     171              : struct RealClock;
     172              : 
     173              : impl Clock for RealClock {
     174              :     #[inline]
     175            0 :     fn now(&self) -> DateTime<Utc> {
     176            0 :         Utc::now()
     177            0 :     }
     178              : }
     179              : 
     180              : /// Name of the field used by tracing crate to store the event message.
     181              : const MESSAGE_FIELD: &str = "message";
     182              : 
     183              : thread_local! {
     184              :     /// Protects against deadlocks and double panics during log writing.
     185              :     /// The current panic handler will use tracing to log panic information.
     186              :     static REENTRANCY_GUARD: Cell<bool> = const { Cell::new(false) };
     187              :     /// Thread-local instance with per-thread buffer for log writing.
     188              :     static EVENT_FORMATTER: RefCell<EventFormatter> = RefCell::new(EventFormatter::new());
     189              :     /// Cached OS thread ID.
     190              :     static THREAD_ID: u64 = gettid::gettid();
     191              : }
     192              : 
     193              : /// Implements tracing layer to handle events specific to logging.
     194              : struct JsonLoggingLayer<C: Clock, W: MakeWriter> {
     195              :     clock: C,
     196              :     skipped_field_indices: papaya::HashMap<callsite::Identifier, SkippedFieldIndices>,
     197              :     writer: W,
     198              : }
     199              : 
     200              : impl<S, C: Clock + 'static, W: MakeWriter + 'static> Layer<S> for JsonLoggingLayer<C, W>
     201              : where
     202              :     S: Subscriber + for<'a> LookupSpan<'a>,
     203              : {
     204            1 :     fn on_event(&self, event: &Event<'_>, ctx: Context<'_, S>) {
     205              :         use std::io::Write;
     206              : 
     207              :         // TODO: consider special tracing subscriber to grab timestamp very
     208              :         //       early, before OTel machinery, and add as event extension.
     209            1 :         let now = self.clock.now();
     210            1 : 
     211            1 :         let res: io::Result<()> = REENTRANCY_GUARD.with(move |entered| {
     212            1 :             if entered.get() {
     213            0 :                 let mut formatter = EventFormatter::new();
     214            0 :                 formatter.format(now, event, &ctx, &self.skipped_field_indices)?;
     215            0 :                 self.writer.make_writer().write_all(formatter.buffer())
     216              :             } else {
     217            1 :                 entered.set(true);
     218            1 :                 defer!(entered.set(false););
     219            1 : 
     220            1 :                 EVENT_FORMATTER.with_borrow_mut(move |formatter| {
     221            1 :                     formatter.reset();
     222            1 :                     formatter.format(now, event, &ctx, &self.skipped_field_indices)?;
     223            1 :                     self.writer.make_writer().write_all(formatter.buffer())
     224            1 :                 })
     225              :             }
     226            1 :         });
     227              : 
     228              :         // In case logging fails we generate a simpler JSON object.
     229            1 :         if let Err(err) = res {
     230            0 :             if let Ok(mut line) = serde_json::to_vec(&serde_json::json!( {
     231            0 :                 "timestamp": now.to_rfc3339_opts(chrono::SecondsFormat::Micros, true),
     232            0 :                 "level": "ERROR",
     233            0 :                 "message": format_args!("cannot log event: {err:?}"),
     234            0 :                 "fields": {
     235            0 :                     "event": format_args!("{event:?}"),
     236            0 :                 },
     237            0 :             })) {
     238            0 :                 line.push(b'\n');
     239            0 :                 self.writer.make_writer().write_all(&line).ok();
     240            0 :             }
     241            1 :         }
     242            1 :     }
     243              : 
     244              :     /// Registers a SpanFields instance as span extension.
     245            2 :     fn on_new_span(&self, attrs: &span::Attributes<'_>, id: &span::Id, ctx: Context<'_, S>) {
     246            2 :         let span = ctx.span(id).expect("span must exist");
     247            2 :         let fields = SpanFields::default();
     248            2 :         fields.record_fields(attrs);
     249            2 :         // This could deadlock when there's a panic somewhere in the tracing
     250            2 :         // event handling and a read or write guard is still held. This includes
     251            2 :         // the OTel subscriber.
     252            2 :         span.extensions_mut().insert(fields);
     253            2 :     }
     254              : 
     255            0 :     fn on_record(&self, id: &span::Id, values: &span::Record<'_>, ctx: Context<'_, S>) {
     256            0 :         let span = ctx.span(id).expect("span must exist");
     257            0 :         let ext = span.extensions();
     258            0 :         if let Some(data) = ext.get::<SpanFields>() {
     259            0 :             data.record_fields(values);
     260            0 :         }
     261            0 :     }
     262              : 
     263              :     /// Called (lazily) whenever a new log call is executed. We quickly check
     264              :     /// for duplicate field names and record duplicates as skippable. Last one
     265              :     /// wins.
     266            3 :     fn register_callsite(&self, metadata: &'static Metadata<'static>) -> Interest {
     267            3 :         if !metadata.is_event() {
     268              :             // Must not be never because we wouldn't get trace and span data.
     269            2 :             return Interest::always();
     270            1 :         }
     271            1 : 
     272            1 :         let mut field_indices = SkippedFieldIndices::default();
     273            1 :         let mut seen_fields = HashMap::<&'static str, usize>::new();
     274            5 :         for field in metadata.fields() {
     275              :             use std::collections::hash_map::Entry;
     276            5 :             match seen_fields.entry(field.name()) {
     277            2 :                 Entry::Vacant(entry) => {
     278            2 :                     // field not seen yet
     279            2 :                     entry.insert(field.index());
     280            2 :                 }
     281            3 :                 Entry::Occupied(mut entry) => {
     282            3 :                     // replace currently stored index
     283            3 :                     let old_index = entry.insert(field.index());
     284            3 :                     // ... and append it to list of skippable indices
     285            3 :                     field_indices.push(old_index);
     286            3 :                 }
     287              :             }
     288              :         }
     289              : 
     290            1 :         if !field_indices.is_empty() {
     291            1 :             self.skipped_field_indices
     292            1 :                 .pin()
     293            1 :                 .insert(metadata.callsite(), field_indices);
     294            1 :         }
     295              : 
     296            1 :         Interest::always()
     297            3 :     }
     298              : }
     299              : 
     300              : /// Stores span field values recorded during the spans lifetime.
     301              : #[derive(Default)]
     302              : struct SpanFields {
     303              :     // TODO: Switch to custom enum with lasso::Spur for Strings?
     304              :     fields: papaya::HashMap<&'static str, serde_json::Value>,
     305              : }
     306              : 
     307              : impl SpanFields {
     308              :     #[inline]
     309            2 :     fn record_fields<R: tracing_subscriber::field::RecordFields>(&self, fields: R) {
     310            2 :         fields.record(&mut SpanFieldsRecorder {
     311            2 :             fields: self.fields.pin(),
     312            2 :         });
     313            2 :     }
     314              : }
     315              : 
     316              : /// Implements a tracing field visitor to convert and store values.
     317              : struct SpanFieldsRecorder<'m, S, G> {
     318              :     fields: papaya::HashMapRef<'m, &'static str, serde_json::Value, S, G>,
     319              : }
     320              : 
     321              : impl<S: BuildHasher, G: papaya::Guard> tracing::field::Visit for SpanFieldsRecorder<'_, S, G> {
     322              :     #[inline]
     323            0 :     fn record_f64(&mut self, field: &tracing::field::Field, value: f64) {
     324            0 :         self.fields
     325            0 :             .insert(field.name(), serde_json::Value::from(value));
     326            0 :     }
     327              : 
     328              :     #[inline]
     329            3 :     fn record_i64(&mut self, field: &tracing::field::Field, value: i64) {
     330            3 :         self.fields
     331            3 :             .insert(field.name(), serde_json::Value::from(value));
     332            3 :     }
     333              : 
     334              :     #[inline]
     335            0 :     fn record_u64(&mut self, field: &tracing::field::Field, value: u64) {
     336            0 :         self.fields
     337            0 :             .insert(field.name(), serde_json::Value::from(value));
     338            0 :     }
     339              : 
     340              :     #[inline]
     341            0 :     fn record_i128(&mut self, field: &tracing::field::Field, value: i128) {
     342            0 :         if let Ok(value) = i64::try_from(value) {
     343            0 :             self.fields
     344            0 :                 .insert(field.name(), serde_json::Value::from(value));
     345            0 :         } else {
     346            0 :             self.fields
     347            0 :                 .insert(field.name(), serde_json::Value::from(format!("{value}")));
     348            0 :         }
     349            0 :     }
     350              : 
     351              :     #[inline]
     352            0 :     fn record_u128(&mut self, field: &tracing::field::Field, value: u128) {
     353            0 :         if let Ok(value) = u64::try_from(value) {
     354            0 :             self.fields
     355            0 :                 .insert(field.name(), serde_json::Value::from(value));
     356            0 :         } else {
     357            0 :             self.fields
     358            0 :                 .insert(field.name(), serde_json::Value::from(format!("{value}")));
     359            0 :         }
     360            0 :     }
     361              : 
     362              :     #[inline]
     363            0 :     fn record_bool(&mut self, field: &tracing::field::Field, value: bool) {
     364            0 :         self.fields
     365            0 :             .insert(field.name(), serde_json::Value::from(value));
     366            0 :     }
     367              : 
     368              :     #[inline]
     369            0 :     fn record_bytes(&mut self, field: &tracing::field::Field, value: &[u8]) {
     370            0 :         self.fields
     371            0 :             .insert(field.name(), serde_json::Value::from(value));
     372            0 :     }
     373              : 
     374              :     #[inline]
     375            0 :     fn record_str(&mut self, field: &tracing::field::Field, value: &str) {
     376            0 :         self.fields
     377            0 :             .insert(field.name(), serde_json::Value::from(value));
     378            0 :     }
     379              : 
     380              :     #[inline]
     381            0 :     fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) {
     382            0 :         self.fields
     383            0 :             .insert(field.name(), serde_json::Value::from(format!("{value:?}")));
     384            0 :     }
     385              : 
     386              :     #[inline]
     387            0 :     fn record_error(
     388            0 :         &mut self,
     389            0 :         field: &tracing::field::Field,
     390            0 :         value: &(dyn std::error::Error + 'static),
     391            0 :     ) {
     392            0 :         self.fields
     393            0 :             .insert(field.name(), serde_json::Value::from(format!("{value}")));
     394            0 :     }
     395              : }
     396              : 
     397              : /// List of field indices skipped during logging. Can list duplicate fields or
     398              : /// metafields not meant to be logged.
     399              : #[derive(Clone, Default)]
     400              : struct SkippedFieldIndices {
     401              :     bits: u64,
     402              : }
     403              : 
     404              : impl SkippedFieldIndices {
     405              :     #[inline]
     406            1 :     fn is_empty(&self) -> bool {
     407            1 :         self.bits == 0
     408            1 :     }
     409              : 
     410              :     #[inline]
     411            3 :     fn push(&mut self, index: usize) {
     412            3 :         self.bits |= 1u64
     413            3 :             .checked_shl(index as u32)
     414            3 :             .expect("field index too large");
     415            3 :     }
     416              : 
     417              :     #[inline]
     418           10 :     fn contains(&self, index: usize) -> bool {
     419           10 :         self.bits
     420           10 :             & 1u64
     421           10 :                 .checked_shl(index as u32)
     422           10 :                 .expect("field index too large")
     423           10 :             != 0
     424           10 :     }
     425              : }
     426              : 
     427              : /// Formats a tracing event and writes JSON to its internal buffer including a newline.
     428              : // TODO: buffer capacity management, truncate if too large
     429              : struct EventFormatter {
     430              :     logline_buffer: Vec<u8>,
     431              : }
     432              : 
     433              : impl EventFormatter {
     434              :     #[inline]
     435            1 :     fn new() -> Self {
     436            1 :         EventFormatter {
     437            1 :             logline_buffer: Vec::new(),
     438            1 :         }
     439            1 :     }
     440              : 
     441              :     #[inline]
     442            1 :     fn buffer(&self) -> &[u8] {
     443            1 :         &self.logline_buffer
     444            1 :     }
     445              : 
     446              :     #[inline]
     447            1 :     fn reset(&mut self) {
     448            1 :         self.logline_buffer.clear();
     449            1 :     }
     450              : 
     451            1 :     fn format<S>(
     452            1 :         &mut self,
     453            1 :         now: DateTime<Utc>,
     454            1 :         event: &Event<'_>,
     455            1 :         ctx: &Context<'_, S>,
     456            1 :         skipped_field_indices: &papaya::HashMap<callsite::Identifier, SkippedFieldIndices>,
     457            1 :     ) -> io::Result<()>
     458            1 :     where
     459            1 :         S: Subscriber + for<'a> LookupSpan<'a>,
     460            1 :     {
     461            1 :         let timestamp = now.to_rfc3339_opts(chrono::SecondsFormat::Micros, true);
     462              : 
     463              :         use tracing_log::NormalizeEvent;
     464            1 :         let normalized_meta = event.normalized_metadata();
     465            1 :         let meta = normalized_meta.as_ref().unwrap_or_else(|| event.metadata());
     466            1 : 
     467            1 :         let skipped_field_indices = skipped_field_indices.pin();
     468            1 :         let skipped_field_indices = skipped_field_indices.get(&meta.callsite());
     469            1 : 
     470            1 :         let mut serialize = || {
     471            1 :             let mut serializer = serde_json::Serializer::new(&mut self.logline_buffer);
     472              : 
     473            1 :             let mut serializer = serializer.serialize_map(None)?;
     474              : 
     475              :             // Timestamp comes first, so raw lines can be sorted by timestamp.
     476            1 :             serializer.serialize_entry("timestamp", &timestamp)?;
     477              : 
     478              :             // Level next.
     479            1 :             serializer.serialize_entry("level", &meta.level().as_str())?;
     480              : 
     481              :             // Message next.
     482            1 :             serializer.serialize_key("message")?;
     483            1 :             let mut message_extractor =
     484            1 :                 MessageFieldExtractor::new(serializer, skipped_field_indices);
     485            1 :             event.record(&mut message_extractor);
     486            1 :             let mut serializer = message_extractor.into_serializer()?;
     487              : 
     488            1 :             let mut fields_present = FieldsPresent(false, skipped_field_indices);
     489            1 :             event.record(&mut fields_present);
     490            1 :             if fields_present.0 {
     491            1 :                 serializer.serialize_entry(
     492            1 :                     "fields",
     493            1 :                     &SerializableEventFields(event, skipped_field_indices),
     494            1 :                 )?;
     495            0 :             }
     496              : 
     497            1 :             let pid = std::process::id();
     498            1 :             if pid != 1 {
     499            1 :                 serializer.serialize_entry("process_id", &pid)?;
     500            0 :             }
     501              : 
     502            1 :             THREAD_ID.with(|tid| serializer.serialize_entry("thread_id", tid))?;
     503              : 
     504              :             // TODO: tls cache? name could change
     505            1 :             if let Some(thread_name) = std::thread::current().name() {
     506            1 :                 if !thread_name.is_empty() && thread_name != "tokio-runtime-worker" {
     507            1 :                     serializer.serialize_entry("thread_name", thread_name)?;
     508            0 :                 }
     509            0 :             }
     510              : 
     511            1 :             if let Some(task_id) = tokio::task::try_id() {
     512            0 :                 serializer.serialize_entry("task_id", &format_args!("{task_id}"))?;
     513            1 :             }
     514              : 
     515            1 :             serializer.serialize_entry("target", meta.target())?;
     516              : 
     517            1 :             if let Some(module) = meta.module_path() {
     518            1 :                 if module != meta.target() {
     519            0 :                     serializer.serialize_entry("module", module)?;
     520            1 :                 }
     521            0 :             }
     522              : 
     523            1 :             if let Some(file) = meta.file() {
     524            1 :                 if let Some(line) = meta.line() {
     525            1 :                     serializer.serialize_entry("src", &format_args!("{file}:{line}"))?;
     526              :                 } else {
     527            0 :                     serializer.serialize_entry("src", file)?;
     528              :                 }
     529            0 :             }
     530              : 
     531              :             {
     532            1 :                 let otel_context = Span::current().context();
     533            1 :                 let otel_spanref = otel_context.span();
     534            1 :                 let span_context = otel_spanref.span_context();
     535            1 :                 if span_context.is_valid() {
     536            0 :                     serializer.serialize_entry(
     537            0 :                         "trace_id",
     538            0 :                         &format_args!("{}", span_context.trace_id()),
     539            0 :                     )?;
     540            1 :                 }
     541              :             }
     542              : 
     543            1 :             serializer.serialize_entry("spans", &SerializableSpanStack(ctx))?;
     544              : 
     545            1 :             serializer.end()
     546            1 :         };
     547              : 
     548            1 :         serialize().map_err(io::Error::other)?;
     549            1 :         self.logline_buffer.push(b'\n');
     550            1 :         Ok(())
     551            1 :     }
     552              : }
     553              : 
     554              : /// Extracts the message field that's mixed will other fields.
     555              : struct MessageFieldExtractor<'a, S: serde::ser::SerializeMap> {
     556              :     serializer: S,
     557              :     skipped_field_indices: Option<&'a SkippedFieldIndices>,
     558              :     state: Option<Result<(), S::Error>>,
     559              : }
     560              : 
     561              : impl<'a, S: serde::ser::SerializeMap> MessageFieldExtractor<'a, S> {
     562              :     #[inline]
     563            1 :     fn new(serializer: S, skipped_field_indices: Option<&'a SkippedFieldIndices>) -> Self {
     564            1 :         Self {
     565            1 :             serializer,
     566            1 :             skipped_field_indices,
     567            1 :             state: None,
     568            1 :         }
     569            1 :     }
     570              : 
     571              :     #[inline]
     572            1 :     fn into_serializer(mut self) -> Result<S, S::Error> {
     573            1 :         match self.state {
     574            1 :             Some(Ok(())) => {}
     575            0 :             Some(Err(err)) => return Err(err),
     576            0 :             None => self.serializer.serialize_value("")?,
     577              :         }
     578            1 :         Ok(self.serializer)
     579            1 :     }
     580              : 
     581              :     #[inline]
     582            5 :     fn accept_field(&self, field: &tracing::field::Field) -> bool {
     583            5 :         self.state.is_none()
     584            5 :             && field.name() == MESSAGE_FIELD
     585            2 :             && !self
     586            2 :                 .skipped_field_indices
     587            2 :                 .is_some_and(|i| i.contains(field.index()))
     588            5 :     }
     589              : }
     590              : 
     591              : impl<S: serde::ser::SerializeMap> tracing::field::Visit for MessageFieldExtractor<'_, S> {
     592              :     #[inline]
     593            0 :     fn record_f64(&mut self, field: &tracing::field::Field, value: f64) {
     594            0 :         if self.accept_field(field) {
     595            0 :             self.state = Some(self.serializer.serialize_value(&value));
     596            0 :         }
     597            0 :     }
     598              : 
     599              :     #[inline]
     600            3 :     fn record_i64(&mut self, field: &tracing::field::Field, value: i64) {
     601            3 :         if self.accept_field(field) {
     602            0 :             self.state = Some(self.serializer.serialize_value(&value));
     603            3 :         }
     604            3 :     }
     605              : 
     606              :     #[inline]
     607            0 :     fn record_u64(&mut self, field: &tracing::field::Field, value: u64) {
     608            0 :         if self.accept_field(field) {
     609            0 :             self.state = Some(self.serializer.serialize_value(&value));
     610            0 :         }
     611            0 :     }
     612              : 
     613              :     #[inline]
     614            0 :     fn record_i128(&mut self, field: &tracing::field::Field, value: i128) {
     615            0 :         if self.accept_field(field) {
     616            0 :             self.state = Some(self.serializer.serialize_value(&value));
     617            0 :         }
     618            0 :     }
     619              : 
     620              :     #[inline]
     621            0 :     fn record_u128(&mut self, field: &tracing::field::Field, value: u128) {
     622            0 :         if self.accept_field(field) {
     623            0 :             self.state = Some(self.serializer.serialize_value(&value));
     624            0 :         }
     625            0 :     }
     626              : 
     627              :     #[inline]
     628            0 :     fn record_bool(&mut self, field: &tracing::field::Field, value: bool) {
     629            0 :         if self.accept_field(field) {
     630            0 :             self.state = Some(self.serializer.serialize_value(&value));
     631            0 :         }
     632            0 :     }
     633              : 
     634              :     #[inline]
     635            0 :     fn record_bytes(&mut self, field: &tracing::field::Field, value: &[u8]) {
     636            0 :         if self.accept_field(field) {
     637            0 :             self.state = Some(self.serializer.serialize_value(&format_args!("{value:x?}")));
     638            0 :         }
     639            0 :     }
     640              : 
     641              :     #[inline]
     642            1 :     fn record_str(&mut self, field: &tracing::field::Field, value: &str) {
     643            1 :         if self.accept_field(field) {
     644            1 :             self.state = Some(self.serializer.serialize_value(&value));
     645            1 :         }
     646            1 :     }
     647              : 
     648              :     #[inline]
     649            1 :     fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) {
     650            1 :         if self.accept_field(field) {
     651            0 :             self.state = Some(self.serializer.serialize_value(&format_args!("{value:?}")));
     652            1 :         }
     653            1 :     }
     654              : 
     655              :     #[inline]
     656            0 :     fn record_error(
     657            0 :         &mut self,
     658            0 :         field: &tracing::field::Field,
     659            0 :         value: &(dyn std::error::Error + 'static),
     660            0 :     ) {
     661            0 :         if self.accept_field(field) {
     662            0 :             self.state = Some(self.serializer.serialize_value(&format_args!("{value}")));
     663            0 :         }
     664            0 :     }
     665              : }
     666              : 
     667              : /// Checks if there's any fields and field values present. If not, the JSON subobject
     668              : /// can be skipped.
     669              : // This is entirely optional and only cosmetic, though maybe helps a
     670              : // bit during log parsing in dashboards when there's no field with empty object.
     671              : struct FieldsPresent<'a>(pub bool, Option<&'a SkippedFieldIndices>);
     672              : 
     673              : // Even though some methods have an overhead (error, bytes) it is assumed the
     674              : // compiler won't include this since we ignore the value entirely.
     675              : impl tracing::field::Visit for FieldsPresent<'_> {
     676              :     #[inline]
     677            5 :     fn record_debug(&mut self, field: &tracing::field::Field, _: &dyn std::fmt::Debug) {
     678            5 :         if !self.1.is_some_and(|i| i.contains(field.index()))
     679            2 :             && field.name() != MESSAGE_FIELD
     680            1 :             && !field.name().starts_with("log.")
     681            1 :         {
     682            1 :             self.0 |= true;
     683            4 :         }
     684            5 :     }
     685              : }
     686              : 
     687              : /// Serializes the fields directly supplied with a log event.
     688              : struct SerializableEventFields<'a, 'event>(
     689              :     &'a tracing::Event<'event>,
     690              :     Option<&'a SkippedFieldIndices>,
     691              : );
     692              : 
     693              : impl serde::ser::Serialize for SerializableEventFields<'_, '_> {
     694            1 :     fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
     695            1 :     where
     696            1 :         S: Serializer,
     697            1 :     {
     698              :         use serde::ser::SerializeMap;
     699            1 :         let serializer = serializer.serialize_map(None)?;
     700            1 :         let mut message_skipper = MessageFieldSkipper::new(serializer, self.1);
     701            1 :         self.0.record(&mut message_skipper);
     702            1 :         let serializer = message_skipper.into_serializer()?;
     703            1 :         serializer.end()
     704            1 :     }
     705              : }
     706              : 
     707              : /// A tracing field visitor that skips the message field.
     708              : struct MessageFieldSkipper<'a, S: serde::ser::SerializeMap> {
     709              :     serializer: S,
     710              :     skipped_field_indices: Option<&'a SkippedFieldIndices>,
     711              :     state: Result<(), S::Error>,
     712              : }
     713              : 
     714              : impl<'a, S: serde::ser::SerializeMap> MessageFieldSkipper<'a, S> {
     715              :     #[inline]
     716            1 :     fn new(serializer: S, skipped_field_indices: Option<&'a SkippedFieldIndices>) -> Self {
     717            1 :         Self {
     718            1 :             serializer,
     719            1 :             skipped_field_indices,
     720            1 :             state: Ok(()),
     721            1 :         }
     722            1 :     }
     723              : 
     724              :     #[inline]
     725            5 :     fn accept_field(&self, field: &tracing::field::Field) -> bool {
     726            5 :         self.state.is_ok()
     727            5 :             && field.name() != MESSAGE_FIELD
     728            3 :             && !field.name().starts_with("log.")
     729            3 :             && !self
     730            3 :                 .skipped_field_indices
     731            3 :                 .is_some_and(|i| i.contains(field.index()))
     732            5 :     }
     733              : 
     734              :     #[inline]
     735            1 :     fn into_serializer(self) -> Result<S, S::Error> {
     736            1 :         self.state?;
     737            1 :         Ok(self.serializer)
     738            1 :     }
     739              : }
     740              : 
     741              : impl<S: serde::ser::SerializeMap> tracing::field::Visit for MessageFieldSkipper<'_, S> {
     742              :     #[inline]
     743            0 :     fn record_f64(&mut self, field: &tracing::field::Field, value: f64) {
     744            0 :         if self.accept_field(field) {
     745            0 :             self.state = self.serializer.serialize_entry(field.name(), &value);
     746            0 :         }
     747            0 :     }
     748              : 
     749              :     #[inline]
     750            3 :     fn record_i64(&mut self, field: &tracing::field::Field, value: i64) {
     751            3 :         if self.accept_field(field) {
     752            1 :             self.state = self.serializer.serialize_entry(field.name(), &value);
     753            2 :         }
     754            3 :     }
     755              : 
     756              :     #[inline]
     757            0 :     fn record_u64(&mut self, field: &tracing::field::Field, value: u64) {
     758            0 :         if self.accept_field(field) {
     759            0 :             self.state = self.serializer.serialize_entry(field.name(), &value);
     760            0 :         }
     761            0 :     }
     762              : 
     763              :     #[inline]
     764            0 :     fn record_i128(&mut self, field: &tracing::field::Field, value: i128) {
     765            0 :         if self.accept_field(field) {
     766            0 :             self.state = self.serializer.serialize_entry(field.name(), &value);
     767            0 :         }
     768            0 :     }
     769              : 
     770              :     #[inline]
     771            0 :     fn record_u128(&mut self, field: &tracing::field::Field, value: u128) {
     772            0 :         if self.accept_field(field) {
     773            0 :             self.state = self.serializer.serialize_entry(field.name(), &value);
     774            0 :         }
     775            0 :     }
     776              : 
     777              :     #[inline]
     778            0 :     fn record_bool(&mut self, field: &tracing::field::Field, value: bool) {
     779            0 :         if self.accept_field(field) {
     780            0 :             self.state = self.serializer.serialize_entry(field.name(), &value);
     781            0 :         }
     782            0 :     }
     783              : 
     784              :     #[inline]
     785            0 :     fn record_bytes(&mut self, field: &tracing::field::Field, value: &[u8]) {
     786            0 :         if self.accept_field(field) {
     787            0 :             self.state = self
     788            0 :                 .serializer
     789            0 :                 .serialize_entry(field.name(), &format_args!("{value:x?}"));
     790            0 :         }
     791            0 :     }
     792              : 
     793              :     #[inline]
     794            1 :     fn record_str(&mut self, field: &tracing::field::Field, value: &str) {
     795            1 :         if self.accept_field(field) {
     796            0 :             self.state = self.serializer.serialize_entry(field.name(), &value);
     797            1 :         }
     798            1 :     }
     799              : 
     800              :     #[inline]
     801            1 :     fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) {
     802            1 :         if self.accept_field(field) {
     803            0 :             self.state = self
     804            0 :                 .serializer
     805            0 :                 .serialize_entry(field.name(), &format_args!("{value:?}"));
     806            1 :         }
     807            1 :     }
     808              : 
     809              :     #[inline]
     810            0 :     fn record_error(
     811            0 :         &mut self,
     812            0 :         field: &tracing::field::Field,
     813            0 :         value: &(dyn std::error::Error + 'static),
     814            0 :     ) {
     815            0 :         if self.accept_field(field) {
     816            0 :             self.state = self.serializer.serialize_value(&format_args!("{value}"));
     817            0 :         }
     818            0 :     }
     819              : }
     820              : 
     821              : /// Serializes the span stack from root to leaf (parent of event) enumerated
     822              : /// inside an object where the keys are just the number padded with zeroes
     823              : /// to retain sorting order.
     824              : // The object is necessary because Loki cannot flatten arrays.
     825              : struct SerializableSpanStack<'a, 'b, Span>(&'b Context<'a, Span>)
     826              : where
     827              :     Span: Subscriber + for<'lookup> LookupSpan<'lookup>;
     828              : 
     829              : impl<Span> serde::ser::Serialize for SerializableSpanStack<'_, '_, Span>
     830              : where
     831              :     Span: Subscriber + for<'lookup> LookupSpan<'lookup>,
     832              : {
     833            1 :     fn serialize<Ser>(&self, serializer: Ser) -> Result<Ser::Ok, Ser::Error>
     834            1 :     where
     835            1 :         Ser: serde::ser::Serializer,
     836            1 :     {
     837            1 :         let mut serializer = serializer.serialize_map(None)?;
     838              : 
     839            1 :         if let Some(leaf_span) = self.0.lookup_current() {
     840            2 :             for (i, span) in leaf_span.scope().from_root().enumerate() {
     841            2 :                 serializer.serialize_entry(&format_args!("{i:02}"), &SerializableSpan(&span))?;
     842              :             }
     843            0 :         }
     844              : 
     845            1 :         serializer.end()
     846            1 :     }
     847              : }
     848              : 
     849              : /// Serializes a single span. Include the span ID, name and its fields as
     850              : /// recorded up to this point.
     851              : struct SerializableSpan<'a, 'b, Span>(&'b SpanRef<'a, Span>)
     852              : where
     853              :     Span: for<'lookup> LookupSpan<'lookup>;
     854              : 
     855              : impl<Span> serde::ser::Serialize for SerializableSpan<'_, '_, Span>
     856              : where
     857              :     Span: for<'lookup> LookupSpan<'lookup>,
     858              : {
     859            2 :     fn serialize<Ser>(&self, serializer: Ser) -> Result<Ser::Ok, Ser::Error>
     860            2 :     where
     861            2 :         Ser: serde::ser::Serializer,
     862            2 :     {
     863            2 :         let mut serializer = serializer.serialize_map(None)?;
     864              :         // TODO: the span ID is probably only useful for debugging tracing.
     865            2 :         serializer.serialize_entry("span_id", &format_args!("{:016x}", self.0.id().into_u64()))?;
     866            2 :         serializer.serialize_entry("span_name", self.0.metadata().name())?;
     867              : 
     868            2 :         let ext = self.0.extensions();
     869            2 :         if let Some(data) = ext.get::<SpanFields>() {
     870            2 :             for (key, value) in &data.fields.pin() {
     871            1 :                 serializer.serialize_entry(key, value)?;
     872              :             }
     873            0 :         }
     874              : 
     875            2 :         serializer.end()
     876            2 :     }
     877              : }
     878              : 
     879              : #[cfg(test)]
     880              : #[allow(clippy::unwrap_used)]
     881              : mod tests {
     882              :     use std::sync::{Arc, Mutex, MutexGuard};
     883              : 
     884              :     use assert_json_diff::assert_json_eq;
     885              :     use tracing::info_span;
     886              : 
     887              :     use super::*;
     888              : 
     889              :     struct TestClock {
     890              :         current_time: Mutex<DateTime<Utc>>,
     891              :     }
     892              : 
     893              :     impl Clock for Arc<TestClock> {
     894            2 :         fn now(&self) -> DateTime<Utc> {
     895            2 :             *self.current_time.lock().expect("poisoned")
     896            2 :         }
     897              :     }
     898              : 
     899              :     struct VecWriter<'a> {
     900              :         buffer: MutexGuard<'a, Vec<u8>>,
     901              :     }
     902              : 
     903              :     impl MakeWriter for Arc<Mutex<Vec<u8>>> {
     904            1 :         fn make_writer(&self) -> impl io::Write {
     905            1 :             VecWriter {
     906            1 :                 buffer: self.lock().expect("poisoned"),
     907            1 :             }
     908            1 :         }
     909              :     }
     910              : 
     911              :     impl io::Write for VecWriter<'_> {
     912            1 :         fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
     913            1 :             self.buffer.write(buf)
     914            1 :         }
     915              : 
     916            0 :         fn flush(&mut self) -> io::Result<()> {
     917            0 :             Ok(())
     918            0 :         }
     919              :     }
     920              : 
     921              :     #[test]
     922            1 :     fn test_field_collection() {
     923            1 :         let clock = Arc::new(TestClock {
     924            1 :             current_time: Mutex::new(Utc::now()),
     925            1 :         });
     926            1 :         let buffer = Arc::new(Mutex::new(Vec::new()));
     927            1 :         let log_layer = JsonLoggingLayer {
     928            1 :             clock: clock.clone(),
     929            1 :             skipped_field_indices: papaya::HashMap::default(),
     930            1 :             writer: buffer.clone(),
     931            1 :         };
     932            1 : 
     933            1 :         let registry = tracing_subscriber::Registry::default().with(log_layer);
     934            1 : 
     935            1 :         tracing::subscriber::with_default(registry, || {
     936            1 :             info_span!("span1", x = 40, x = 41, x = 42).in_scope(|| {
     937            1 :                 info_span!("span2").in_scope(|| {
     938            1 :                     tracing::error!(
     939              :                         a = 1,
     940              :                         a = 2,
     941              :                         a = 3,
     942              :                         message = "explicit message field",
     943            0 :                         "implicit message field"
     944              :                     );
     945            1 :                 });
     946            1 :             });
     947            1 :         });
     948            1 : 
     949            1 :         let buffer = Arc::try_unwrap(buffer)
     950            1 :             .expect("no other reference")
     951            1 :             .into_inner()
     952            1 :             .expect("poisoned");
     953            1 :         let actual: serde_json::Value = serde_json::from_slice(&buffer).expect("valid JSON");
     954            1 :         let expected: serde_json::Value = serde_json::json!(
     955            1 :             {
     956            1 :                 "timestamp": clock.now().to_rfc3339_opts(chrono::SecondsFormat::Micros, true),
     957            1 :                 "level": "ERROR",
     958            1 :                 "message": "explicit message field",
     959            1 :                 "fields": {
     960            1 :                     "a": 3,
     961            1 :                 },
     962            1 :                 "spans": {
     963            1 :                     "00":{
     964            1 :                         "span_id": "0000000000000001",
     965            1 :                         "span_name": "span1",
     966            1 :                         "x": 42,
     967            1 :                     },
     968            1 :                     "01": {
     969            1 :                         "span_id": "0000000000000002",
     970            1 :                         "span_name": "span2",
     971            1 :                     }
     972            1 :                 },
     973            1 :                 "src": actual.as_object().unwrap().get("src").unwrap().as_str().unwrap(),
     974            1 :                 "target": "proxy::logging::tests",
     975            1 :                 "process_id": actual.as_object().unwrap().get("process_id").unwrap().as_number().unwrap(),
     976            1 :                 "thread_id": actual.as_object().unwrap().get("thread_id").unwrap().as_number().unwrap(),
     977            1 :                 "thread_name": "logging::tests::test_field_collection",
     978            1 :             }
     979            1 :         );
     980            1 : 
     981            1 :         assert_json_eq!(actual, expected);
     982            1 :     }
     983              : }
        

Generated by: LCOV version 2.1-beta