Line data Source code
1 : use std::cell::{Cell, RefCell};
2 : use std::collections::HashMap;
3 : use std::hash::BuildHasher;
4 : use std::{env, io};
5 :
6 : use chrono::{DateTime, Utc};
7 : use opentelemetry::trace::TraceContextExt;
8 : use scopeguard::defer;
9 : use serde::ser::{SerializeMap, Serializer};
10 : use tracing::span;
11 : use tracing::subscriber::Interest;
12 : use tracing::{callsite, Event, Metadata, Span, Subscriber};
13 : use tracing_opentelemetry::OpenTelemetrySpanExt;
14 : use tracing_subscriber::filter::{EnvFilter, LevelFilter};
15 : use tracing_subscriber::fmt::format::{Format, Full};
16 : use tracing_subscriber::fmt::time::SystemTime;
17 : use tracing_subscriber::fmt::{FormatEvent, FormatFields};
18 : use tracing_subscriber::layer::{Context, Layer};
19 : use tracing_subscriber::prelude::*;
20 : use tracing_subscriber::registry::{LookupSpan, SpanRef};
21 :
22 : /// Initialize logging and OpenTelemetry tracing and exporter.
23 : ///
24 : /// Logging can be configured using `RUST_LOG` environment variable.
25 : ///
26 : /// OpenTelemetry is configured with OTLP/HTTP exporter. It picks up
27 : /// configuration from environment variables. For example, to change the
28 : /// destination, set `OTEL_EXPORTER_OTLP_ENDPOINT=http://jaeger:4318`.
29 : /// See <https://opentelemetry.io/docs/reference/specification/sdk-environment-variables>
30 0 : pub async fn init() -> anyhow::Result<LoggingGuard> {
31 0 : let logfmt = LogFormat::from_env()?;
32 :
33 0 : let env_filter = EnvFilter::builder()
34 0 : .with_default_directive(LevelFilter::INFO.into())
35 0 : .from_env_lossy()
36 0 : .add_directive(
37 0 : "aws_config=info"
38 0 : .parse()
39 0 : .expect("this should be a valid filter directive"),
40 0 : )
41 0 : .add_directive(
42 0 : "azure_core::policies::transport=off"
43 0 : .parse()
44 0 : .expect("this should be a valid filter directive"),
45 0 : );
46 :
47 0 : let otlp_layer = tracing_utils::init_tracing("proxy").await;
48 :
49 0 : let json_log_layer = if logfmt == LogFormat::Json {
50 0 : Some(JsonLoggingLayer {
51 0 : clock: RealClock,
52 0 : skipped_field_indices: papaya::HashMap::default(),
53 0 : writer: StderrWriter {
54 0 : stderr: std::io::stderr(),
55 0 : },
56 0 : })
57 : } else {
58 0 : None
59 : };
60 :
61 0 : let text_log_layer = if logfmt == LogFormat::Text {
62 0 : Some(
63 0 : tracing_subscriber::fmt::layer()
64 0 : .with_ansi(false)
65 0 : .with_writer(std::io::stderr)
66 0 : .with_target(false),
67 0 : )
68 : } else {
69 0 : None
70 : };
71 :
72 0 : tracing_subscriber::registry()
73 0 : .with(env_filter)
74 0 : .with(otlp_layer)
75 0 : .with(json_log_layer)
76 0 : .with(text_log_layer)
77 0 : .try_init()?;
78 :
79 0 : Ok(LoggingGuard)
80 0 : }
81 :
82 : /// Initialize logging for local_proxy with log prefix and no opentelemetry.
83 : ///
84 : /// Logging can be configured using `RUST_LOG` environment variable.
85 0 : pub fn init_local_proxy() -> anyhow::Result<LoggingGuard> {
86 0 : let env_filter = EnvFilter::builder()
87 0 : .with_default_directive(LevelFilter::INFO.into())
88 0 : .from_env_lossy();
89 0 :
90 0 : let fmt_layer = tracing_subscriber::fmt::layer()
91 0 : .with_ansi(false)
92 0 : .with_writer(std::io::stderr)
93 0 : .event_format(LocalProxyFormatter(Format::default().with_target(false)));
94 0 :
95 0 : tracing_subscriber::registry()
96 0 : .with(env_filter)
97 0 : .with(fmt_layer)
98 0 : .try_init()?;
99 :
100 0 : Ok(LoggingGuard)
101 0 : }
102 :
103 : pub struct LocalProxyFormatter(Format<Full, SystemTime>);
104 :
105 : impl<S, N> FormatEvent<S, N> for LocalProxyFormatter
106 : where
107 : S: Subscriber + for<'a> LookupSpan<'a>,
108 : N: for<'a> FormatFields<'a> + 'static,
109 : {
110 0 : fn format_event(
111 0 : &self,
112 0 : ctx: &tracing_subscriber::fmt::FmtContext<'_, S, N>,
113 0 : mut writer: tracing_subscriber::fmt::format::Writer<'_>,
114 0 : event: &tracing::Event<'_>,
115 0 : ) -> std::fmt::Result {
116 0 : writer.write_str("[local_proxy] ")?;
117 0 : self.0.format_event(ctx, writer, event)
118 0 : }
119 : }
120 :
121 : pub struct LoggingGuard;
122 :
123 : impl Drop for LoggingGuard {
124 0 : fn drop(&mut self) {
125 0 : // Shutdown trace pipeline gracefully, so that it has a chance to send any
126 0 : // pending traces before we exit.
127 0 : tracing::info!("shutting down the tracing machinery");
128 0 : tracing_utils::shutdown_tracing();
129 0 : }
130 : }
131 :
132 : // TODO: make JSON the default
133 : #[derive(Copy, Clone, PartialEq, Eq, Default, Debug)]
134 : enum LogFormat {
135 : #[default]
136 : Text = 1,
137 : Json,
138 : }
139 :
140 : impl LogFormat {
141 0 : fn from_env() -> anyhow::Result<Self> {
142 0 : let logfmt = env::var("LOGFMT");
143 0 : Ok(match logfmt.as_deref() {
144 0 : Err(_) => LogFormat::default(),
145 0 : Ok("text") => LogFormat::Text,
146 0 : Ok("json") => LogFormat::Json,
147 0 : Ok(logfmt) => anyhow::bail!("unknown log format: {logfmt}"),
148 : })
149 0 : }
150 : }
151 :
152 : trait MakeWriter {
153 : fn make_writer(&self) -> impl io::Write;
154 : }
155 :
156 : struct StderrWriter {
157 : stderr: io::Stderr,
158 : }
159 :
160 : impl MakeWriter for StderrWriter {
161 : #[inline]
162 0 : fn make_writer(&self) -> impl io::Write {
163 0 : self.stderr.lock()
164 0 : }
165 : }
166 :
167 : // TODO: move into separate module or even separate crate.
168 : trait Clock {
169 : fn now(&self) -> DateTime<Utc>;
170 : }
171 :
172 : struct RealClock;
173 :
174 : impl Clock for RealClock {
175 : #[inline]
176 0 : fn now(&self) -> DateTime<Utc> {
177 0 : Utc::now()
178 0 : }
179 : }
180 :
181 : /// Name of the field used by tracing crate to store the event message.
182 : const MESSAGE_FIELD: &str = "message";
183 :
184 : thread_local! {
185 : /// Protects against deadlocks and double panics during log writing.
186 : /// The current panic handler will use tracing to log panic information.
187 : static REENTRANCY_GUARD: Cell<bool> = const { Cell::new(false) };
188 : /// Thread-local instance with per-thread buffer for log writing.
189 : static EVENT_FORMATTER: RefCell<EventFormatter> = RefCell::new(EventFormatter::new());
190 : /// Cached OS thread ID.
191 : static THREAD_ID: u64 = gettid::gettid();
192 : }
193 :
194 : /// Implements tracing layer to handle events specific to logging.
195 : struct JsonLoggingLayer<C: Clock, W: MakeWriter> {
196 : clock: C,
197 : skipped_field_indices: papaya::HashMap<callsite::Identifier, SkippedFieldIndices>,
198 : writer: W,
199 : }
200 :
201 : impl<S, C: Clock + 'static, W: MakeWriter + 'static> Layer<S> for JsonLoggingLayer<C, W>
202 : where
203 : S: Subscriber + for<'a> LookupSpan<'a>,
204 : {
205 1 : fn on_event(&self, event: &Event<'_>, ctx: Context<'_, S>) {
206 : use std::io::Write;
207 :
208 : // TODO: consider special tracing subscriber to grab timestamp very
209 : // early, before OTel machinery, and add as event extension.
210 1 : let now = self.clock.now();
211 1 :
212 1 : let res: io::Result<()> = REENTRANCY_GUARD.with(move |entered| {
213 1 : if entered.get() {
214 0 : let mut formatter = EventFormatter::new();
215 0 : formatter.format(now, event, &ctx, &self.skipped_field_indices)?;
216 0 : self.writer.make_writer().write_all(formatter.buffer())
217 : } else {
218 1 : entered.set(true);
219 1 : defer!(entered.set(false););
220 1 :
221 1 : EVENT_FORMATTER.with_borrow_mut(move |formatter| {
222 1 : formatter.reset();
223 1 : formatter.format(now, event, &ctx, &self.skipped_field_indices)?;
224 1 : self.writer.make_writer().write_all(formatter.buffer())
225 1 : })
226 : }
227 1 : });
228 :
229 : // In case logging fails we generate a simpler JSON object.
230 1 : if let Err(err) = res {
231 0 : if let Ok(mut line) = serde_json::to_vec(&serde_json::json!( {
232 0 : "timestamp": now.to_rfc3339_opts(chrono::SecondsFormat::Micros, true),
233 0 : "level": "ERROR",
234 0 : "message": format_args!("cannot log event: {err:?}"),
235 0 : "fields": {
236 0 : "event": format_args!("{event:?}"),
237 0 : },
238 0 : })) {
239 0 : line.push(b'\n');
240 0 : self.writer.make_writer().write_all(&line).ok();
241 0 : }
242 1 : }
243 1 : }
244 :
245 : /// Registers a SpanFields instance as span extension.
246 2 : fn on_new_span(&self, attrs: &span::Attributes<'_>, id: &span::Id, ctx: Context<'_, S>) {
247 2 : let span = ctx.span(id).expect("span must exist");
248 2 : let fields = SpanFields::default();
249 2 : fields.record_fields(attrs);
250 2 : // This could deadlock when there's a panic somewhere in the tracing
251 2 : // event handling and a read or write guard is still held. This includes
252 2 : // the OTel subscriber.
253 2 : span.extensions_mut().insert(fields);
254 2 : }
255 :
256 0 : fn on_record(&self, id: &span::Id, values: &span::Record<'_>, ctx: Context<'_, S>) {
257 0 : let span = ctx.span(id).expect("span must exist");
258 0 : let ext = span.extensions();
259 0 : if let Some(data) = ext.get::<SpanFields>() {
260 0 : data.record_fields(values);
261 0 : }
262 0 : }
263 :
264 : /// Called (lazily) whenever a new log call is executed. We quickly check
265 : /// for duplicate field names and record duplicates as skippable. Last one
266 : /// wins.
267 3 : fn register_callsite(&self, metadata: &'static Metadata<'static>) -> Interest {
268 3 : if !metadata.is_event() {
269 : // Must not be never because we wouldn't get trace and span data.
270 2 : return Interest::always();
271 1 : }
272 1 :
273 1 : let mut field_indices = SkippedFieldIndices::default();
274 1 : let mut seen_fields = HashMap::<&'static str, usize>::new();
275 5 : for field in metadata.fields() {
276 : use std::collections::hash_map::Entry;
277 5 : match seen_fields.entry(field.name()) {
278 2 : Entry::Vacant(entry) => {
279 2 : // field not seen yet
280 2 : entry.insert(field.index());
281 2 : }
282 3 : Entry::Occupied(mut entry) => {
283 3 : // replace currently stored index
284 3 : let old_index = entry.insert(field.index());
285 3 : // ... and append it to list of skippable indices
286 3 : field_indices.push(old_index);
287 3 : }
288 : }
289 : }
290 :
291 1 : if !field_indices.is_empty() {
292 1 : self.skipped_field_indices
293 1 : .pin()
294 1 : .insert(metadata.callsite(), field_indices);
295 1 : }
296 :
297 1 : Interest::always()
298 3 : }
299 : }
300 :
301 : /// Stores span field values recorded during the spans lifetime.
302 : #[derive(Default)]
303 : struct SpanFields {
304 : // TODO: Switch to custom enum with lasso::Spur for Strings?
305 : fields: papaya::HashMap<&'static str, serde_json::Value>,
306 : }
307 :
308 : impl SpanFields {
309 : #[inline]
310 2 : fn record_fields<R: tracing_subscriber::field::RecordFields>(&self, fields: R) {
311 2 : fields.record(&mut SpanFieldsRecorder {
312 2 : fields: self.fields.pin(),
313 2 : });
314 2 : }
315 : }
316 :
317 : /// Implements a tracing field visitor to convert and store values.
318 : struct SpanFieldsRecorder<'m, S, G> {
319 : fields: papaya::HashMapRef<'m, &'static str, serde_json::Value, S, G>,
320 : }
321 :
322 : impl<S: BuildHasher, G: papaya::Guard> tracing::field::Visit for SpanFieldsRecorder<'_, S, G> {
323 : #[inline]
324 0 : fn record_f64(&mut self, field: &tracing::field::Field, value: f64) {
325 0 : self.fields
326 0 : .insert(field.name(), serde_json::Value::from(value));
327 0 : }
328 :
329 : #[inline]
330 3 : fn record_i64(&mut self, field: &tracing::field::Field, value: i64) {
331 3 : self.fields
332 3 : .insert(field.name(), serde_json::Value::from(value));
333 3 : }
334 :
335 : #[inline]
336 0 : fn record_u64(&mut self, field: &tracing::field::Field, value: u64) {
337 0 : self.fields
338 0 : .insert(field.name(), serde_json::Value::from(value));
339 0 : }
340 :
341 : #[inline]
342 0 : fn record_i128(&mut self, field: &tracing::field::Field, value: i128) {
343 0 : if let Ok(value) = i64::try_from(value) {
344 0 : self.fields
345 0 : .insert(field.name(), serde_json::Value::from(value));
346 0 : } else {
347 0 : self.fields
348 0 : .insert(field.name(), serde_json::Value::from(format!("{value}")));
349 0 : }
350 0 : }
351 :
352 : #[inline]
353 0 : fn record_u128(&mut self, field: &tracing::field::Field, value: u128) {
354 0 : if let Ok(value) = u64::try_from(value) {
355 0 : self.fields
356 0 : .insert(field.name(), serde_json::Value::from(value));
357 0 : } else {
358 0 : self.fields
359 0 : .insert(field.name(), serde_json::Value::from(format!("{value}")));
360 0 : }
361 0 : }
362 :
363 : #[inline]
364 0 : fn record_bool(&mut self, field: &tracing::field::Field, value: bool) {
365 0 : self.fields
366 0 : .insert(field.name(), serde_json::Value::from(value));
367 0 : }
368 :
369 : #[inline]
370 0 : fn record_bytes(&mut self, field: &tracing::field::Field, value: &[u8]) {
371 0 : self.fields
372 0 : .insert(field.name(), serde_json::Value::from(value));
373 0 : }
374 :
375 : #[inline]
376 0 : fn record_str(&mut self, field: &tracing::field::Field, value: &str) {
377 0 : self.fields
378 0 : .insert(field.name(), serde_json::Value::from(value));
379 0 : }
380 :
381 : #[inline]
382 0 : fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) {
383 0 : self.fields
384 0 : .insert(field.name(), serde_json::Value::from(format!("{value:?}")));
385 0 : }
386 :
387 : #[inline]
388 0 : fn record_error(
389 0 : &mut self,
390 0 : field: &tracing::field::Field,
391 0 : value: &(dyn std::error::Error + 'static),
392 0 : ) {
393 0 : self.fields
394 0 : .insert(field.name(), serde_json::Value::from(format!("{value}")));
395 0 : }
396 : }
397 :
398 : /// List of field indices skipped during logging. Can list duplicate fields or
399 : /// metafields not meant to be logged.
400 : #[derive(Clone, Default)]
401 : struct SkippedFieldIndices {
402 : bits: u64,
403 : }
404 :
405 : impl SkippedFieldIndices {
406 : #[inline]
407 1 : fn is_empty(&self) -> bool {
408 1 : self.bits == 0
409 1 : }
410 :
411 : #[inline]
412 3 : fn push(&mut self, index: usize) {
413 3 : self.bits |= 1u64
414 3 : .checked_shl(index as u32)
415 3 : .expect("field index too large");
416 3 : }
417 :
418 : #[inline]
419 10 : fn contains(&self, index: usize) -> bool {
420 10 : self.bits
421 10 : & 1u64
422 10 : .checked_shl(index as u32)
423 10 : .expect("field index too large")
424 10 : != 0
425 10 : }
426 : }
427 :
428 : /// Formats a tracing event and writes JSON to its internal buffer including a newline.
429 : // TODO: buffer capacity management, truncate if too large
430 : struct EventFormatter {
431 : logline_buffer: Vec<u8>,
432 : }
433 :
434 : impl EventFormatter {
435 : #[inline]
436 1 : fn new() -> Self {
437 1 : EventFormatter {
438 1 : logline_buffer: Vec::new(),
439 1 : }
440 1 : }
441 :
442 : #[inline]
443 1 : fn buffer(&self) -> &[u8] {
444 1 : &self.logline_buffer
445 1 : }
446 :
447 : #[inline]
448 1 : fn reset(&mut self) {
449 1 : self.logline_buffer.clear();
450 1 : }
451 :
452 1 : fn format<S>(
453 1 : &mut self,
454 1 : now: DateTime<Utc>,
455 1 : event: &Event<'_>,
456 1 : ctx: &Context<'_, S>,
457 1 : skipped_field_indices: &papaya::HashMap<callsite::Identifier, SkippedFieldIndices>,
458 1 : ) -> io::Result<()>
459 1 : where
460 1 : S: Subscriber + for<'a> LookupSpan<'a>,
461 1 : {
462 1 : let timestamp = now.to_rfc3339_opts(chrono::SecondsFormat::Micros, true);
463 :
464 : use tracing_log::NormalizeEvent;
465 1 : let normalized_meta = event.normalized_metadata();
466 1 : let meta = normalized_meta.as_ref().unwrap_or_else(|| event.metadata());
467 1 :
468 1 : let skipped_field_indices = skipped_field_indices.pin();
469 1 : let skipped_field_indices = skipped_field_indices.get(&meta.callsite());
470 1 :
471 1 : let mut serialize = || {
472 1 : let mut serializer = serde_json::Serializer::new(&mut self.logline_buffer);
473 :
474 1 : let mut serializer = serializer.serialize_map(None)?;
475 :
476 : // Timestamp comes first, so raw lines can be sorted by timestamp.
477 1 : serializer.serialize_entry("timestamp", ×tamp)?;
478 :
479 : // Level next.
480 1 : serializer.serialize_entry("level", &meta.level().as_str())?;
481 :
482 : // Message next.
483 1 : serializer.serialize_key("message")?;
484 1 : let mut message_extractor =
485 1 : MessageFieldExtractor::new(serializer, skipped_field_indices);
486 1 : event.record(&mut message_extractor);
487 1 : let mut serializer = message_extractor.into_serializer()?;
488 :
489 1 : let mut fields_present = FieldsPresent(false, skipped_field_indices);
490 1 : event.record(&mut fields_present);
491 1 : if fields_present.0 {
492 1 : serializer.serialize_entry(
493 1 : "fields",
494 1 : &SerializableEventFields(event, skipped_field_indices),
495 1 : )?;
496 0 : }
497 :
498 1 : let pid = std::process::id();
499 1 : if pid != 1 {
500 1 : serializer.serialize_entry("process_id", &pid)?;
501 0 : }
502 :
503 1 : THREAD_ID.with(|tid| serializer.serialize_entry("thread_id", tid))?;
504 :
505 : // TODO: tls cache? name could change
506 1 : if let Some(thread_name) = std::thread::current().name() {
507 1 : if !thread_name.is_empty() && thread_name != "tokio-runtime-worker" {
508 1 : serializer.serialize_entry("thread_name", thread_name)?;
509 0 : }
510 0 : }
511 :
512 1 : if let Some(task_id) = tokio::task::try_id() {
513 0 : serializer.serialize_entry("task_id", &format_args!("{task_id}"))?;
514 1 : }
515 :
516 1 : serializer.serialize_entry("target", meta.target())?;
517 :
518 1 : if let Some(module) = meta.module_path() {
519 1 : if module != meta.target() {
520 0 : serializer.serialize_entry("module", module)?;
521 1 : }
522 0 : }
523 :
524 1 : if let Some(file) = meta.file() {
525 1 : if let Some(line) = meta.line() {
526 1 : serializer.serialize_entry("src", &format_args!("{file}:{line}"))?;
527 : } else {
528 0 : serializer.serialize_entry("src", file)?;
529 : }
530 0 : }
531 :
532 : {
533 1 : let otel_context = Span::current().context();
534 1 : let otel_spanref = otel_context.span();
535 1 : let span_context = otel_spanref.span_context();
536 1 : if span_context.is_valid() {
537 0 : serializer.serialize_entry(
538 0 : "trace_id",
539 0 : &format_args!("{}", span_context.trace_id()),
540 0 : )?;
541 1 : }
542 : }
543 :
544 1 : serializer.serialize_entry("spans", &SerializableSpanStack(ctx))?;
545 :
546 1 : serializer.end()
547 1 : };
548 :
549 1 : serialize().map_err(io::Error::other)?;
550 1 : self.logline_buffer.push(b'\n');
551 1 : Ok(())
552 1 : }
553 : }
554 :
555 : /// Extracts the message field that's mixed will other fields.
556 : struct MessageFieldExtractor<'a, S: serde::ser::SerializeMap> {
557 : serializer: S,
558 : skipped_field_indices: Option<&'a SkippedFieldIndices>,
559 : state: Option<Result<(), S::Error>>,
560 : }
561 :
562 : impl<'a, S: serde::ser::SerializeMap> MessageFieldExtractor<'a, S> {
563 : #[inline]
564 1 : fn new(serializer: S, skipped_field_indices: Option<&'a SkippedFieldIndices>) -> Self {
565 1 : Self {
566 1 : serializer,
567 1 : skipped_field_indices,
568 1 : state: None,
569 1 : }
570 1 : }
571 :
572 : #[inline]
573 1 : fn into_serializer(mut self) -> Result<S, S::Error> {
574 1 : match self.state {
575 1 : Some(Ok(())) => {}
576 0 : Some(Err(err)) => return Err(err),
577 0 : None => self.serializer.serialize_value("")?,
578 : }
579 1 : Ok(self.serializer)
580 1 : }
581 :
582 : #[inline]
583 5 : fn accept_field(&self, field: &tracing::field::Field) -> bool {
584 5 : self.state.is_none()
585 5 : && field.name() == MESSAGE_FIELD
586 2 : && !self
587 2 : .skipped_field_indices
588 2 : .is_some_and(|i| i.contains(field.index()))
589 5 : }
590 : }
591 :
592 : impl<S: serde::ser::SerializeMap> tracing::field::Visit for MessageFieldExtractor<'_, S> {
593 : #[inline]
594 0 : fn record_f64(&mut self, field: &tracing::field::Field, value: f64) {
595 0 : if self.accept_field(field) {
596 0 : self.state = Some(self.serializer.serialize_value(&value));
597 0 : }
598 0 : }
599 :
600 : #[inline]
601 3 : fn record_i64(&mut self, field: &tracing::field::Field, value: i64) {
602 3 : if self.accept_field(field) {
603 0 : self.state = Some(self.serializer.serialize_value(&value));
604 3 : }
605 3 : }
606 :
607 : #[inline]
608 0 : fn record_u64(&mut self, field: &tracing::field::Field, value: u64) {
609 0 : if self.accept_field(field) {
610 0 : self.state = Some(self.serializer.serialize_value(&value));
611 0 : }
612 0 : }
613 :
614 : #[inline]
615 0 : fn record_i128(&mut self, field: &tracing::field::Field, value: i128) {
616 0 : if self.accept_field(field) {
617 0 : self.state = Some(self.serializer.serialize_value(&value));
618 0 : }
619 0 : }
620 :
621 : #[inline]
622 0 : fn record_u128(&mut self, field: &tracing::field::Field, value: u128) {
623 0 : if self.accept_field(field) {
624 0 : self.state = Some(self.serializer.serialize_value(&value));
625 0 : }
626 0 : }
627 :
628 : #[inline]
629 0 : fn record_bool(&mut self, field: &tracing::field::Field, value: bool) {
630 0 : if self.accept_field(field) {
631 0 : self.state = Some(self.serializer.serialize_value(&value));
632 0 : }
633 0 : }
634 :
635 : #[inline]
636 0 : fn record_bytes(&mut self, field: &tracing::field::Field, value: &[u8]) {
637 0 : if self.accept_field(field) {
638 0 : self.state = Some(self.serializer.serialize_value(&format_args!("{value:x?}")));
639 0 : }
640 0 : }
641 :
642 : #[inline]
643 1 : fn record_str(&mut self, field: &tracing::field::Field, value: &str) {
644 1 : if self.accept_field(field) {
645 1 : self.state = Some(self.serializer.serialize_value(&value));
646 1 : }
647 1 : }
648 :
649 : #[inline]
650 1 : fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) {
651 1 : if self.accept_field(field) {
652 0 : self.state = Some(self.serializer.serialize_value(&format_args!("{value:?}")));
653 1 : }
654 1 : }
655 :
656 : #[inline]
657 0 : fn record_error(
658 0 : &mut self,
659 0 : field: &tracing::field::Field,
660 0 : value: &(dyn std::error::Error + 'static),
661 0 : ) {
662 0 : if self.accept_field(field) {
663 0 : self.state = Some(self.serializer.serialize_value(&format_args!("{value}")));
664 0 : }
665 0 : }
666 : }
667 :
668 : /// Checks if there's any fields and field values present. If not, the JSON subobject
669 : /// can be skipped.
670 : // This is entirely optional and only cosmetic, though maybe helps a
671 : // bit during log parsing in dashboards when there's no field with empty object.
672 : struct FieldsPresent<'a>(pub bool, Option<&'a SkippedFieldIndices>);
673 :
674 : // Even though some methods have an overhead (error, bytes) it is assumed the
675 : // compiler won't include this since we ignore the value entirely.
676 : impl tracing::field::Visit for FieldsPresent<'_> {
677 : #[inline]
678 5 : fn record_debug(&mut self, field: &tracing::field::Field, _: &dyn std::fmt::Debug) {
679 5 : if !self.1.is_some_and(|i| i.contains(field.index()))
680 2 : && field.name() != MESSAGE_FIELD
681 1 : && !field.name().starts_with("log.")
682 1 : {
683 1 : self.0 |= true;
684 4 : }
685 5 : }
686 : }
687 :
688 : /// Serializes the fields directly supplied with a log event.
689 : struct SerializableEventFields<'a, 'event>(
690 : &'a tracing::Event<'event>,
691 : Option<&'a SkippedFieldIndices>,
692 : );
693 :
694 : impl serde::ser::Serialize for SerializableEventFields<'_, '_> {
695 1 : fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
696 1 : where
697 1 : S: Serializer,
698 1 : {
699 : use serde::ser::SerializeMap;
700 1 : let serializer = serializer.serialize_map(None)?;
701 1 : let mut message_skipper = MessageFieldSkipper::new(serializer, self.1);
702 1 : self.0.record(&mut message_skipper);
703 1 : let serializer = message_skipper.into_serializer()?;
704 1 : serializer.end()
705 1 : }
706 : }
707 :
708 : /// A tracing field visitor that skips the message field.
709 : struct MessageFieldSkipper<'a, S: serde::ser::SerializeMap> {
710 : serializer: S,
711 : skipped_field_indices: Option<&'a SkippedFieldIndices>,
712 : state: Result<(), S::Error>,
713 : }
714 :
715 : impl<'a, S: serde::ser::SerializeMap> MessageFieldSkipper<'a, S> {
716 : #[inline]
717 1 : fn new(serializer: S, skipped_field_indices: Option<&'a SkippedFieldIndices>) -> Self {
718 1 : Self {
719 1 : serializer,
720 1 : skipped_field_indices,
721 1 : state: Ok(()),
722 1 : }
723 1 : }
724 :
725 : #[inline]
726 5 : fn accept_field(&self, field: &tracing::field::Field) -> bool {
727 5 : self.state.is_ok()
728 5 : && field.name() != MESSAGE_FIELD
729 3 : && !field.name().starts_with("log.")
730 3 : && !self
731 3 : .skipped_field_indices
732 3 : .is_some_and(|i| i.contains(field.index()))
733 5 : }
734 :
735 : #[inline]
736 1 : fn into_serializer(self) -> Result<S, S::Error> {
737 1 : self.state?;
738 1 : Ok(self.serializer)
739 1 : }
740 : }
741 :
742 : impl<S: serde::ser::SerializeMap> tracing::field::Visit for MessageFieldSkipper<'_, S> {
743 : #[inline]
744 0 : fn record_f64(&mut self, field: &tracing::field::Field, value: f64) {
745 0 : if self.accept_field(field) {
746 0 : self.state = self.serializer.serialize_entry(field.name(), &value);
747 0 : }
748 0 : }
749 :
750 : #[inline]
751 3 : fn record_i64(&mut self, field: &tracing::field::Field, value: i64) {
752 3 : if self.accept_field(field) {
753 1 : self.state = self.serializer.serialize_entry(field.name(), &value);
754 2 : }
755 3 : }
756 :
757 : #[inline]
758 0 : fn record_u64(&mut self, field: &tracing::field::Field, value: u64) {
759 0 : if self.accept_field(field) {
760 0 : self.state = self.serializer.serialize_entry(field.name(), &value);
761 0 : }
762 0 : }
763 :
764 : #[inline]
765 0 : fn record_i128(&mut self, field: &tracing::field::Field, value: i128) {
766 0 : if self.accept_field(field) {
767 0 : self.state = self.serializer.serialize_entry(field.name(), &value);
768 0 : }
769 0 : }
770 :
771 : #[inline]
772 0 : fn record_u128(&mut self, field: &tracing::field::Field, value: u128) {
773 0 : if self.accept_field(field) {
774 0 : self.state = self.serializer.serialize_entry(field.name(), &value);
775 0 : }
776 0 : }
777 :
778 : #[inline]
779 0 : fn record_bool(&mut self, field: &tracing::field::Field, value: bool) {
780 0 : if self.accept_field(field) {
781 0 : self.state = self.serializer.serialize_entry(field.name(), &value);
782 0 : }
783 0 : }
784 :
785 : #[inline]
786 0 : fn record_bytes(&mut self, field: &tracing::field::Field, value: &[u8]) {
787 0 : if self.accept_field(field) {
788 0 : self.state = self
789 0 : .serializer
790 0 : .serialize_entry(field.name(), &format_args!("{value:x?}"));
791 0 : }
792 0 : }
793 :
794 : #[inline]
795 1 : fn record_str(&mut self, field: &tracing::field::Field, value: &str) {
796 1 : if self.accept_field(field) {
797 0 : self.state = self.serializer.serialize_entry(field.name(), &value);
798 1 : }
799 1 : }
800 :
801 : #[inline]
802 1 : fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) {
803 1 : if self.accept_field(field) {
804 0 : self.state = self
805 0 : .serializer
806 0 : .serialize_entry(field.name(), &format_args!("{value:?}"));
807 1 : }
808 1 : }
809 :
810 : #[inline]
811 0 : fn record_error(
812 0 : &mut self,
813 0 : field: &tracing::field::Field,
814 0 : value: &(dyn std::error::Error + 'static),
815 0 : ) {
816 0 : if self.accept_field(field) {
817 0 : self.state = self.serializer.serialize_value(&format_args!("{value}"));
818 0 : }
819 0 : }
820 : }
821 :
822 : /// Serializes the span stack from root to leaf (parent of event) enumerated
823 : /// inside an object where the keys are just the number padded with zeroes
824 : /// to retain sorting order.
825 : // The object is necessary because Loki cannot flatten arrays.
826 : struct SerializableSpanStack<'a, 'b, Span>(&'b Context<'a, Span>)
827 : where
828 : Span: Subscriber + for<'lookup> LookupSpan<'lookup>;
829 :
830 : impl<Span> serde::ser::Serialize for SerializableSpanStack<'_, '_, Span>
831 : where
832 : Span: Subscriber + for<'lookup> LookupSpan<'lookup>,
833 : {
834 1 : fn serialize<Ser>(&self, serializer: Ser) -> Result<Ser::Ok, Ser::Error>
835 1 : where
836 1 : Ser: serde::ser::Serializer,
837 1 : {
838 1 : let mut serializer = serializer.serialize_map(None)?;
839 :
840 1 : if let Some(leaf_span) = self.0.lookup_current() {
841 2 : for (i, span) in leaf_span.scope().from_root().enumerate() {
842 2 : serializer.serialize_entry(&format_args!("{i:02}"), &SerializableSpan(&span))?;
843 : }
844 0 : }
845 :
846 1 : serializer.end()
847 1 : }
848 : }
849 :
850 : /// Serializes a single span. Include the span ID, name and its fields as
851 : /// recorded up to this point.
852 : struct SerializableSpan<'a, 'b, Span>(&'b SpanRef<'a, Span>)
853 : where
854 : Span: for<'lookup> LookupSpan<'lookup>;
855 :
856 : impl<Span> serde::ser::Serialize for SerializableSpan<'_, '_, Span>
857 : where
858 : Span: for<'lookup> LookupSpan<'lookup>,
859 : {
860 2 : fn serialize<Ser>(&self, serializer: Ser) -> Result<Ser::Ok, Ser::Error>
861 2 : where
862 2 : Ser: serde::ser::Serializer,
863 2 : {
864 2 : let mut serializer = serializer.serialize_map(None)?;
865 : // TODO: the span ID is probably only useful for debugging tracing.
866 2 : serializer.serialize_entry("span_id", &format_args!("{:016x}", self.0.id().into_u64()))?;
867 2 : serializer.serialize_entry("span_name", self.0.metadata().name())?;
868 :
869 2 : let ext = self.0.extensions();
870 2 : if let Some(data) = ext.get::<SpanFields>() {
871 2 : for (key, value) in &data.fields.pin() {
872 1 : serializer.serialize_entry(key, value)?;
873 : }
874 0 : }
875 :
876 2 : serializer.end()
877 2 : }
878 : }
879 :
880 : #[cfg(test)]
881 : #[allow(clippy::unwrap_used)]
882 : mod tests {
883 : use std::sync::{Arc, Mutex, MutexGuard};
884 :
885 : use assert_json_diff::assert_json_eq;
886 : use tracing::info_span;
887 :
888 : use super::*;
889 :
890 : struct TestClock {
891 : current_time: Mutex<DateTime<Utc>>,
892 : }
893 :
894 : impl Clock for Arc<TestClock> {
895 2 : fn now(&self) -> DateTime<Utc> {
896 2 : *self.current_time.lock().expect("poisoned")
897 2 : }
898 : }
899 :
900 : struct VecWriter<'a> {
901 : buffer: MutexGuard<'a, Vec<u8>>,
902 : }
903 :
904 : impl MakeWriter for Arc<Mutex<Vec<u8>>> {
905 1 : fn make_writer(&self) -> impl io::Write {
906 1 : VecWriter {
907 1 : buffer: self.lock().expect("poisoned"),
908 1 : }
909 1 : }
910 : }
911 :
912 : impl io::Write for VecWriter<'_> {
913 1 : fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
914 1 : self.buffer.write(buf)
915 1 : }
916 :
917 0 : fn flush(&mut self) -> io::Result<()> {
918 0 : Ok(())
919 0 : }
920 : }
921 :
922 : #[test]
923 1 : fn test_field_collection() {
924 1 : let clock = Arc::new(TestClock {
925 1 : current_time: Mutex::new(Utc::now()),
926 1 : });
927 1 : let buffer = Arc::new(Mutex::new(Vec::new()));
928 1 : let log_layer = JsonLoggingLayer {
929 1 : clock: clock.clone(),
930 1 : skipped_field_indices: papaya::HashMap::default(),
931 1 : writer: buffer.clone(),
932 1 : };
933 1 :
934 1 : let registry = tracing_subscriber::Registry::default().with(log_layer);
935 1 :
936 1 : tracing::subscriber::with_default(registry, || {
937 1 : info_span!("span1", x = 40, x = 41, x = 42).in_scope(|| {
938 1 : info_span!("span2").in_scope(|| {
939 1 : tracing::error!(
940 : a = 1,
941 : a = 2,
942 : a = 3,
943 : message = "explicit message field",
944 0 : "implicit message field"
945 : );
946 1 : });
947 1 : });
948 1 : });
949 1 :
950 1 : let buffer = Arc::try_unwrap(buffer)
951 1 : .expect("no other reference")
952 1 : .into_inner()
953 1 : .expect("poisoned");
954 1 : let actual: serde_json::Value = serde_json::from_slice(&buffer).expect("valid JSON");
955 1 : let expected: serde_json::Value = serde_json::json!(
956 1 : {
957 1 : "timestamp": clock.now().to_rfc3339_opts(chrono::SecondsFormat::Micros, true),
958 1 : "level": "ERROR",
959 1 : "message": "explicit message field",
960 1 : "fields": {
961 1 : "a": 3,
962 1 : },
963 1 : "spans": {
964 1 : "00":{
965 1 : "span_id": "0000000000000001",
966 1 : "span_name": "span1",
967 1 : "x": 42,
968 1 : },
969 1 : "01": {
970 1 : "span_id": "0000000000000002",
971 1 : "span_name": "span2",
972 1 : }
973 1 : },
974 1 : "src": actual.as_object().unwrap().get("src").unwrap().as_str().unwrap(),
975 1 : "target": "proxy::logging::tests",
976 1 : "process_id": actual.as_object().unwrap().get("process_id").unwrap().as_number().unwrap(),
977 1 : "thread_id": actual.as_object().unwrap().get("thread_id").unwrap().as_number().unwrap(),
978 1 : "thread_name": "logging::tests::test_field_collection",
979 1 : }
980 1 : );
981 1 :
982 1 : assert_json_eq!(actual, expected);
983 1 : }
984 : }
|