LCOV - code coverage report
Current view: top level - proxy/src - logging.rs (source / functions) Coverage Total Hit
Test: 6c6fe25ecc82be7eef3e957667d85acf2b969737.info Lines: 58.1 % 681 396
Test Date: 2025-04-16 17:36:39 Functions: 49.5 % 103 51

            Line data    Source code
       1              : use std::cell::{Cell, RefCell};
       2              : use std::collections::HashMap;
       3              : use std::hash::BuildHasher;
       4              : use std::sync::atomic::{AtomicU32, Ordering};
       5              : use std::{array, env, fmt, io};
       6              : 
       7              : use chrono::{DateTime, Utc};
       8              : use indexmap::IndexSet;
       9              : use opentelemetry::trace::TraceContextExt;
      10              : use scopeguard::defer;
      11              : use serde::ser::{SerializeMap, Serializer};
      12              : use tracing::subscriber::Interest;
      13              : use tracing::{Event, Metadata, Span, Subscriber, callsite, span};
      14              : use tracing_opentelemetry::OpenTelemetrySpanExt;
      15              : use tracing_subscriber::filter::{EnvFilter, LevelFilter};
      16              : use tracing_subscriber::fmt::format::{Format, Full};
      17              : use tracing_subscriber::fmt::time::SystemTime;
      18              : use tracing_subscriber::fmt::{FormatEvent, FormatFields};
      19              : use tracing_subscriber::layer::{Context, Layer};
      20              : use tracing_subscriber::prelude::*;
      21              : use tracing_subscriber::registry::{LookupSpan, SpanRef};
      22              : use try_lock::TryLock;
      23              : 
      24              : /// Initialize logging and OpenTelemetry tracing and exporter.
      25              : ///
      26              : /// Logging can be configured using `RUST_LOG` environment variable.
      27              : ///
      28              : /// OpenTelemetry is configured with OTLP/HTTP exporter. It picks up
      29              : /// configuration from environment variables. For example, to change the
      30              : /// destination, set `OTEL_EXPORTER_OTLP_ENDPOINT=http://jaeger:4318`.
      31              : /// See <https://opentelemetry.io/docs/reference/specification/sdk-environment-variables>
      32            0 : pub fn init() -> anyhow::Result<LoggingGuard> {
      33            0 :     let logfmt = LogFormat::from_env()?;
      34              : 
      35            0 :     let env_filter = EnvFilter::builder()
      36            0 :         .with_default_directive(LevelFilter::INFO.into())
      37            0 :         .from_env_lossy()
      38            0 :         .add_directive(
      39            0 :             "aws_config=info"
      40            0 :                 .parse()
      41            0 :                 .expect("this should be a valid filter directive"),
      42            0 :         )
      43            0 :         .add_directive(
      44            0 :             "azure_core::policies::transport=off"
      45            0 :                 .parse()
      46            0 :                 .expect("this should be a valid filter directive"),
      47            0 :         );
      48            0 : 
      49            0 :     let provider = tracing_utils::init_tracing("proxy", tracing_utils::ExportConfig::default());
      50            0 :     let otlp_layer = provider.as_ref().map(tracing_utils::layer);
      51              : 
      52            0 :     let json_log_layer = if logfmt == LogFormat::Json {
      53            0 :         Some(JsonLoggingLayer::new(
      54            0 :             RealClock,
      55            0 :             StderrWriter {
      56            0 :                 stderr: std::io::stderr(),
      57            0 :             },
      58            0 :             ["request_id", "session_id", "conn_id"],
      59            0 :         ))
      60              :     } else {
      61            0 :         None
      62              :     };
      63              : 
      64            0 :     let text_log_layer = if logfmt == LogFormat::Text {
      65            0 :         Some(
      66            0 :             tracing_subscriber::fmt::layer()
      67            0 :                 .with_ansi(false)
      68            0 :                 .with_writer(std::io::stderr)
      69            0 :                 .with_target(false),
      70            0 :         )
      71              :     } else {
      72            0 :         None
      73              :     };
      74              : 
      75            0 :     tracing_subscriber::registry()
      76            0 :         .with(env_filter)
      77            0 :         .with(otlp_layer)
      78            0 :         .with(json_log_layer)
      79            0 :         .with(text_log_layer)
      80            0 :         .try_init()?;
      81              : 
      82            0 :     Ok(LoggingGuard(provider))
      83            0 : }
      84              : 
      85              : /// Initialize logging for local_proxy with log prefix and no opentelemetry.
      86              : ///
      87              : /// Logging can be configured using `RUST_LOG` environment variable.
      88            0 : pub fn init_local_proxy() -> anyhow::Result<LoggingGuard> {
      89            0 :     let env_filter = EnvFilter::builder()
      90            0 :         .with_default_directive(LevelFilter::INFO.into())
      91            0 :         .from_env_lossy();
      92            0 : 
      93            0 :     let fmt_layer = tracing_subscriber::fmt::layer()
      94            0 :         .with_ansi(false)
      95            0 :         .with_writer(std::io::stderr)
      96            0 :         .event_format(LocalProxyFormatter(Format::default().with_target(false)));
      97            0 : 
      98            0 :     tracing_subscriber::registry()
      99            0 :         .with(env_filter)
     100            0 :         .with(fmt_layer)
     101            0 :         .try_init()?;
     102              : 
     103            0 :     Ok(LoggingGuard(None))
     104            0 : }
     105              : 
     106              : pub struct LocalProxyFormatter(Format<Full, SystemTime>);
     107              : 
     108              : impl<S, N> FormatEvent<S, N> for LocalProxyFormatter
     109              : where
     110              :     S: Subscriber + for<'a> LookupSpan<'a>,
     111              :     N: for<'a> FormatFields<'a> + 'static,
     112              : {
     113            0 :     fn format_event(
     114            0 :         &self,
     115            0 :         ctx: &tracing_subscriber::fmt::FmtContext<'_, S, N>,
     116            0 :         mut writer: tracing_subscriber::fmt::format::Writer<'_>,
     117            0 :         event: &tracing::Event<'_>,
     118            0 :     ) -> std::fmt::Result {
     119            0 :         writer.write_str("[local_proxy] ")?;
     120            0 :         self.0.format_event(ctx, writer, event)
     121            0 :     }
     122              : }
     123              : 
     124              : pub struct LoggingGuard(Option<tracing_utils::Provider>);
     125              : 
     126              : impl Drop for LoggingGuard {
     127            0 :     fn drop(&mut self) {
     128            0 :         if let Some(p) = &self.0 {
     129              :             // Shutdown trace pipeline gracefully, so that it has a chance to send any
     130              :             // pending traces before we exit.
     131            0 :             tracing::info!("shutting down the tracing machinery");
     132            0 :             drop(p.shutdown());
     133            0 :         }
     134            0 :     }
     135              : }
     136              : 
     137              : // TODO: make JSON the default
     138              : #[derive(Copy, Clone, PartialEq, Eq, Default, Debug)]
     139              : enum LogFormat {
     140              :     #[default]
     141              :     Text = 1,
     142              :     Json,
     143              : }
     144              : 
     145              : impl LogFormat {
     146            0 :     fn from_env() -> anyhow::Result<Self> {
     147            0 :         let logfmt = env::var("LOGFMT");
     148            0 :         Ok(match logfmt.as_deref() {
     149            0 :             Err(_) => LogFormat::default(),
     150            0 :             Ok("text") => LogFormat::Text,
     151            0 :             Ok("json") => LogFormat::Json,
     152            0 :             Ok(logfmt) => anyhow::bail!("unknown log format: {logfmt}"),
     153              :         })
     154            0 :     }
     155              : }
     156              : 
     157              : trait MakeWriter {
     158              :     fn make_writer(&self) -> impl io::Write;
     159              : }
     160              : 
     161              : struct StderrWriter {
     162              :     stderr: io::Stderr,
     163              : }
     164              : 
     165              : impl MakeWriter for StderrWriter {
     166              :     #[inline]
     167            0 :     fn make_writer(&self) -> impl io::Write {
     168            0 :         self.stderr.lock()
     169            0 :     }
     170              : }
     171              : 
     172              : // TODO: move into separate module or even separate crate.
     173              : trait Clock {
     174              :     fn now(&self) -> DateTime<Utc>;
     175              : }
     176              : 
     177              : struct RealClock;
     178              : 
     179              : impl Clock for RealClock {
     180              :     #[inline]
     181            0 :     fn now(&self) -> DateTime<Utc> {
     182            0 :         Utc::now()
     183            0 :     }
     184              : }
     185              : 
     186              : /// Name of the field used by tracing crate to store the event message.
     187              : const MESSAGE_FIELD: &str = "message";
     188              : 
     189              : thread_local! {
     190              :     /// Protects against deadlocks and double panics during log writing.
     191              :     /// The current panic handler will use tracing to log panic information.
     192              :     static REENTRANCY_GUARD: Cell<bool> = const { Cell::new(false) };
     193              :     /// Thread-local instance with per-thread buffer for log writing.
     194              :     static EVENT_FORMATTER: RefCell<EventFormatter> = RefCell::new(EventFormatter::new());
     195              :     /// Cached OS thread ID.
     196              :     static THREAD_ID: u64 = gettid::gettid();
     197              : }
     198              : 
     199              : /// Implements tracing layer to handle events specific to logging.
     200              : struct JsonLoggingLayer<C: Clock, W: MakeWriter, const F: usize> {
     201              :     clock: C,
     202              :     skipped_field_indices: papaya::HashMap<callsite::Identifier, SkippedFieldIndices>,
     203              :     callsite_ids: papaya::HashMap<callsite::Identifier, CallsiteId>,
     204              :     writer: W,
     205              :     // We use a const generic and arrays to bypass one heap allocation.
     206              :     extract_fields: IndexSet<&'static str>,
     207              :     _marker: std::marker::PhantomData<[&'static str; F]>,
     208              : }
     209              : 
     210              : impl<C: Clock, W: MakeWriter, const F: usize> JsonLoggingLayer<C, W, F> {
     211            0 :     fn new(clock: C, writer: W, extract_fields: [&'static str; F]) -> Self {
     212            0 :         JsonLoggingLayer {
     213            0 :             clock,
     214            0 :             skipped_field_indices: papaya::HashMap::default(),
     215            0 :             callsite_ids: papaya::HashMap::default(),
     216            0 :             writer,
     217            0 :             extract_fields: IndexSet::from_iter(extract_fields),
     218            0 :             _marker: std::marker::PhantomData,
     219            0 :         }
     220            0 :     }
     221              : 
     222              :     #[inline]
     223            2 :     fn callsite_id(&self, cs: callsite::Identifier) -> CallsiteId {
     224            2 :         *self
     225            2 :             .callsite_ids
     226            2 :             .pin()
     227            2 :             .get_or_insert_with(cs, CallsiteId::next)
     228            2 :     }
     229              : }
     230              : 
     231              : impl<S, C: Clock + 'static, W: MakeWriter + 'static, const F: usize> Layer<S>
     232              :     for JsonLoggingLayer<C, W, F>
     233              : where
     234              :     S: Subscriber + for<'a> LookupSpan<'a>,
     235              : {
     236            1 :     fn on_event(&self, event: &Event<'_>, ctx: Context<'_, S>) {
     237              :         use std::io::Write;
     238              : 
     239              :         // TODO: consider special tracing subscriber to grab timestamp very
     240              :         //       early, before OTel machinery, and add as event extension.
     241            1 :         let now = self.clock.now();
     242            1 : 
     243            1 :         let res: io::Result<()> = REENTRANCY_GUARD.with(move |entered| {
     244            1 :             if entered.get() {
     245            0 :                 let mut formatter = EventFormatter::new();
     246            0 :                 formatter.format::<S, F>(
     247            0 :                     now,
     248            0 :                     event,
     249            0 :                     &ctx,
     250            0 :                     &self.skipped_field_indices,
     251            0 :                     &self.callsite_ids,
     252            0 :                     &self.extract_fields,
     253            0 :                 )?;
     254            0 :                 self.writer.make_writer().write_all(formatter.buffer())
     255              :             } else {
     256            1 :                 entered.set(true);
     257            1 :                 defer!(entered.set(false););
     258            1 : 
     259            1 :                 EVENT_FORMATTER.with_borrow_mut(move |formatter| {
     260            1 :                     formatter.reset();
     261            1 :                     formatter.format::<S, F>(
     262            1 :                         now,
     263            1 :                         event,
     264            1 :                         &ctx,
     265            1 :                         &self.skipped_field_indices,
     266            1 :                         &self.callsite_ids,
     267            1 :                         &self.extract_fields,
     268            1 :                     )?;
     269            1 :                     self.writer.make_writer().write_all(formatter.buffer())
     270            1 :                 })
     271              :             }
     272            1 :         });
     273              : 
     274              :         // In case logging fails we generate a simpler JSON object.
     275            1 :         if let Err(err) = res {
     276            0 :             if let Ok(mut line) = serde_json::to_vec(&serde_json::json!( {
     277            0 :                 "timestamp": now.to_rfc3339_opts(chrono::SecondsFormat::Micros, true),
     278            0 :                 "level": "ERROR",
     279            0 :                 "message": format_args!("cannot log event: {err:?}"),
     280            0 :                 "fields": {
     281            0 :                     "event": format_args!("{event:?}"),
     282            0 :                 },
     283            0 :             })) {
     284            0 :                 line.push(b'\n');
     285            0 :                 self.writer.make_writer().write_all(&line).ok();
     286            0 :             }
     287            1 :         }
     288            1 :     }
     289              : 
     290              :     /// Registers a SpanFields instance as span extension.
     291            2 :     fn on_new_span(&self, attrs: &span::Attributes<'_>, id: &span::Id, ctx: Context<'_, S>) {
     292            2 :         let span = ctx.span(id).expect("span must exist");
     293            2 :         let fields = SpanFields::default();
     294            2 :         fields.record_fields(attrs);
     295            2 : 
     296            2 :         // This could deadlock when there's a panic somewhere in the tracing
     297            2 :         // event handling and a read or write guard is still held. This includes
     298            2 :         // the OTel subscriber.
     299            2 :         let mut exts = span.extensions_mut();
     300            2 : 
     301            2 :         exts.insert(fields);
     302            2 :     }
     303              : 
     304            0 :     fn on_record(&self, id: &span::Id, values: &span::Record<'_>, ctx: Context<'_, S>) {
     305            0 :         let span = ctx.span(id).expect("span must exist");
     306            0 :         let ext = span.extensions();
     307            0 :         if let Some(data) = ext.get::<SpanFields>() {
     308            0 :             data.record_fields(values);
     309            0 :         }
     310            0 :     }
     311              : 
     312              :     /// Called (lazily) whenever a new log call is executed. We quickly check
     313              :     /// for duplicate field names and record duplicates as skippable. Last one
     314              :     /// wins.
     315            3 :     fn register_callsite(&self, metadata: &'static Metadata<'static>) -> Interest {
     316            3 :         if !metadata.is_event() {
     317            2 :             self.callsite_id(metadata.callsite());
     318            2 :             // Must not be never because we wouldn't get trace and span data.
     319            2 :             return Interest::always();
     320            1 :         }
     321            1 : 
     322            1 :         let mut field_indices = SkippedFieldIndices::default();
     323            1 :         let mut seen_fields = HashMap::<&'static str, usize>::new();
     324            5 :         for field in metadata.fields() {
     325              :             use std::collections::hash_map::Entry;
     326            5 :             match seen_fields.entry(field.name()) {
     327            2 :                 Entry::Vacant(entry) => {
     328            2 :                     // field not seen yet
     329            2 :                     entry.insert(field.index());
     330            2 :                 }
     331            3 :                 Entry::Occupied(mut entry) => {
     332            3 :                     // replace currently stored index
     333            3 :                     let old_index = entry.insert(field.index());
     334            3 :                     // ... and append it to list of skippable indices
     335            3 :                     field_indices.push(old_index);
     336            3 :                 }
     337              :             }
     338              :         }
     339              : 
     340            1 :         if !field_indices.is_empty() {
     341            1 :             self.skipped_field_indices
     342            1 :                 .pin()
     343            1 :                 .insert(metadata.callsite(), field_indices);
     344            1 :         }
     345              : 
     346            1 :         Interest::always()
     347            3 :     }
     348              : }
     349              : 
     350              : #[derive(Copy, Clone, Debug, Default)]
     351              : #[repr(transparent)]
     352              : struct CallsiteId(u32);
     353              : 
     354              : impl CallsiteId {
     355              :     #[inline]
     356            2 :     fn next() -> Self {
     357              :         // Start at 1 to reserve 0 for default.
     358              :         static COUNTER: AtomicU32 = AtomicU32::new(1);
     359            2 :         CallsiteId(COUNTER.fetch_add(1, Ordering::Relaxed))
     360            2 :     }
     361              : }
     362              : 
     363              : impl fmt::Display for CallsiteId {
     364              :     #[inline]
     365            2 :     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
     366            2 :         self.0.fmt(f)
     367            2 :     }
     368              : }
     369              : 
     370              : /// Stores span field values recorded during the spans lifetime.
     371              : #[derive(Default)]
     372              : struct SpanFields {
     373              :     // TODO: Switch to custom enum with lasso::Spur for Strings?
     374              :     fields: papaya::HashMap<&'static str, serde_json::Value>,
     375              : }
     376              : 
     377              : impl SpanFields {
     378              :     #[inline]
     379            2 :     fn record_fields<R: tracing_subscriber::field::RecordFields>(&self, fields: R) {
     380            2 :         fields.record(&mut SpanFieldsRecorder {
     381            2 :             fields: self.fields.pin(),
     382            2 :         });
     383            2 :     }
     384              : }
     385              : 
     386              : /// Implements a tracing field visitor to convert and store values.
     387              : struct SpanFieldsRecorder<'m, S, G> {
     388              :     fields: papaya::HashMapRef<'m, &'static str, serde_json::Value, S, G>,
     389              : }
     390              : 
     391              : impl<S: BuildHasher, G: papaya::Guard> tracing::field::Visit for SpanFieldsRecorder<'_, S, G> {
     392              :     #[inline]
     393            0 :     fn record_f64(&mut self, field: &tracing::field::Field, value: f64) {
     394            0 :         self.fields
     395            0 :             .insert(field.name(), serde_json::Value::from(value));
     396            0 :     }
     397              : 
     398              :     #[inline]
     399            4 :     fn record_i64(&mut self, field: &tracing::field::Field, value: i64) {
     400            4 :         self.fields
     401            4 :             .insert(field.name(), serde_json::Value::from(value));
     402            4 :     }
     403              : 
     404              :     #[inline]
     405            0 :     fn record_u64(&mut self, field: &tracing::field::Field, value: u64) {
     406            0 :         self.fields
     407            0 :             .insert(field.name(), serde_json::Value::from(value));
     408            0 :     }
     409              : 
     410              :     #[inline]
     411            0 :     fn record_i128(&mut self, field: &tracing::field::Field, value: i128) {
     412            0 :         if let Ok(value) = i64::try_from(value) {
     413            0 :             self.fields
     414            0 :                 .insert(field.name(), serde_json::Value::from(value));
     415            0 :         } else {
     416            0 :             self.fields
     417            0 :                 .insert(field.name(), serde_json::Value::from(format!("{value}")));
     418            0 :         }
     419            0 :     }
     420              : 
     421              :     #[inline]
     422            0 :     fn record_u128(&mut self, field: &tracing::field::Field, value: u128) {
     423            0 :         if let Ok(value) = u64::try_from(value) {
     424            0 :             self.fields
     425            0 :                 .insert(field.name(), serde_json::Value::from(value));
     426            0 :         } else {
     427            0 :             self.fields
     428            0 :                 .insert(field.name(), serde_json::Value::from(format!("{value}")));
     429            0 :         }
     430            0 :     }
     431              : 
     432              :     #[inline]
     433            0 :     fn record_bool(&mut self, field: &tracing::field::Field, value: bool) {
     434            0 :         self.fields
     435            0 :             .insert(field.name(), serde_json::Value::from(value));
     436            0 :     }
     437              : 
     438              :     #[inline]
     439            0 :     fn record_bytes(&mut self, field: &tracing::field::Field, value: &[u8]) {
     440            0 :         self.fields
     441            0 :             .insert(field.name(), serde_json::Value::from(value));
     442            0 :     }
     443              : 
     444              :     #[inline]
     445            0 :     fn record_str(&mut self, field: &tracing::field::Field, value: &str) {
     446            0 :         self.fields
     447            0 :             .insert(field.name(), serde_json::Value::from(value));
     448            0 :     }
     449              : 
     450              :     #[inline]
     451            0 :     fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) {
     452            0 :         self.fields
     453            0 :             .insert(field.name(), serde_json::Value::from(format!("{value:?}")));
     454            0 :     }
     455              : 
     456              :     #[inline]
     457            0 :     fn record_error(
     458            0 :         &mut self,
     459            0 :         field: &tracing::field::Field,
     460            0 :         value: &(dyn std::error::Error + 'static),
     461            0 :     ) {
     462            0 :         self.fields
     463            0 :             .insert(field.name(), serde_json::Value::from(format!("{value}")));
     464            0 :     }
     465              : }
     466              : 
     467              : /// List of field indices skipped during logging. Can list duplicate fields or
     468              : /// metafields not meant to be logged.
     469              : #[derive(Clone, Default)]
     470              : struct SkippedFieldIndices {
     471              :     bits: u64,
     472              : }
     473              : 
     474              : impl SkippedFieldIndices {
     475              :     #[inline]
     476            1 :     fn is_empty(&self) -> bool {
     477            1 :         self.bits == 0
     478            1 :     }
     479              : 
     480              :     #[inline]
     481            3 :     fn push(&mut self, index: usize) {
     482            3 :         self.bits |= 1u64
     483            3 :             .checked_shl(index as u32)
     484            3 :             .expect("field index too large");
     485            3 :     }
     486              : 
     487              :     #[inline]
     488           10 :     fn contains(&self, index: usize) -> bool {
     489           10 :         self.bits
     490           10 :             & 1u64
     491           10 :                 .checked_shl(index as u32)
     492           10 :                 .expect("field index too large")
     493           10 :             != 0
     494           10 :     }
     495              : }
     496              : 
     497              : /// Formats a tracing event and writes JSON to its internal buffer including a newline.
     498              : // TODO: buffer capacity management, truncate if too large
     499              : struct EventFormatter {
     500              :     logline_buffer: Vec<u8>,
     501              : }
     502              : 
     503              : impl EventFormatter {
     504              :     #[inline]
     505            1 :     fn new() -> Self {
     506            1 :         EventFormatter {
     507            1 :             logline_buffer: Vec::new(),
     508            1 :         }
     509            1 :     }
     510              : 
     511              :     #[inline]
     512            1 :     fn buffer(&self) -> &[u8] {
     513            1 :         &self.logline_buffer
     514            1 :     }
     515              : 
     516              :     #[inline]
     517            1 :     fn reset(&mut self) {
     518            1 :         self.logline_buffer.clear();
     519            1 :     }
     520              : 
     521            1 :     fn format<S, const F: usize>(
     522            1 :         &mut self,
     523            1 :         now: DateTime<Utc>,
     524            1 :         event: &Event<'_>,
     525            1 :         ctx: &Context<'_, S>,
     526            1 :         skipped_field_indices: &papaya::HashMap<callsite::Identifier, SkippedFieldIndices>,
     527            1 :         callsite_ids: &papaya::HashMap<callsite::Identifier, CallsiteId>,
     528            1 :         extract_fields: &IndexSet<&'static str>,
     529            1 :     ) -> io::Result<()>
     530            1 :     where
     531            1 :         S: Subscriber + for<'a> LookupSpan<'a>,
     532            1 :     {
     533            1 :         let timestamp = now.to_rfc3339_opts(chrono::SecondsFormat::Micros, true);
     534              : 
     535              :         use tracing_log::NormalizeEvent;
     536            1 :         let normalized_meta = event.normalized_metadata();
     537            1 :         let meta = normalized_meta.as_ref().unwrap_or_else(|| event.metadata());
     538            1 : 
     539            1 :         let skipped_field_indices = skipped_field_indices.pin();
     540            1 :         let skipped_field_indices = skipped_field_indices.get(&meta.callsite());
     541            1 : 
     542            1 :         let mut serialize = || {
     543            1 :             let mut serializer = serde_json::Serializer::new(&mut self.logline_buffer);
     544              : 
     545            1 :             let mut serializer = serializer.serialize_map(None)?;
     546              : 
     547              :             // Timestamp comes first, so raw lines can be sorted by timestamp.
     548            1 :             serializer.serialize_entry("timestamp", &timestamp)?;
     549              : 
     550              :             // Level next.
     551            1 :             serializer.serialize_entry("level", &meta.level().as_str())?;
     552              : 
     553              :             // Message next.
     554            1 :             serializer.serialize_key("message")?;
     555            1 :             let mut message_extractor =
     556            1 :                 MessageFieldExtractor::new(serializer, skipped_field_indices);
     557            1 :             event.record(&mut message_extractor);
     558            1 :             let mut serializer = message_extractor.into_serializer()?;
     559              : 
     560              :             // Direct message fields.
     561            1 :             let mut fields_present = FieldsPresent(false, skipped_field_indices);
     562            1 :             event.record(&mut fields_present);
     563            1 :             if fields_present.0 {
     564            1 :                 serializer.serialize_entry(
     565            1 :                     "fields",
     566            1 :                     &SerializableEventFields(event, skipped_field_indices),
     567            1 :                 )?;
     568            0 :             }
     569              : 
     570            1 :             let spans = SerializableSpans {
     571            1 :                 ctx,
     572            1 :                 callsite_ids,
     573            1 :                 extract: ExtractedSpanFields::<'_, F>::new(extract_fields),
     574            1 :             };
     575            1 :             serializer.serialize_entry("spans", &spans)?;
     576              : 
     577              :             // TODO: thread-local cache?
     578            1 :             let pid = std::process::id();
     579            1 :             // Skip adding pid 1 to reduce noise for services running in containers.
     580            1 :             if pid != 1 {
     581            1 :                 serializer.serialize_entry("process_id", &pid)?;
     582            0 :             }
     583              : 
     584            1 :             THREAD_ID.with(|tid| serializer.serialize_entry("thread_id", tid))?;
     585              : 
     586              :             // TODO: tls cache? name could change
     587            1 :             if let Some(thread_name) = std::thread::current().name() {
     588            1 :                 if !thread_name.is_empty() && thread_name != "tokio-runtime-worker" {
     589            1 :                     serializer.serialize_entry("thread_name", thread_name)?;
     590            0 :                 }
     591            0 :             }
     592              : 
     593            1 :             if let Some(task_id) = tokio::task::try_id() {
     594            0 :                 serializer.serialize_entry("task_id", &format_args!("{task_id}"))?;
     595            1 :             }
     596              : 
     597            1 :             serializer.serialize_entry("target", meta.target())?;
     598              : 
     599              :             // Skip adding module if it's the same as target.
     600            1 :             if let Some(module) = meta.module_path() {
     601            1 :                 if module != meta.target() {
     602            0 :                     serializer.serialize_entry("module", module)?;
     603            1 :                 }
     604            0 :             }
     605              : 
     606            1 :             if let Some(file) = meta.file() {
     607            1 :                 if let Some(line) = meta.line() {
     608            1 :                     serializer.serialize_entry("src", &format_args!("{file}:{line}"))?;
     609              :                 } else {
     610            0 :                     serializer.serialize_entry("src", file)?;
     611              :                 }
     612            0 :             }
     613              : 
     614              :             {
     615            1 :                 let otel_context = Span::current().context();
     616            1 :                 let otel_spanref = otel_context.span();
     617            1 :                 let span_context = otel_spanref.span_context();
     618            1 :                 if span_context.is_valid() {
     619            0 :                     serializer.serialize_entry(
     620            0 :                         "trace_id",
     621            0 :                         &format_args!("{}", span_context.trace_id()),
     622            0 :                     )?;
     623            1 :                 }
     624              :             }
     625              : 
     626            1 :             if spans.extract.has_values() {
     627              :                 // TODO: add fields from event, too?
     628            1 :                 serializer.serialize_entry("extract", &spans.extract)?;
     629            0 :             }
     630              : 
     631            1 :             serializer.end()
     632            1 :         };
     633              : 
     634            1 :         serialize().map_err(io::Error::other)?;
     635            1 :         self.logline_buffer.push(b'\n');
     636            1 :         Ok(())
     637            1 :     }
     638              : }
     639              : 
     640              : /// Extracts the message field that's mixed will other fields.
     641              : struct MessageFieldExtractor<'a, S: serde::ser::SerializeMap> {
     642              :     serializer: S,
     643              :     skipped_field_indices: Option<&'a SkippedFieldIndices>,
     644              :     state: Option<Result<(), S::Error>>,
     645              : }
     646              : 
     647              : impl<'a, S: serde::ser::SerializeMap> MessageFieldExtractor<'a, S> {
     648              :     #[inline]
     649            1 :     fn new(serializer: S, skipped_field_indices: Option<&'a SkippedFieldIndices>) -> Self {
     650            1 :         Self {
     651            1 :             serializer,
     652            1 :             skipped_field_indices,
     653            1 :             state: None,
     654            1 :         }
     655            1 :     }
     656              : 
     657              :     #[inline]
     658            1 :     fn into_serializer(mut self) -> Result<S, S::Error> {
     659            1 :         match self.state {
     660            1 :             Some(Ok(())) => {}
     661            0 :             Some(Err(err)) => return Err(err),
     662            0 :             None => self.serializer.serialize_value("")?,
     663              :         }
     664            1 :         Ok(self.serializer)
     665            1 :     }
     666              : 
     667              :     #[inline]
     668            5 :     fn accept_field(&self, field: &tracing::field::Field) -> bool {
     669            5 :         self.state.is_none()
     670            5 :             && field.name() == MESSAGE_FIELD
     671            2 :             && !self
     672            2 :                 .skipped_field_indices
     673            2 :                 .is_some_and(|i| i.contains(field.index()))
     674            5 :     }
     675              : }
     676              : 
     677              : impl<S: serde::ser::SerializeMap> tracing::field::Visit for MessageFieldExtractor<'_, S> {
     678              :     #[inline]
     679            0 :     fn record_f64(&mut self, field: &tracing::field::Field, value: f64) {
     680            0 :         if self.accept_field(field) {
     681            0 :             self.state = Some(self.serializer.serialize_value(&value));
     682            0 :         }
     683            0 :     }
     684              : 
     685              :     #[inline]
     686            3 :     fn record_i64(&mut self, field: &tracing::field::Field, value: i64) {
     687            3 :         if self.accept_field(field) {
     688            0 :             self.state = Some(self.serializer.serialize_value(&value));
     689            3 :         }
     690            3 :     }
     691              : 
     692              :     #[inline]
     693            0 :     fn record_u64(&mut self, field: &tracing::field::Field, value: u64) {
     694            0 :         if self.accept_field(field) {
     695            0 :             self.state = Some(self.serializer.serialize_value(&value));
     696            0 :         }
     697            0 :     }
     698              : 
     699              :     #[inline]
     700            0 :     fn record_i128(&mut self, field: &tracing::field::Field, value: i128) {
     701            0 :         if self.accept_field(field) {
     702            0 :             self.state = Some(self.serializer.serialize_value(&value));
     703            0 :         }
     704            0 :     }
     705              : 
     706              :     #[inline]
     707            0 :     fn record_u128(&mut self, field: &tracing::field::Field, value: u128) {
     708            0 :         if self.accept_field(field) {
     709            0 :             self.state = Some(self.serializer.serialize_value(&value));
     710            0 :         }
     711            0 :     }
     712              : 
     713              :     #[inline]
     714            0 :     fn record_bool(&mut self, field: &tracing::field::Field, value: bool) {
     715            0 :         if self.accept_field(field) {
     716            0 :             self.state = Some(self.serializer.serialize_value(&value));
     717            0 :         }
     718            0 :     }
     719              : 
     720              :     #[inline]
     721            0 :     fn record_bytes(&mut self, field: &tracing::field::Field, value: &[u8]) {
     722            0 :         if self.accept_field(field) {
     723            0 :             self.state = Some(self.serializer.serialize_value(&format_args!("{value:x?}")));
     724            0 :         }
     725            0 :     }
     726              : 
     727              :     #[inline]
     728            1 :     fn record_str(&mut self, field: &tracing::field::Field, value: &str) {
     729            1 :         if self.accept_field(field) {
     730            1 :             self.state = Some(self.serializer.serialize_value(&value));
     731            1 :         }
     732            1 :     }
     733              : 
     734              :     #[inline]
     735            1 :     fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) {
     736            1 :         if self.accept_field(field) {
     737            0 :             self.state = Some(self.serializer.serialize_value(&format_args!("{value:?}")));
     738            1 :         }
     739            1 :     }
     740              : 
     741              :     #[inline]
     742            0 :     fn record_error(
     743            0 :         &mut self,
     744            0 :         field: &tracing::field::Field,
     745            0 :         value: &(dyn std::error::Error + 'static),
     746            0 :     ) {
     747            0 :         if self.accept_field(field) {
     748            0 :             self.state = Some(self.serializer.serialize_value(&format_args!("{value}")));
     749            0 :         }
     750            0 :     }
     751              : }
     752              : 
     753              : /// Checks if there's any fields and field values present. If not, the JSON subobject
     754              : /// can be skipped.
     755              : // This is entirely optional and only cosmetic, though maybe helps a
     756              : // bit during log parsing in dashboards when there's no field with empty object.
     757              : struct FieldsPresent<'a>(pub bool, Option<&'a SkippedFieldIndices>);
     758              : 
     759              : // Even though some methods have an overhead (error, bytes) it is assumed the
     760              : // compiler won't include this since we ignore the value entirely.
     761              : impl tracing::field::Visit for FieldsPresent<'_> {
     762              :     #[inline]
     763            5 :     fn record_debug(&mut self, field: &tracing::field::Field, _: &dyn std::fmt::Debug) {
     764            5 :         if !self.1.is_some_and(|i| i.contains(field.index()))
     765            2 :             && field.name() != MESSAGE_FIELD
     766            1 :             && !field.name().starts_with("log.")
     767            1 :         {
     768            1 :             self.0 |= true;
     769            4 :         }
     770            5 :     }
     771              : }
     772              : 
     773              : /// Serializes the fields directly supplied with a log event.
     774              : struct SerializableEventFields<'a, 'event>(
     775              :     &'a tracing::Event<'event>,
     776              :     Option<&'a SkippedFieldIndices>,
     777              : );
     778              : 
     779              : impl serde::ser::Serialize for SerializableEventFields<'_, '_> {
     780            1 :     fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
     781            1 :     where
     782            1 :         S: Serializer,
     783            1 :     {
     784              :         use serde::ser::SerializeMap;
     785            1 :         let serializer = serializer.serialize_map(None)?;
     786            1 :         let mut message_skipper = MessageFieldSkipper::new(serializer, self.1);
     787            1 :         self.0.record(&mut message_skipper);
     788            1 :         let serializer = message_skipper.into_serializer()?;
     789            1 :         serializer.end()
     790            1 :     }
     791              : }
     792              : 
     793              : /// A tracing field visitor that skips the message field.
     794              : struct MessageFieldSkipper<'a, S: serde::ser::SerializeMap> {
     795              :     serializer: S,
     796              :     skipped_field_indices: Option<&'a SkippedFieldIndices>,
     797              :     state: Result<(), S::Error>,
     798              : }
     799              : 
     800              : impl<'a, S: serde::ser::SerializeMap> MessageFieldSkipper<'a, S> {
     801              :     #[inline]
     802            1 :     fn new(serializer: S, skipped_field_indices: Option<&'a SkippedFieldIndices>) -> Self {
     803            1 :         Self {
     804            1 :             serializer,
     805            1 :             skipped_field_indices,
     806            1 :             state: Ok(()),
     807            1 :         }
     808            1 :     }
     809              : 
     810              :     #[inline]
     811            5 :     fn accept_field(&self, field: &tracing::field::Field) -> bool {
     812            5 :         self.state.is_ok()
     813            5 :             && field.name() != MESSAGE_FIELD
     814            3 :             && !field.name().starts_with("log.")
     815            3 :             && !self
     816            3 :                 .skipped_field_indices
     817            3 :                 .is_some_and(|i| i.contains(field.index()))
     818            5 :     }
     819              : 
     820              :     #[inline]
     821            1 :     fn into_serializer(self) -> Result<S, S::Error> {
     822            1 :         self.state?;
     823            1 :         Ok(self.serializer)
     824            1 :     }
     825              : }
     826              : 
     827              : impl<S: serde::ser::SerializeMap> tracing::field::Visit for MessageFieldSkipper<'_, S> {
     828              :     #[inline]
     829            0 :     fn record_f64(&mut self, field: &tracing::field::Field, value: f64) {
     830            0 :         if self.accept_field(field) {
     831            0 :             self.state = self.serializer.serialize_entry(field.name(), &value);
     832            0 :         }
     833            0 :     }
     834              : 
     835              :     #[inline]
     836            3 :     fn record_i64(&mut self, field: &tracing::field::Field, value: i64) {
     837            3 :         if self.accept_field(field) {
     838            1 :             self.state = self.serializer.serialize_entry(field.name(), &value);
     839            2 :         }
     840            3 :     }
     841              : 
     842              :     #[inline]
     843            0 :     fn record_u64(&mut self, field: &tracing::field::Field, value: u64) {
     844            0 :         if self.accept_field(field) {
     845            0 :             self.state = self.serializer.serialize_entry(field.name(), &value);
     846            0 :         }
     847            0 :     }
     848              : 
     849              :     #[inline]
     850            0 :     fn record_i128(&mut self, field: &tracing::field::Field, value: i128) {
     851            0 :         if self.accept_field(field) {
     852            0 :             self.state = self.serializer.serialize_entry(field.name(), &value);
     853            0 :         }
     854            0 :     }
     855              : 
     856              :     #[inline]
     857            0 :     fn record_u128(&mut self, field: &tracing::field::Field, value: u128) {
     858            0 :         if self.accept_field(field) {
     859            0 :             self.state = self.serializer.serialize_entry(field.name(), &value);
     860            0 :         }
     861            0 :     }
     862              : 
     863              :     #[inline]
     864            0 :     fn record_bool(&mut self, field: &tracing::field::Field, value: bool) {
     865            0 :         if self.accept_field(field) {
     866            0 :             self.state = self.serializer.serialize_entry(field.name(), &value);
     867            0 :         }
     868            0 :     }
     869              : 
     870              :     #[inline]
     871            0 :     fn record_bytes(&mut self, field: &tracing::field::Field, value: &[u8]) {
     872            0 :         if self.accept_field(field) {
     873            0 :             self.state = self
     874            0 :                 .serializer
     875            0 :                 .serialize_entry(field.name(), &format_args!("{value:x?}"));
     876            0 :         }
     877            0 :     }
     878              : 
     879              :     #[inline]
     880            1 :     fn record_str(&mut self, field: &tracing::field::Field, value: &str) {
     881            1 :         if self.accept_field(field) {
     882            0 :             self.state = self.serializer.serialize_entry(field.name(), &value);
     883            1 :         }
     884            1 :     }
     885              : 
     886              :     #[inline]
     887            1 :     fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) {
     888            1 :         if self.accept_field(field) {
     889            0 :             self.state = self
     890            0 :                 .serializer
     891            0 :                 .serialize_entry(field.name(), &format_args!("{value:?}"));
     892            1 :         }
     893            1 :     }
     894              : 
     895              :     #[inline]
     896            0 :     fn record_error(
     897            0 :         &mut self,
     898            0 :         field: &tracing::field::Field,
     899            0 :         value: &(dyn std::error::Error + 'static),
     900            0 :     ) {
     901            0 :         if self.accept_field(field) {
     902            0 :             self.state = self.serializer.serialize_value(&format_args!("{value}"));
     903            0 :         }
     904            0 :     }
     905              : }
     906              : 
     907              : /// Serializes the span stack from root to leaf (parent of event) as object
     908              : /// with the span names as keys. To prevent collision we append a numberic value
     909              : /// to the name. Also, collects any span fields we're interested in. Last one
     910              : /// wins.
     911              : struct SerializableSpans<'a, 'ctx, Span, const F: usize>
     912              : where
     913              :     Span: Subscriber + for<'lookup> LookupSpan<'lookup>,
     914              : {
     915              :     ctx: &'a Context<'ctx, Span>,
     916              :     callsite_ids: &'a papaya::HashMap<callsite::Identifier, CallsiteId>,
     917              :     extract: ExtractedSpanFields<'a, F>,
     918              : }
     919              : 
     920              : impl<Span, const F: usize> serde::ser::Serialize for SerializableSpans<'_, '_, Span, F>
     921              : where
     922              :     Span: Subscriber + for<'lookup> LookupSpan<'lookup>,
     923              : {
     924            1 :     fn serialize<Ser>(&self, serializer: Ser) -> Result<Ser::Ok, Ser::Error>
     925            1 :     where
     926            1 :         Ser: serde::ser::Serializer,
     927            1 :     {
     928            1 :         let mut serializer = serializer.serialize_map(None)?;
     929              : 
     930            1 :         if let Some(leaf_span) = self.ctx.lookup_current() {
     931            2 :             for span in leaf_span.scope().from_root() {
     932              :                 // Append a numeric callsite ID to the span name to keep the name unique
     933              :                 // in the JSON object.
     934            2 :                 let cid = self
     935            2 :                     .callsite_ids
     936            2 :                     .pin()
     937            2 :                     .get(&span.metadata().callsite())
     938            2 :                     .copied()
     939            2 :                     .unwrap_or_default();
     940            2 : 
     941            2 :                 // Loki turns the # into an underscore during field name concatenation.
     942            2 :                 serializer.serialize_key(&format_args!("{}#{}", span.metadata().name(), &cid))?;
     943              : 
     944            2 :                 serializer.serialize_value(&SerializableSpanFields {
     945            2 :                     span: &span,
     946            2 :                     extract: &self.extract,
     947            2 :                 })?;
     948              :             }
     949            0 :         }
     950              : 
     951            1 :         serializer.end()
     952            1 :     }
     953              : }
     954              : 
     955              : /// Serializes the span fields as object.
     956              : struct SerializableSpanFields<'a, 'span, Span, const F: usize>
     957              : where
     958              :     Span: for<'lookup> LookupSpan<'lookup>,
     959              : {
     960              :     span: &'a SpanRef<'span, Span>,
     961              :     extract: &'a ExtractedSpanFields<'a, F>,
     962              : }
     963              : 
     964              : impl<Span, const F: usize> serde::ser::Serialize for SerializableSpanFields<'_, '_, Span, F>
     965              : where
     966              :     Span: for<'lookup> LookupSpan<'lookup>,
     967              : {
     968            2 :     fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
     969            2 :     where
     970            2 :         S: serde::ser::Serializer,
     971            2 :     {
     972            2 :         let mut serializer = serializer.serialize_map(None)?;
     973              : 
     974            2 :         let ext = self.span.extensions();
     975            2 :         if let Some(data) = ext.get::<SpanFields>() {
     976            2 :             for (name, value) in &data.fields.pin() {
     977            2 :                 serializer.serialize_entry(name, value)?;
     978              :                 // TODO: replace clone with reference, if possible.
     979            2 :                 self.extract.set(name, value.clone());
     980              :             }
     981            0 :         }
     982              : 
     983            2 :         serializer.end()
     984            2 :     }
     985              : }
     986              : 
     987              : struct ExtractedSpanFields<'a, const F: usize> {
     988              :     names: &'a IndexSet<&'static str>,
     989              :     // TODO: replace TryLock with something local thread and interior mutability.
     990              :     //       serde API doesn't let us use `mut`.
     991              :     values: TryLock<([Option<serde_json::Value>; F], bool)>,
     992              : }
     993              : 
     994              : impl<'a, const F: usize> ExtractedSpanFields<'a, F> {
     995            1 :     fn new(names: &'a IndexSet<&'static str>) -> Self {
     996            1 :         ExtractedSpanFields {
     997            1 :             names,
     998            1 :             values: TryLock::new((array::from_fn(|_| Option::default()), false)),
     999            1 :         }
    1000            1 :     }
    1001              : 
    1002              :     #[inline]
    1003            2 :     fn set(&self, name: &'static str, value: serde_json::Value) {
    1004            2 :         if let Some((index, _)) = self.names.get_full(name) {
    1005            2 :             let mut fields = self.values.try_lock().expect("thread-local use");
    1006            2 :             fields.0[index] = Some(value);
    1007            2 :             fields.1 = true;
    1008            2 :         }
    1009            2 :     }
    1010              : 
    1011              :     #[inline]
    1012            1 :     fn has_values(&self) -> bool {
    1013            1 :         self.values.try_lock().expect("thread-local use").1
    1014            1 :     }
    1015              : }
    1016              : 
    1017              : impl<const F: usize> serde::ser::Serialize for ExtractedSpanFields<'_, F> {
    1018            1 :     fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
    1019            1 :     where
    1020            1 :         S: serde::ser::Serializer,
    1021            1 :     {
    1022            1 :         let mut serializer = serializer.serialize_map(None)?;
    1023              : 
    1024            1 :         let values = self.values.try_lock().expect("thread-local use");
    1025            1 :         for (i, value) in values.0.iter().enumerate() {
    1026            1 :             if let Some(value) = value {
    1027            1 :                 let key = self.names[i];
    1028            1 :                 serializer.serialize_entry(key, value)?;
    1029            0 :             }
    1030              :         }
    1031              : 
    1032            1 :         serializer.end()
    1033            1 :     }
    1034              : }
    1035              : 
    1036              : #[cfg(test)]
    1037              : #[allow(clippy::unwrap_used)]
    1038              : mod tests {
    1039              :     use std::marker::PhantomData;
    1040              :     use std::sync::{Arc, Mutex, MutexGuard};
    1041              : 
    1042              :     use assert_json_diff::assert_json_eq;
    1043              :     use tracing::info_span;
    1044              : 
    1045              :     use super::*;
    1046              : 
    1047              :     struct TestClock {
    1048              :         current_time: Mutex<DateTime<Utc>>,
    1049              :     }
    1050              : 
    1051              :     impl Clock for Arc<TestClock> {
    1052            2 :         fn now(&self) -> DateTime<Utc> {
    1053            2 :             *self.current_time.lock().expect("poisoned")
    1054            2 :         }
    1055              :     }
    1056              : 
    1057              :     struct VecWriter<'a> {
    1058              :         buffer: MutexGuard<'a, Vec<u8>>,
    1059              :     }
    1060              : 
    1061              :     impl MakeWriter for Arc<Mutex<Vec<u8>>> {
    1062            1 :         fn make_writer(&self) -> impl io::Write {
    1063            1 :             VecWriter {
    1064            1 :                 buffer: self.lock().expect("poisoned"),
    1065            1 :             }
    1066            1 :         }
    1067              :     }
    1068              : 
    1069              :     impl io::Write for VecWriter<'_> {
    1070            1 :         fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
    1071            1 :             self.buffer.write(buf)
    1072            1 :         }
    1073              : 
    1074            0 :         fn flush(&mut self) -> io::Result<()> {
    1075            0 :             Ok(())
    1076            0 :         }
    1077              :     }
    1078              : 
    1079              :     #[test]
    1080            1 :     fn test_field_collection() {
    1081            1 :         let clock = Arc::new(TestClock {
    1082            1 :             current_time: Mutex::new(Utc::now()),
    1083            1 :         });
    1084            1 :         let buffer = Arc::new(Mutex::new(Vec::new()));
    1085            1 :         let log_layer = JsonLoggingLayer {
    1086            1 :             clock: clock.clone(),
    1087            1 :             skipped_field_indices: papaya::HashMap::default(),
    1088            1 :             callsite_ids: papaya::HashMap::default(),
    1089            1 :             writer: buffer.clone(),
    1090            1 :             extract_fields: IndexSet::from_iter(["x"]),
    1091            1 :             _marker: PhantomData::<[&'static str; 1]>,
    1092            1 :         };
    1093            1 : 
    1094            1 :         let registry = tracing_subscriber::Registry::default().with(log_layer);
    1095            1 : 
    1096            1 :         tracing::subscriber::with_default(registry, || {
    1097            1 :             info_span!("some_span", x = 24).in_scope(|| {
    1098            1 :                 info_span!("some_span", x = 40, x = 41, x = 42).in_scope(|| {
    1099            1 :                     tracing::error!(
    1100              :                         a = 1,
    1101              :                         a = 2,
    1102              :                         a = 3,
    1103              :                         message = "explicit message field",
    1104            0 :                         "implicit message field"
    1105              :                     );
    1106            1 :                 });
    1107            1 :             });
    1108            1 :         });
    1109            1 : 
    1110            1 :         let buffer = Arc::try_unwrap(buffer)
    1111            1 :             .expect("no other reference")
    1112            1 :             .into_inner()
    1113            1 :             .expect("poisoned");
    1114            1 :         let actual: serde_json::Value = serde_json::from_slice(&buffer).expect("valid JSON");
    1115            1 :         let expected: serde_json::Value = serde_json::json!(
    1116            1 :             {
    1117            1 :                 "timestamp": clock.now().to_rfc3339_opts(chrono::SecondsFormat::Micros, true),
    1118            1 :                 "level": "ERROR",
    1119            1 :                 "message": "explicit message field",
    1120            1 :                 "fields": {
    1121            1 :                     "a": 3,
    1122            1 :                 },
    1123            1 :                 "spans": {
    1124            1 :                     "some_span#1":{
    1125            1 :                         "x": 24,
    1126            1 :                     },
    1127            1 :                     "some_span#2": {
    1128            1 :                         "x": 42,
    1129            1 :                     }
    1130            1 :                 },
    1131            1 :                 "extract": {
    1132            1 :                     "x": 42,
    1133            1 :                 },
    1134            1 :                 "src": actual.as_object().unwrap().get("src").unwrap().as_str().unwrap(),
    1135            1 :                 "target": "proxy::logging::tests",
    1136            1 :                 "process_id": actual.as_object().unwrap().get("process_id").unwrap().as_number().unwrap(),
    1137            1 :                 "thread_id": actual.as_object().unwrap().get("thread_id").unwrap().as_number().unwrap(),
    1138            1 :                 "thread_name": "logging::tests::test_field_collection",
    1139            1 :             }
    1140            1 :         );
    1141            1 : 
    1142            1 :         assert_json_eq!(actual, expected);
    1143            1 :     }
    1144              : }
        

Generated by: LCOV version 2.1-beta