Line data Source code
1 : use std::cell::{Cell, RefCell};
2 : use std::collections::HashMap;
3 : use std::hash::BuildHasher;
4 : use std::sync::atomic::{AtomicU32, Ordering};
5 : use std::{array, env, fmt, io};
6 :
7 : use chrono::{DateTime, Utc};
8 : use indexmap::IndexSet;
9 : use opentelemetry::trace::TraceContextExt;
10 : use scopeguard::defer;
11 : use serde::ser::{SerializeMap, Serializer};
12 : use tracing::subscriber::Interest;
13 : use tracing::{Event, Metadata, Span, Subscriber, callsite, span};
14 : use tracing_opentelemetry::OpenTelemetrySpanExt;
15 : use tracing_subscriber::filter::{EnvFilter, LevelFilter};
16 : use tracing_subscriber::fmt::format::{Format, Full};
17 : use tracing_subscriber::fmt::time::SystemTime;
18 : use tracing_subscriber::fmt::{FormatEvent, FormatFields};
19 : use tracing_subscriber::layer::{Context, Layer};
20 : use tracing_subscriber::prelude::*;
21 : use tracing_subscriber::registry::{LookupSpan, SpanRef};
22 : use try_lock::TryLock;
23 :
24 : /// Initialize logging and OpenTelemetry tracing and exporter.
25 : ///
26 : /// Logging can be configured using `RUST_LOG` environment variable.
27 : ///
28 : /// OpenTelemetry is configured with OTLP/HTTP exporter. It picks up
29 : /// configuration from environment variables. For example, to change the
30 : /// destination, set `OTEL_EXPORTER_OTLP_ENDPOINT=http://jaeger:4318`.
31 : /// See <https://opentelemetry.io/docs/reference/specification/sdk-environment-variables>
32 0 : pub fn init() -> anyhow::Result<LoggingGuard> {
33 0 : let logfmt = LogFormat::from_env()?;
34 :
35 0 : let env_filter = EnvFilter::builder()
36 0 : .with_default_directive(LevelFilter::INFO.into())
37 0 : .from_env_lossy()
38 0 : .add_directive(
39 0 : "aws_config=info"
40 0 : .parse()
41 0 : .expect("this should be a valid filter directive"),
42 0 : )
43 0 : .add_directive(
44 0 : "azure_core::policies::transport=off"
45 0 : .parse()
46 0 : .expect("this should be a valid filter directive"),
47 0 : );
48 0 :
49 0 : let provider = tracing_utils::init_tracing("proxy", tracing_utils::ExportConfig::default());
50 0 : let otlp_layer = provider.as_ref().map(tracing_utils::layer);
51 :
52 0 : let json_log_layer = if logfmt == LogFormat::Json {
53 0 : Some(JsonLoggingLayer::new(
54 0 : RealClock,
55 0 : StderrWriter {
56 0 : stderr: std::io::stderr(),
57 0 : },
58 0 : ["request_id", "session_id", "conn_id"],
59 0 : ))
60 : } else {
61 0 : None
62 : };
63 :
64 0 : let text_log_layer = if logfmt == LogFormat::Text {
65 0 : Some(
66 0 : tracing_subscriber::fmt::layer()
67 0 : .with_ansi(false)
68 0 : .with_writer(std::io::stderr)
69 0 : .with_target(false),
70 0 : )
71 : } else {
72 0 : None
73 : };
74 :
75 0 : tracing_subscriber::registry()
76 0 : .with(env_filter)
77 0 : .with(otlp_layer)
78 0 : .with(json_log_layer)
79 0 : .with(text_log_layer)
80 0 : .try_init()?;
81 :
82 0 : Ok(LoggingGuard(provider))
83 0 : }
84 :
85 : /// Initialize logging for local_proxy with log prefix and no opentelemetry.
86 : ///
87 : /// Logging can be configured using `RUST_LOG` environment variable.
88 0 : pub fn init_local_proxy() -> anyhow::Result<LoggingGuard> {
89 0 : let env_filter = EnvFilter::builder()
90 0 : .with_default_directive(LevelFilter::INFO.into())
91 0 : .from_env_lossy();
92 0 :
93 0 : let fmt_layer = tracing_subscriber::fmt::layer()
94 0 : .with_ansi(false)
95 0 : .with_writer(std::io::stderr)
96 0 : .event_format(LocalProxyFormatter(Format::default().with_target(false)));
97 0 :
98 0 : tracing_subscriber::registry()
99 0 : .with(env_filter)
100 0 : .with(fmt_layer)
101 0 : .try_init()?;
102 :
103 0 : Ok(LoggingGuard(None))
104 0 : }
105 :
106 : pub struct LocalProxyFormatter(Format<Full, SystemTime>);
107 :
108 : impl<S, N> FormatEvent<S, N> for LocalProxyFormatter
109 : where
110 : S: Subscriber + for<'a> LookupSpan<'a>,
111 : N: for<'a> FormatFields<'a> + 'static,
112 : {
113 0 : fn format_event(
114 0 : &self,
115 0 : ctx: &tracing_subscriber::fmt::FmtContext<'_, S, N>,
116 0 : mut writer: tracing_subscriber::fmt::format::Writer<'_>,
117 0 : event: &tracing::Event<'_>,
118 0 : ) -> std::fmt::Result {
119 0 : writer.write_str("[local_proxy] ")?;
120 0 : self.0.format_event(ctx, writer, event)
121 0 : }
122 : }
123 :
124 : pub struct LoggingGuard(Option<tracing_utils::Provider>);
125 :
126 : impl Drop for LoggingGuard {
127 0 : fn drop(&mut self) {
128 0 : if let Some(p) = &self.0 {
129 : // Shutdown trace pipeline gracefully, so that it has a chance to send any
130 : // pending traces before we exit.
131 0 : tracing::info!("shutting down the tracing machinery");
132 0 : drop(p.shutdown());
133 0 : }
134 0 : }
135 : }
136 :
137 : // TODO: make JSON the default
138 : #[derive(Copy, Clone, PartialEq, Eq, Default, Debug)]
139 : enum LogFormat {
140 : #[default]
141 : Text = 1,
142 : Json,
143 : }
144 :
145 : impl LogFormat {
146 0 : fn from_env() -> anyhow::Result<Self> {
147 0 : let logfmt = env::var("LOGFMT");
148 0 : Ok(match logfmt.as_deref() {
149 0 : Err(_) => LogFormat::default(),
150 0 : Ok("text") => LogFormat::Text,
151 0 : Ok("json") => LogFormat::Json,
152 0 : Ok(logfmt) => anyhow::bail!("unknown log format: {logfmt}"),
153 : })
154 0 : }
155 : }
156 :
157 : trait MakeWriter {
158 : fn make_writer(&self) -> impl io::Write;
159 : }
160 :
161 : struct StderrWriter {
162 : stderr: io::Stderr,
163 : }
164 :
165 : impl MakeWriter for StderrWriter {
166 : #[inline]
167 0 : fn make_writer(&self) -> impl io::Write {
168 0 : self.stderr.lock()
169 0 : }
170 : }
171 :
172 : // TODO: move into separate module or even separate crate.
173 : trait Clock {
174 : fn now(&self) -> DateTime<Utc>;
175 : }
176 :
177 : struct RealClock;
178 :
179 : impl Clock for RealClock {
180 : #[inline]
181 0 : fn now(&self) -> DateTime<Utc> {
182 0 : Utc::now()
183 0 : }
184 : }
185 :
186 : /// Name of the field used by tracing crate to store the event message.
187 : const MESSAGE_FIELD: &str = "message";
188 :
189 : thread_local! {
190 : /// Protects against deadlocks and double panics during log writing.
191 : /// The current panic handler will use tracing to log panic information.
192 : static REENTRANCY_GUARD: Cell<bool> = const { Cell::new(false) };
193 : /// Thread-local instance with per-thread buffer for log writing.
194 : static EVENT_FORMATTER: RefCell<EventFormatter> = RefCell::new(EventFormatter::new());
195 : /// Cached OS thread ID.
196 : static THREAD_ID: u64 = gettid::gettid();
197 : }
198 :
199 : /// Implements tracing layer to handle events specific to logging.
200 : struct JsonLoggingLayer<C: Clock, W: MakeWriter, const F: usize> {
201 : clock: C,
202 : skipped_field_indices: papaya::HashMap<callsite::Identifier, SkippedFieldIndices>,
203 : callsite_ids: papaya::HashMap<callsite::Identifier, CallsiteId>,
204 : writer: W,
205 : // We use a const generic and arrays to bypass one heap allocation.
206 : extract_fields: IndexSet<&'static str>,
207 : _marker: std::marker::PhantomData<[&'static str; F]>,
208 : }
209 :
210 : impl<C: Clock, W: MakeWriter, const F: usize> JsonLoggingLayer<C, W, F> {
211 0 : fn new(clock: C, writer: W, extract_fields: [&'static str; F]) -> Self {
212 0 : JsonLoggingLayer {
213 0 : clock,
214 0 : skipped_field_indices: papaya::HashMap::default(),
215 0 : callsite_ids: papaya::HashMap::default(),
216 0 : writer,
217 0 : extract_fields: IndexSet::from_iter(extract_fields),
218 0 : _marker: std::marker::PhantomData,
219 0 : }
220 0 : }
221 :
222 : #[inline]
223 2 : fn callsite_id(&self, cs: callsite::Identifier) -> CallsiteId {
224 2 : *self
225 2 : .callsite_ids
226 2 : .pin()
227 2 : .get_or_insert_with(cs, CallsiteId::next)
228 2 : }
229 : }
230 :
231 : impl<S, C: Clock + 'static, W: MakeWriter + 'static, const F: usize> Layer<S>
232 : for JsonLoggingLayer<C, W, F>
233 : where
234 : S: Subscriber + for<'a> LookupSpan<'a>,
235 : {
236 1 : fn on_event(&self, event: &Event<'_>, ctx: Context<'_, S>) {
237 : use std::io::Write;
238 :
239 : // TODO: consider special tracing subscriber to grab timestamp very
240 : // early, before OTel machinery, and add as event extension.
241 1 : let now = self.clock.now();
242 1 :
243 1 : let res: io::Result<()> = REENTRANCY_GUARD.with(move |entered| {
244 1 : if entered.get() {
245 0 : let mut formatter = EventFormatter::new();
246 0 : formatter.format::<S, F>(
247 0 : now,
248 0 : event,
249 0 : &ctx,
250 0 : &self.skipped_field_indices,
251 0 : &self.callsite_ids,
252 0 : &self.extract_fields,
253 0 : )?;
254 0 : self.writer.make_writer().write_all(formatter.buffer())
255 : } else {
256 1 : entered.set(true);
257 1 : defer!(entered.set(false););
258 1 :
259 1 : EVENT_FORMATTER.with_borrow_mut(move |formatter| {
260 1 : formatter.reset();
261 1 : formatter.format::<S, F>(
262 1 : now,
263 1 : event,
264 1 : &ctx,
265 1 : &self.skipped_field_indices,
266 1 : &self.callsite_ids,
267 1 : &self.extract_fields,
268 1 : )?;
269 1 : self.writer.make_writer().write_all(formatter.buffer())
270 1 : })
271 : }
272 1 : });
273 :
274 : // In case logging fails we generate a simpler JSON object.
275 1 : if let Err(err) = res {
276 0 : if let Ok(mut line) = serde_json::to_vec(&serde_json::json!( {
277 0 : "timestamp": now.to_rfc3339_opts(chrono::SecondsFormat::Micros, true),
278 0 : "level": "ERROR",
279 0 : "message": format_args!("cannot log event: {err:?}"),
280 0 : "fields": {
281 0 : "event": format_args!("{event:?}"),
282 0 : },
283 0 : })) {
284 0 : line.push(b'\n');
285 0 : self.writer.make_writer().write_all(&line).ok();
286 0 : }
287 1 : }
288 1 : }
289 :
290 : /// Registers a SpanFields instance as span extension.
291 2 : fn on_new_span(&self, attrs: &span::Attributes<'_>, id: &span::Id, ctx: Context<'_, S>) {
292 2 : let span = ctx.span(id).expect("span must exist");
293 2 : let fields = SpanFields::default();
294 2 : fields.record_fields(attrs);
295 2 :
296 2 : // This could deadlock when there's a panic somewhere in the tracing
297 2 : // event handling and a read or write guard is still held. This includes
298 2 : // the OTel subscriber.
299 2 : let mut exts = span.extensions_mut();
300 2 :
301 2 : exts.insert(fields);
302 2 : }
303 :
304 0 : fn on_record(&self, id: &span::Id, values: &span::Record<'_>, ctx: Context<'_, S>) {
305 0 : let span = ctx.span(id).expect("span must exist");
306 0 : let ext = span.extensions();
307 0 : if let Some(data) = ext.get::<SpanFields>() {
308 0 : data.record_fields(values);
309 0 : }
310 0 : }
311 :
312 : /// Called (lazily) whenever a new log call is executed. We quickly check
313 : /// for duplicate field names and record duplicates as skippable. Last one
314 : /// wins.
315 3 : fn register_callsite(&self, metadata: &'static Metadata<'static>) -> Interest {
316 3 : if !metadata.is_event() {
317 2 : self.callsite_id(metadata.callsite());
318 2 : // Must not be never because we wouldn't get trace and span data.
319 2 : return Interest::always();
320 1 : }
321 1 :
322 1 : let mut field_indices = SkippedFieldIndices::default();
323 1 : let mut seen_fields = HashMap::<&'static str, usize>::new();
324 5 : for field in metadata.fields() {
325 : use std::collections::hash_map::Entry;
326 5 : match seen_fields.entry(field.name()) {
327 2 : Entry::Vacant(entry) => {
328 2 : // field not seen yet
329 2 : entry.insert(field.index());
330 2 : }
331 3 : Entry::Occupied(mut entry) => {
332 3 : // replace currently stored index
333 3 : let old_index = entry.insert(field.index());
334 3 : // ... and append it to list of skippable indices
335 3 : field_indices.push(old_index);
336 3 : }
337 : }
338 : }
339 :
340 1 : if !field_indices.is_empty() {
341 1 : self.skipped_field_indices
342 1 : .pin()
343 1 : .insert(metadata.callsite(), field_indices);
344 1 : }
345 :
346 1 : Interest::always()
347 3 : }
348 : }
349 :
350 : #[derive(Copy, Clone, Debug, Default)]
351 : #[repr(transparent)]
352 : struct CallsiteId(u32);
353 :
354 : impl CallsiteId {
355 : #[inline]
356 2 : fn next() -> Self {
357 : // Start at 1 to reserve 0 for default.
358 : static COUNTER: AtomicU32 = AtomicU32::new(1);
359 2 : CallsiteId(COUNTER.fetch_add(1, Ordering::Relaxed))
360 2 : }
361 : }
362 :
363 : impl fmt::Display for CallsiteId {
364 : #[inline]
365 2 : fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
366 2 : self.0.fmt(f)
367 2 : }
368 : }
369 :
370 : /// Stores span field values recorded during the spans lifetime.
371 : #[derive(Default)]
372 : struct SpanFields {
373 : // TODO: Switch to custom enum with lasso::Spur for Strings?
374 : fields: papaya::HashMap<&'static str, serde_json::Value>,
375 : }
376 :
377 : impl SpanFields {
378 : #[inline]
379 2 : fn record_fields<R: tracing_subscriber::field::RecordFields>(&self, fields: R) {
380 2 : fields.record(&mut SpanFieldsRecorder {
381 2 : fields: self.fields.pin(),
382 2 : });
383 2 : }
384 : }
385 :
386 : /// Implements a tracing field visitor to convert and store values.
387 : struct SpanFieldsRecorder<'m, S, G> {
388 : fields: papaya::HashMapRef<'m, &'static str, serde_json::Value, S, G>,
389 : }
390 :
391 : impl<S: BuildHasher, G: papaya::Guard> tracing::field::Visit for SpanFieldsRecorder<'_, S, G> {
392 : #[inline]
393 0 : fn record_f64(&mut self, field: &tracing::field::Field, value: f64) {
394 0 : self.fields
395 0 : .insert(field.name(), serde_json::Value::from(value));
396 0 : }
397 :
398 : #[inline]
399 4 : fn record_i64(&mut self, field: &tracing::field::Field, value: i64) {
400 4 : self.fields
401 4 : .insert(field.name(), serde_json::Value::from(value));
402 4 : }
403 :
404 : #[inline]
405 0 : fn record_u64(&mut self, field: &tracing::field::Field, value: u64) {
406 0 : self.fields
407 0 : .insert(field.name(), serde_json::Value::from(value));
408 0 : }
409 :
410 : #[inline]
411 0 : fn record_i128(&mut self, field: &tracing::field::Field, value: i128) {
412 0 : if let Ok(value) = i64::try_from(value) {
413 0 : self.fields
414 0 : .insert(field.name(), serde_json::Value::from(value));
415 0 : } else {
416 0 : self.fields
417 0 : .insert(field.name(), serde_json::Value::from(format!("{value}")));
418 0 : }
419 0 : }
420 :
421 : #[inline]
422 0 : fn record_u128(&mut self, field: &tracing::field::Field, value: u128) {
423 0 : if let Ok(value) = u64::try_from(value) {
424 0 : self.fields
425 0 : .insert(field.name(), serde_json::Value::from(value));
426 0 : } else {
427 0 : self.fields
428 0 : .insert(field.name(), serde_json::Value::from(format!("{value}")));
429 0 : }
430 0 : }
431 :
432 : #[inline]
433 0 : fn record_bool(&mut self, field: &tracing::field::Field, value: bool) {
434 0 : self.fields
435 0 : .insert(field.name(), serde_json::Value::from(value));
436 0 : }
437 :
438 : #[inline]
439 0 : fn record_bytes(&mut self, field: &tracing::field::Field, value: &[u8]) {
440 0 : self.fields
441 0 : .insert(field.name(), serde_json::Value::from(value));
442 0 : }
443 :
444 : #[inline]
445 0 : fn record_str(&mut self, field: &tracing::field::Field, value: &str) {
446 0 : self.fields
447 0 : .insert(field.name(), serde_json::Value::from(value));
448 0 : }
449 :
450 : #[inline]
451 0 : fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) {
452 0 : self.fields
453 0 : .insert(field.name(), serde_json::Value::from(format!("{value:?}")));
454 0 : }
455 :
456 : #[inline]
457 0 : fn record_error(
458 0 : &mut self,
459 0 : field: &tracing::field::Field,
460 0 : value: &(dyn std::error::Error + 'static),
461 0 : ) {
462 0 : self.fields
463 0 : .insert(field.name(), serde_json::Value::from(format!("{value}")));
464 0 : }
465 : }
466 :
467 : /// List of field indices skipped during logging. Can list duplicate fields or
468 : /// metafields not meant to be logged.
469 : #[derive(Clone, Default)]
470 : struct SkippedFieldIndices {
471 : bits: u64,
472 : }
473 :
474 : impl SkippedFieldIndices {
475 : #[inline]
476 1 : fn is_empty(&self) -> bool {
477 1 : self.bits == 0
478 1 : }
479 :
480 : #[inline]
481 3 : fn push(&mut self, index: usize) {
482 3 : self.bits |= 1u64
483 3 : .checked_shl(index as u32)
484 3 : .expect("field index too large");
485 3 : }
486 :
487 : #[inline]
488 10 : fn contains(&self, index: usize) -> bool {
489 10 : self.bits
490 10 : & 1u64
491 10 : .checked_shl(index as u32)
492 10 : .expect("field index too large")
493 10 : != 0
494 10 : }
495 : }
496 :
497 : /// Formats a tracing event and writes JSON to its internal buffer including a newline.
498 : // TODO: buffer capacity management, truncate if too large
499 : struct EventFormatter {
500 : logline_buffer: Vec<u8>,
501 : }
502 :
503 : impl EventFormatter {
504 : #[inline]
505 1 : fn new() -> Self {
506 1 : EventFormatter {
507 1 : logline_buffer: Vec::new(),
508 1 : }
509 1 : }
510 :
511 : #[inline]
512 1 : fn buffer(&self) -> &[u8] {
513 1 : &self.logline_buffer
514 1 : }
515 :
516 : #[inline]
517 1 : fn reset(&mut self) {
518 1 : self.logline_buffer.clear();
519 1 : }
520 :
521 1 : fn format<S, const F: usize>(
522 1 : &mut self,
523 1 : now: DateTime<Utc>,
524 1 : event: &Event<'_>,
525 1 : ctx: &Context<'_, S>,
526 1 : skipped_field_indices: &papaya::HashMap<callsite::Identifier, SkippedFieldIndices>,
527 1 : callsite_ids: &papaya::HashMap<callsite::Identifier, CallsiteId>,
528 1 : extract_fields: &IndexSet<&'static str>,
529 1 : ) -> io::Result<()>
530 1 : where
531 1 : S: Subscriber + for<'a> LookupSpan<'a>,
532 1 : {
533 1 : let timestamp = now.to_rfc3339_opts(chrono::SecondsFormat::Micros, true);
534 :
535 : use tracing_log::NormalizeEvent;
536 1 : let normalized_meta = event.normalized_metadata();
537 1 : let meta = normalized_meta.as_ref().unwrap_or_else(|| event.metadata());
538 1 :
539 1 : let skipped_field_indices = skipped_field_indices.pin();
540 1 : let skipped_field_indices = skipped_field_indices.get(&meta.callsite());
541 1 :
542 1 : let mut serialize = || {
543 1 : let mut serializer = serde_json::Serializer::new(&mut self.logline_buffer);
544 :
545 1 : let mut serializer = serializer.serialize_map(None)?;
546 :
547 : // Timestamp comes first, so raw lines can be sorted by timestamp.
548 1 : serializer.serialize_entry("timestamp", ×tamp)?;
549 :
550 : // Level next.
551 1 : serializer.serialize_entry("level", &meta.level().as_str())?;
552 :
553 : // Message next.
554 1 : serializer.serialize_key("message")?;
555 1 : let mut message_extractor =
556 1 : MessageFieldExtractor::new(serializer, skipped_field_indices);
557 1 : event.record(&mut message_extractor);
558 1 : let mut serializer = message_extractor.into_serializer()?;
559 :
560 : // Direct message fields.
561 1 : let mut fields_present = FieldsPresent(false, skipped_field_indices);
562 1 : event.record(&mut fields_present);
563 1 : if fields_present.0 {
564 1 : serializer.serialize_entry(
565 1 : "fields",
566 1 : &SerializableEventFields(event, skipped_field_indices),
567 1 : )?;
568 0 : }
569 :
570 1 : let spans = SerializableSpans {
571 1 : ctx,
572 1 : callsite_ids,
573 1 : extract: ExtractedSpanFields::<'_, F>::new(extract_fields),
574 1 : };
575 1 : serializer.serialize_entry("spans", &spans)?;
576 :
577 : // TODO: thread-local cache?
578 1 : let pid = std::process::id();
579 1 : // Skip adding pid 1 to reduce noise for services running in containers.
580 1 : if pid != 1 {
581 1 : serializer.serialize_entry("process_id", &pid)?;
582 0 : }
583 :
584 1 : THREAD_ID.with(|tid| serializer.serialize_entry("thread_id", tid))?;
585 :
586 : // TODO: tls cache? name could change
587 1 : if let Some(thread_name) = std::thread::current().name() {
588 1 : if !thread_name.is_empty() && thread_name != "tokio-runtime-worker" {
589 1 : serializer.serialize_entry("thread_name", thread_name)?;
590 0 : }
591 0 : }
592 :
593 1 : if let Some(task_id) = tokio::task::try_id() {
594 0 : serializer.serialize_entry("task_id", &format_args!("{task_id}"))?;
595 1 : }
596 :
597 1 : serializer.serialize_entry("target", meta.target())?;
598 :
599 : // Skip adding module if it's the same as target.
600 1 : if let Some(module) = meta.module_path() {
601 1 : if module != meta.target() {
602 0 : serializer.serialize_entry("module", module)?;
603 1 : }
604 0 : }
605 :
606 1 : if let Some(file) = meta.file() {
607 1 : if let Some(line) = meta.line() {
608 1 : serializer.serialize_entry("src", &format_args!("{file}:{line}"))?;
609 : } else {
610 0 : serializer.serialize_entry("src", file)?;
611 : }
612 0 : }
613 :
614 : {
615 1 : let otel_context = Span::current().context();
616 1 : let otel_spanref = otel_context.span();
617 1 : let span_context = otel_spanref.span_context();
618 1 : if span_context.is_valid() {
619 0 : serializer.serialize_entry(
620 0 : "trace_id",
621 0 : &format_args!("{}", span_context.trace_id()),
622 0 : )?;
623 1 : }
624 : }
625 :
626 1 : if spans.extract.has_values() {
627 : // TODO: add fields from event, too?
628 1 : serializer.serialize_entry("extract", &spans.extract)?;
629 0 : }
630 :
631 1 : serializer.end()
632 1 : };
633 :
634 1 : serialize().map_err(io::Error::other)?;
635 1 : self.logline_buffer.push(b'\n');
636 1 : Ok(())
637 1 : }
638 : }
639 :
640 : /// Extracts the message field that's mixed will other fields.
641 : struct MessageFieldExtractor<'a, S: serde::ser::SerializeMap> {
642 : serializer: S,
643 : skipped_field_indices: Option<&'a SkippedFieldIndices>,
644 : state: Option<Result<(), S::Error>>,
645 : }
646 :
647 : impl<'a, S: serde::ser::SerializeMap> MessageFieldExtractor<'a, S> {
648 : #[inline]
649 1 : fn new(serializer: S, skipped_field_indices: Option<&'a SkippedFieldIndices>) -> Self {
650 1 : Self {
651 1 : serializer,
652 1 : skipped_field_indices,
653 1 : state: None,
654 1 : }
655 1 : }
656 :
657 : #[inline]
658 1 : fn into_serializer(mut self) -> Result<S, S::Error> {
659 1 : match self.state {
660 1 : Some(Ok(())) => {}
661 0 : Some(Err(err)) => return Err(err),
662 0 : None => self.serializer.serialize_value("")?,
663 : }
664 1 : Ok(self.serializer)
665 1 : }
666 :
667 : #[inline]
668 5 : fn accept_field(&self, field: &tracing::field::Field) -> bool {
669 5 : self.state.is_none()
670 5 : && field.name() == MESSAGE_FIELD
671 2 : && !self
672 2 : .skipped_field_indices
673 2 : .is_some_and(|i| i.contains(field.index()))
674 5 : }
675 : }
676 :
677 : impl<S: serde::ser::SerializeMap> tracing::field::Visit for MessageFieldExtractor<'_, S> {
678 : #[inline]
679 0 : fn record_f64(&mut self, field: &tracing::field::Field, value: f64) {
680 0 : if self.accept_field(field) {
681 0 : self.state = Some(self.serializer.serialize_value(&value));
682 0 : }
683 0 : }
684 :
685 : #[inline]
686 3 : fn record_i64(&mut self, field: &tracing::field::Field, value: i64) {
687 3 : if self.accept_field(field) {
688 0 : self.state = Some(self.serializer.serialize_value(&value));
689 3 : }
690 3 : }
691 :
692 : #[inline]
693 0 : fn record_u64(&mut self, field: &tracing::field::Field, value: u64) {
694 0 : if self.accept_field(field) {
695 0 : self.state = Some(self.serializer.serialize_value(&value));
696 0 : }
697 0 : }
698 :
699 : #[inline]
700 0 : fn record_i128(&mut self, field: &tracing::field::Field, value: i128) {
701 0 : if self.accept_field(field) {
702 0 : self.state = Some(self.serializer.serialize_value(&value));
703 0 : }
704 0 : }
705 :
706 : #[inline]
707 0 : fn record_u128(&mut self, field: &tracing::field::Field, value: u128) {
708 0 : if self.accept_field(field) {
709 0 : self.state = Some(self.serializer.serialize_value(&value));
710 0 : }
711 0 : }
712 :
713 : #[inline]
714 0 : fn record_bool(&mut self, field: &tracing::field::Field, value: bool) {
715 0 : if self.accept_field(field) {
716 0 : self.state = Some(self.serializer.serialize_value(&value));
717 0 : }
718 0 : }
719 :
720 : #[inline]
721 0 : fn record_bytes(&mut self, field: &tracing::field::Field, value: &[u8]) {
722 0 : if self.accept_field(field) {
723 0 : self.state = Some(self.serializer.serialize_value(&format_args!("{value:x?}")));
724 0 : }
725 0 : }
726 :
727 : #[inline]
728 1 : fn record_str(&mut self, field: &tracing::field::Field, value: &str) {
729 1 : if self.accept_field(field) {
730 1 : self.state = Some(self.serializer.serialize_value(&value));
731 1 : }
732 1 : }
733 :
734 : #[inline]
735 1 : fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) {
736 1 : if self.accept_field(field) {
737 0 : self.state = Some(self.serializer.serialize_value(&format_args!("{value:?}")));
738 1 : }
739 1 : }
740 :
741 : #[inline]
742 0 : fn record_error(
743 0 : &mut self,
744 0 : field: &tracing::field::Field,
745 0 : value: &(dyn std::error::Error + 'static),
746 0 : ) {
747 0 : if self.accept_field(field) {
748 0 : self.state = Some(self.serializer.serialize_value(&format_args!("{value}")));
749 0 : }
750 0 : }
751 : }
752 :
753 : /// Checks if there's any fields and field values present. If not, the JSON subobject
754 : /// can be skipped.
755 : // This is entirely optional and only cosmetic, though maybe helps a
756 : // bit during log parsing in dashboards when there's no field with empty object.
757 : struct FieldsPresent<'a>(pub bool, Option<&'a SkippedFieldIndices>);
758 :
759 : // Even though some methods have an overhead (error, bytes) it is assumed the
760 : // compiler won't include this since we ignore the value entirely.
761 : impl tracing::field::Visit for FieldsPresent<'_> {
762 : #[inline]
763 5 : fn record_debug(&mut self, field: &tracing::field::Field, _: &dyn std::fmt::Debug) {
764 5 : if !self.1.is_some_and(|i| i.contains(field.index()))
765 2 : && field.name() != MESSAGE_FIELD
766 1 : && !field.name().starts_with("log.")
767 1 : {
768 1 : self.0 |= true;
769 4 : }
770 5 : }
771 : }
772 :
773 : /// Serializes the fields directly supplied with a log event.
774 : struct SerializableEventFields<'a, 'event>(
775 : &'a tracing::Event<'event>,
776 : Option<&'a SkippedFieldIndices>,
777 : );
778 :
779 : impl serde::ser::Serialize for SerializableEventFields<'_, '_> {
780 1 : fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
781 1 : where
782 1 : S: Serializer,
783 1 : {
784 : use serde::ser::SerializeMap;
785 1 : let serializer = serializer.serialize_map(None)?;
786 1 : let mut message_skipper = MessageFieldSkipper::new(serializer, self.1);
787 1 : self.0.record(&mut message_skipper);
788 1 : let serializer = message_skipper.into_serializer()?;
789 1 : serializer.end()
790 1 : }
791 : }
792 :
793 : /// A tracing field visitor that skips the message field.
794 : struct MessageFieldSkipper<'a, S: serde::ser::SerializeMap> {
795 : serializer: S,
796 : skipped_field_indices: Option<&'a SkippedFieldIndices>,
797 : state: Result<(), S::Error>,
798 : }
799 :
800 : impl<'a, S: serde::ser::SerializeMap> MessageFieldSkipper<'a, S> {
801 : #[inline]
802 1 : fn new(serializer: S, skipped_field_indices: Option<&'a SkippedFieldIndices>) -> Self {
803 1 : Self {
804 1 : serializer,
805 1 : skipped_field_indices,
806 1 : state: Ok(()),
807 1 : }
808 1 : }
809 :
810 : #[inline]
811 5 : fn accept_field(&self, field: &tracing::field::Field) -> bool {
812 5 : self.state.is_ok()
813 5 : && field.name() != MESSAGE_FIELD
814 3 : && !field.name().starts_with("log.")
815 3 : && !self
816 3 : .skipped_field_indices
817 3 : .is_some_and(|i| i.contains(field.index()))
818 5 : }
819 :
820 : #[inline]
821 1 : fn into_serializer(self) -> Result<S, S::Error> {
822 1 : self.state?;
823 1 : Ok(self.serializer)
824 1 : }
825 : }
826 :
827 : impl<S: serde::ser::SerializeMap> tracing::field::Visit for MessageFieldSkipper<'_, S> {
828 : #[inline]
829 0 : fn record_f64(&mut self, field: &tracing::field::Field, value: f64) {
830 0 : if self.accept_field(field) {
831 0 : self.state = self.serializer.serialize_entry(field.name(), &value);
832 0 : }
833 0 : }
834 :
835 : #[inline]
836 3 : fn record_i64(&mut self, field: &tracing::field::Field, value: i64) {
837 3 : if self.accept_field(field) {
838 1 : self.state = self.serializer.serialize_entry(field.name(), &value);
839 2 : }
840 3 : }
841 :
842 : #[inline]
843 0 : fn record_u64(&mut self, field: &tracing::field::Field, value: u64) {
844 0 : if self.accept_field(field) {
845 0 : self.state = self.serializer.serialize_entry(field.name(), &value);
846 0 : }
847 0 : }
848 :
849 : #[inline]
850 0 : fn record_i128(&mut self, field: &tracing::field::Field, value: i128) {
851 0 : if self.accept_field(field) {
852 0 : self.state = self.serializer.serialize_entry(field.name(), &value);
853 0 : }
854 0 : }
855 :
856 : #[inline]
857 0 : fn record_u128(&mut self, field: &tracing::field::Field, value: u128) {
858 0 : if self.accept_field(field) {
859 0 : self.state = self.serializer.serialize_entry(field.name(), &value);
860 0 : }
861 0 : }
862 :
863 : #[inline]
864 0 : fn record_bool(&mut self, field: &tracing::field::Field, value: bool) {
865 0 : if self.accept_field(field) {
866 0 : self.state = self.serializer.serialize_entry(field.name(), &value);
867 0 : }
868 0 : }
869 :
870 : #[inline]
871 0 : fn record_bytes(&mut self, field: &tracing::field::Field, value: &[u8]) {
872 0 : if self.accept_field(field) {
873 0 : self.state = self
874 0 : .serializer
875 0 : .serialize_entry(field.name(), &format_args!("{value:x?}"));
876 0 : }
877 0 : }
878 :
879 : #[inline]
880 1 : fn record_str(&mut self, field: &tracing::field::Field, value: &str) {
881 1 : if self.accept_field(field) {
882 0 : self.state = self.serializer.serialize_entry(field.name(), &value);
883 1 : }
884 1 : }
885 :
886 : #[inline]
887 1 : fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) {
888 1 : if self.accept_field(field) {
889 0 : self.state = self
890 0 : .serializer
891 0 : .serialize_entry(field.name(), &format_args!("{value:?}"));
892 1 : }
893 1 : }
894 :
895 : #[inline]
896 0 : fn record_error(
897 0 : &mut self,
898 0 : field: &tracing::field::Field,
899 0 : value: &(dyn std::error::Error + 'static),
900 0 : ) {
901 0 : if self.accept_field(field) {
902 0 : self.state = self.serializer.serialize_value(&format_args!("{value}"));
903 0 : }
904 0 : }
905 : }
906 :
907 : /// Serializes the span stack from root to leaf (parent of event) as object
908 : /// with the span names as keys. To prevent collision we append a numberic value
909 : /// to the name. Also, collects any span fields we're interested in. Last one
910 : /// wins.
911 : struct SerializableSpans<'a, 'ctx, Span, const F: usize>
912 : where
913 : Span: Subscriber + for<'lookup> LookupSpan<'lookup>,
914 : {
915 : ctx: &'a Context<'ctx, Span>,
916 : callsite_ids: &'a papaya::HashMap<callsite::Identifier, CallsiteId>,
917 : extract: ExtractedSpanFields<'a, F>,
918 : }
919 :
920 : impl<Span, const F: usize> serde::ser::Serialize for SerializableSpans<'_, '_, Span, F>
921 : where
922 : Span: Subscriber + for<'lookup> LookupSpan<'lookup>,
923 : {
924 1 : fn serialize<Ser>(&self, serializer: Ser) -> Result<Ser::Ok, Ser::Error>
925 1 : where
926 1 : Ser: serde::ser::Serializer,
927 1 : {
928 1 : let mut serializer = serializer.serialize_map(None)?;
929 :
930 1 : if let Some(leaf_span) = self.ctx.lookup_current() {
931 2 : for span in leaf_span.scope().from_root() {
932 : // Append a numeric callsite ID to the span name to keep the name unique
933 : // in the JSON object.
934 2 : let cid = self
935 2 : .callsite_ids
936 2 : .pin()
937 2 : .get(&span.metadata().callsite())
938 2 : .copied()
939 2 : .unwrap_or_default();
940 2 :
941 2 : // Loki turns the # into an underscore during field name concatenation.
942 2 : serializer.serialize_key(&format_args!("{}#{}", span.metadata().name(), &cid))?;
943 :
944 2 : serializer.serialize_value(&SerializableSpanFields {
945 2 : span: &span,
946 2 : extract: &self.extract,
947 2 : })?;
948 : }
949 0 : }
950 :
951 1 : serializer.end()
952 1 : }
953 : }
954 :
955 : /// Serializes the span fields as object.
956 : struct SerializableSpanFields<'a, 'span, Span, const F: usize>
957 : where
958 : Span: for<'lookup> LookupSpan<'lookup>,
959 : {
960 : span: &'a SpanRef<'span, Span>,
961 : extract: &'a ExtractedSpanFields<'a, F>,
962 : }
963 :
964 : impl<Span, const F: usize> serde::ser::Serialize for SerializableSpanFields<'_, '_, Span, F>
965 : where
966 : Span: for<'lookup> LookupSpan<'lookup>,
967 : {
968 2 : fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
969 2 : where
970 2 : S: serde::ser::Serializer,
971 2 : {
972 2 : let mut serializer = serializer.serialize_map(None)?;
973 :
974 2 : let ext = self.span.extensions();
975 2 : if let Some(data) = ext.get::<SpanFields>() {
976 2 : for (name, value) in &data.fields.pin() {
977 2 : serializer.serialize_entry(name, value)?;
978 : // TODO: replace clone with reference, if possible.
979 2 : self.extract.set(name, value.clone());
980 : }
981 0 : }
982 :
983 2 : serializer.end()
984 2 : }
985 : }
986 :
987 : struct ExtractedSpanFields<'a, const F: usize> {
988 : names: &'a IndexSet<&'static str>,
989 : // TODO: replace TryLock with something local thread and interior mutability.
990 : // serde API doesn't let us use `mut`.
991 : values: TryLock<([Option<serde_json::Value>; F], bool)>,
992 : }
993 :
994 : impl<'a, const F: usize> ExtractedSpanFields<'a, F> {
995 1 : fn new(names: &'a IndexSet<&'static str>) -> Self {
996 1 : ExtractedSpanFields {
997 1 : names,
998 1 : values: TryLock::new((array::from_fn(|_| Option::default()), false)),
999 1 : }
1000 1 : }
1001 :
1002 : #[inline]
1003 2 : fn set(&self, name: &'static str, value: serde_json::Value) {
1004 2 : if let Some((index, _)) = self.names.get_full(name) {
1005 2 : let mut fields = self.values.try_lock().expect("thread-local use");
1006 2 : fields.0[index] = Some(value);
1007 2 : fields.1 = true;
1008 2 : }
1009 2 : }
1010 :
1011 : #[inline]
1012 1 : fn has_values(&self) -> bool {
1013 1 : self.values.try_lock().expect("thread-local use").1
1014 1 : }
1015 : }
1016 :
1017 : impl<const F: usize> serde::ser::Serialize for ExtractedSpanFields<'_, F> {
1018 1 : fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
1019 1 : where
1020 1 : S: serde::ser::Serializer,
1021 1 : {
1022 1 : let mut serializer = serializer.serialize_map(None)?;
1023 :
1024 1 : let values = self.values.try_lock().expect("thread-local use");
1025 1 : for (i, value) in values.0.iter().enumerate() {
1026 1 : if let Some(value) = value {
1027 1 : let key = self.names[i];
1028 1 : serializer.serialize_entry(key, value)?;
1029 0 : }
1030 : }
1031 :
1032 1 : serializer.end()
1033 1 : }
1034 : }
1035 :
1036 : #[cfg(test)]
1037 : #[allow(clippy::unwrap_used)]
1038 : mod tests {
1039 : use std::marker::PhantomData;
1040 : use std::sync::{Arc, Mutex, MutexGuard};
1041 :
1042 : use assert_json_diff::assert_json_eq;
1043 : use tracing::info_span;
1044 :
1045 : use super::*;
1046 :
1047 : struct TestClock {
1048 : current_time: Mutex<DateTime<Utc>>,
1049 : }
1050 :
1051 : impl Clock for Arc<TestClock> {
1052 2 : fn now(&self) -> DateTime<Utc> {
1053 2 : *self.current_time.lock().expect("poisoned")
1054 2 : }
1055 : }
1056 :
1057 : struct VecWriter<'a> {
1058 : buffer: MutexGuard<'a, Vec<u8>>,
1059 : }
1060 :
1061 : impl MakeWriter for Arc<Mutex<Vec<u8>>> {
1062 1 : fn make_writer(&self) -> impl io::Write {
1063 1 : VecWriter {
1064 1 : buffer: self.lock().expect("poisoned"),
1065 1 : }
1066 1 : }
1067 : }
1068 :
1069 : impl io::Write for VecWriter<'_> {
1070 1 : fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
1071 1 : self.buffer.write(buf)
1072 1 : }
1073 :
1074 0 : fn flush(&mut self) -> io::Result<()> {
1075 0 : Ok(())
1076 0 : }
1077 : }
1078 :
1079 : #[test]
1080 1 : fn test_field_collection() {
1081 1 : let clock = Arc::new(TestClock {
1082 1 : current_time: Mutex::new(Utc::now()),
1083 1 : });
1084 1 : let buffer = Arc::new(Mutex::new(Vec::new()));
1085 1 : let log_layer = JsonLoggingLayer {
1086 1 : clock: clock.clone(),
1087 1 : skipped_field_indices: papaya::HashMap::default(),
1088 1 : callsite_ids: papaya::HashMap::default(),
1089 1 : writer: buffer.clone(),
1090 1 : extract_fields: IndexSet::from_iter(["x"]),
1091 1 : _marker: PhantomData::<[&'static str; 1]>,
1092 1 : };
1093 1 :
1094 1 : let registry = tracing_subscriber::Registry::default().with(log_layer);
1095 1 :
1096 1 : tracing::subscriber::with_default(registry, || {
1097 1 : info_span!("some_span", x = 24).in_scope(|| {
1098 1 : info_span!("some_span", x = 40, x = 41, x = 42).in_scope(|| {
1099 1 : tracing::error!(
1100 : a = 1,
1101 : a = 2,
1102 : a = 3,
1103 : message = "explicit message field",
1104 0 : "implicit message field"
1105 : );
1106 1 : });
1107 1 : });
1108 1 : });
1109 1 :
1110 1 : let buffer = Arc::try_unwrap(buffer)
1111 1 : .expect("no other reference")
1112 1 : .into_inner()
1113 1 : .expect("poisoned");
1114 1 : let actual: serde_json::Value = serde_json::from_slice(&buffer).expect("valid JSON");
1115 1 : let expected: serde_json::Value = serde_json::json!(
1116 1 : {
1117 1 : "timestamp": clock.now().to_rfc3339_opts(chrono::SecondsFormat::Micros, true),
1118 1 : "level": "ERROR",
1119 1 : "message": "explicit message field",
1120 1 : "fields": {
1121 1 : "a": 3,
1122 1 : },
1123 1 : "spans": {
1124 1 : "some_span#1":{
1125 1 : "x": 24,
1126 1 : },
1127 1 : "some_span#2": {
1128 1 : "x": 42,
1129 1 : }
1130 1 : },
1131 1 : "extract": {
1132 1 : "x": 42,
1133 1 : },
1134 1 : "src": actual.as_object().unwrap().get("src").unwrap().as_str().unwrap(),
1135 1 : "target": "proxy::logging::tests",
1136 1 : "process_id": actual.as_object().unwrap().get("process_id").unwrap().as_number().unwrap(),
1137 1 : "thread_id": actual.as_object().unwrap().get("thread_id").unwrap().as_number().unwrap(),
1138 1 : "thread_name": "logging::tests::test_field_collection",
1139 1 : }
1140 1 : );
1141 1 :
1142 1 : assert_json_eq!(actual, expected);
1143 1 : }
1144 : }
|