Line data Source code
1 : use std::cell::{Cell, RefCell};
2 : use std::collections::HashMap;
3 : use std::hash::BuildHasher;
4 : use std::{env, io};
5 :
6 : use chrono::{DateTime, Utc};
7 : use opentelemetry::trace::TraceContextExt;
8 : use scopeguard::defer;
9 : use serde::ser::{SerializeMap, Serializer};
10 : use tracing::subscriber::Interest;
11 : use tracing::{callsite, span, Event, Metadata, Span, Subscriber};
12 : use tracing_opentelemetry::OpenTelemetrySpanExt;
13 : use tracing_subscriber::filter::{EnvFilter, LevelFilter};
14 : use tracing_subscriber::fmt::format::{Format, Full};
15 : use tracing_subscriber::fmt::time::SystemTime;
16 : use tracing_subscriber::fmt::{FormatEvent, FormatFields};
17 : use tracing_subscriber::layer::{Context, Layer};
18 : use tracing_subscriber::prelude::*;
19 : use tracing_subscriber::registry::{LookupSpan, SpanRef};
20 :
21 : /// Initialize logging and OpenTelemetry tracing and exporter.
22 : ///
23 : /// Logging can be configured using `RUST_LOG` environment variable.
24 : ///
25 : /// OpenTelemetry is configured with OTLP/HTTP exporter. It picks up
26 : /// configuration from environment variables. For example, to change the
27 : /// destination, set `OTEL_EXPORTER_OTLP_ENDPOINT=http://jaeger:4318`.
28 : /// See <https://opentelemetry.io/docs/reference/specification/sdk-environment-variables>
29 0 : pub async fn init() -> anyhow::Result<LoggingGuard> {
30 0 : let logfmt = LogFormat::from_env()?;
31 :
32 0 : let env_filter = EnvFilter::builder()
33 0 : .with_default_directive(LevelFilter::INFO.into())
34 0 : .from_env_lossy()
35 0 : .add_directive(
36 0 : "aws_config=info"
37 0 : .parse()
38 0 : .expect("this should be a valid filter directive"),
39 0 : )
40 0 : .add_directive(
41 0 : "azure_core::policies::transport=off"
42 0 : .parse()
43 0 : .expect("this should be a valid filter directive"),
44 0 : );
45 :
46 0 : let otlp_layer = tracing_utils::init_tracing("proxy").await;
47 :
48 0 : let json_log_layer = if logfmt == LogFormat::Json {
49 0 : Some(JsonLoggingLayer {
50 0 : clock: RealClock,
51 0 : skipped_field_indices: papaya::HashMap::default(),
52 0 : writer: StderrWriter {
53 0 : stderr: std::io::stderr(),
54 0 : },
55 0 : })
56 : } else {
57 0 : None
58 : };
59 :
60 0 : let text_log_layer = if logfmt == LogFormat::Text {
61 0 : Some(
62 0 : tracing_subscriber::fmt::layer()
63 0 : .with_ansi(false)
64 0 : .with_writer(std::io::stderr)
65 0 : .with_target(false),
66 0 : )
67 : } else {
68 0 : None
69 : };
70 :
71 0 : tracing_subscriber::registry()
72 0 : .with(env_filter)
73 0 : .with(otlp_layer)
74 0 : .with(json_log_layer)
75 0 : .with(text_log_layer)
76 0 : .try_init()?;
77 :
78 0 : Ok(LoggingGuard)
79 0 : }
80 :
81 : /// Initialize logging for local_proxy with log prefix and no opentelemetry.
82 : ///
83 : /// Logging can be configured using `RUST_LOG` environment variable.
84 0 : pub fn init_local_proxy() -> anyhow::Result<LoggingGuard> {
85 0 : let env_filter = EnvFilter::builder()
86 0 : .with_default_directive(LevelFilter::INFO.into())
87 0 : .from_env_lossy();
88 0 :
89 0 : let fmt_layer = tracing_subscriber::fmt::layer()
90 0 : .with_ansi(false)
91 0 : .with_writer(std::io::stderr)
92 0 : .event_format(LocalProxyFormatter(Format::default().with_target(false)));
93 0 :
94 0 : tracing_subscriber::registry()
95 0 : .with(env_filter)
96 0 : .with(fmt_layer)
97 0 : .try_init()?;
98 :
99 0 : Ok(LoggingGuard)
100 0 : }
101 :
102 : pub struct LocalProxyFormatter(Format<Full, SystemTime>);
103 :
104 : impl<S, N> FormatEvent<S, N> for LocalProxyFormatter
105 : where
106 : S: Subscriber + for<'a> LookupSpan<'a>,
107 : N: for<'a> FormatFields<'a> + 'static,
108 : {
109 0 : fn format_event(
110 0 : &self,
111 0 : ctx: &tracing_subscriber::fmt::FmtContext<'_, S, N>,
112 0 : mut writer: tracing_subscriber::fmt::format::Writer<'_>,
113 0 : event: &tracing::Event<'_>,
114 0 : ) -> std::fmt::Result {
115 0 : writer.write_str("[local_proxy] ")?;
116 0 : self.0.format_event(ctx, writer, event)
117 0 : }
118 : }
119 :
120 : pub struct LoggingGuard;
121 :
122 : impl Drop for LoggingGuard {
123 0 : fn drop(&mut self) {
124 0 : // Shutdown trace pipeline gracefully, so that it has a chance to send any
125 0 : // pending traces before we exit.
126 0 : tracing::info!("shutting down the tracing machinery");
127 0 : tracing_utils::shutdown_tracing();
128 0 : }
129 : }
130 :
131 : // TODO: make JSON the default
132 : #[derive(Copy, Clone, PartialEq, Eq, Default, Debug)]
133 : enum LogFormat {
134 : #[default]
135 : Text = 1,
136 : Json,
137 : }
138 :
139 : impl LogFormat {
140 0 : fn from_env() -> anyhow::Result<Self> {
141 0 : let logfmt = env::var("LOGFMT");
142 0 : Ok(match logfmt.as_deref() {
143 0 : Err(_) => LogFormat::default(),
144 0 : Ok("text") => LogFormat::Text,
145 0 : Ok("json") => LogFormat::Json,
146 0 : Ok(logfmt) => anyhow::bail!("unknown log format: {logfmt}"),
147 : })
148 0 : }
149 : }
150 :
151 : trait MakeWriter {
152 : fn make_writer(&self) -> impl io::Write;
153 : }
154 :
155 : struct StderrWriter {
156 : stderr: io::Stderr,
157 : }
158 :
159 : impl MakeWriter for StderrWriter {
160 : #[inline]
161 0 : fn make_writer(&self) -> impl io::Write {
162 0 : self.stderr.lock()
163 0 : }
164 : }
165 :
166 : // TODO: move into separate module or even separate crate.
167 : trait Clock {
168 : fn now(&self) -> DateTime<Utc>;
169 : }
170 :
171 : struct RealClock;
172 :
173 : impl Clock for RealClock {
174 : #[inline]
175 0 : fn now(&self) -> DateTime<Utc> {
176 0 : Utc::now()
177 0 : }
178 : }
179 :
180 : /// Name of the field used by tracing crate to store the event message.
181 : const MESSAGE_FIELD: &str = "message";
182 :
183 : thread_local! {
184 : /// Protects against deadlocks and double panics during log writing.
185 : /// The current panic handler will use tracing to log panic information.
186 : static REENTRANCY_GUARD: Cell<bool> = const { Cell::new(false) };
187 : /// Thread-local instance with per-thread buffer for log writing.
188 : static EVENT_FORMATTER: RefCell<EventFormatter> = RefCell::new(EventFormatter::new());
189 : /// Cached OS thread ID.
190 : static THREAD_ID: u64 = gettid::gettid();
191 : }
192 :
193 : /// Implements tracing layer to handle events specific to logging.
194 : struct JsonLoggingLayer<C: Clock, W: MakeWriter> {
195 : clock: C,
196 : skipped_field_indices: papaya::HashMap<callsite::Identifier, SkippedFieldIndices>,
197 : writer: W,
198 : }
199 :
200 : impl<S, C: Clock + 'static, W: MakeWriter + 'static> Layer<S> for JsonLoggingLayer<C, W>
201 : where
202 : S: Subscriber + for<'a> LookupSpan<'a>,
203 : {
204 1 : fn on_event(&self, event: &Event<'_>, ctx: Context<'_, S>) {
205 : use std::io::Write;
206 :
207 : // TODO: consider special tracing subscriber to grab timestamp very
208 : // early, before OTel machinery, and add as event extension.
209 1 : let now = self.clock.now();
210 1 :
211 1 : let res: io::Result<()> = REENTRANCY_GUARD.with(move |entered| {
212 1 : if entered.get() {
213 0 : let mut formatter = EventFormatter::new();
214 0 : formatter.format(now, event, &ctx, &self.skipped_field_indices)?;
215 0 : self.writer.make_writer().write_all(formatter.buffer())
216 : } else {
217 1 : entered.set(true);
218 1 : defer!(entered.set(false););
219 1 :
220 1 : EVENT_FORMATTER.with_borrow_mut(move |formatter| {
221 1 : formatter.reset();
222 1 : formatter.format(now, event, &ctx, &self.skipped_field_indices)?;
223 1 : self.writer.make_writer().write_all(formatter.buffer())
224 1 : })
225 : }
226 1 : });
227 :
228 : // In case logging fails we generate a simpler JSON object.
229 1 : if let Err(err) = res {
230 0 : if let Ok(mut line) = serde_json::to_vec(&serde_json::json!( {
231 0 : "timestamp": now.to_rfc3339_opts(chrono::SecondsFormat::Micros, true),
232 0 : "level": "ERROR",
233 0 : "message": format_args!("cannot log event: {err:?}"),
234 0 : "fields": {
235 0 : "event": format_args!("{event:?}"),
236 0 : },
237 0 : })) {
238 0 : line.push(b'\n');
239 0 : self.writer.make_writer().write_all(&line).ok();
240 0 : }
241 1 : }
242 1 : }
243 :
244 : /// Registers a SpanFields instance as span extension.
245 2 : fn on_new_span(&self, attrs: &span::Attributes<'_>, id: &span::Id, ctx: Context<'_, S>) {
246 2 : let span = ctx.span(id).expect("span must exist");
247 2 : let fields = SpanFields::default();
248 2 : fields.record_fields(attrs);
249 2 : // This could deadlock when there's a panic somewhere in the tracing
250 2 : // event handling and a read or write guard is still held. This includes
251 2 : // the OTel subscriber.
252 2 : span.extensions_mut().insert(fields);
253 2 : }
254 :
255 0 : fn on_record(&self, id: &span::Id, values: &span::Record<'_>, ctx: Context<'_, S>) {
256 0 : let span = ctx.span(id).expect("span must exist");
257 0 : let ext = span.extensions();
258 0 : if let Some(data) = ext.get::<SpanFields>() {
259 0 : data.record_fields(values);
260 0 : }
261 0 : }
262 :
263 : /// Called (lazily) whenever a new log call is executed. We quickly check
264 : /// for duplicate field names and record duplicates as skippable. Last one
265 : /// wins.
266 3 : fn register_callsite(&self, metadata: &'static Metadata<'static>) -> Interest {
267 3 : if !metadata.is_event() {
268 : // Must not be never because we wouldn't get trace and span data.
269 2 : return Interest::always();
270 1 : }
271 1 :
272 1 : let mut field_indices = SkippedFieldIndices::default();
273 1 : let mut seen_fields = HashMap::<&'static str, usize>::new();
274 5 : for field in metadata.fields() {
275 : use std::collections::hash_map::Entry;
276 5 : match seen_fields.entry(field.name()) {
277 2 : Entry::Vacant(entry) => {
278 2 : // field not seen yet
279 2 : entry.insert(field.index());
280 2 : }
281 3 : Entry::Occupied(mut entry) => {
282 3 : // replace currently stored index
283 3 : let old_index = entry.insert(field.index());
284 3 : // ... and append it to list of skippable indices
285 3 : field_indices.push(old_index);
286 3 : }
287 : }
288 : }
289 :
290 1 : if !field_indices.is_empty() {
291 1 : self.skipped_field_indices
292 1 : .pin()
293 1 : .insert(metadata.callsite(), field_indices);
294 1 : }
295 :
296 1 : Interest::always()
297 3 : }
298 : }
299 :
300 : /// Stores span field values recorded during the spans lifetime.
301 : #[derive(Default)]
302 : struct SpanFields {
303 : // TODO: Switch to custom enum with lasso::Spur for Strings?
304 : fields: papaya::HashMap<&'static str, serde_json::Value>,
305 : }
306 :
307 : impl SpanFields {
308 : #[inline]
309 2 : fn record_fields<R: tracing_subscriber::field::RecordFields>(&self, fields: R) {
310 2 : fields.record(&mut SpanFieldsRecorder {
311 2 : fields: self.fields.pin(),
312 2 : });
313 2 : }
314 : }
315 :
316 : /// Implements a tracing field visitor to convert and store values.
317 : struct SpanFieldsRecorder<'m, S, G> {
318 : fields: papaya::HashMapRef<'m, &'static str, serde_json::Value, S, G>,
319 : }
320 :
321 : impl<S: BuildHasher, G: papaya::Guard> tracing::field::Visit for SpanFieldsRecorder<'_, S, G> {
322 : #[inline]
323 0 : fn record_f64(&mut self, field: &tracing::field::Field, value: f64) {
324 0 : self.fields
325 0 : .insert(field.name(), serde_json::Value::from(value));
326 0 : }
327 :
328 : #[inline]
329 3 : fn record_i64(&mut self, field: &tracing::field::Field, value: i64) {
330 3 : self.fields
331 3 : .insert(field.name(), serde_json::Value::from(value));
332 3 : }
333 :
334 : #[inline]
335 0 : fn record_u64(&mut self, field: &tracing::field::Field, value: u64) {
336 0 : self.fields
337 0 : .insert(field.name(), serde_json::Value::from(value));
338 0 : }
339 :
340 : #[inline]
341 0 : fn record_i128(&mut self, field: &tracing::field::Field, value: i128) {
342 0 : if let Ok(value) = i64::try_from(value) {
343 0 : self.fields
344 0 : .insert(field.name(), serde_json::Value::from(value));
345 0 : } else {
346 0 : self.fields
347 0 : .insert(field.name(), serde_json::Value::from(format!("{value}")));
348 0 : }
349 0 : }
350 :
351 : #[inline]
352 0 : fn record_u128(&mut self, field: &tracing::field::Field, value: u128) {
353 0 : if let Ok(value) = u64::try_from(value) {
354 0 : self.fields
355 0 : .insert(field.name(), serde_json::Value::from(value));
356 0 : } else {
357 0 : self.fields
358 0 : .insert(field.name(), serde_json::Value::from(format!("{value}")));
359 0 : }
360 0 : }
361 :
362 : #[inline]
363 0 : fn record_bool(&mut self, field: &tracing::field::Field, value: bool) {
364 0 : self.fields
365 0 : .insert(field.name(), serde_json::Value::from(value));
366 0 : }
367 :
368 : #[inline]
369 0 : fn record_bytes(&mut self, field: &tracing::field::Field, value: &[u8]) {
370 0 : self.fields
371 0 : .insert(field.name(), serde_json::Value::from(value));
372 0 : }
373 :
374 : #[inline]
375 0 : fn record_str(&mut self, field: &tracing::field::Field, value: &str) {
376 0 : self.fields
377 0 : .insert(field.name(), serde_json::Value::from(value));
378 0 : }
379 :
380 : #[inline]
381 0 : fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) {
382 0 : self.fields
383 0 : .insert(field.name(), serde_json::Value::from(format!("{value:?}")));
384 0 : }
385 :
386 : #[inline]
387 0 : fn record_error(
388 0 : &mut self,
389 0 : field: &tracing::field::Field,
390 0 : value: &(dyn std::error::Error + 'static),
391 0 : ) {
392 0 : self.fields
393 0 : .insert(field.name(), serde_json::Value::from(format!("{value}")));
394 0 : }
395 : }
396 :
397 : /// List of field indices skipped during logging. Can list duplicate fields or
398 : /// metafields not meant to be logged.
399 : #[derive(Clone, Default)]
400 : struct SkippedFieldIndices {
401 : bits: u64,
402 : }
403 :
404 : impl SkippedFieldIndices {
405 : #[inline]
406 1 : fn is_empty(&self) -> bool {
407 1 : self.bits == 0
408 1 : }
409 :
410 : #[inline]
411 3 : fn push(&mut self, index: usize) {
412 3 : self.bits |= 1u64
413 3 : .checked_shl(index as u32)
414 3 : .expect("field index too large");
415 3 : }
416 :
417 : #[inline]
418 10 : fn contains(&self, index: usize) -> bool {
419 10 : self.bits
420 10 : & 1u64
421 10 : .checked_shl(index as u32)
422 10 : .expect("field index too large")
423 10 : != 0
424 10 : }
425 : }
426 :
427 : /// Formats a tracing event and writes JSON to its internal buffer including a newline.
428 : // TODO: buffer capacity management, truncate if too large
429 : struct EventFormatter {
430 : logline_buffer: Vec<u8>,
431 : }
432 :
433 : impl EventFormatter {
434 : #[inline]
435 1 : fn new() -> Self {
436 1 : EventFormatter {
437 1 : logline_buffer: Vec::new(),
438 1 : }
439 1 : }
440 :
441 : #[inline]
442 1 : fn buffer(&self) -> &[u8] {
443 1 : &self.logline_buffer
444 1 : }
445 :
446 : #[inline]
447 1 : fn reset(&mut self) {
448 1 : self.logline_buffer.clear();
449 1 : }
450 :
451 1 : fn format<S>(
452 1 : &mut self,
453 1 : now: DateTime<Utc>,
454 1 : event: &Event<'_>,
455 1 : ctx: &Context<'_, S>,
456 1 : skipped_field_indices: &papaya::HashMap<callsite::Identifier, SkippedFieldIndices>,
457 1 : ) -> io::Result<()>
458 1 : where
459 1 : S: Subscriber + for<'a> LookupSpan<'a>,
460 1 : {
461 1 : let timestamp = now.to_rfc3339_opts(chrono::SecondsFormat::Micros, true);
462 :
463 : use tracing_log::NormalizeEvent;
464 1 : let normalized_meta = event.normalized_metadata();
465 1 : let meta = normalized_meta.as_ref().unwrap_or_else(|| event.metadata());
466 1 :
467 1 : let skipped_field_indices = skipped_field_indices.pin();
468 1 : let skipped_field_indices = skipped_field_indices.get(&meta.callsite());
469 1 :
470 1 : let mut serialize = || {
471 1 : let mut serializer = serde_json::Serializer::new(&mut self.logline_buffer);
472 :
473 1 : let mut serializer = serializer.serialize_map(None)?;
474 :
475 : // Timestamp comes first, so raw lines can be sorted by timestamp.
476 1 : serializer.serialize_entry("timestamp", ×tamp)?;
477 :
478 : // Level next.
479 1 : serializer.serialize_entry("level", &meta.level().as_str())?;
480 :
481 : // Message next.
482 1 : serializer.serialize_key("message")?;
483 1 : let mut message_extractor =
484 1 : MessageFieldExtractor::new(serializer, skipped_field_indices);
485 1 : event.record(&mut message_extractor);
486 1 : let mut serializer = message_extractor.into_serializer()?;
487 :
488 1 : let mut fields_present = FieldsPresent(false, skipped_field_indices);
489 1 : event.record(&mut fields_present);
490 1 : if fields_present.0 {
491 1 : serializer.serialize_entry(
492 1 : "fields",
493 1 : &SerializableEventFields(event, skipped_field_indices),
494 1 : )?;
495 0 : }
496 :
497 1 : let pid = std::process::id();
498 1 : if pid != 1 {
499 1 : serializer.serialize_entry("process_id", &pid)?;
500 0 : }
501 :
502 1 : THREAD_ID.with(|tid| serializer.serialize_entry("thread_id", tid))?;
503 :
504 : // TODO: tls cache? name could change
505 1 : if let Some(thread_name) = std::thread::current().name() {
506 1 : if !thread_name.is_empty() && thread_name != "tokio-runtime-worker" {
507 1 : serializer.serialize_entry("thread_name", thread_name)?;
508 0 : }
509 0 : }
510 :
511 1 : if let Some(task_id) = tokio::task::try_id() {
512 0 : serializer.serialize_entry("task_id", &format_args!("{task_id}"))?;
513 1 : }
514 :
515 1 : serializer.serialize_entry("target", meta.target())?;
516 :
517 1 : if let Some(module) = meta.module_path() {
518 1 : if module != meta.target() {
519 0 : serializer.serialize_entry("module", module)?;
520 1 : }
521 0 : }
522 :
523 1 : if let Some(file) = meta.file() {
524 1 : if let Some(line) = meta.line() {
525 1 : serializer.serialize_entry("src", &format_args!("{file}:{line}"))?;
526 : } else {
527 0 : serializer.serialize_entry("src", file)?;
528 : }
529 0 : }
530 :
531 : {
532 1 : let otel_context = Span::current().context();
533 1 : let otel_spanref = otel_context.span();
534 1 : let span_context = otel_spanref.span_context();
535 1 : if span_context.is_valid() {
536 0 : serializer.serialize_entry(
537 0 : "trace_id",
538 0 : &format_args!("{}", span_context.trace_id()),
539 0 : )?;
540 1 : }
541 : }
542 :
543 1 : serializer.serialize_entry("spans", &SerializableSpanStack(ctx))?;
544 :
545 1 : serializer.end()
546 1 : };
547 :
548 1 : serialize().map_err(io::Error::other)?;
549 1 : self.logline_buffer.push(b'\n');
550 1 : Ok(())
551 1 : }
552 : }
553 :
554 : /// Extracts the message field that's mixed will other fields.
555 : struct MessageFieldExtractor<'a, S: serde::ser::SerializeMap> {
556 : serializer: S,
557 : skipped_field_indices: Option<&'a SkippedFieldIndices>,
558 : state: Option<Result<(), S::Error>>,
559 : }
560 :
561 : impl<'a, S: serde::ser::SerializeMap> MessageFieldExtractor<'a, S> {
562 : #[inline]
563 1 : fn new(serializer: S, skipped_field_indices: Option<&'a SkippedFieldIndices>) -> Self {
564 1 : Self {
565 1 : serializer,
566 1 : skipped_field_indices,
567 1 : state: None,
568 1 : }
569 1 : }
570 :
571 : #[inline]
572 1 : fn into_serializer(mut self) -> Result<S, S::Error> {
573 1 : match self.state {
574 1 : Some(Ok(())) => {}
575 0 : Some(Err(err)) => return Err(err),
576 0 : None => self.serializer.serialize_value("")?,
577 : }
578 1 : Ok(self.serializer)
579 1 : }
580 :
581 : #[inline]
582 5 : fn accept_field(&self, field: &tracing::field::Field) -> bool {
583 5 : self.state.is_none()
584 5 : && field.name() == MESSAGE_FIELD
585 2 : && !self
586 2 : .skipped_field_indices
587 2 : .is_some_and(|i| i.contains(field.index()))
588 5 : }
589 : }
590 :
591 : impl<S: serde::ser::SerializeMap> tracing::field::Visit for MessageFieldExtractor<'_, S> {
592 : #[inline]
593 0 : fn record_f64(&mut self, field: &tracing::field::Field, value: f64) {
594 0 : if self.accept_field(field) {
595 0 : self.state = Some(self.serializer.serialize_value(&value));
596 0 : }
597 0 : }
598 :
599 : #[inline]
600 3 : fn record_i64(&mut self, field: &tracing::field::Field, value: i64) {
601 3 : if self.accept_field(field) {
602 0 : self.state = Some(self.serializer.serialize_value(&value));
603 3 : }
604 3 : }
605 :
606 : #[inline]
607 0 : fn record_u64(&mut self, field: &tracing::field::Field, value: u64) {
608 0 : if self.accept_field(field) {
609 0 : self.state = Some(self.serializer.serialize_value(&value));
610 0 : }
611 0 : }
612 :
613 : #[inline]
614 0 : fn record_i128(&mut self, field: &tracing::field::Field, value: i128) {
615 0 : if self.accept_field(field) {
616 0 : self.state = Some(self.serializer.serialize_value(&value));
617 0 : }
618 0 : }
619 :
620 : #[inline]
621 0 : fn record_u128(&mut self, field: &tracing::field::Field, value: u128) {
622 0 : if self.accept_field(field) {
623 0 : self.state = Some(self.serializer.serialize_value(&value));
624 0 : }
625 0 : }
626 :
627 : #[inline]
628 0 : fn record_bool(&mut self, field: &tracing::field::Field, value: bool) {
629 0 : if self.accept_field(field) {
630 0 : self.state = Some(self.serializer.serialize_value(&value));
631 0 : }
632 0 : }
633 :
634 : #[inline]
635 0 : fn record_bytes(&mut self, field: &tracing::field::Field, value: &[u8]) {
636 0 : if self.accept_field(field) {
637 0 : self.state = Some(self.serializer.serialize_value(&format_args!("{value:x?}")));
638 0 : }
639 0 : }
640 :
641 : #[inline]
642 1 : fn record_str(&mut self, field: &tracing::field::Field, value: &str) {
643 1 : if self.accept_field(field) {
644 1 : self.state = Some(self.serializer.serialize_value(&value));
645 1 : }
646 1 : }
647 :
648 : #[inline]
649 1 : fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) {
650 1 : if self.accept_field(field) {
651 0 : self.state = Some(self.serializer.serialize_value(&format_args!("{value:?}")));
652 1 : }
653 1 : }
654 :
655 : #[inline]
656 0 : fn record_error(
657 0 : &mut self,
658 0 : field: &tracing::field::Field,
659 0 : value: &(dyn std::error::Error + 'static),
660 0 : ) {
661 0 : if self.accept_field(field) {
662 0 : self.state = Some(self.serializer.serialize_value(&format_args!("{value}")));
663 0 : }
664 0 : }
665 : }
666 :
667 : /// Checks if there's any fields and field values present. If not, the JSON subobject
668 : /// can be skipped.
669 : // This is entirely optional and only cosmetic, though maybe helps a
670 : // bit during log parsing in dashboards when there's no field with empty object.
671 : struct FieldsPresent<'a>(pub bool, Option<&'a SkippedFieldIndices>);
672 :
673 : // Even though some methods have an overhead (error, bytes) it is assumed the
674 : // compiler won't include this since we ignore the value entirely.
675 : impl tracing::field::Visit for FieldsPresent<'_> {
676 : #[inline]
677 5 : fn record_debug(&mut self, field: &tracing::field::Field, _: &dyn std::fmt::Debug) {
678 5 : if !self.1.is_some_and(|i| i.contains(field.index()))
679 2 : && field.name() != MESSAGE_FIELD
680 1 : && !field.name().starts_with("log.")
681 1 : {
682 1 : self.0 |= true;
683 4 : }
684 5 : }
685 : }
686 :
687 : /// Serializes the fields directly supplied with a log event.
688 : struct SerializableEventFields<'a, 'event>(
689 : &'a tracing::Event<'event>,
690 : Option<&'a SkippedFieldIndices>,
691 : );
692 :
693 : impl serde::ser::Serialize for SerializableEventFields<'_, '_> {
694 1 : fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
695 1 : where
696 1 : S: Serializer,
697 1 : {
698 : use serde::ser::SerializeMap;
699 1 : let serializer = serializer.serialize_map(None)?;
700 1 : let mut message_skipper = MessageFieldSkipper::new(serializer, self.1);
701 1 : self.0.record(&mut message_skipper);
702 1 : let serializer = message_skipper.into_serializer()?;
703 1 : serializer.end()
704 1 : }
705 : }
706 :
707 : /// A tracing field visitor that skips the message field.
708 : struct MessageFieldSkipper<'a, S: serde::ser::SerializeMap> {
709 : serializer: S,
710 : skipped_field_indices: Option<&'a SkippedFieldIndices>,
711 : state: Result<(), S::Error>,
712 : }
713 :
714 : impl<'a, S: serde::ser::SerializeMap> MessageFieldSkipper<'a, S> {
715 : #[inline]
716 1 : fn new(serializer: S, skipped_field_indices: Option<&'a SkippedFieldIndices>) -> Self {
717 1 : Self {
718 1 : serializer,
719 1 : skipped_field_indices,
720 1 : state: Ok(()),
721 1 : }
722 1 : }
723 :
724 : #[inline]
725 5 : fn accept_field(&self, field: &tracing::field::Field) -> bool {
726 5 : self.state.is_ok()
727 5 : && field.name() != MESSAGE_FIELD
728 3 : && !field.name().starts_with("log.")
729 3 : && !self
730 3 : .skipped_field_indices
731 3 : .is_some_and(|i| i.contains(field.index()))
732 5 : }
733 :
734 : #[inline]
735 1 : fn into_serializer(self) -> Result<S, S::Error> {
736 1 : self.state?;
737 1 : Ok(self.serializer)
738 1 : }
739 : }
740 :
741 : impl<S: serde::ser::SerializeMap> tracing::field::Visit for MessageFieldSkipper<'_, S> {
742 : #[inline]
743 0 : fn record_f64(&mut self, field: &tracing::field::Field, value: f64) {
744 0 : if self.accept_field(field) {
745 0 : self.state = self.serializer.serialize_entry(field.name(), &value);
746 0 : }
747 0 : }
748 :
749 : #[inline]
750 3 : fn record_i64(&mut self, field: &tracing::field::Field, value: i64) {
751 3 : if self.accept_field(field) {
752 1 : self.state = self.serializer.serialize_entry(field.name(), &value);
753 2 : }
754 3 : }
755 :
756 : #[inline]
757 0 : fn record_u64(&mut self, field: &tracing::field::Field, value: u64) {
758 0 : if self.accept_field(field) {
759 0 : self.state = self.serializer.serialize_entry(field.name(), &value);
760 0 : }
761 0 : }
762 :
763 : #[inline]
764 0 : fn record_i128(&mut self, field: &tracing::field::Field, value: i128) {
765 0 : if self.accept_field(field) {
766 0 : self.state = self.serializer.serialize_entry(field.name(), &value);
767 0 : }
768 0 : }
769 :
770 : #[inline]
771 0 : fn record_u128(&mut self, field: &tracing::field::Field, value: u128) {
772 0 : if self.accept_field(field) {
773 0 : self.state = self.serializer.serialize_entry(field.name(), &value);
774 0 : }
775 0 : }
776 :
777 : #[inline]
778 0 : fn record_bool(&mut self, field: &tracing::field::Field, value: bool) {
779 0 : if self.accept_field(field) {
780 0 : self.state = self.serializer.serialize_entry(field.name(), &value);
781 0 : }
782 0 : }
783 :
784 : #[inline]
785 0 : fn record_bytes(&mut self, field: &tracing::field::Field, value: &[u8]) {
786 0 : if self.accept_field(field) {
787 0 : self.state = self
788 0 : .serializer
789 0 : .serialize_entry(field.name(), &format_args!("{value:x?}"));
790 0 : }
791 0 : }
792 :
793 : #[inline]
794 1 : fn record_str(&mut self, field: &tracing::field::Field, value: &str) {
795 1 : if self.accept_field(field) {
796 0 : self.state = self.serializer.serialize_entry(field.name(), &value);
797 1 : }
798 1 : }
799 :
800 : #[inline]
801 1 : fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) {
802 1 : if self.accept_field(field) {
803 0 : self.state = self
804 0 : .serializer
805 0 : .serialize_entry(field.name(), &format_args!("{value:?}"));
806 1 : }
807 1 : }
808 :
809 : #[inline]
810 0 : fn record_error(
811 0 : &mut self,
812 0 : field: &tracing::field::Field,
813 0 : value: &(dyn std::error::Error + 'static),
814 0 : ) {
815 0 : if self.accept_field(field) {
816 0 : self.state = self.serializer.serialize_value(&format_args!("{value}"));
817 0 : }
818 0 : }
819 : }
820 :
821 : /// Serializes the span stack from root to leaf (parent of event) enumerated
822 : /// inside an object where the keys are just the number padded with zeroes
823 : /// to retain sorting order.
824 : // The object is necessary because Loki cannot flatten arrays.
825 : struct SerializableSpanStack<'a, 'b, Span>(&'b Context<'a, Span>)
826 : where
827 : Span: Subscriber + for<'lookup> LookupSpan<'lookup>;
828 :
829 : impl<Span> serde::ser::Serialize for SerializableSpanStack<'_, '_, Span>
830 : where
831 : Span: Subscriber + for<'lookup> LookupSpan<'lookup>,
832 : {
833 1 : fn serialize<Ser>(&self, serializer: Ser) -> Result<Ser::Ok, Ser::Error>
834 1 : where
835 1 : Ser: serde::ser::Serializer,
836 1 : {
837 1 : let mut serializer = serializer.serialize_map(None)?;
838 :
839 1 : if let Some(leaf_span) = self.0.lookup_current() {
840 2 : for (i, span) in leaf_span.scope().from_root().enumerate() {
841 2 : serializer.serialize_entry(&format_args!("{i:02}"), &SerializableSpan(&span))?;
842 : }
843 0 : }
844 :
845 1 : serializer.end()
846 1 : }
847 : }
848 :
849 : /// Serializes a single span. Include the span ID, name and its fields as
850 : /// recorded up to this point.
851 : struct SerializableSpan<'a, 'b, Span>(&'b SpanRef<'a, Span>)
852 : where
853 : Span: for<'lookup> LookupSpan<'lookup>;
854 :
855 : impl<Span> serde::ser::Serialize for SerializableSpan<'_, '_, Span>
856 : where
857 : Span: for<'lookup> LookupSpan<'lookup>,
858 : {
859 2 : fn serialize<Ser>(&self, serializer: Ser) -> Result<Ser::Ok, Ser::Error>
860 2 : where
861 2 : Ser: serde::ser::Serializer,
862 2 : {
863 2 : let mut serializer = serializer.serialize_map(None)?;
864 : // TODO: the span ID is probably only useful for debugging tracing.
865 2 : serializer.serialize_entry("span_id", &format_args!("{:016x}", self.0.id().into_u64()))?;
866 2 : serializer.serialize_entry("span_name", self.0.metadata().name())?;
867 :
868 2 : let ext = self.0.extensions();
869 2 : if let Some(data) = ext.get::<SpanFields>() {
870 2 : for (key, value) in &data.fields.pin() {
871 1 : serializer.serialize_entry(key, value)?;
872 : }
873 0 : }
874 :
875 2 : serializer.end()
876 2 : }
877 : }
878 :
879 : #[cfg(test)]
880 : #[allow(clippy::unwrap_used)]
881 : mod tests {
882 : use std::sync::{Arc, Mutex, MutexGuard};
883 :
884 : use assert_json_diff::assert_json_eq;
885 : use tracing::info_span;
886 :
887 : use super::*;
888 :
889 : struct TestClock {
890 : current_time: Mutex<DateTime<Utc>>,
891 : }
892 :
893 : impl Clock for Arc<TestClock> {
894 2 : fn now(&self) -> DateTime<Utc> {
895 2 : *self.current_time.lock().expect("poisoned")
896 2 : }
897 : }
898 :
899 : struct VecWriter<'a> {
900 : buffer: MutexGuard<'a, Vec<u8>>,
901 : }
902 :
903 : impl MakeWriter for Arc<Mutex<Vec<u8>>> {
904 1 : fn make_writer(&self) -> impl io::Write {
905 1 : VecWriter {
906 1 : buffer: self.lock().expect("poisoned"),
907 1 : }
908 1 : }
909 : }
910 :
911 : impl io::Write for VecWriter<'_> {
912 1 : fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
913 1 : self.buffer.write(buf)
914 1 : }
915 :
916 0 : fn flush(&mut self) -> io::Result<()> {
917 0 : Ok(())
918 0 : }
919 : }
920 :
921 : #[test]
922 1 : fn test_field_collection() {
923 1 : let clock = Arc::new(TestClock {
924 1 : current_time: Mutex::new(Utc::now()),
925 1 : });
926 1 : let buffer = Arc::new(Mutex::new(Vec::new()));
927 1 : let log_layer = JsonLoggingLayer {
928 1 : clock: clock.clone(),
929 1 : skipped_field_indices: papaya::HashMap::default(),
930 1 : writer: buffer.clone(),
931 1 : };
932 1 :
933 1 : let registry = tracing_subscriber::Registry::default().with(log_layer);
934 1 :
935 1 : tracing::subscriber::with_default(registry, || {
936 1 : info_span!("span1", x = 40, x = 41, x = 42).in_scope(|| {
937 1 : info_span!("span2").in_scope(|| {
938 1 : tracing::error!(
939 : a = 1,
940 : a = 2,
941 : a = 3,
942 : message = "explicit message field",
943 0 : "implicit message field"
944 : );
945 1 : });
946 1 : });
947 1 : });
948 1 :
949 1 : let buffer = Arc::try_unwrap(buffer)
950 1 : .expect("no other reference")
951 1 : .into_inner()
952 1 : .expect("poisoned");
953 1 : let actual: serde_json::Value = serde_json::from_slice(&buffer).expect("valid JSON");
954 1 : let expected: serde_json::Value = serde_json::json!(
955 1 : {
956 1 : "timestamp": clock.now().to_rfc3339_opts(chrono::SecondsFormat::Micros, true),
957 1 : "level": "ERROR",
958 1 : "message": "explicit message field",
959 1 : "fields": {
960 1 : "a": 3,
961 1 : },
962 1 : "spans": {
963 1 : "00":{
964 1 : "span_id": "0000000000000001",
965 1 : "span_name": "span1",
966 1 : "x": 42,
967 1 : },
968 1 : "01": {
969 1 : "span_id": "0000000000000002",
970 1 : "span_name": "span2",
971 1 : }
972 1 : },
973 1 : "src": actual.as_object().unwrap().get("src").unwrap().as_str().unwrap(),
974 1 : "target": "proxy::logging::tests",
975 1 : "process_id": actual.as_object().unwrap().get("process_id").unwrap().as_number().unwrap(),
976 1 : "thread_id": actual.as_object().unwrap().get("thread_id").unwrap().as_number().unwrap(),
977 1 : "thread_name": "logging::tests::test_field_collection",
978 1 : }
979 1 : );
980 1 :
981 1 : assert_json_eq!(actual, expected);
982 1 : }
983 : }
|