LCOV - code coverage report
Current view: top level - proxy/src - logging.rs (source / functions) Coverage Total Hit
Test: 5fe7fa8d483b39476409aee736d6d5e32728bfac.info Lines: 58.2 % 680 396
Test Date: 2025-03-12 16:10:49 Functions: 31.3 % 163 51

            Line data    Source code
       1              : use std::cell::{Cell, RefCell};
       2              : use std::collections::HashMap;
       3              : use std::hash::BuildHasher;
       4              : use std::sync::atomic::{AtomicU32, Ordering};
       5              : use std::{array, env, fmt, io};
       6              : 
       7              : use chrono::{DateTime, Utc};
       8              : use indexmap::IndexSet;
       9              : use opentelemetry::trace::TraceContextExt;
      10              : use scopeguard::defer;
      11              : use serde::ser::{SerializeMap, Serializer};
      12              : use tracing::subscriber::Interest;
      13              : use tracing::{Event, Metadata, Span, Subscriber, callsite, span};
      14              : use tracing_opentelemetry::OpenTelemetrySpanExt;
      15              : use tracing_subscriber::filter::{EnvFilter, LevelFilter};
      16              : use tracing_subscriber::fmt::format::{Format, Full};
      17              : use tracing_subscriber::fmt::time::SystemTime;
      18              : use tracing_subscriber::fmt::{FormatEvent, FormatFields};
      19              : use tracing_subscriber::layer::{Context, Layer};
      20              : use tracing_subscriber::prelude::*;
      21              : use tracing_subscriber::registry::{LookupSpan, SpanRef};
      22              : use try_lock::TryLock;
      23              : 
      24              : /// Initialize logging and OpenTelemetry tracing and exporter.
      25              : ///
      26              : /// Logging can be configured using `RUST_LOG` environment variable.
      27              : ///
      28              : /// OpenTelemetry is configured with OTLP/HTTP exporter. It picks up
      29              : /// configuration from environment variables. For example, to change the
      30              : /// destination, set `OTEL_EXPORTER_OTLP_ENDPOINT=http://jaeger:4318`.
      31              : /// See <https://opentelemetry.io/docs/reference/specification/sdk-environment-variables>
      32            0 : pub async fn init() -> anyhow::Result<LoggingGuard> {
      33            0 :     let logfmt = LogFormat::from_env()?;
      34              : 
      35            0 :     let env_filter = EnvFilter::builder()
      36            0 :         .with_default_directive(LevelFilter::INFO.into())
      37            0 :         .from_env_lossy()
      38            0 :         .add_directive(
      39            0 :             "aws_config=info"
      40            0 :                 .parse()
      41            0 :                 .expect("this should be a valid filter directive"),
      42            0 :         )
      43            0 :         .add_directive(
      44            0 :             "azure_core::policies::transport=off"
      45            0 :                 .parse()
      46            0 :                 .expect("this should be a valid filter directive"),
      47            0 :         );
      48              : 
      49            0 :     let otlp_layer =
      50            0 :         tracing_utils::init_tracing("proxy", tracing_utils::ExportConfig::default()).await;
      51              : 
      52            0 :     let json_log_layer = if logfmt == LogFormat::Json {
      53            0 :         Some(JsonLoggingLayer::new(
      54            0 :             RealClock,
      55            0 :             StderrWriter {
      56            0 :                 stderr: std::io::stderr(),
      57            0 :             },
      58            0 :             ["request_id", "session_id", "conn_id"],
      59            0 :         ))
      60              :     } else {
      61            0 :         None
      62              :     };
      63              : 
      64            0 :     let text_log_layer = if logfmt == LogFormat::Text {
      65            0 :         Some(
      66            0 :             tracing_subscriber::fmt::layer()
      67            0 :                 .with_ansi(false)
      68            0 :                 .with_writer(std::io::stderr)
      69            0 :                 .with_target(false),
      70            0 :         )
      71              :     } else {
      72            0 :         None
      73              :     };
      74              : 
      75            0 :     tracing_subscriber::registry()
      76            0 :         .with(env_filter)
      77            0 :         .with(otlp_layer)
      78            0 :         .with(json_log_layer)
      79            0 :         .with(text_log_layer)
      80            0 :         .try_init()?;
      81              : 
      82            0 :     Ok(LoggingGuard)
      83            0 : }
      84              : 
      85              : /// Initialize logging for local_proxy with log prefix and no opentelemetry.
      86              : ///
      87              : /// Logging can be configured using `RUST_LOG` environment variable.
      88            0 : pub fn init_local_proxy() -> anyhow::Result<LoggingGuard> {
      89            0 :     let env_filter = EnvFilter::builder()
      90            0 :         .with_default_directive(LevelFilter::INFO.into())
      91            0 :         .from_env_lossy();
      92            0 : 
      93            0 :     let fmt_layer = tracing_subscriber::fmt::layer()
      94            0 :         .with_ansi(false)
      95            0 :         .with_writer(std::io::stderr)
      96            0 :         .event_format(LocalProxyFormatter(Format::default().with_target(false)));
      97            0 : 
      98            0 :     tracing_subscriber::registry()
      99            0 :         .with(env_filter)
     100            0 :         .with(fmt_layer)
     101            0 :         .try_init()?;
     102              : 
     103            0 :     Ok(LoggingGuard)
     104            0 : }
     105              : 
     106              : pub struct LocalProxyFormatter(Format<Full, SystemTime>);
     107              : 
     108              : impl<S, N> FormatEvent<S, N> for LocalProxyFormatter
     109              : where
     110              :     S: Subscriber + for<'a> LookupSpan<'a>,
     111              :     N: for<'a> FormatFields<'a> + 'static,
     112              : {
     113            0 :     fn format_event(
     114            0 :         &self,
     115            0 :         ctx: &tracing_subscriber::fmt::FmtContext<'_, S, N>,
     116            0 :         mut writer: tracing_subscriber::fmt::format::Writer<'_>,
     117            0 :         event: &tracing::Event<'_>,
     118            0 :     ) -> std::fmt::Result {
     119            0 :         writer.write_str("[local_proxy] ")?;
     120            0 :         self.0.format_event(ctx, writer, event)
     121            0 :     }
     122              : }
     123              : 
     124              : pub struct LoggingGuard;
     125              : 
     126              : impl Drop for LoggingGuard {
     127            0 :     fn drop(&mut self) {
     128            0 :         // Shutdown trace pipeline gracefully, so that it has a chance to send any
     129            0 :         // pending traces before we exit.
     130            0 :         tracing::info!("shutting down the tracing machinery");
     131            0 :         tracing_utils::shutdown_tracing();
     132            0 :     }
     133              : }
     134              : 
     135              : // TODO: make JSON the default
     136              : #[derive(Copy, Clone, PartialEq, Eq, Default, Debug)]
     137              : enum LogFormat {
     138              :     #[default]
     139              :     Text = 1,
     140              :     Json,
     141              : }
     142              : 
     143              : impl LogFormat {
     144            0 :     fn from_env() -> anyhow::Result<Self> {
     145            0 :         let logfmt = env::var("LOGFMT");
     146            0 :         Ok(match logfmt.as_deref() {
     147            0 :             Err(_) => LogFormat::default(),
     148            0 :             Ok("text") => LogFormat::Text,
     149            0 :             Ok("json") => LogFormat::Json,
     150            0 :             Ok(logfmt) => anyhow::bail!("unknown log format: {logfmt}"),
     151              :         })
     152            0 :     }
     153              : }
     154              : 
     155              : trait MakeWriter {
     156              :     fn make_writer(&self) -> impl io::Write;
     157              : }
     158              : 
     159              : struct StderrWriter {
     160              :     stderr: io::Stderr,
     161              : }
     162              : 
     163              : impl MakeWriter for StderrWriter {
     164              :     #[inline]
     165            0 :     fn make_writer(&self) -> impl io::Write {
     166            0 :         self.stderr.lock()
     167            0 :     }
     168              : }
     169              : 
     170              : // TODO: move into separate module or even separate crate.
     171              : trait Clock {
     172              :     fn now(&self) -> DateTime<Utc>;
     173              : }
     174              : 
     175              : struct RealClock;
     176              : 
     177              : impl Clock for RealClock {
     178              :     #[inline]
     179            0 :     fn now(&self) -> DateTime<Utc> {
     180            0 :         Utc::now()
     181            0 :     }
     182              : }
     183              : 
     184              : /// Name of the field used by tracing crate to store the event message.
     185              : const MESSAGE_FIELD: &str = "message";
     186              : 
     187              : thread_local! {
     188              :     /// Protects against deadlocks and double panics during log writing.
     189              :     /// The current panic handler will use tracing to log panic information.
     190              :     static REENTRANCY_GUARD: Cell<bool> = const { Cell::new(false) };
     191              :     /// Thread-local instance with per-thread buffer for log writing.
     192              :     static EVENT_FORMATTER: RefCell<EventFormatter> = RefCell::new(EventFormatter::new());
     193              :     /// Cached OS thread ID.
     194              :     static THREAD_ID: u64 = gettid::gettid();
     195              : }
     196              : 
     197              : /// Implements tracing layer to handle events specific to logging.
     198              : struct JsonLoggingLayer<C: Clock, W: MakeWriter, const F: usize> {
     199              :     clock: C,
     200              :     skipped_field_indices: papaya::HashMap<callsite::Identifier, SkippedFieldIndices>,
     201              :     callsite_ids: papaya::HashMap<callsite::Identifier, CallsiteId>,
     202              :     writer: W,
     203              :     // We use a const generic and arrays to bypass one heap allocation.
     204              :     extract_fields: IndexSet<&'static str>,
     205              :     _marker: std::marker::PhantomData<[&'static str; F]>,
     206              : }
     207              : 
     208              : impl<C: Clock, W: MakeWriter, const F: usize> JsonLoggingLayer<C, W, F> {
     209            0 :     fn new(clock: C, writer: W, extract_fields: [&'static str; F]) -> Self {
     210            0 :         JsonLoggingLayer {
     211            0 :             clock,
     212            0 :             skipped_field_indices: papaya::HashMap::default(),
     213            0 :             callsite_ids: papaya::HashMap::default(),
     214            0 :             writer,
     215            0 :             extract_fields: IndexSet::from_iter(extract_fields),
     216            0 :             _marker: std::marker::PhantomData,
     217            0 :         }
     218            0 :     }
     219              : 
     220              :     #[inline]
     221            2 :     fn callsite_id(&self, cs: callsite::Identifier) -> CallsiteId {
     222            2 :         *self
     223            2 :             .callsite_ids
     224            2 :             .pin()
     225            2 :             .get_or_insert_with(cs, CallsiteId::next)
     226            2 :     }
     227              : }
     228              : 
     229              : impl<S, C: Clock + 'static, W: MakeWriter + 'static, const F: usize> Layer<S>
     230              :     for JsonLoggingLayer<C, W, F>
     231              : where
     232              :     S: Subscriber + for<'a> LookupSpan<'a>,
     233              : {
     234            1 :     fn on_event(&self, event: &Event<'_>, ctx: Context<'_, S>) {
     235              :         use std::io::Write;
     236              : 
     237              :         // TODO: consider special tracing subscriber to grab timestamp very
     238              :         //       early, before OTel machinery, and add as event extension.
     239            1 :         let now = self.clock.now();
     240            1 : 
     241            1 :         let res: io::Result<()> = REENTRANCY_GUARD.with(move |entered| {
     242            1 :             if entered.get() {
     243            0 :                 let mut formatter = EventFormatter::new();
     244            0 :                 formatter.format::<S, F>(
     245            0 :                     now,
     246            0 :                     event,
     247            0 :                     &ctx,
     248            0 :                     &self.skipped_field_indices,
     249            0 :                     &self.callsite_ids,
     250            0 :                     &self.extract_fields,
     251            0 :                 )?;
     252            0 :                 self.writer.make_writer().write_all(formatter.buffer())
     253              :             } else {
     254            1 :                 entered.set(true);
     255            1 :                 defer!(entered.set(false););
     256            1 : 
     257            1 :                 EVENT_FORMATTER.with_borrow_mut(move |formatter| {
     258            1 :                     formatter.reset();
     259            1 :                     formatter.format::<S, F>(
     260            1 :                         now,
     261            1 :                         event,
     262            1 :                         &ctx,
     263            1 :                         &self.skipped_field_indices,
     264            1 :                         &self.callsite_ids,
     265            1 :                         &self.extract_fields,
     266            1 :                     )?;
     267            1 :                     self.writer.make_writer().write_all(formatter.buffer())
     268            1 :                 })
     269              :             }
     270            1 :         });
     271              : 
     272              :         // In case logging fails we generate a simpler JSON object.
     273            1 :         if let Err(err) = res {
     274            0 :             if let Ok(mut line) = serde_json::to_vec(&serde_json::json!( {
     275            0 :                 "timestamp": now.to_rfc3339_opts(chrono::SecondsFormat::Micros, true),
     276            0 :                 "level": "ERROR",
     277            0 :                 "message": format_args!("cannot log event: {err:?}"),
     278            0 :                 "fields": {
     279            0 :                     "event": format_args!("{event:?}"),
     280            0 :                 },
     281            0 :             })) {
     282            0 :                 line.push(b'\n');
     283            0 :                 self.writer.make_writer().write_all(&line).ok();
     284            0 :             }
     285            1 :         }
     286            1 :     }
     287              : 
     288              :     /// Registers a SpanFields instance as span extension.
     289            2 :     fn on_new_span(&self, attrs: &span::Attributes<'_>, id: &span::Id, ctx: Context<'_, S>) {
     290            2 :         let span = ctx.span(id).expect("span must exist");
     291            2 :         let fields = SpanFields::default();
     292            2 :         fields.record_fields(attrs);
     293            2 : 
     294            2 :         // This could deadlock when there's a panic somewhere in the tracing
     295            2 :         // event handling and a read or write guard is still held. This includes
     296            2 :         // the OTel subscriber.
     297            2 :         let mut exts = span.extensions_mut();
     298            2 : 
     299            2 :         exts.insert(fields);
     300            2 :     }
     301              : 
     302            0 :     fn on_record(&self, id: &span::Id, values: &span::Record<'_>, ctx: Context<'_, S>) {
     303            0 :         let span = ctx.span(id).expect("span must exist");
     304            0 :         let ext = span.extensions();
     305            0 :         if let Some(data) = ext.get::<SpanFields>() {
     306            0 :             data.record_fields(values);
     307            0 :         }
     308            0 :     }
     309              : 
     310              :     /// Called (lazily) whenever a new log call is executed. We quickly check
     311              :     /// for duplicate field names and record duplicates as skippable. Last one
     312              :     /// wins.
     313            3 :     fn register_callsite(&self, metadata: &'static Metadata<'static>) -> Interest {
     314            3 :         if !metadata.is_event() {
     315            2 :             self.callsite_id(metadata.callsite());
     316            2 :             // Must not be never because we wouldn't get trace and span data.
     317            2 :             return Interest::always();
     318            1 :         }
     319            1 : 
     320            1 :         let mut field_indices = SkippedFieldIndices::default();
     321            1 :         let mut seen_fields = HashMap::<&'static str, usize>::new();
     322            5 :         for field in metadata.fields() {
     323              :             use std::collections::hash_map::Entry;
     324            5 :             match seen_fields.entry(field.name()) {
     325            2 :                 Entry::Vacant(entry) => {
     326            2 :                     // field not seen yet
     327            2 :                     entry.insert(field.index());
     328            2 :                 }
     329            3 :                 Entry::Occupied(mut entry) => {
     330            3 :                     // replace currently stored index
     331            3 :                     let old_index = entry.insert(field.index());
     332            3 :                     // ... and append it to list of skippable indices
     333            3 :                     field_indices.push(old_index);
     334            3 :                 }
     335              :             }
     336              :         }
     337              : 
     338            1 :         if !field_indices.is_empty() {
     339            1 :             self.skipped_field_indices
     340            1 :                 .pin()
     341            1 :                 .insert(metadata.callsite(), field_indices);
     342            1 :         }
     343              : 
     344            1 :         Interest::always()
     345            3 :     }
     346              : }
     347              : 
     348              : #[derive(Copy, Clone, Debug, Default)]
     349              : #[repr(transparent)]
     350              : struct CallsiteId(u32);
     351              : 
     352              : impl CallsiteId {
     353              :     #[inline]
     354            2 :     fn next() -> Self {
     355              :         // Start at 1 to reserve 0 for default.
     356              :         static COUNTER: AtomicU32 = AtomicU32::new(1);
     357            2 :         CallsiteId(COUNTER.fetch_add(1, Ordering::Relaxed))
     358            2 :     }
     359              : }
     360              : 
     361              : impl fmt::Display for CallsiteId {
     362              :     #[inline]
     363            2 :     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
     364            2 :         self.0.fmt(f)
     365            2 :     }
     366              : }
     367              : 
     368              : /// Stores span field values recorded during the spans lifetime.
     369              : #[derive(Default)]
     370              : struct SpanFields {
     371              :     // TODO: Switch to custom enum with lasso::Spur for Strings?
     372              :     fields: papaya::HashMap<&'static str, serde_json::Value>,
     373              : }
     374              : 
     375              : impl SpanFields {
     376              :     #[inline]
     377            2 :     fn record_fields<R: tracing_subscriber::field::RecordFields>(&self, fields: R) {
     378            2 :         fields.record(&mut SpanFieldsRecorder {
     379            2 :             fields: self.fields.pin(),
     380            2 :         });
     381            2 :     }
     382              : }
     383              : 
     384              : /// Implements a tracing field visitor to convert and store values.
     385              : struct SpanFieldsRecorder<'m, S, G> {
     386              :     fields: papaya::HashMapRef<'m, &'static str, serde_json::Value, S, G>,
     387              : }
     388              : 
     389              : impl<S: BuildHasher, G: papaya::Guard> tracing::field::Visit for SpanFieldsRecorder<'_, S, G> {
     390              :     #[inline]
     391            0 :     fn record_f64(&mut self, field: &tracing::field::Field, value: f64) {
     392            0 :         self.fields
     393            0 :             .insert(field.name(), serde_json::Value::from(value));
     394            0 :     }
     395              : 
     396              :     #[inline]
     397            4 :     fn record_i64(&mut self, field: &tracing::field::Field, value: i64) {
     398            4 :         self.fields
     399            4 :             .insert(field.name(), serde_json::Value::from(value));
     400            4 :     }
     401              : 
     402              :     #[inline]
     403            0 :     fn record_u64(&mut self, field: &tracing::field::Field, value: u64) {
     404            0 :         self.fields
     405            0 :             .insert(field.name(), serde_json::Value::from(value));
     406            0 :     }
     407              : 
     408              :     #[inline]
     409            0 :     fn record_i128(&mut self, field: &tracing::field::Field, value: i128) {
     410            0 :         if let Ok(value) = i64::try_from(value) {
     411            0 :             self.fields
     412            0 :                 .insert(field.name(), serde_json::Value::from(value));
     413            0 :         } else {
     414            0 :             self.fields
     415            0 :                 .insert(field.name(), serde_json::Value::from(format!("{value}")));
     416            0 :         }
     417            0 :     }
     418              : 
     419              :     #[inline]
     420            0 :     fn record_u128(&mut self, field: &tracing::field::Field, value: u128) {
     421            0 :         if let Ok(value) = u64::try_from(value) {
     422            0 :             self.fields
     423            0 :                 .insert(field.name(), serde_json::Value::from(value));
     424            0 :         } else {
     425            0 :             self.fields
     426            0 :                 .insert(field.name(), serde_json::Value::from(format!("{value}")));
     427            0 :         }
     428            0 :     }
     429              : 
     430              :     #[inline]
     431            0 :     fn record_bool(&mut self, field: &tracing::field::Field, value: bool) {
     432            0 :         self.fields
     433            0 :             .insert(field.name(), serde_json::Value::from(value));
     434            0 :     }
     435              : 
     436              :     #[inline]
     437            0 :     fn record_bytes(&mut self, field: &tracing::field::Field, value: &[u8]) {
     438            0 :         self.fields
     439            0 :             .insert(field.name(), serde_json::Value::from(value));
     440            0 :     }
     441              : 
     442              :     #[inline]
     443            0 :     fn record_str(&mut self, field: &tracing::field::Field, value: &str) {
     444            0 :         self.fields
     445            0 :             .insert(field.name(), serde_json::Value::from(value));
     446            0 :     }
     447              : 
     448              :     #[inline]
     449            0 :     fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) {
     450            0 :         self.fields
     451            0 :             .insert(field.name(), serde_json::Value::from(format!("{value:?}")));
     452            0 :     }
     453              : 
     454              :     #[inline]
     455            0 :     fn record_error(
     456            0 :         &mut self,
     457            0 :         field: &tracing::field::Field,
     458            0 :         value: &(dyn std::error::Error + 'static),
     459            0 :     ) {
     460            0 :         self.fields
     461            0 :             .insert(field.name(), serde_json::Value::from(format!("{value}")));
     462            0 :     }
     463              : }
     464              : 
     465              : /// List of field indices skipped during logging. Can list duplicate fields or
     466              : /// metafields not meant to be logged.
     467              : #[derive(Clone, Default)]
     468              : struct SkippedFieldIndices {
     469              :     bits: u64,
     470              : }
     471              : 
     472              : impl SkippedFieldIndices {
     473              :     #[inline]
     474            1 :     fn is_empty(&self) -> bool {
     475            1 :         self.bits == 0
     476            1 :     }
     477              : 
     478              :     #[inline]
     479            3 :     fn push(&mut self, index: usize) {
     480            3 :         self.bits |= 1u64
     481            3 :             .checked_shl(index as u32)
     482            3 :             .expect("field index too large");
     483            3 :     }
     484              : 
     485              :     #[inline]
     486           10 :     fn contains(&self, index: usize) -> bool {
     487           10 :         self.bits
     488           10 :             & 1u64
     489           10 :                 .checked_shl(index as u32)
     490           10 :                 .expect("field index too large")
     491           10 :             != 0
     492           10 :     }
     493              : }
     494              : 
     495              : /// Formats a tracing event and writes JSON to its internal buffer including a newline.
     496              : // TODO: buffer capacity management, truncate if too large
     497              : struct EventFormatter {
     498              :     logline_buffer: Vec<u8>,
     499              : }
     500              : 
     501              : impl EventFormatter {
     502              :     #[inline]
     503            1 :     fn new() -> Self {
     504            1 :         EventFormatter {
     505            1 :             logline_buffer: Vec::new(),
     506            1 :         }
     507            1 :     }
     508              : 
     509              :     #[inline]
     510            1 :     fn buffer(&self) -> &[u8] {
     511            1 :         &self.logline_buffer
     512            1 :     }
     513              : 
     514              :     #[inline]
     515            1 :     fn reset(&mut self) {
     516            1 :         self.logline_buffer.clear();
     517            1 :     }
     518              : 
     519            1 :     fn format<S, const F: usize>(
     520            1 :         &mut self,
     521            1 :         now: DateTime<Utc>,
     522            1 :         event: &Event<'_>,
     523            1 :         ctx: &Context<'_, S>,
     524            1 :         skipped_field_indices: &papaya::HashMap<callsite::Identifier, SkippedFieldIndices>,
     525            1 :         callsite_ids: &papaya::HashMap<callsite::Identifier, CallsiteId>,
     526            1 :         extract_fields: &IndexSet<&'static str>,
     527            1 :     ) -> io::Result<()>
     528            1 :     where
     529            1 :         S: Subscriber + for<'a> LookupSpan<'a>,
     530            1 :     {
     531            1 :         let timestamp = now.to_rfc3339_opts(chrono::SecondsFormat::Micros, true);
     532              : 
     533              :         use tracing_log::NormalizeEvent;
     534            1 :         let normalized_meta = event.normalized_metadata();
     535            1 :         let meta = normalized_meta.as_ref().unwrap_or_else(|| event.metadata());
     536            1 : 
     537            1 :         let skipped_field_indices = skipped_field_indices.pin();
     538            1 :         let skipped_field_indices = skipped_field_indices.get(&meta.callsite());
     539            1 : 
     540            1 :         let mut serialize = || {
     541            1 :             let mut serializer = serde_json::Serializer::new(&mut self.logline_buffer);
     542              : 
     543            1 :             let mut serializer = serializer.serialize_map(None)?;
     544              : 
     545              :             // Timestamp comes first, so raw lines can be sorted by timestamp.
     546            1 :             serializer.serialize_entry("timestamp", &timestamp)?;
     547              : 
     548              :             // Level next.
     549            1 :             serializer.serialize_entry("level", &meta.level().as_str())?;
     550              : 
     551              :             // Message next.
     552            1 :             serializer.serialize_key("message")?;
     553            1 :             let mut message_extractor =
     554            1 :                 MessageFieldExtractor::new(serializer, skipped_field_indices);
     555            1 :             event.record(&mut message_extractor);
     556            1 :             let mut serializer = message_extractor.into_serializer()?;
     557              : 
     558              :             // Direct message fields.
     559            1 :             let mut fields_present = FieldsPresent(false, skipped_field_indices);
     560            1 :             event.record(&mut fields_present);
     561            1 :             if fields_present.0 {
     562            1 :                 serializer.serialize_entry(
     563            1 :                     "fields",
     564            1 :                     &SerializableEventFields(event, skipped_field_indices),
     565            1 :                 )?;
     566            0 :             }
     567              : 
     568            1 :             let spans = SerializableSpans {
     569            1 :                 ctx,
     570            1 :                 callsite_ids,
     571            1 :                 extract: ExtractedSpanFields::<'_, F>::new(extract_fields),
     572            1 :             };
     573            1 :             serializer.serialize_entry("spans", &spans)?;
     574              : 
     575              :             // TODO: thread-local cache?
     576            1 :             let pid = std::process::id();
     577            1 :             // Skip adding pid 1 to reduce noise for services running in containers.
     578            1 :             if pid != 1 {
     579            1 :                 serializer.serialize_entry("process_id", &pid)?;
     580            0 :             }
     581              : 
     582            1 :             THREAD_ID.with(|tid| serializer.serialize_entry("thread_id", tid))?;
     583              : 
     584              :             // TODO: tls cache? name could change
     585            1 :             if let Some(thread_name) = std::thread::current().name() {
     586            1 :                 if !thread_name.is_empty() && thread_name != "tokio-runtime-worker" {
     587            1 :                     serializer.serialize_entry("thread_name", thread_name)?;
     588            0 :                 }
     589            0 :             }
     590              : 
     591            1 :             if let Some(task_id) = tokio::task::try_id() {
     592            0 :                 serializer.serialize_entry("task_id", &format_args!("{task_id}"))?;
     593            1 :             }
     594              : 
     595            1 :             serializer.serialize_entry("target", meta.target())?;
     596              : 
     597              :             // Skip adding module if it's the same as target.
     598            1 :             if let Some(module) = meta.module_path() {
     599            1 :                 if module != meta.target() {
     600            0 :                     serializer.serialize_entry("module", module)?;
     601            1 :                 }
     602            0 :             }
     603              : 
     604            1 :             if let Some(file) = meta.file() {
     605            1 :                 if let Some(line) = meta.line() {
     606            1 :                     serializer.serialize_entry("src", &format_args!("{file}:{line}"))?;
     607              :                 } else {
     608            0 :                     serializer.serialize_entry("src", file)?;
     609              :                 }
     610            0 :             }
     611              : 
     612              :             {
     613            1 :                 let otel_context = Span::current().context();
     614            1 :                 let otel_spanref = otel_context.span();
     615            1 :                 let span_context = otel_spanref.span_context();
     616            1 :                 if span_context.is_valid() {
     617            0 :                     serializer.serialize_entry(
     618            0 :                         "trace_id",
     619            0 :                         &format_args!("{}", span_context.trace_id()),
     620            0 :                     )?;
     621            1 :                 }
     622              :             }
     623              : 
     624            1 :             if spans.extract.has_values() {
     625              :                 // TODO: add fields from event, too?
     626            1 :                 serializer.serialize_entry("extract", &spans.extract)?;
     627            0 :             }
     628              : 
     629            1 :             serializer.end()
     630            1 :         };
     631              : 
     632            1 :         serialize().map_err(io::Error::other)?;
     633            1 :         self.logline_buffer.push(b'\n');
     634            1 :         Ok(())
     635            1 :     }
     636              : }
     637              : 
     638              : /// Extracts the message field that's mixed will other fields.
     639              : struct MessageFieldExtractor<'a, S: serde::ser::SerializeMap> {
     640              :     serializer: S,
     641              :     skipped_field_indices: Option<&'a SkippedFieldIndices>,
     642              :     state: Option<Result<(), S::Error>>,
     643              : }
     644              : 
     645              : impl<'a, S: serde::ser::SerializeMap> MessageFieldExtractor<'a, S> {
     646              :     #[inline]
     647            1 :     fn new(serializer: S, skipped_field_indices: Option<&'a SkippedFieldIndices>) -> Self {
     648            1 :         Self {
     649            1 :             serializer,
     650            1 :             skipped_field_indices,
     651            1 :             state: None,
     652            1 :         }
     653            1 :     }
     654              : 
     655              :     #[inline]
     656            1 :     fn into_serializer(mut self) -> Result<S, S::Error> {
     657            1 :         match self.state {
     658            1 :             Some(Ok(())) => {}
     659            0 :             Some(Err(err)) => return Err(err),
     660            0 :             None => self.serializer.serialize_value("")?,
     661              :         }
     662            1 :         Ok(self.serializer)
     663            1 :     }
     664              : 
     665              :     #[inline]
     666            5 :     fn accept_field(&self, field: &tracing::field::Field) -> bool {
     667            5 :         self.state.is_none()
     668            5 :             && field.name() == MESSAGE_FIELD
     669            2 :             && !self
     670            2 :                 .skipped_field_indices
     671            2 :                 .is_some_and(|i| i.contains(field.index()))
     672            5 :     }
     673              : }
     674              : 
     675              : impl<S: serde::ser::SerializeMap> tracing::field::Visit for MessageFieldExtractor<'_, S> {
     676              :     #[inline]
     677            0 :     fn record_f64(&mut self, field: &tracing::field::Field, value: f64) {
     678            0 :         if self.accept_field(field) {
     679            0 :             self.state = Some(self.serializer.serialize_value(&value));
     680            0 :         }
     681            0 :     }
     682              : 
     683              :     #[inline]
     684            3 :     fn record_i64(&mut self, field: &tracing::field::Field, value: i64) {
     685            3 :         if self.accept_field(field) {
     686            0 :             self.state = Some(self.serializer.serialize_value(&value));
     687            3 :         }
     688            3 :     }
     689              : 
     690              :     #[inline]
     691            0 :     fn record_u64(&mut self, field: &tracing::field::Field, value: u64) {
     692            0 :         if self.accept_field(field) {
     693            0 :             self.state = Some(self.serializer.serialize_value(&value));
     694            0 :         }
     695            0 :     }
     696              : 
     697              :     #[inline]
     698            0 :     fn record_i128(&mut self, field: &tracing::field::Field, value: i128) {
     699            0 :         if self.accept_field(field) {
     700            0 :             self.state = Some(self.serializer.serialize_value(&value));
     701            0 :         }
     702            0 :     }
     703              : 
     704              :     #[inline]
     705            0 :     fn record_u128(&mut self, field: &tracing::field::Field, value: u128) {
     706            0 :         if self.accept_field(field) {
     707            0 :             self.state = Some(self.serializer.serialize_value(&value));
     708            0 :         }
     709            0 :     }
     710              : 
     711              :     #[inline]
     712            0 :     fn record_bool(&mut self, field: &tracing::field::Field, value: bool) {
     713            0 :         if self.accept_field(field) {
     714            0 :             self.state = Some(self.serializer.serialize_value(&value));
     715            0 :         }
     716            0 :     }
     717              : 
     718              :     #[inline]
     719            0 :     fn record_bytes(&mut self, field: &tracing::field::Field, value: &[u8]) {
     720            0 :         if self.accept_field(field) {
     721            0 :             self.state = Some(self.serializer.serialize_value(&format_args!("{value:x?}")));
     722            0 :         }
     723            0 :     }
     724              : 
     725              :     #[inline]
     726            1 :     fn record_str(&mut self, field: &tracing::field::Field, value: &str) {
     727            1 :         if self.accept_field(field) {
     728            1 :             self.state = Some(self.serializer.serialize_value(&value));
     729            1 :         }
     730            1 :     }
     731              : 
     732              :     #[inline]
     733            1 :     fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) {
     734            1 :         if self.accept_field(field) {
     735            0 :             self.state = Some(self.serializer.serialize_value(&format_args!("{value:?}")));
     736            1 :         }
     737            1 :     }
     738              : 
     739              :     #[inline]
     740            0 :     fn record_error(
     741            0 :         &mut self,
     742            0 :         field: &tracing::field::Field,
     743            0 :         value: &(dyn std::error::Error + 'static),
     744            0 :     ) {
     745            0 :         if self.accept_field(field) {
     746            0 :             self.state = Some(self.serializer.serialize_value(&format_args!("{value}")));
     747            0 :         }
     748            0 :     }
     749              : }
     750              : 
     751              : /// Checks if there's any fields and field values present. If not, the JSON subobject
     752              : /// can be skipped.
     753              : // This is entirely optional and only cosmetic, though maybe helps a
     754              : // bit during log parsing in dashboards when there's no field with empty object.
     755              : struct FieldsPresent<'a>(pub bool, Option<&'a SkippedFieldIndices>);
     756              : 
     757              : // Even though some methods have an overhead (error, bytes) it is assumed the
     758              : // compiler won't include this since we ignore the value entirely.
     759              : impl tracing::field::Visit for FieldsPresent<'_> {
     760              :     #[inline]
     761            5 :     fn record_debug(&mut self, field: &tracing::field::Field, _: &dyn std::fmt::Debug) {
     762            5 :         if !self.1.is_some_and(|i| i.contains(field.index()))
     763            2 :             && field.name() != MESSAGE_FIELD
     764            1 :             && !field.name().starts_with("log.")
     765            1 :         {
     766            1 :             self.0 |= true;
     767            4 :         }
     768            5 :     }
     769              : }
     770              : 
     771              : /// Serializes the fields directly supplied with a log event.
     772              : struct SerializableEventFields<'a, 'event>(
     773              :     &'a tracing::Event<'event>,
     774              :     Option<&'a SkippedFieldIndices>,
     775              : );
     776              : 
     777              : impl serde::ser::Serialize for SerializableEventFields<'_, '_> {
     778            1 :     fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
     779            1 :     where
     780            1 :         S: Serializer,
     781            1 :     {
     782              :         use serde::ser::SerializeMap;
     783            1 :         let serializer = serializer.serialize_map(None)?;
     784            1 :         let mut message_skipper = MessageFieldSkipper::new(serializer, self.1);
     785            1 :         self.0.record(&mut message_skipper);
     786            1 :         let serializer = message_skipper.into_serializer()?;
     787            1 :         serializer.end()
     788            1 :     }
     789              : }
     790              : 
     791              : /// A tracing field visitor that skips the message field.
     792              : struct MessageFieldSkipper<'a, S: serde::ser::SerializeMap> {
     793              :     serializer: S,
     794              :     skipped_field_indices: Option<&'a SkippedFieldIndices>,
     795              :     state: Result<(), S::Error>,
     796              : }
     797              : 
     798              : impl<'a, S: serde::ser::SerializeMap> MessageFieldSkipper<'a, S> {
     799              :     #[inline]
     800            1 :     fn new(serializer: S, skipped_field_indices: Option<&'a SkippedFieldIndices>) -> Self {
     801            1 :         Self {
     802            1 :             serializer,
     803            1 :             skipped_field_indices,
     804            1 :             state: Ok(()),
     805            1 :         }
     806            1 :     }
     807              : 
     808              :     #[inline]
     809            5 :     fn accept_field(&self, field: &tracing::field::Field) -> bool {
     810            5 :         self.state.is_ok()
     811            5 :             && field.name() != MESSAGE_FIELD
     812            3 :             && !field.name().starts_with("log.")
     813            3 :             && !self
     814            3 :                 .skipped_field_indices
     815            3 :                 .is_some_and(|i| i.contains(field.index()))
     816            5 :     }
     817              : 
     818              :     #[inline]
     819            1 :     fn into_serializer(self) -> Result<S, S::Error> {
     820            1 :         self.state?;
     821            1 :         Ok(self.serializer)
     822            1 :     }
     823              : }
     824              : 
     825              : impl<S: serde::ser::SerializeMap> tracing::field::Visit for MessageFieldSkipper<'_, S> {
     826              :     #[inline]
     827            0 :     fn record_f64(&mut self, field: &tracing::field::Field, value: f64) {
     828            0 :         if self.accept_field(field) {
     829            0 :             self.state = self.serializer.serialize_entry(field.name(), &value);
     830            0 :         }
     831            0 :     }
     832              : 
     833              :     #[inline]
     834            3 :     fn record_i64(&mut self, field: &tracing::field::Field, value: i64) {
     835            3 :         if self.accept_field(field) {
     836            1 :             self.state = self.serializer.serialize_entry(field.name(), &value);
     837            2 :         }
     838            3 :     }
     839              : 
     840              :     #[inline]
     841            0 :     fn record_u64(&mut self, field: &tracing::field::Field, value: u64) {
     842            0 :         if self.accept_field(field) {
     843            0 :             self.state = self.serializer.serialize_entry(field.name(), &value);
     844            0 :         }
     845            0 :     }
     846              : 
     847              :     #[inline]
     848            0 :     fn record_i128(&mut self, field: &tracing::field::Field, value: i128) {
     849            0 :         if self.accept_field(field) {
     850            0 :             self.state = self.serializer.serialize_entry(field.name(), &value);
     851            0 :         }
     852            0 :     }
     853              : 
     854              :     #[inline]
     855            0 :     fn record_u128(&mut self, field: &tracing::field::Field, value: u128) {
     856            0 :         if self.accept_field(field) {
     857            0 :             self.state = self.serializer.serialize_entry(field.name(), &value);
     858            0 :         }
     859            0 :     }
     860              : 
     861              :     #[inline]
     862            0 :     fn record_bool(&mut self, field: &tracing::field::Field, value: bool) {
     863            0 :         if self.accept_field(field) {
     864            0 :             self.state = self.serializer.serialize_entry(field.name(), &value);
     865            0 :         }
     866            0 :     }
     867              : 
     868              :     #[inline]
     869            0 :     fn record_bytes(&mut self, field: &tracing::field::Field, value: &[u8]) {
     870            0 :         if self.accept_field(field) {
     871            0 :             self.state = self
     872            0 :                 .serializer
     873            0 :                 .serialize_entry(field.name(), &format_args!("{value:x?}"));
     874            0 :         }
     875            0 :     }
     876              : 
     877              :     #[inline]
     878            1 :     fn record_str(&mut self, field: &tracing::field::Field, value: &str) {
     879            1 :         if self.accept_field(field) {
     880            0 :             self.state = self.serializer.serialize_entry(field.name(), &value);
     881            1 :         }
     882            1 :     }
     883              : 
     884              :     #[inline]
     885            1 :     fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) {
     886            1 :         if self.accept_field(field) {
     887            0 :             self.state = self
     888            0 :                 .serializer
     889            0 :                 .serialize_entry(field.name(), &format_args!("{value:?}"));
     890            1 :         }
     891            1 :     }
     892              : 
     893              :     #[inline]
     894            0 :     fn record_error(
     895            0 :         &mut self,
     896            0 :         field: &tracing::field::Field,
     897            0 :         value: &(dyn std::error::Error + 'static),
     898            0 :     ) {
     899            0 :         if self.accept_field(field) {
     900            0 :             self.state = self.serializer.serialize_value(&format_args!("{value}"));
     901            0 :         }
     902            0 :     }
     903              : }
     904              : 
     905              : /// Serializes the span stack from root to leaf (parent of event) as object
     906              : /// with the span names as keys. To prevent collision we append a numberic value
     907              : /// to the name. Also, collects any span fields we're interested in. Last one
     908              : /// wins.
     909              : struct SerializableSpans<'a, 'ctx, Span, const F: usize>
     910              : where
     911              :     Span: Subscriber + for<'lookup> LookupSpan<'lookup>,
     912              : {
     913              :     ctx: &'a Context<'ctx, Span>,
     914              :     callsite_ids: &'a papaya::HashMap<callsite::Identifier, CallsiteId>,
     915              :     extract: ExtractedSpanFields<'a, F>,
     916              : }
     917              : 
     918              : impl<Span, const F: usize> serde::ser::Serialize for SerializableSpans<'_, '_, Span, F>
     919              : where
     920              :     Span: Subscriber + for<'lookup> LookupSpan<'lookup>,
     921              : {
     922            1 :     fn serialize<Ser>(&self, serializer: Ser) -> Result<Ser::Ok, Ser::Error>
     923            1 :     where
     924            1 :         Ser: serde::ser::Serializer,
     925            1 :     {
     926            1 :         let mut serializer = serializer.serialize_map(None)?;
     927              : 
     928            1 :         if let Some(leaf_span) = self.ctx.lookup_current() {
     929            2 :             for span in leaf_span.scope().from_root() {
     930              :                 // Append a numeric callsite ID to the span name to keep the name unique
     931              :                 // in the JSON object.
     932            2 :                 let cid = self
     933            2 :                     .callsite_ids
     934            2 :                     .pin()
     935            2 :                     .get(&span.metadata().callsite())
     936            2 :                     .copied()
     937            2 :                     .unwrap_or_default();
     938            2 : 
     939            2 :                 // Loki turns the # into an underscore during field name concatenation.
     940            2 :                 serializer.serialize_key(&format_args!("{}#{}", span.metadata().name(), &cid))?;
     941              : 
     942            2 :                 serializer.serialize_value(&SerializableSpanFields {
     943            2 :                     span: &span,
     944            2 :                     extract: &self.extract,
     945            2 :                 })?;
     946              :             }
     947            0 :         }
     948              : 
     949            1 :         serializer.end()
     950            1 :     }
     951              : }
     952              : 
     953              : /// Serializes the span fields as object.
     954              : struct SerializableSpanFields<'a, 'span, Span, const F: usize>
     955              : where
     956              :     Span: for<'lookup> LookupSpan<'lookup>,
     957              : {
     958              :     span: &'a SpanRef<'span, Span>,
     959              :     extract: &'a ExtractedSpanFields<'a, F>,
     960              : }
     961              : 
     962              : impl<Span, const F: usize> serde::ser::Serialize for SerializableSpanFields<'_, '_, Span, F>
     963              : where
     964              :     Span: for<'lookup> LookupSpan<'lookup>,
     965              : {
     966            2 :     fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
     967            2 :     where
     968            2 :         S: serde::ser::Serializer,
     969            2 :     {
     970            2 :         let mut serializer = serializer.serialize_map(None)?;
     971              : 
     972            2 :         let ext = self.span.extensions();
     973            2 :         if let Some(data) = ext.get::<SpanFields>() {
     974            2 :             for (name, value) in &data.fields.pin() {
     975            2 :                 serializer.serialize_entry(name, value)?;
     976              :                 // TODO: replace clone with reference, if possible.
     977            2 :                 self.extract.set(name, value.clone());
     978              :             }
     979            0 :         }
     980              : 
     981            2 :         serializer.end()
     982            2 :     }
     983              : }
     984              : 
     985              : struct ExtractedSpanFields<'a, const F: usize> {
     986              :     names: &'a IndexSet<&'static str>,
     987              :     // TODO: replace TryLock with something local thread and interior mutability.
     988              :     //       serde API doesn't let us use `mut`.
     989              :     values: TryLock<([Option<serde_json::Value>; F], bool)>,
     990              : }
     991              : 
     992              : impl<'a, const F: usize> ExtractedSpanFields<'a, F> {
     993            1 :     fn new(names: &'a IndexSet<&'static str>) -> Self {
     994            1 :         ExtractedSpanFields {
     995            1 :             names,
     996            1 :             values: TryLock::new((array::from_fn(|_| Option::default()), false)),
     997            1 :         }
     998            1 :     }
     999              : 
    1000              :     #[inline]
    1001            2 :     fn set(&self, name: &'static str, value: serde_json::Value) {
    1002            2 :         if let Some((index, _)) = self.names.get_full(name) {
    1003            2 :             let mut fields = self.values.try_lock().expect("thread-local use");
    1004            2 :             fields.0[index] = Some(value);
    1005            2 :             fields.1 = true;
    1006            2 :         }
    1007            2 :     }
    1008              : 
    1009              :     #[inline]
    1010            1 :     fn has_values(&self) -> bool {
    1011            1 :         self.values.try_lock().expect("thread-local use").1
    1012            1 :     }
    1013              : }
    1014              : 
    1015              : impl<const F: usize> serde::ser::Serialize for ExtractedSpanFields<'_, F> {
    1016            1 :     fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
    1017            1 :     where
    1018            1 :         S: serde::ser::Serializer,
    1019            1 :     {
    1020            1 :         let mut serializer = serializer.serialize_map(None)?;
    1021              : 
    1022            1 :         let values = self.values.try_lock().expect("thread-local use");
    1023            1 :         for (i, value) in values.0.iter().enumerate() {
    1024            1 :             if let Some(value) = value {
    1025            1 :                 let key = self.names[i];
    1026            1 :                 serializer.serialize_entry(key, value)?;
    1027            0 :             }
    1028              :         }
    1029              : 
    1030            1 :         serializer.end()
    1031            1 :     }
    1032              : }
    1033              : 
    1034              : #[cfg(test)]
    1035              : #[allow(clippy::unwrap_used)]
    1036              : mod tests {
    1037              :     use std::marker::PhantomData;
    1038              :     use std::sync::{Arc, Mutex, MutexGuard};
    1039              : 
    1040              :     use assert_json_diff::assert_json_eq;
    1041              :     use tracing::info_span;
    1042              : 
    1043              :     use super::*;
    1044              : 
    1045              :     struct TestClock {
    1046              :         current_time: Mutex<DateTime<Utc>>,
    1047              :     }
    1048              : 
    1049              :     impl Clock for Arc<TestClock> {
    1050            2 :         fn now(&self) -> DateTime<Utc> {
    1051            2 :             *self.current_time.lock().expect("poisoned")
    1052            2 :         }
    1053              :     }
    1054              : 
    1055              :     struct VecWriter<'a> {
    1056              :         buffer: MutexGuard<'a, Vec<u8>>,
    1057              :     }
    1058              : 
    1059              :     impl MakeWriter for Arc<Mutex<Vec<u8>>> {
    1060            1 :         fn make_writer(&self) -> impl io::Write {
    1061            1 :             VecWriter {
    1062            1 :                 buffer: self.lock().expect("poisoned"),
    1063            1 :             }
    1064            1 :         }
    1065              :     }
    1066              : 
    1067              :     impl io::Write for VecWriter<'_> {
    1068            1 :         fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
    1069            1 :             self.buffer.write(buf)
    1070            1 :         }
    1071              : 
    1072            0 :         fn flush(&mut self) -> io::Result<()> {
    1073            0 :             Ok(())
    1074            0 :         }
    1075              :     }
    1076              : 
    1077              :     #[test]
    1078            1 :     fn test_field_collection() {
    1079            1 :         let clock = Arc::new(TestClock {
    1080            1 :             current_time: Mutex::new(Utc::now()),
    1081            1 :         });
    1082            1 :         let buffer = Arc::new(Mutex::new(Vec::new()));
    1083            1 :         let log_layer = JsonLoggingLayer {
    1084            1 :             clock: clock.clone(),
    1085            1 :             skipped_field_indices: papaya::HashMap::default(),
    1086            1 :             callsite_ids: papaya::HashMap::default(),
    1087            1 :             writer: buffer.clone(),
    1088            1 :             extract_fields: IndexSet::from_iter(["x"]),
    1089            1 :             _marker: PhantomData::<[&'static str; 1]>,
    1090            1 :         };
    1091            1 : 
    1092            1 :         let registry = tracing_subscriber::Registry::default().with(log_layer);
    1093            1 : 
    1094            1 :         tracing::subscriber::with_default(registry, || {
    1095            1 :             info_span!("some_span", x = 24).in_scope(|| {
    1096            1 :                 info_span!("some_span", x = 40, x = 41, x = 42).in_scope(|| {
    1097            1 :                     tracing::error!(
    1098              :                         a = 1,
    1099              :                         a = 2,
    1100              :                         a = 3,
    1101              :                         message = "explicit message field",
    1102            0 :                         "implicit message field"
    1103              :                     );
    1104            1 :                 });
    1105            1 :             });
    1106            1 :         });
    1107            1 : 
    1108            1 :         let buffer = Arc::try_unwrap(buffer)
    1109            1 :             .expect("no other reference")
    1110            1 :             .into_inner()
    1111            1 :             .expect("poisoned");
    1112            1 :         let actual: serde_json::Value = serde_json::from_slice(&buffer).expect("valid JSON");
    1113            1 :         let expected: serde_json::Value = serde_json::json!(
    1114            1 :             {
    1115            1 :                 "timestamp": clock.now().to_rfc3339_opts(chrono::SecondsFormat::Micros, true),
    1116            1 :                 "level": "ERROR",
    1117            1 :                 "message": "explicit message field",
    1118            1 :                 "fields": {
    1119            1 :                     "a": 3,
    1120            1 :                 },
    1121            1 :                 "spans": {
    1122            1 :                     "some_span#1":{
    1123            1 :                         "x": 24,
    1124            1 :                     },
    1125            1 :                     "some_span#2": {
    1126            1 :                         "x": 42,
    1127            1 :                     }
    1128            1 :                 },
    1129            1 :                 "extract": {
    1130            1 :                     "x": 42,
    1131            1 :                 },
    1132            1 :                 "src": actual.as_object().unwrap().get("src").unwrap().as_str().unwrap(),
    1133            1 :                 "target": "proxy::logging::tests",
    1134            1 :                 "process_id": actual.as_object().unwrap().get("process_id").unwrap().as_number().unwrap(),
    1135            1 :                 "thread_id": actual.as_object().unwrap().get("thread_id").unwrap().as_number().unwrap(),
    1136            1 :                 "thread_name": "logging::tests::test_field_collection",
    1137            1 :             }
    1138            1 :         );
    1139            1 : 
    1140            1 :         assert_json_eq!(actual, expected);
    1141            1 :     }
    1142              : }
        

Generated by: LCOV version 2.1-beta