LCOV - code coverage report
Current view: top level - proxy/src - logging.rs (source / functions) Coverage Total Hit
Test: 65975feb441523e1ae5866ecbec0610188cb8ce3.info Lines: 54.8 % 584 320
Test Date: 2025-02-12 20:31:43 Functions: 30.5 % 141 43

            Line data    Source code
       1              : use std::cell::{Cell, RefCell};
       2              : use std::collections::HashMap;
       3              : use std::hash::BuildHasher;
       4              : use std::{env, io};
       5              : 
       6              : use chrono::{DateTime, Utc};
       7              : use opentelemetry::trace::TraceContextExt;
       8              : use scopeguard::defer;
       9              : use serde::ser::{SerializeMap, Serializer};
      10              : use tracing::span;
      11              : use tracing::subscriber::Interest;
      12              : use tracing::{callsite, Event, Metadata, Span, Subscriber};
      13              : use tracing_opentelemetry::OpenTelemetrySpanExt;
      14              : use tracing_subscriber::filter::{EnvFilter, LevelFilter};
      15              : use tracing_subscriber::fmt::format::{Format, Full};
      16              : use tracing_subscriber::fmt::time::SystemTime;
      17              : use tracing_subscriber::fmt::{FormatEvent, FormatFields};
      18              : use tracing_subscriber::layer::{Context, Layer};
      19              : use tracing_subscriber::prelude::*;
      20              : use tracing_subscriber::registry::{LookupSpan, SpanRef};
      21              : 
      22              : /// Initialize logging and OpenTelemetry tracing and exporter.
      23              : ///
      24              : /// Logging can be configured using `RUST_LOG` environment variable.
      25              : ///
      26              : /// OpenTelemetry is configured with OTLP/HTTP exporter. It picks up
      27              : /// configuration from environment variables. For example, to change the
      28              : /// destination, set `OTEL_EXPORTER_OTLP_ENDPOINT=http://jaeger:4318`.
      29              : /// See <https://opentelemetry.io/docs/reference/specification/sdk-environment-variables>
      30            0 : pub async fn init() -> anyhow::Result<LoggingGuard> {
      31            0 :     let logfmt = LogFormat::from_env()?;
      32              : 
      33            0 :     let env_filter = EnvFilter::builder()
      34            0 :         .with_default_directive(LevelFilter::INFO.into())
      35            0 :         .from_env_lossy()
      36            0 :         .add_directive(
      37            0 :             "aws_config=info"
      38            0 :                 .parse()
      39            0 :                 .expect("this should be a valid filter directive"),
      40            0 :         )
      41            0 :         .add_directive(
      42            0 :             "azure_core::policies::transport=off"
      43            0 :                 .parse()
      44            0 :                 .expect("this should be a valid filter directive"),
      45            0 :         );
      46              : 
      47            0 :     let otlp_layer = tracing_utils::init_tracing("proxy").await;
      48              : 
      49            0 :     let json_log_layer = if logfmt == LogFormat::Json {
      50            0 :         Some(JsonLoggingLayer {
      51            0 :             clock: RealClock,
      52            0 :             skipped_field_indices: papaya::HashMap::default(),
      53            0 :             writer: StderrWriter {
      54            0 :                 stderr: std::io::stderr(),
      55            0 :             },
      56            0 :         })
      57              :     } else {
      58            0 :         None
      59              :     };
      60              : 
      61            0 :     let text_log_layer = if logfmt == LogFormat::Text {
      62            0 :         Some(
      63            0 :             tracing_subscriber::fmt::layer()
      64            0 :                 .with_ansi(false)
      65            0 :                 .with_writer(std::io::stderr)
      66            0 :                 .with_target(false),
      67            0 :         )
      68              :     } else {
      69            0 :         None
      70              :     };
      71              : 
      72            0 :     tracing_subscriber::registry()
      73            0 :         .with(env_filter)
      74            0 :         .with(otlp_layer)
      75            0 :         .with(json_log_layer)
      76            0 :         .with(text_log_layer)
      77            0 :         .try_init()?;
      78              : 
      79            0 :     Ok(LoggingGuard)
      80            0 : }
      81              : 
      82              : /// Initialize logging for local_proxy with log prefix and no opentelemetry.
      83              : ///
      84              : /// Logging can be configured using `RUST_LOG` environment variable.
      85            0 : pub fn init_local_proxy() -> anyhow::Result<LoggingGuard> {
      86            0 :     let env_filter = EnvFilter::builder()
      87            0 :         .with_default_directive(LevelFilter::INFO.into())
      88            0 :         .from_env_lossy();
      89            0 : 
      90            0 :     let fmt_layer = tracing_subscriber::fmt::layer()
      91            0 :         .with_ansi(false)
      92            0 :         .with_writer(std::io::stderr)
      93            0 :         .event_format(LocalProxyFormatter(Format::default().with_target(false)));
      94            0 : 
      95            0 :     tracing_subscriber::registry()
      96            0 :         .with(env_filter)
      97            0 :         .with(fmt_layer)
      98            0 :         .try_init()?;
      99              : 
     100            0 :     Ok(LoggingGuard)
     101            0 : }
     102              : 
     103              : pub struct LocalProxyFormatter(Format<Full, SystemTime>);
     104              : 
     105              : impl<S, N> FormatEvent<S, N> for LocalProxyFormatter
     106              : where
     107              :     S: Subscriber + for<'a> LookupSpan<'a>,
     108              :     N: for<'a> FormatFields<'a> + 'static,
     109              : {
     110            0 :     fn format_event(
     111            0 :         &self,
     112            0 :         ctx: &tracing_subscriber::fmt::FmtContext<'_, S, N>,
     113            0 :         mut writer: tracing_subscriber::fmt::format::Writer<'_>,
     114            0 :         event: &tracing::Event<'_>,
     115            0 :     ) -> std::fmt::Result {
     116            0 :         writer.write_str("[local_proxy] ")?;
     117            0 :         self.0.format_event(ctx, writer, event)
     118            0 :     }
     119              : }
     120              : 
     121              : pub struct LoggingGuard;
     122              : 
     123              : impl Drop for LoggingGuard {
     124            0 :     fn drop(&mut self) {
     125            0 :         // Shutdown trace pipeline gracefully, so that it has a chance to send any
     126            0 :         // pending traces before we exit.
     127            0 :         tracing::info!("shutting down the tracing machinery");
     128            0 :         tracing_utils::shutdown_tracing();
     129            0 :     }
     130              : }
     131              : 
     132              : // TODO: make JSON the default
     133              : #[derive(Copy, Clone, PartialEq, Eq, Default, Debug)]
     134              : enum LogFormat {
     135              :     #[default]
     136              :     Text = 1,
     137              :     Json,
     138              : }
     139              : 
     140              : impl LogFormat {
     141            0 :     fn from_env() -> anyhow::Result<Self> {
     142            0 :         let logfmt = env::var("LOGFMT");
     143            0 :         Ok(match logfmt.as_deref() {
     144            0 :             Err(_) => LogFormat::default(),
     145            0 :             Ok("text") => LogFormat::Text,
     146            0 :             Ok("json") => LogFormat::Json,
     147            0 :             Ok(logfmt) => anyhow::bail!("unknown log format: {logfmt}"),
     148              :         })
     149            0 :     }
     150              : }
     151              : 
     152              : trait MakeWriter {
     153              :     fn make_writer(&self) -> impl io::Write;
     154              : }
     155              : 
     156              : struct StderrWriter {
     157              :     stderr: io::Stderr,
     158              : }
     159              : 
     160              : impl MakeWriter for StderrWriter {
     161              :     #[inline]
     162            0 :     fn make_writer(&self) -> impl io::Write {
     163            0 :         self.stderr.lock()
     164            0 :     }
     165              : }
     166              : 
     167              : // TODO: move into separate module or even separate crate.
     168              : trait Clock {
     169              :     fn now(&self) -> DateTime<Utc>;
     170              : }
     171              : 
     172              : struct RealClock;
     173              : 
     174              : impl Clock for RealClock {
     175              :     #[inline]
     176            0 :     fn now(&self) -> DateTime<Utc> {
     177            0 :         Utc::now()
     178            0 :     }
     179              : }
     180              : 
     181              : /// Name of the field used by tracing crate to store the event message.
     182              : const MESSAGE_FIELD: &str = "message";
     183              : 
     184              : thread_local! {
     185              :     /// Protects against deadlocks and double panics during log writing.
     186              :     /// The current panic handler will use tracing to log panic information.
     187              :     static REENTRANCY_GUARD: Cell<bool> = const { Cell::new(false) };
     188              :     /// Thread-local instance with per-thread buffer for log writing.
     189              :     static EVENT_FORMATTER: RefCell<EventFormatter> = RefCell::new(EventFormatter::new());
     190              :     /// Cached OS thread ID.
     191              :     static THREAD_ID: u64 = gettid::gettid();
     192              : }
     193              : 
     194              : /// Implements tracing layer to handle events specific to logging.
     195              : struct JsonLoggingLayer<C: Clock, W: MakeWriter> {
     196              :     clock: C,
     197              :     skipped_field_indices: papaya::HashMap<callsite::Identifier, SkippedFieldIndices>,
     198              :     writer: W,
     199              : }
     200              : 
     201              : impl<S, C: Clock + 'static, W: MakeWriter + 'static> Layer<S> for JsonLoggingLayer<C, W>
     202              : where
     203              :     S: Subscriber + for<'a> LookupSpan<'a>,
     204              : {
     205            1 :     fn on_event(&self, event: &Event<'_>, ctx: Context<'_, S>) {
     206              :         use std::io::Write;
     207              : 
     208              :         // TODO: consider special tracing subscriber to grab timestamp very
     209              :         //       early, before OTel machinery, and add as event extension.
     210            1 :         let now = self.clock.now();
     211            1 : 
     212            1 :         let res: io::Result<()> = REENTRANCY_GUARD.with(move |entered| {
     213            1 :             if entered.get() {
     214            0 :                 let mut formatter = EventFormatter::new();
     215            0 :                 formatter.format(now, event, &ctx, &self.skipped_field_indices)?;
     216            0 :                 self.writer.make_writer().write_all(formatter.buffer())
     217              :             } else {
     218            1 :                 entered.set(true);
     219            1 :                 defer!(entered.set(false););
     220            1 : 
     221            1 :                 EVENT_FORMATTER.with_borrow_mut(move |formatter| {
     222            1 :                     formatter.reset();
     223            1 :                     formatter.format(now, event, &ctx, &self.skipped_field_indices)?;
     224            1 :                     self.writer.make_writer().write_all(formatter.buffer())
     225            1 :                 })
     226              :             }
     227            1 :         });
     228              : 
     229              :         // In case logging fails we generate a simpler JSON object.
     230            1 :         if let Err(err) = res {
     231            0 :             if let Ok(mut line) = serde_json::to_vec(&serde_json::json!( {
     232            0 :                 "timestamp": now.to_rfc3339_opts(chrono::SecondsFormat::Micros, true),
     233            0 :                 "level": "ERROR",
     234            0 :                 "message": format_args!("cannot log event: {err:?}"),
     235            0 :                 "fields": {
     236            0 :                     "event": format_args!("{event:?}"),
     237            0 :                 },
     238            0 :             })) {
     239            0 :                 line.push(b'\n');
     240            0 :                 self.writer.make_writer().write_all(&line).ok();
     241            0 :             }
     242            1 :         }
     243            1 :     }
     244              : 
     245              :     /// Registers a SpanFields instance as span extension.
     246            2 :     fn on_new_span(&self, attrs: &span::Attributes<'_>, id: &span::Id, ctx: Context<'_, S>) {
     247            2 :         let span = ctx.span(id).expect("span must exist");
     248            2 :         let fields = SpanFields::default();
     249            2 :         fields.record_fields(attrs);
     250            2 :         // This could deadlock when there's a panic somewhere in the tracing
     251            2 :         // event handling and a read or write guard is still held. This includes
     252            2 :         // the OTel subscriber.
     253            2 :         span.extensions_mut().insert(fields);
     254            2 :     }
     255              : 
     256            0 :     fn on_record(&self, id: &span::Id, values: &span::Record<'_>, ctx: Context<'_, S>) {
     257            0 :         let span = ctx.span(id).expect("span must exist");
     258            0 :         let ext = span.extensions();
     259            0 :         if let Some(data) = ext.get::<SpanFields>() {
     260            0 :             data.record_fields(values);
     261            0 :         }
     262            0 :     }
     263              : 
     264              :     /// Called (lazily) whenever a new log call is executed. We quickly check
     265              :     /// for duplicate field names and record duplicates as skippable. Last one
     266              :     /// wins.
     267            3 :     fn register_callsite(&self, metadata: &'static Metadata<'static>) -> Interest {
     268            3 :         if !metadata.is_event() {
     269              :             // Must not be never because we wouldn't get trace and span data.
     270            2 :             return Interest::always();
     271            1 :         }
     272            1 : 
     273            1 :         let mut field_indices = SkippedFieldIndices::default();
     274            1 :         let mut seen_fields = HashMap::<&'static str, usize>::new();
     275            5 :         for field in metadata.fields() {
     276              :             use std::collections::hash_map::Entry;
     277            5 :             match seen_fields.entry(field.name()) {
     278            2 :                 Entry::Vacant(entry) => {
     279            2 :                     // field not seen yet
     280            2 :                     entry.insert(field.index());
     281            2 :                 }
     282            3 :                 Entry::Occupied(mut entry) => {
     283            3 :                     // replace currently stored index
     284            3 :                     let old_index = entry.insert(field.index());
     285            3 :                     // ... and append it to list of skippable indices
     286            3 :                     field_indices.push(old_index);
     287            3 :                 }
     288              :             }
     289              :         }
     290              : 
     291            1 :         if !field_indices.is_empty() {
     292            1 :             self.skipped_field_indices
     293            1 :                 .pin()
     294            1 :                 .insert(metadata.callsite(), field_indices);
     295            1 :         }
     296              : 
     297            1 :         Interest::always()
     298            3 :     }
     299              : }
     300              : 
     301              : /// Stores span field values recorded during the spans lifetime.
     302              : #[derive(Default)]
     303              : struct SpanFields {
     304              :     // TODO: Switch to custom enum with lasso::Spur for Strings?
     305              :     fields: papaya::HashMap<&'static str, serde_json::Value>,
     306              : }
     307              : 
     308              : impl SpanFields {
     309              :     #[inline]
     310            2 :     fn record_fields<R: tracing_subscriber::field::RecordFields>(&self, fields: R) {
     311            2 :         fields.record(&mut SpanFieldsRecorder {
     312            2 :             fields: self.fields.pin(),
     313            2 :         });
     314            2 :     }
     315              : }
     316              : 
     317              : /// Implements a tracing field visitor to convert and store values.
     318              : struct SpanFieldsRecorder<'m, S, G> {
     319              :     fields: papaya::HashMapRef<'m, &'static str, serde_json::Value, S, G>,
     320              : }
     321              : 
     322              : impl<S: BuildHasher, G: papaya::Guard> tracing::field::Visit for SpanFieldsRecorder<'_, S, G> {
     323              :     #[inline]
     324            0 :     fn record_f64(&mut self, field: &tracing::field::Field, value: f64) {
     325            0 :         self.fields
     326            0 :             .insert(field.name(), serde_json::Value::from(value));
     327            0 :     }
     328              : 
     329              :     #[inline]
     330            3 :     fn record_i64(&mut self, field: &tracing::field::Field, value: i64) {
     331            3 :         self.fields
     332            3 :             .insert(field.name(), serde_json::Value::from(value));
     333            3 :     }
     334              : 
     335              :     #[inline]
     336            0 :     fn record_u64(&mut self, field: &tracing::field::Field, value: u64) {
     337            0 :         self.fields
     338            0 :             .insert(field.name(), serde_json::Value::from(value));
     339            0 :     }
     340              : 
     341              :     #[inline]
     342            0 :     fn record_i128(&mut self, field: &tracing::field::Field, value: i128) {
     343            0 :         if let Ok(value) = i64::try_from(value) {
     344            0 :             self.fields
     345            0 :                 .insert(field.name(), serde_json::Value::from(value));
     346            0 :         } else {
     347            0 :             self.fields
     348            0 :                 .insert(field.name(), serde_json::Value::from(format!("{value}")));
     349            0 :         }
     350            0 :     }
     351              : 
     352              :     #[inline]
     353            0 :     fn record_u128(&mut self, field: &tracing::field::Field, value: u128) {
     354            0 :         if let Ok(value) = u64::try_from(value) {
     355            0 :             self.fields
     356            0 :                 .insert(field.name(), serde_json::Value::from(value));
     357            0 :         } else {
     358            0 :             self.fields
     359            0 :                 .insert(field.name(), serde_json::Value::from(format!("{value}")));
     360            0 :         }
     361            0 :     }
     362              : 
     363              :     #[inline]
     364            0 :     fn record_bool(&mut self, field: &tracing::field::Field, value: bool) {
     365            0 :         self.fields
     366            0 :             .insert(field.name(), serde_json::Value::from(value));
     367            0 :     }
     368              : 
     369              :     #[inline]
     370            0 :     fn record_bytes(&mut self, field: &tracing::field::Field, value: &[u8]) {
     371            0 :         self.fields
     372            0 :             .insert(field.name(), serde_json::Value::from(value));
     373            0 :     }
     374              : 
     375              :     #[inline]
     376            0 :     fn record_str(&mut self, field: &tracing::field::Field, value: &str) {
     377            0 :         self.fields
     378            0 :             .insert(field.name(), serde_json::Value::from(value));
     379            0 :     }
     380              : 
     381              :     #[inline]
     382            0 :     fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) {
     383            0 :         self.fields
     384            0 :             .insert(field.name(), serde_json::Value::from(format!("{value:?}")));
     385            0 :     }
     386              : 
     387              :     #[inline]
     388            0 :     fn record_error(
     389            0 :         &mut self,
     390            0 :         field: &tracing::field::Field,
     391            0 :         value: &(dyn std::error::Error + 'static),
     392            0 :     ) {
     393            0 :         self.fields
     394            0 :             .insert(field.name(), serde_json::Value::from(format!("{value}")));
     395            0 :     }
     396              : }
     397              : 
     398              : /// List of field indices skipped during logging. Can list duplicate fields or
     399              : /// metafields not meant to be logged.
     400              : #[derive(Clone, Default)]
     401              : struct SkippedFieldIndices {
     402              :     bits: u64,
     403              : }
     404              : 
     405              : impl SkippedFieldIndices {
     406              :     #[inline]
     407            1 :     fn is_empty(&self) -> bool {
     408            1 :         self.bits == 0
     409            1 :     }
     410              : 
     411              :     #[inline]
     412            3 :     fn push(&mut self, index: usize) {
     413            3 :         self.bits |= 1u64
     414            3 :             .checked_shl(index as u32)
     415            3 :             .expect("field index too large");
     416            3 :     }
     417              : 
     418              :     #[inline]
     419           10 :     fn contains(&self, index: usize) -> bool {
     420           10 :         self.bits
     421           10 :             & 1u64
     422           10 :                 .checked_shl(index as u32)
     423           10 :                 .expect("field index too large")
     424           10 :             != 0
     425           10 :     }
     426              : }
     427              : 
     428              : /// Formats a tracing event and writes JSON to its internal buffer including a newline.
     429              : // TODO: buffer capacity management, truncate if too large
     430              : struct EventFormatter {
     431              :     logline_buffer: Vec<u8>,
     432              : }
     433              : 
     434              : impl EventFormatter {
     435              :     #[inline]
     436            1 :     fn new() -> Self {
     437            1 :         EventFormatter {
     438            1 :             logline_buffer: Vec::new(),
     439            1 :         }
     440            1 :     }
     441              : 
     442              :     #[inline]
     443            1 :     fn buffer(&self) -> &[u8] {
     444            1 :         &self.logline_buffer
     445            1 :     }
     446              : 
     447              :     #[inline]
     448            1 :     fn reset(&mut self) {
     449            1 :         self.logline_buffer.clear();
     450            1 :     }
     451              : 
     452            1 :     fn format<S>(
     453            1 :         &mut self,
     454            1 :         now: DateTime<Utc>,
     455            1 :         event: &Event<'_>,
     456            1 :         ctx: &Context<'_, S>,
     457            1 :         skipped_field_indices: &papaya::HashMap<callsite::Identifier, SkippedFieldIndices>,
     458            1 :     ) -> io::Result<()>
     459            1 :     where
     460            1 :         S: Subscriber + for<'a> LookupSpan<'a>,
     461            1 :     {
     462            1 :         let timestamp = now.to_rfc3339_opts(chrono::SecondsFormat::Micros, true);
     463              : 
     464              :         use tracing_log::NormalizeEvent;
     465            1 :         let normalized_meta = event.normalized_metadata();
     466            1 :         let meta = normalized_meta.as_ref().unwrap_or_else(|| event.metadata());
     467            1 : 
     468            1 :         let skipped_field_indices = skipped_field_indices.pin();
     469            1 :         let skipped_field_indices = skipped_field_indices.get(&meta.callsite());
     470            1 : 
     471            1 :         let mut serialize = || {
     472            1 :             let mut serializer = serde_json::Serializer::new(&mut self.logline_buffer);
     473              : 
     474            1 :             let mut serializer = serializer.serialize_map(None)?;
     475              : 
     476              :             // Timestamp comes first, so raw lines can be sorted by timestamp.
     477            1 :             serializer.serialize_entry("timestamp", &timestamp)?;
     478              : 
     479              :             // Level next.
     480            1 :             serializer.serialize_entry("level", &meta.level().as_str())?;
     481              : 
     482              :             // Message next.
     483            1 :             serializer.serialize_key("message")?;
     484            1 :             let mut message_extractor =
     485            1 :                 MessageFieldExtractor::new(serializer, skipped_field_indices);
     486            1 :             event.record(&mut message_extractor);
     487            1 :             let mut serializer = message_extractor.into_serializer()?;
     488              : 
     489            1 :             let mut fields_present = FieldsPresent(false, skipped_field_indices);
     490            1 :             event.record(&mut fields_present);
     491            1 :             if fields_present.0 {
     492            1 :                 serializer.serialize_entry(
     493            1 :                     "fields",
     494            1 :                     &SerializableEventFields(event, skipped_field_indices),
     495            1 :                 )?;
     496            0 :             }
     497              : 
     498            1 :             let pid = std::process::id();
     499            1 :             if pid != 1 {
     500            1 :                 serializer.serialize_entry("process_id", &pid)?;
     501            0 :             }
     502              : 
     503            1 :             THREAD_ID.with(|tid| serializer.serialize_entry("thread_id", tid))?;
     504              : 
     505              :             // TODO: tls cache? name could change
     506            1 :             if let Some(thread_name) = std::thread::current().name() {
     507            1 :                 if !thread_name.is_empty() && thread_name != "tokio-runtime-worker" {
     508            1 :                     serializer.serialize_entry("thread_name", thread_name)?;
     509            0 :                 }
     510            0 :             }
     511              : 
     512            1 :             if let Some(task_id) = tokio::task::try_id() {
     513            0 :                 serializer.serialize_entry("task_id", &format_args!("{task_id}"))?;
     514            1 :             }
     515              : 
     516            1 :             serializer.serialize_entry("target", meta.target())?;
     517              : 
     518            1 :             if let Some(module) = meta.module_path() {
     519            1 :                 if module != meta.target() {
     520            0 :                     serializer.serialize_entry("module", module)?;
     521            1 :                 }
     522            0 :             }
     523              : 
     524            1 :             if let Some(file) = meta.file() {
     525            1 :                 if let Some(line) = meta.line() {
     526            1 :                     serializer.serialize_entry("src", &format_args!("{file}:{line}"))?;
     527              :                 } else {
     528            0 :                     serializer.serialize_entry("src", file)?;
     529              :                 }
     530            0 :             }
     531              : 
     532              :             {
     533            1 :                 let otel_context = Span::current().context();
     534            1 :                 let otel_spanref = otel_context.span();
     535            1 :                 let span_context = otel_spanref.span_context();
     536            1 :                 if span_context.is_valid() {
     537            0 :                     serializer.serialize_entry(
     538            0 :                         "trace_id",
     539            0 :                         &format_args!("{}", span_context.trace_id()),
     540            0 :                     )?;
     541            1 :                 }
     542              :             }
     543              : 
     544            1 :             serializer.serialize_entry("spans", &SerializableSpanStack(ctx))?;
     545              : 
     546            1 :             serializer.end()
     547            1 :         };
     548              : 
     549            1 :         serialize().map_err(io::Error::other)?;
     550            1 :         self.logline_buffer.push(b'\n');
     551            1 :         Ok(())
     552            1 :     }
     553              : }
     554              : 
     555              : /// Extracts the message field that's mixed will other fields.
     556              : struct MessageFieldExtractor<'a, S: serde::ser::SerializeMap> {
     557              :     serializer: S,
     558              :     skipped_field_indices: Option<&'a SkippedFieldIndices>,
     559              :     state: Option<Result<(), S::Error>>,
     560              : }
     561              : 
     562              : impl<'a, S: serde::ser::SerializeMap> MessageFieldExtractor<'a, S> {
     563              :     #[inline]
     564            1 :     fn new(serializer: S, skipped_field_indices: Option<&'a SkippedFieldIndices>) -> Self {
     565            1 :         Self {
     566            1 :             serializer,
     567            1 :             skipped_field_indices,
     568            1 :             state: None,
     569            1 :         }
     570            1 :     }
     571              : 
     572              :     #[inline]
     573            1 :     fn into_serializer(mut self) -> Result<S, S::Error> {
     574            1 :         match self.state {
     575            1 :             Some(Ok(())) => {}
     576            0 :             Some(Err(err)) => return Err(err),
     577            0 :             None => self.serializer.serialize_value("")?,
     578              :         }
     579            1 :         Ok(self.serializer)
     580            1 :     }
     581              : 
     582              :     #[inline]
     583            5 :     fn accept_field(&self, field: &tracing::field::Field) -> bool {
     584            5 :         self.state.is_none()
     585            5 :             && field.name() == MESSAGE_FIELD
     586            2 :             && !self
     587            2 :                 .skipped_field_indices
     588            2 :                 .is_some_and(|i| i.contains(field.index()))
     589            5 :     }
     590              : }
     591              : 
     592              : impl<S: serde::ser::SerializeMap> tracing::field::Visit for MessageFieldExtractor<'_, S> {
     593              :     #[inline]
     594            0 :     fn record_f64(&mut self, field: &tracing::field::Field, value: f64) {
     595            0 :         if self.accept_field(field) {
     596            0 :             self.state = Some(self.serializer.serialize_value(&value));
     597            0 :         }
     598            0 :     }
     599              : 
     600              :     #[inline]
     601            3 :     fn record_i64(&mut self, field: &tracing::field::Field, value: i64) {
     602            3 :         if self.accept_field(field) {
     603            0 :             self.state = Some(self.serializer.serialize_value(&value));
     604            3 :         }
     605            3 :     }
     606              : 
     607              :     #[inline]
     608            0 :     fn record_u64(&mut self, field: &tracing::field::Field, value: u64) {
     609            0 :         if self.accept_field(field) {
     610            0 :             self.state = Some(self.serializer.serialize_value(&value));
     611            0 :         }
     612            0 :     }
     613              : 
     614              :     #[inline]
     615            0 :     fn record_i128(&mut self, field: &tracing::field::Field, value: i128) {
     616            0 :         if self.accept_field(field) {
     617            0 :             self.state = Some(self.serializer.serialize_value(&value));
     618            0 :         }
     619            0 :     }
     620              : 
     621              :     #[inline]
     622            0 :     fn record_u128(&mut self, field: &tracing::field::Field, value: u128) {
     623            0 :         if self.accept_field(field) {
     624            0 :             self.state = Some(self.serializer.serialize_value(&value));
     625            0 :         }
     626            0 :     }
     627              : 
     628              :     #[inline]
     629            0 :     fn record_bool(&mut self, field: &tracing::field::Field, value: bool) {
     630            0 :         if self.accept_field(field) {
     631            0 :             self.state = Some(self.serializer.serialize_value(&value));
     632            0 :         }
     633            0 :     }
     634              : 
     635              :     #[inline]
     636            0 :     fn record_bytes(&mut self, field: &tracing::field::Field, value: &[u8]) {
     637            0 :         if self.accept_field(field) {
     638            0 :             self.state = Some(self.serializer.serialize_value(&format_args!("{value:x?}")));
     639            0 :         }
     640            0 :     }
     641              : 
     642              :     #[inline]
     643            1 :     fn record_str(&mut self, field: &tracing::field::Field, value: &str) {
     644            1 :         if self.accept_field(field) {
     645            1 :             self.state = Some(self.serializer.serialize_value(&value));
     646            1 :         }
     647            1 :     }
     648              : 
     649              :     #[inline]
     650            1 :     fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) {
     651            1 :         if self.accept_field(field) {
     652            0 :             self.state = Some(self.serializer.serialize_value(&format_args!("{value:?}")));
     653            1 :         }
     654            1 :     }
     655              : 
     656              :     #[inline]
     657            0 :     fn record_error(
     658            0 :         &mut self,
     659            0 :         field: &tracing::field::Field,
     660            0 :         value: &(dyn std::error::Error + 'static),
     661            0 :     ) {
     662            0 :         if self.accept_field(field) {
     663            0 :             self.state = Some(self.serializer.serialize_value(&format_args!("{value}")));
     664            0 :         }
     665            0 :     }
     666              : }
     667              : 
     668              : /// Checks if there's any fields and field values present. If not, the JSON subobject
     669              : /// can be skipped.
     670              : // This is entirely optional and only cosmetic, though maybe helps a
     671              : // bit during log parsing in dashboards when there's no field with empty object.
     672              : struct FieldsPresent<'a>(pub bool, Option<&'a SkippedFieldIndices>);
     673              : 
     674              : // Even though some methods have an overhead (error, bytes) it is assumed the
     675              : // compiler won't include this since we ignore the value entirely.
     676              : impl tracing::field::Visit for FieldsPresent<'_> {
     677              :     #[inline]
     678            5 :     fn record_debug(&mut self, field: &tracing::field::Field, _: &dyn std::fmt::Debug) {
     679            5 :         if !self.1.is_some_and(|i| i.contains(field.index()))
     680            2 :             && field.name() != MESSAGE_FIELD
     681            1 :             && !field.name().starts_with("log.")
     682            1 :         {
     683            1 :             self.0 |= true;
     684            4 :         }
     685            5 :     }
     686              : }
     687              : 
     688              : /// Serializes the fields directly supplied with a log event.
     689              : struct SerializableEventFields<'a, 'event>(
     690              :     &'a tracing::Event<'event>,
     691              :     Option<&'a SkippedFieldIndices>,
     692              : );
     693              : 
     694              : impl serde::ser::Serialize for SerializableEventFields<'_, '_> {
     695            1 :     fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
     696            1 :     where
     697            1 :         S: Serializer,
     698            1 :     {
     699              :         use serde::ser::SerializeMap;
     700            1 :         let serializer = serializer.serialize_map(None)?;
     701            1 :         let mut message_skipper = MessageFieldSkipper::new(serializer, self.1);
     702            1 :         self.0.record(&mut message_skipper);
     703            1 :         let serializer = message_skipper.into_serializer()?;
     704            1 :         serializer.end()
     705            1 :     }
     706              : }
     707              : 
     708              : /// A tracing field visitor that skips the message field.
     709              : struct MessageFieldSkipper<'a, S: serde::ser::SerializeMap> {
     710              :     serializer: S,
     711              :     skipped_field_indices: Option<&'a SkippedFieldIndices>,
     712              :     state: Result<(), S::Error>,
     713              : }
     714              : 
     715              : impl<'a, S: serde::ser::SerializeMap> MessageFieldSkipper<'a, S> {
     716              :     #[inline]
     717            1 :     fn new(serializer: S, skipped_field_indices: Option<&'a SkippedFieldIndices>) -> Self {
     718            1 :         Self {
     719            1 :             serializer,
     720            1 :             skipped_field_indices,
     721            1 :             state: Ok(()),
     722            1 :         }
     723            1 :     }
     724              : 
     725              :     #[inline]
     726            5 :     fn accept_field(&self, field: &tracing::field::Field) -> bool {
     727            5 :         self.state.is_ok()
     728            5 :             && field.name() != MESSAGE_FIELD
     729            3 :             && !field.name().starts_with("log.")
     730            3 :             && !self
     731            3 :                 .skipped_field_indices
     732            3 :                 .is_some_and(|i| i.contains(field.index()))
     733            5 :     }
     734              : 
     735              :     #[inline]
     736            1 :     fn into_serializer(self) -> Result<S, S::Error> {
     737            1 :         self.state?;
     738            1 :         Ok(self.serializer)
     739            1 :     }
     740              : }
     741              : 
     742              : impl<S: serde::ser::SerializeMap> tracing::field::Visit for MessageFieldSkipper<'_, S> {
     743              :     #[inline]
     744            0 :     fn record_f64(&mut self, field: &tracing::field::Field, value: f64) {
     745            0 :         if self.accept_field(field) {
     746            0 :             self.state = self.serializer.serialize_entry(field.name(), &value);
     747            0 :         }
     748            0 :     }
     749              : 
     750              :     #[inline]
     751            3 :     fn record_i64(&mut self, field: &tracing::field::Field, value: i64) {
     752            3 :         if self.accept_field(field) {
     753            1 :             self.state = self.serializer.serialize_entry(field.name(), &value);
     754            2 :         }
     755            3 :     }
     756              : 
     757              :     #[inline]
     758            0 :     fn record_u64(&mut self, field: &tracing::field::Field, value: u64) {
     759            0 :         if self.accept_field(field) {
     760            0 :             self.state = self.serializer.serialize_entry(field.name(), &value);
     761            0 :         }
     762            0 :     }
     763              : 
     764              :     #[inline]
     765            0 :     fn record_i128(&mut self, field: &tracing::field::Field, value: i128) {
     766            0 :         if self.accept_field(field) {
     767            0 :             self.state = self.serializer.serialize_entry(field.name(), &value);
     768            0 :         }
     769            0 :     }
     770              : 
     771              :     #[inline]
     772            0 :     fn record_u128(&mut self, field: &tracing::field::Field, value: u128) {
     773            0 :         if self.accept_field(field) {
     774            0 :             self.state = self.serializer.serialize_entry(field.name(), &value);
     775            0 :         }
     776            0 :     }
     777              : 
     778              :     #[inline]
     779            0 :     fn record_bool(&mut self, field: &tracing::field::Field, value: bool) {
     780            0 :         if self.accept_field(field) {
     781            0 :             self.state = self.serializer.serialize_entry(field.name(), &value);
     782            0 :         }
     783            0 :     }
     784              : 
     785              :     #[inline]
     786            0 :     fn record_bytes(&mut self, field: &tracing::field::Field, value: &[u8]) {
     787            0 :         if self.accept_field(field) {
     788            0 :             self.state = self
     789            0 :                 .serializer
     790            0 :                 .serialize_entry(field.name(), &format_args!("{value:x?}"));
     791            0 :         }
     792            0 :     }
     793              : 
     794              :     #[inline]
     795            1 :     fn record_str(&mut self, field: &tracing::field::Field, value: &str) {
     796            1 :         if self.accept_field(field) {
     797            0 :             self.state = self.serializer.serialize_entry(field.name(), &value);
     798            1 :         }
     799            1 :     }
     800              : 
     801              :     #[inline]
     802            1 :     fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) {
     803            1 :         if self.accept_field(field) {
     804            0 :             self.state = self
     805            0 :                 .serializer
     806            0 :                 .serialize_entry(field.name(), &format_args!("{value:?}"));
     807            1 :         }
     808            1 :     }
     809              : 
     810              :     #[inline]
     811            0 :     fn record_error(
     812            0 :         &mut self,
     813            0 :         field: &tracing::field::Field,
     814            0 :         value: &(dyn std::error::Error + 'static),
     815            0 :     ) {
     816            0 :         if self.accept_field(field) {
     817            0 :             self.state = self.serializer.serialize_value(&format_args!("{value}"));
     818            0 :         }
     819            0 :     }
     820              : }
     821              : 
     822              : /// Serializes the span stack from root to leaf (parent of event) enumerated
     823              : /// inside an object where the keys are just the number padded with zeroes
     824              : /// to retain sorting order.
     825              : // The object is necessary because Loki cannot flatten arrays.
     826              : struct SerializableSpanStack<'a, 'b, Span>(&'b Context<'a, Span>)
     827              : where
     828              :     Span: Subscriber + for<'lookup> LookupSpan<'lookup>;
     829              : 
     830              : impl<Span> serde::ser::Serialize for SerializableSpanStack<'_, '_, Span>
     831              : where
     832              :     Span: Subscriber + for<'lookup> LookupSpan<'lookup>,
     833              : {
     834            1 :     fn serialize<Ser>(&self, serializer: Ser) -> Result<Ser::Ok, Ser::Error>
     835            1 :     where
     836            1 :         Ser: serde::ser::Serializer,
     837            1 :     {
     838            1 :         let mut serializer = serializer.serialize_map(None)?;
     839              : 
     840            1 :         if let Some(leaf_span) = self.0.lookup_current() {
     841            2 :             for (i, span) in leaf_span.scope().from_root().enumerate() {
     842            2 :                 serializer.serialize_entry(&format_args!("{i:02}"), &SerializableSpan(&span))?;
     843              :             }
     844            0 :         }
     845              : 
     846            1 :         serializer.end()
     847            1 :     }
     848              : }
     849              : 
     850              : /// Serializes a single span. Include the span ID, name and its fields as
     851              : /// recorded up to this point.
     852              : struct SerializableSpan<'a, 'b, Span>(&'b SpanRef<'a, Span>)
     853              : where
     854              :     Span: for<'lookup> LookupSpan<'lookup>;
     855              : 
     856              : impl<Span> serde::ser::Serialize for SerializableSpan<'_, '_, Span>
     857              : where
     858              :     Span: for<'lookup> LookupSpan<'lookup>,
     859              : {
     860            2 :     fn serialize<Ser>(&self, serializer: Ser) -> Result<Ser::Ok, Ser::Error>
     861            2 :     where
     862            2 :         Ser: serde::ser::Serializer,
     863            2 :     {
     864            2 :         let mut serializer = serializer.serialize_map(None)?;
     865              :         // TODO: the span ID is probably only useful for debugging tracing.
     866            2 :         serializer.serialize_entry("span_id", &format_args!("{:016x}", self.0.id().into_u64()))?;
     867            2 :         serializer.serialize_entry("span_name", self.0.metadata().name())?;
     868              : 
     869            2 :         let ext = self.0.extensions();
     870            2 :         if let Some(data) = ext.get::<SpanFields>() {
     871            2 :             for (key, value) in &data.fields.pin() {
     872            1 :                 serializer.serialize_entry(key, value)?;
     873              :             }
     874            0 :         }
     875              : 
     876            2 :         serializer.end()
     877            2 :     }
     878              : }
     879              : 
     880              : #[cfg(test)]
     881              : #[allow(clippy::unwrap_used)]
     882              : mod tests {
     883              :     use std::sync::{Arc, Mutex, MutexGuard};
     884              : 
     885              :     use assert_json_diff::assert_json_eq;
     886              :     use tracing::info_span;
     887              : 
     888              :     use super::*;
     889              : 
     890              :     struct TestClock {
     891              :         current_time: Mutex<DateTime<Utc>>,
     892              :     }
     893              : 
     894              :     impl Clock for Arc<TestClock> {
     895            2 :         fn now(&self) -> DateTime<Utc> {
     896            2 :             *self.current_time.lock().expect("poisoned")
     897            2 :         }
     898              :     }
     899              : 
     900              :     struct VecWriter<'a> {
     901              :         buffer: MutexGuard<'a, Vec<u8>>,
     902              :     }
     903              : 
     904              :     impl MakeWriter for Arc<Mutex<Vec<u8>>> {
     905            1 :         fn make_writer(&self) -> impl io::Write {
     906            1 :             VecWriter {
     907            1 :                 buffer: self.lock().expect("poisoned"),
     908            1 :             }
     909            1 :         }
     910              :     }
     911              : 
     912              :     impl io::Write for VecWriter<'_> {
     913            1 :         fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
     914            1 :             self.buffer.write(buf)
     915            1 :         }
     916              : 
     917            0 :         fn flush(&mut self) -> io::Result<()> {
     918            0 :             Ok(())
     919            0 :         }
     920              :     }
     921              : 
     922              :     #[test]
     923            1 :     fn test_field_collection() {
     924            1 :         let clock = Arc::new(TestClock {
     925            1 :             current_time: Mutex::new(Utc::now()),
     926            1 :         });
     927            1 :         let buffer = Arc::new(Mutex::new(Vec::new()));
     928            1 :         let log_layer = JsonLoggingLayer {
     929            1 :             clock: clock.clone(),
     930            1 :             skipped_field_indices: papaya::HashMap::default(),
     931            1 :             writer: buffer.clone(),
     932            1 :         };
     933            1 : 
     934            1 :         let registry = tracing_subscriber::Registry::default().with(log_layer);
     935            1 : 
     936            1 :         tracing::subscriber::with_default(registry, || {
     937            1 :             info_span!("span1", x = 40, x = 41, x = 42).in_scope(|| {
     938            1 :                 info_span!("span2").in_scope(|| {
     939            1 :                     tracing::error!(
     940              :                         a = 1,
     941              :                         a = 2,
     942              :                         a = 3,
     943              :                         message = "explicit message field",
     944            0 :                         "implicit message field"
     945              :                     );
     946            1 :                 });
     947            1 :             });
     948            1 :         });
     949            1 : 
     950            1 :         let buffer = Arc::try_unwrap(buffer)
     951            1 :             .expect("no other reference")
     952            1 :             .into_inner()
     953            1 :             .expect("poisoned");
     954            1 :         let actual: serde_json::Value = serde_json::from_slice(&buffer).expect("valid JSON");
     955            1 :         let expected: serde_json::Value = serde_json::json!(
     956            1 :             {
     957            1 :                 "timestamp": clock.now().to_rfc3339_opts(chrono::SecondsFormat::Micros, true),
     958            1 :                 "level": "ERROR",
     959            1 :                 "message": "explicit message field",
     960            1 :                 "fields": {
     961            1 :                     "a": 3,
     962            1 :                 },
     963            1 :                 "spans": {
     964            1 :                     "00":{
     965            1 :                         "span_id": "0000000000000001",
     966            1 :                         "span_name": "span1",
     967            1 :                         "x": 42,
     968            1 :                     },
     969            1 :                     "01": {
     970            1 :                         "span_id": "0000000000000002",
     971            1 :                         "span_name": "span2",
     972            1 :                     }
     973            1 :                 },
     974            1 :                 "src": actual.as_object().unwrap().get("src").unwrap().as_str().unwrap(),
     975            1 :                 "target": "proxy::logging::tests",
     976            1 :                 "process_id": actual.as_object().unwrap().get("process_id").unwrap().as_number().unwrap(),
     977            1 :                 "thread_id": actual.as_object().unwrap().get("thread_id").unwrap().as_number().unwrap(),
     978            1 :                 "thread_name": "logging::tests::test_field_collection",
     979            1 :             }
     980            1 :         );
     981            1 : 
     982            1 :         assert_json_eq!(actual, expected);
     983            1 :     }
     984              : }
        

Generated by: LCOV version 2.1-beta