Line data Source code
1 : use std::cell::{Cell, RefCell};
2 : use std::collections::HashMap;
3 : use std::hash::BuildHasher;
4 : use std::sync::atomic::{AtomicU32, Ordering};
5 : use std::{array, env, fmt, io};
6 :
7 : use chrono::{DateTime, Utc};
8 : use indexmap::IndexSet;
9 : use opentelemetry::trace::TraceContextExt;
10 : use scopeguard::defer;
11 : use serde::ser::{SerializeMap, Serializer};
12 : use tracing::subscriber::Interest;
13 : use tracing::{Event, Metadata, Span, Subscriber, callsite, span};
14 : use tracing_opentelemetry::OpenTelemetrySpanExt;
15 : use tracing_subscriber::filter::{EnvFilter, LevelFilter};
16 : use tracing_subscriber::fmt::format::{Format, Full};
17 : use tracing_subscriber::fmt::time::SystemTime;
18 : use tracing_subscriber::fmt::{FormatEvent, FormatFields};
19 : use tracing_subscriber::layer::{Context, Layer};
20 : use tracing_subscriber::prelude::*;
21 : use tracing_subscriber::registry::{LookupSpan, SpanRef};
22 : use try_lock::TryLock;
23 :
24 : /// Initialize logging and OpenTelemetry tracing and exporter.
25 : ///
26 : /// Logging can be configured using `RUST_LOG` environment variable.
27 : ///
28 : /// OpenTelemetry is configured with OTLP/HTTP exporter. It picks up
29 : /// configuration from environment variables. For example, to change the
30 : /// destination, set `OTEL_EXPORTER_OTLP_ENDPOINT=http://jaeger:4318`.
31 : /// See <https://opentelemetry.io/docs/reference/specification/sdk-environment-variables>
32 0 : pub async fn init() -> anyhow::Result<LoggingGuard> {
33 0 : let logfmt = LogFormat::from_env()?;
34 :
35 0 : let env_filter = EnvFilter::builder()
36 0 : .with_default_directive(LevelFilter::INFO.into())
37 0 : .from_env_lossy()
38 0 : .add_directive(
39 0 : "aws_config=info"
40 0 : .parse()
41 0 : .expect("this should be a valid filter directive"),
42 0 : )
43 0 : .add_directive(
44 0 : "azure_core::policies::transport=off"
45 0 : .parse()
46 0 : .expect("this should be a valid filter directive"),
47 0 : );
48 :
49 0 : let otlp_layer =
50 0 : tracing_utils::init_tracing("proxy", tracing_utils::ExportConfig::default()).await;
51 :
52 0 : let json_log_layer = if logfmt == LogFormat::Json {
53 0 : Some(JsonLoggingLayer::new(
54 0 : RealClock,
55 0 : StderrWriter {
56 0 : stderr: std::io::stderr(),
57 0 : },
58 0 : ["request_id", "session_id", "conn_id"],
59 0 : ))
60 : } else {
61 0 : None
62 : };
63 :
64 0 : let text_log_layer = if logfmt == LogFormat::Text {
65 0 : Some(
66 0 : tracing_subscriber::fmt::layer()
67 0 : .with_ansi(false)
68 0 : .with_writer(std::io::stderr)
69 0 : .with_target(false),
70 0 : )
71 : } else {
72 0 : None
73 : };
74 :
75 0 : tracing_subscriber::registry()
76 0 : .with(env_filter)
77 0 : .with(otlp_layer)
78 0 : .with(json_log_layer)
79 0 : .with(text_log_layer)
80 0 : .try_init()?;
81 :
82 0 : Ok(LoggingGuard)
83 0 : }
84 :
85 : /// Initialize logging for local_proxy with log prefix and no opentelemetry.
86 : ///
87 : /// Logging can be configured using `RUST_LOG` environment variable.
88 0 : pub fn init_local_proxy() -> anyhow::Result<LoggingGuard> {
89 0 : let env_filter = EnvFilter::builder()
90 0 : .with_default_directive(LevelFilter::INFO.into())
91 0 : .from_env_lossy();
92 0 :
93 0 : let fmt_layer = tracing_subscriber::fmt::layer()
94 0 : .with_ansi(false)
95 0 : .with_writer(std::io::stderr)
96 0 : .event_format(LocalProxyFormatter(Format::default().with_target(false)));
97 0 :
98 0 : tracing_subscriber::registry()
99 0 : .with(env_filter)
100 0 : .with(fmt_layer)
101 0 : .try_init()?;
102 :
103 0 : Ok(LoggingGuard)
104 0 : }
105 :
106 : pub struct LocalProxyFormatter(Format<Full, SystemTime>);
107 :
108 : impl<S, N> FormatEvent<S, N> for LocalProxyFormatter
109 : where
110 : S: Subscriber + for<'a> LookupSpan<'a>,
111 : N: for<'a> FormatFields<'a> + 'static,
112 : {
113 0 : fn format_event(
114 0 : &self,
115 0 : ctx: &tracing_subscriber::fmt::FmtContext<'_, S, N>,
116 0 : mut writer: tracing_subscriber::fmt::format::Writer<'_>,
117 0 : event: &tracing::Event<'_>,
118 0 : ) -> std::fmt::Result {
119 0 : writer.write_str("[local_proxy] ")?;
120 0 : self.0.format_event(ctx, writer, event)
121 0 : }
122 : }
123 :
124 : pub struct LoggingGuard;
125 :
126 : impl Drop for LoggingGuard {
127 0 : fn drop(&mut self) {
128 0 : // Shutdown trace pipeline gracefully, so that it has a chance to send any
129 0 : // pending traces before we exit.
130 0 : tracing::info!("shutting down the tracing machinery");
131 0 : tracing_utils::shutdown_tracing();
132 0 : }
133 : }
134 :
135 : // TODO: make JSON the default
136 : #[derive(Copy, Clone, PartialEq, Eq, Default, Debug)]
137 : enum LogFormat {
138 : #[default]
139 : Text = 1,
140 : Json,
141 : }
142 :
143 : impl LogFormat {
144 0 : fn from_env() -> anyhow::Result<Self> {
145 0 : let logfmt = env::var("LOGFMT");
146 0 : Ok(match logfmt.as_deref() {
147 0 : Err(_) => LogFormat::default(),
148 0 : Ok("text") => LogFormat::Text,
149 0 : Ok("json") => LogFormat::Json,
150 0 : Ok(logfmt) => anyhow::bail!("unknown log format: {logfmt}"),
151 : })
152 0 : }
153 : }
154 :
155 : trait MakeWriter {
156 : fn make_writer(&self) -> impl io::Write;
157 : }
158 :
159 : struct StderrWriter {
160 : stderr: io::Stderr,
161 : }
162 :
163 : impl MakeWriter for StderrWriter {
164 : #[inline]
165 0 : fn make_writer(&self) -> impl io::Write {
166 0 : self.stderr.lock()
167 0 : }
168 : }
169 :
170 : // TODO: move into separate module or even separate crate.
171 : trait Clock {
172 : fn now(&self) -> DateTime<Utc>;
173 : }
174 :
175 : struct RealClock;
176 :
177 : impl Clock for RealClock {
178 : #[inline]
179 0 : fn now(&self) -> DateTime<Utc> {
180 0 : Utc::now()
181 0 : }
182 : }
183 :
184 : /// Name of the field used by tracing crate to store the event message.
185 : const MESSAGE_FIELD: &str = "message";
186 :
187 : thread_local! {
188 : /// Protects against deadlocks and double panics during log writing.
189 : /// The current panic handler will use tracing to log panic information.
190 : static REENTRANCY_GUARD: Cell<bool> = const { Cell::new(false) };
191 : /// Thread-local instance with per-thread buffer for log writing.
192 : static EVENT_FORMATTER: RefCell<EventFormatter> = RefCell::new(EventFormatter::new());
193 : /// Cached OS thread ID.
194 : static THREAD_ID: u64 = gettid::gettid();
195 : }
196 :
197 : /// Implements tracing layer to handle events specific to logging.
198 : struct JsonLoggingLayer<C: Clock, W: MakeWriter, const F: usize> {
199 : clock: C,
200 : skipped_field_indices: papaya::HashMap<callsite::Identifier, SkippedFieldIndices>,
201 : callsite_ids: papaya::HashMap<callsite::Identifier, CallsiteId>,
202 : writer: W,
203 : // We use a const generic and arrays to bypass one heap allocation.
204 : extract_fields: IndexSet<&'static str>,
205 : _marker: std::marker::PhantomData<[&'static str; F]>,
206 : }
207 :
208 : impl<C: Clock, W: MakeWriter, const F: usize> JsonLoggingLayer<C, W, F> {
209 0 : fn new(clock: C, writer: W, extract_fields: [&'static str; F]) -> Self {
210 0 : JsonLoggingLayer {
211 0 : clock,
212 0 : skipped_field_indices: papaya::HashMap::default(),
213 0 : callsite_ids: papaya::HashMap::default(),
214 0 : writer,
215 0 : extract_fields: IndexSet::from_iter(extract_fields),
216 0 : _marker: std::marker::PhantomData,
217 0 : }
218 0 : }
219 :
220 : #[inline]
221 2 : fn callsite_id(&self, cs: callsite::Identifier) -> CallsiteId {
222 2 : *self
223 2 : .callsite_ids
224 2 : .pin()
225 2 : .get_or_insert_with(cs, CallsiteId::next)
226 2 : }
227 : }
228 :
229 : impl<S, C: Clock + 'static, W: MakeWriter + 'static, const F: usize> Layer<S>
230 : for JsonLoggingLayer<C, W, F>
231 : where
232 : S: Subscriber + for<'a> LookupSpan<'a>,
233 : {
234 1 : fn on_event(&self, event: &Event<'_>, ctx: Context<'_, S>) {
235 : use std::io::Write;
236 :
237 : // TODO: consider special tracing subscriber to grab timestamp very
238 : // early, before OTel machinery, and add as event extension.
239 1 : let now = self.clock.now();
240 1 :
241 1 : let res: io::Result<()> = REENTRANCY_GUARD.with(move |entered| {
242 1 : if entered.get() {
243 0 : let mut formatter = EventFormatter::new();
244 0 : formatter.format::<S, F>(
245 0 : now,
246 0 : event,
247 0 : &ctx,
248 0 : &self.skipped_field_indices,
249 0 : &self.callsite_ids,
250 0 : &self.extract_fields,
251 0 : )?;
252 0 : self.writer.make_writer().write_all(formatter.buffer())
253 : } else {
254 1 : entered.set(true);
255 1 : defer!(entered.set(false););
256 1 :
257 1 : EVENT_FORMATTER.with_borrow_mut(move |formatter| {
258 1 : formatter.reset();
259 1 : formatter.format::<S, F>(
260 1 : now,
261 1 : event,
262 1 : &ctx,
263 1 : &self.skipped_field_indices,
264 1 : &self.callsite_ids,
265 1 : &self.extract_fields,
266 1 : )?;
267 1 : self.writer.make_writer().write_all(formatter.buffer())
268 1 : })
269 : }
270 1 : });
271 :
272 : // In case logging fails we generate a simpler JSON object.
273 1 : if let Err(err) = res {
274 0 : if let Ok(mut line) = serde_json::to_vec(&serde_json::json!( {
275 0 : "timestamp": now.to_rfc3339_opts(chrono::SecondsFormat::Micros, true),
276 0 : "level": "ERROR",
277 0 : "message": format_args!("cannot log event: {err:?}"),
278 0 : "fields": {
279 0 : "event": format_args!("{event:?}"),
280 0 : },
281 0 : })) {
282 0 : line.push(b'\n');
283 0 : self.writer.make_writer().write_all(&line).ok();
284 0 : }
285 1 : }
286 1 : }
287 :
288 : /// Registers a SpanFields instance as span extension.
289 2 : fn on_new_span(&self, attrs: &span::Attributes<'_>, id: &span::Id, ctx: Context<'_, S>) {
290 2 : let span = ctx.span(id).expect("span must exist");
291 2 : let fields = SpanFields::default();
292 2 : fields.record_fields(attrs);
293 2 :
294 2 : // This could deadlock when there's a panic somewhere in the tracing
295 2 : // event handling and a read or write guard is still held. This includes
296 2 : // the OTel subscriber.
297 2 : let mut exts = span.extensions_mut();
298 2 :
299 2 : exts.insert(fields);
300 2 : }
301 :
302 0 : fn on_record(&self, id: &span::Id, values: &span::Record<'_>, ctx: Context<'_, S>) {
303 0 : let span = ctx.span(id).expect("span must exist");
304 0 : let ext = span.extensions();
305 0 : if let Some(data) = ext.get::<SpanFields>() {
306 0 : data.record_fields(values);
307 0 : }
308 0 : }
309 :
310 : /// Called (lazily) whenever a new log call is executed. We quickly check
311 : /// for duplicate field names and record duplicates as skippable. Last one
312 : /// wins.
313 3 : fn register_callsite(&self, metadata: &'static Metadata<'static>) -> Interest {
314 3 : if !metadata.is_event() {
315 2 : self.callsite_id(metadata.callsite());
316 2 : // Must not be never because we wouldn't get trace and span data.
317 2 : return Interest::always();
318 1 : }
319 1 :
320 1 : let mut field_indices = SkippedFieldIndices::default();
321 1 : let mut seen_fields = HashMap::<&'static str, usize>::new();
322 5 : for field in metadata.fields() {
323 : use std::collections::hash_map::Entry;
324 5 : match seen_fields.entry(field.name()) {
325 2 : Entry::Vacant(entry) => {
326 2 : // field not seen yet
327 2 : entry.insert(field.index());
328 2 : }
329 3 : Entry::Occupied(mut entry) => {
330 3 : // replace currently stored index
331 3 : let old_index = entry.insert(field.index());
332 3 : // ... and append it to list of skippable indices
333 3 : field_indices.push(old_index);
334 3 : }
335 : }
336 : }
337 :
338 1 : if !field_indices.is_empty() {
339 1 : self.skipped_field_indices
340 1 : .pin()
341 1 : .insert(metadata.callsite(), field_indices);
342 1 : }
343 :
344 1 : Interest::always()
345 3 : }
346 : }
347 :
348 : #[derive(Copy, Clone, Debug, Default)]
349 : #[repr(transparent)]
350 : struct CallsiteId(u32);
351 :
352 : impl CallsiteId {
353 : #[inline]
354 2 : fn next() -> Self {
355 : // Start at 1 to reserve 0 for default.
356 : static COUNTER: AtomicU32 = AtomicU32::new(1);
357 2 : CallsiteId(COUNTER.fetch_add(1, Ordering::Relaxed))
358 2 : }
359 : }
360 :
361 : impl fmt::Display for CallsiteId {
362 : #[inline]
363 2 : fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
364 2 : self.0.fmt(f)
365 2 : }
366 : }
367 :
368 : /// Stores span field values recorded during the spans lifetime.
369 : #[derive(Default)]
370 : struct SpanFields {
371 : // TODO: Switch to custom enum with lasso::Spur for Strings?
372 : fields: papaya::HashMap<&'static str, serde_json::Value>,
373 : }
374 :
375 : impl SpanFields {
376 : #[inline]
377 2 : fn record_fields<R: tracing_subscriber::field::RecordFields>(&self, fields: R) {
378 2 : fields.record(&mut SpanFieldsRecorder {
379 2 : fields: self.fields.pin(),
380 2 : });
381 2 : }
382 : }
383 :
384 : /// Implements a tracing field visitor to convert and store values.
385 : struct SpanFieldsRecorder<'m, S, G> {
386 : fields: papaya::HashMapRef<'m, &'static str, serde_json::Value, S, G>,
387 : }
388 :
389 : impl<S: BuildHasher, G: papaya::Guard> tracing::field::Visit for SpanFieldsRecorder<'_, S, G> {
390 : #[inline]
391 0 : fn record_f64(&mut self, field: &tracing::field::Field, value: f64) {
392 0 : self.fields
393 0 : .insert(field.name(), serde_json::Value::from(value));
394 0 : }
395 :
396 : #[inline]
397 4 : fn record_i64(&mut self, field: &tracing::field::Field, value: i64) {
398 4 : self.fields
399 4 : .insert(field.name(), serde_json::Value::from(value));
400 4 : }
401 :
402 : #[inline]
403 0 : fn record_u64(&mut self, field: &tracing::field::Field, value: u64) {
404 0 : self.fields
405 0 : .insert(field.name(), serde_json::Value::from(value));
406 0 : }
407 :
408 : #[inline]
409 0 : fn record_i128(&mut self, field: &tracing::field::Field, value: i128) {
410 0 : if let Ok(value) = i64::try_from(value) {
411 0 : self.fields
412 0 : .insert(field.name(), serde_json::Value::from(value));
413 0 : } else {
414 0 : self.fields
415 0 : .insert(field.name(), serde_json::Value::from(format!("{value}")));
416 0 : }
417 0 : }
418 :
419 : #[inline]
420 0 : fn record_u128(&mut self, field: &tracing::field::Field, value: u128) {
421 0 : if let Ok(value) = u64::try_from(value) {
422 0 : self.fields
423 0 : .insert(field.name(), serde_json::Value::from(value));
424 0 : } else {
425 0 : self.fields
426 0 : .insert(field.name(), serde_json::Value::from(format!("{value}")));
427 0 : }
428 0 : }
429 :
430 : #[inline]
431 0 : fn record_bool(&mut self, field: &tracing::field::Field, value: bool) {
432 0 : self.fields
433 0 : .insert(field.name(), serde_json::Value::from(value));
434 0 : }
435 :
436 : #[inline]
437 0 : fn record_bytes(&mut self, field: &tracing::field::Field, value: &[u8]) {
438 0 : self.fields
439 0 : .insert(field.name(), serde_json::Value::from(value));
440 0 : }
441 :
442 : #[inline]
443 0 : fn record_str(&mut self, field: &tracing::field::Field, value: &str) {
444 0 : self.fields
445 0 : .insert(field.name(), serde_json::Value::from(value));
446 0 : }
447 :
448 : #[inline]
449 0 : fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) {
450 0 : self.fields
451 0 : .insert(field.name(), serde_json::Value::from(format!("{value:?}")));
452 0 : }
453 :
454 : #[inline]
455 0 : fn record_error(
456 0 : &mut self,
457 0 : field: &tracing::field::Field,
458 0 : value: &(dyn std::error::Error + 'static),
459 0 : ) {
460 0 : self.fields
461 0 : .insert(field.name(), serde_json::Value::from(format!("{value}")));
462 0 : }
463 : }
464 :
465 : /// List of field indices skipped during logging. Can list duplicate fields or
466 : /// metafields not meant to be logged.
467 : #[derive(Clone, Default)]
468 : struct SkippedFieldIndices {
469 : bits: u64,
470 : }
471 :
472 : impl SkippedFieldIndices {
473 : #[inline]
474 1 : fn is_empty(&self) -> bool {
475 1 : self.bits == 0
476 1 : }
477 :
478 : #[inline]
479 3 : fn push(&mut self, index: usize) {
480 3 : self.bits |= 1u64
481 3 : .checked_shl(index as u32)
482 3 : .expect("field index too large");
483 3 : }
484 :
485 : #[inline]
486 10 : fn contains(&self, index: usize) -> bool {
487 10 : self.bits
488 10 : & 1u64
489 10 : .checked_shl(index as u32)
490 10 : .expect("field index too large")
491 10 : != 0
492 10 : }
493 : }
494 :
495 : /// Formats a tracing event and writes JSON to its internal buffer including a newline.
496 : // TODO: buffer capacity management, truncate if too large
497 : struct EventFormatter {
498 : logline_buffer: Vec<u8>,
499 : }
500 :
501 : impl EventFormatter {
502 : #[inline]
503 1 : fn new() -> Self {
504 1 : EventFormatter {
505 1 : logline_buffer: Vec::new(),
506 1 : }
507 1 : }
508 :
509 : #[inline]
510 1 : fn buffer(&self) -> &[u8] {
511 1 : &self.logline_buffer
512 1 : }
513 :
514 : #[inline]
515 1 : fn reset(&mut self) {
516 1 : self.logline_buffer.clear();
517 1 : }
518 :
519 1 : fn format<S, const F: usize>(
520 1 : &mut self,
521 1 : now: DateTime<Utc>,
522 1 : event: &Event<'_>,
523 1 : ctx: &Context<'_, S>,
524 1 : skipped_field_indices: &papaya::HashMap<callsite::Identifier, SkippedFieldIndices>,
525 1 : callsite_ids: &papaya::HashMap<callsite::Identifier, CallsiteId>,
526 1 : extract_fields: &IndexSet<&'static str>,
527 1 : ) -> io::Result<()>
528 1 : where
529 1 : S: Subscriber + for<'a> LookupSpan<'a>,
530 1 : {
531 1 : let timestamp = now.to_rfc3339_opts(chrono::SecondsFormat::Micros, true);
532 :
533 : use tracing_log::NormalizeEvent;
534 1 : let normalized_meta = event.normalized_metadata();
535 1 : let meta = normalized_meta.as_ref().unwrap_or_else(|| event.metadata());
536 1 :
537 1 : let skipped_field_indices = skipped_field_indices.pin();
538 1 : let skipped_field_indices = skipped_field_indices.get(&meta.callsite());
539 1 :
540 1 : let mut serialize = || {
541 1 : let mut serializer = serde_json::Serializer::new(&mut self.logline_buffer);
542 :
543 1 : let mut serializer = serializer.serialize_map(None)?;
544 :
545 : // Timestamp comes first, so raw lines can be sorted by timestamp.
546 1 : serializer.serialize_entry("timestamp", ×tamp)?;
547 :
548 : // Level next.
549 1 : serializer.serialize_entry("level", &meta.level().as_str())?;
550 :
551 : // Message next.
552 1 : serializer.serialize_key("message")?;
553 1 : let mut message_extractor =
554 1 : MessageFieldExtractor::new(serializer, skipped_field_indices);
555 1 : event.record(&mut message_extractor);
556 1 : let mut serializer = message_extractor.into_serializer()?;
557 :
558 : // Direct message fields.
559 1 : let mut fields_present = FieldsPresent(false, skipped_field_indices);
560 1 : event.record(&mut fields_present);
561 1 : if fields_present.0 {
562 1 : serializer.serialize_entry(
563 1 : "fields",
564 1 : &SerializableEventFields(event, skipped_field_indices),
565 1 : )?;
566 0 : }
567 :
568 1 : let spans = SerializableSpans {
569 1 : ctx,
570 1 : callsite_ids,
571 1 : extract: ExtractedSpanFields::<'_, F>::new(extract_fields),
572 1 : };
573 1 : serializer.serialize_entry("spans", &spans)?;
574 :
575 : // TODO: thread-local cache?
576 1 : let pid = std::process::id();
577 1 : // Skip adding pid 1 to reduce noise for services running in containers.
578 1 : if pid != 1 {
579 1 : serializer.serialize_entry("process_id", &pid)?;
580 0 : }
581 :
582 1 : THREAD_ID.with(|tid| serializer.serialize_entry("thread_id", tid))?;
583 :
584 : // TODO: tls cache? name could change
585 1 : if let Some(thread_name) = std::thread::current().name() {
586 1 : if !thread_name.is_empty() && thread_name != "tokio-runtime-worker" {
587 1 : serializer.serialize_entry("thread_name", thread_name)?;
588 0 : }
589 0 : }
590 :
591 1 : if let Some(task_id) = tokio::task::try_id() {
592 0 : serializer.serialize_entry("task_id", &format_args!("{task_id}"))?;
593 1 : }
594 :
595 1 : serializer.serialize_entry("target", meta.target())?;
596 :
597 : // Skip adding module if it's the same as target.
598 1 : if let Some(module) = meta.module_path() {
599 1 : if module != meta.target() {
600 0 : serializer.serialize_entry("module", module)?;
601 1 : }
602 0 : }
603 :
604 1 : if let Some(file) = meta.file() {
605 1 : if let Some(line) = meta.line() {
606 1 : serializer.serialize_entry("src", &format_args!("{file}:{line}"))?;
607 : } else {
608 0 : serializer.serialize_entry("src", file)?;
609 : }
610 0 : }
611 :
612 : {
613 1 : let otel_context = Span::current().context();
614 1 : let otel_spanref = otel_context.span();
615 1 : let span_context = otel_spanref.span_context();
616 1 : if span_context.is_valid() {
617 0 : serializer.serialize_entry(
618 0 : "trace_id",
619 0 : &format_args!("{}", span_context.trace_id()),
620 0 : )?;
621 1 : }
622 : }
623 :
624 1 : if spans.extract.has_values() {
625 : // TODO: add fields from event, too?
626 1 : serializer.serialize_entry("extract", &spans.extract)?;
627 0 : }
628 :
629 1 : serializer.end()
630 1 : };
631 :
632 1 : serialize().map_err(io::Error::other)?;
633 1 : self.logline_buffer.push(b'\n');
634 1 : Ok(())
635 1 : }
636 : }
637 :
638 : /// Extracts the message field that's mixed will other fields.
639 : struct MessageFieldExtractor<'a, S: serde::ser::SerializeMap> {
640 : serializer: S,
641 : skipped_field_indices: Option<&'a SkippedFieldIndices>,
642 : state: Option<Result<(), S::Error>>,
643 : }
644 :
645 : impl<'a, S: serde::ser::SerializeMap> MessageFieldExtractor<'a, S> {
646 : #[inline]
647 1 : fn new(serializer: S, skipped_field_indices: Option<&'a SkippedFieldIndices>) -> Self {
648 1 : Self {
649 1 : serializer,
650 1 : skipped_field_indices,
651 1 : state: None,
652 1 : }
653 1 : }
654 :
655 : #[inline]
656 1 : fn into_serializer(mut self) -> Result<S, S::Error> {
657 1 : match self.state {
658 1 : Some(Ok(())) => {}
659 0 : Some(Err(err)) => return Err(err),
660 0 : None => self.serializer.serialize_value("")?,
661 : }
662 1 : Ok(self.serializer)
663 1 : }
664 :
665 : #[inline]
666 5 : fn accept_field(&self, field: &tracing::field::Field) -> bool {
667 5 : self.state.is_none()
668 5 : && field.name() == MESSAGE_FIELD
669 2 : && !self
670 2 : .skipped_field_indices
671 2 : .is_some_and(|i| i.contains(field.index()))
672 5 : }
673 : }
674 :
675 : impl<S: serde::ser::SerializeMap> tracing::field::Visit for MessageFieldExtractor<'_, S> {
676 : #[inline]
677 0 : fn record_f64(&mut self, field: &tracing::field::Field, value: f64) {
678 0 : if self.accept_field(field) {
679 0 : self.state = Some(self.serializer.serialize_value(&value));
680 0 : }
681 0 : }
682 :
683 : #[inline]
684 3 : fn record_i64(&mut self, field: &tracing::field::Field, value: i64) {
685 3 : if self.accept_field(field) {
686 0 : self.state = Some(self.serializer.serialize_value(&value));
687 3 : }
688 3 : }
689 :
690 : #[inline]
691 0 : fn record_u64(&mut self, field: &tracing::field::Field, value: u64) {
692 0 : if self.accept_field(field) {
693 0 : self.state = Some(self.serializer.serialize_value(&value));
694 0 : }
695 0 : }
696 :
697 : #[inline]
698 0 : fn record_i128(&mut self, field: &tracing::field::Field, value: i128) {
699 0 : if self.accept_field(field) {
700 0 : self.state = Some(self.serializer.serialize_value(&value));
701 0 : }
702 0 : }
703 :
704 : #[inline]
705 0 : fn record_u128(&mut self, field: &tracing::field::Field, value: u128) {
706 0 : if self.accept_field(field) {
707 0 : self.state = Some(self.serializer.serialize_value(&value));
708 0 : }
709 0 : }
710 :
711 : #[inline]
712 0 : fn record_bool(&mut self, field: &tracing::field::Field, value: bool) {
713 0 : if self.accept_field(field) {
714 0 : self.state = Some(self.serializer.serialize_value(&value));
715 0 : }
716 0 : }
717 :
718 : #[inline]
719 0 : fn record_bytes(&mut self, field: &tracing::field::Field, value: &[u8]) {
720 0 : if self.accept_field(field) {
721 0 : self.state = Some(self.serializer.serialize_value(&format_args!("{value:x?}")));
722 0 : }
723 0 : }
724 :
725 : #[inline]
726 1 : fn record_str(&mut self, field: &tracing::field::Field, value: &str) {
727 1 : if self.accept_field(field) {
728 1 : self.state = Some(self.serializer.serialize_value(&value));
729 1 : }
730 1 : }
731 :
732 : #[inline]
733 1 : fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) {
734 1 : if self.accept_field(field) {
735 0 : self.state = Some(self.serializer.serialize_value(&format_args!("{value:?}")));
736 1 : }
737 1 : }
738 :
739 : #[inline]
740 0 : fn record_error(
741 0 : &mut self,
742 0 : field: &tracing::field::Field,
743 0 : value: &(dyn std::error::Error + 'static),
744 0 : ) {
745 0 : if self.accept_field(field) {
746 0 : self.state = Some(self.serializer.serialize_value(&format_args!("{value}")));
747 0 : }
748 0 : }
749 : }
750 :
751 : /// Checks if there's any fields and field values present. If not, the JSON subobject
752 : /// can be skipped.
753 : // This is entirely optional and only cosmetic, though maybe helps a
754 : // bit during log parsing in dashboards when there's no field with empty object.
755 : struct FieldsPresent<'a>(pub bool, Option<&'a SkippedFieldIndices>);
756 :
757 : // Even though some methods have an overhead (error, bytes) it is assumed the
758 : // compiler won't include this since we ignore the value entirely.
759 : impl tracing::field::Visit for FieldsPresent<'_> {
760 : #[inline]
761 5 : fn record_debug(&mut self, field: &tracing::field::Field, _: &dyn std::fmt::Debug) {
762 5 : if !self.1.is_some_and(|i| i.contains(field.index()))
763 2 : && field.name() != MESSAGE_FIELD
764 1 : && !field.name().starts_with("log.")
765 1 : {
766 1 : self.0 |= true;
767 4 : }
768 5 : }
769 : }
770 :
771 : /// Serializes the fields directly supplied with a log event.
772 : struct SerializableEventFields<'a, 'event>(
773 : &'a tracing::Event<'event>,
774 : Option<&'a SkippedFieldIndices>,
775 : );
776 :
777 : impl serde::ser::Serialize for SerializableEventFields<'_, '_> {
778 1 : fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
779 1 : where
780 1 : S: Serializer,
781 1 : {
782 : use serde::ser::SerializeMap;
783 1 : let serializer = serializer.serialize_map(None)?;
784 1 : let mut message_skipper = MessageFieldSkipper::new(serializer, self.1);
785 1 : self.0.record(&mut message_skipper);
786 1 : let serializer = message_skipper.into_serializer()?;
787 1 : serializer.end()
788 1 : }
789 : }
790 :
791 : /// A tracing field visitor that skips the message field.
792 : struct MessageFieldSkipper<'a, S: serde::ser::SerializeMap> {
793 : serializer: S,
794 : skipped_field_indices: Option<&'a SkippedFieldIndices>,
795 : state: Result<(), S::Error>,
796 : }
797 :
798 : impl<'a, S: serde::ser::SerializeMap> MessageFieldSkipper<'a, S> {
799 : #[inline]
800 1 : fn new(serializer: S, skipped_field_indices: Option<&'a SkippedFieldIndices>) -> Self {
801 1 : Self {
802 1 : serializer,
803 1 : skipped_field_indices,
804 1 : state: Ok(()),
805 1 : }
806 1 : }
807 :
808 : #[inline]
809 5 : fn accept_field(&self, field: &tracing::field::Field) -> bool {
810 5 : self.state.is_ok()
811 5 : && field.name() != MESSAGE_FIELD
812 3 : && !field.name().starts_with("log.")
813 3 : && !self
814 3 : .skipped_field_indices
815 3 : .is_some_and(|i| i.contains(field.index()))
816 5 : }
817 :
818 : #[inline]
819 1 : fn into_serializer(self) -> Result<S, S::Error> {
820 1 : self.state?;
821 1 : Ok(self.serializer)
822 1 : }
823 : }
824 :
825 : impl<S: serde::ser::SerializeMap> tracing::field::Visit for MessageFieldSkipper<'_, S> {
826 : #[inline]
827 0 : fn record_f64(&mut self, field: &tracing::field::Field, value: f64) {
828 0 : if self.accept_field(field) {
829 0 : self.state = self.serializer.serialize_entry(field.name(), &value);
830 0 : }
831 0 : }
832 :
833 : #[inline]
834 3 : fn record_i64(&mut self, field: &tracing::field::Field, value: i64) {
835 3 : if self.accept_field(field) {
836 1 : self.state = self.serializer.serialize_entry(field.name(), &value);
837 2 : }
838 3 : }
839 :
840 : #[inline]
841 0 : fn record_u64(&mut self, field: &tracing::field::Field, value: u64) {
842 0 : if self.accept_field(field) {
843 0 : self.state = self.serializer.serialize_entry(field.name(), &value);
844 0 : }
845 0 : }
846 :
847 : #[inline]
848 0 : fn record_i128(&mut self, field: &tracing::field::Field, value: i128) {
849 0 : if self.accept_field(field) {
850 0 : self.state = self.serializer.serialize_entry(field.name(), &value);
851 0 : }
852 0 : }
853 :
854 : #[inline]
855 0 : fn record_u128(&mut self, field: &tracing::field::Field, value: u128) {
856 0 : if self.accept_field(field) {
857 0 : self.state = self.serializer.serialize_entry(field.name(), &value);
858 0 : }
859 0 : }
860 :
861 : #[inline]
862 0 : fn record_bool(&mut self, field: &tracing::field::Field, value: bool) {
863 0 : if self.accept_field(field) {
864 0 : self.state = self.serializer.serialize_entry(field.name(), &value);
865 0 : }
866 0 : }
867 :
868 : #[inline]
869 0 : fn record_bytes(&mut self, field: &tracing::field::Field, value: &[u8]) {
870 0 : if self.accept_field(field) {
871 0 : self.state = self
872 0 : .serializer
873 0 : .serialize_entry(field.name(), &format_args!("{value:x?}"));
874 0 : }
875 0 : }
876 :
877 : #[inline]
878 1 : fn record_str(&mut self, field: &tracing::field::Field, value: &str) {
879 1 : if self.accept_field(field) {
880 0 : self.state = self.serializer.serialize_entry(field.name(), &value);
881 1 : }
882 1 : }
883 :
884 : #[inline]
885 1 : fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) {
886 1 : if self.accept_field(field) {
887 0 : self.state = self
888 0 : .serializer
889 0 : .serialize_entry(field.name(), &format_args!("{value:?}"));
890 1 : }
891 1 : }
892 :
893 : #[inline]
894 0 : fn record_error(
895 0 : &mut self,
896 0 : field: &tracing::field::Field,
897 0 : value: &(dyn std::error::Error + 'static),
898 0 : ) {
899 0 : if self.accept_field(field) {
900 0 : self.state = self.serializer.serialize_value(&format_args!("{value}"));
901 0 : }
902 0 : }
903 : }
904 :
905 : /// Serializes the span stack from root to leaf (parent of event) as object
906 : /// with the span names as keys. To prevent collision we append a numberic value
907 : /// to the name. Also, collects any span fields we're interested in. Last one
908 : /// wins.
909 : struct SerializableSpans<'a, 'ctx, Span, const F: usize>
910 : where
911 : Span: Subscriber + for<'lookup> LookupSpan<'lookup>,
912 : {
913 : ctx: &'a Context<'ctx, Span>,
914 : callsite_ids: &'a papaya::HashMap<callsite::Identifier, CallsiteId>,
915 : extract: ExtractedSpanFields<'a, F>,
916 : }
917 :
918 : impl<Span, const F: usize> serde::ser::Serialize for SerializableSpans<'_, '_, Span, F>
919 : where
920 : Span: Subscriber + for<'lookup> LookupSpan<'lookup>,
921 : {
922 1 : fn serialize<Ser>(&self, serializer: Ser) -> Result<Ser::Ok, Ser::Error>
923 1 : where
924 1 : Ser: serde::ser::Serializer,
925 1 : {
926 1 : let mut serializer = serializer.serialize_map(None)?;
927 :
928 1 : if let Some(leaf_span) = self.ctx.lookup_current() {
929 2 : for span in leaf_span.scope().from_root() {
930 : // Append a numeric callsite ID to the span name to keep the name unique
931 : // in the JSON object.
932 2 : let cid = self
933 2 : .callsite_ids
934 2 : .pin()
935 2 : .get(&span.metadata().callsite())
936 2 : .copied()
937 2 : .unwrap_or_default();
938 2 :
939 2 : // Loki turns the # into an underscore during field name concatenation.
940 2 : serializer.serialize_key(&format_args!("{}#{}", span.metadata().name(), &cid))?;
941 :
942 2 : serializer.serialize_value(&SerializableSpanFields {
943 2 : span: &span,
944 2 : extract: &self.extract,
945 2 : })?;
946 : }
947 0 : }
948 :
949 1 : serializer.end()
950 1 : }
951 : }
952 :
953 : /// Serializes the span fields as object.
954 : struct SerializableSpanFields<'a, 'span, Span, const F: usize>
955 : where
956 : Span: for<'lookup> LookupSpan<'lookup>,
957 : {
958 : span: &'a SpanRef<'span, Span>,
959 : extract: &'a ExtractedSpanFields<'a, F>,
960 : }
961 :
962 : impl<Span, const F: usize> serde::ser::Serialize for SerializableSpanFields<'_, '_, Span, F>
963 : where
964 : Span: for<'lookup> LookupSpan<'lookup>,
965 : {
966 2 : fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
967 2 : where
968 2 : S: serde::ser::Serializer,
969 2 : {
970 2 : let mut serializer = serializer.serialize_map(None)?;
971 :
972 2 : let ext = self.span.extensions();
973 2 : if let Some(data) = ext.get::<SpanFields>() {
974 2 : for (name, value) in &data.fields.pin() {
975 2 : serializer.serialize_entry(name, value)?;
976 : // TODO: replace clone with reference, if possible.
977 2 : self.extract.set(name, value.clone());
978 : }
979 0 : }
980 :
981 2 : serializer.end()
982 2 : }
983 : }
984 :
985 : struct ExtractedSpanFields<'a, const F: usize> {
986 : names: &'a IndexSet<&'static str>,
987 : // TODO: replace TryLock with something local thread and interior mutability.
988 : // serde API doesn't let us use `mut`.
989 : values: TryLock<([Option<serde_json::Value>; F], bool)>,
990 : }
991 :
992 : impl<'a, const F: usize> ExtractedSpanFields<'a, F> {
993 1 : fn new(names: &'a IndexSet<&'static str>) -> Self {
994 1 : ExtractedSpanFields {
995 1 : names,
996 1 : values: TryLock::new((array::from_fn(|_| Option::default()), false)),
997 1 : }
998 1 : }
999 :
1000 : #[inline]
1001 2 : fn set(&self, name: &'static str, value: serde_json::Value) {
1002 2 : if let Some((index, _)) = self.names.get_full(name) {
1003 2 : let mut fields = self.values.try_lock().expect("thread-local use");
1004 2 : fields.0[index] = Some(value);
1005 2 : fields.1 = true;
1006 2 : }
1007 2 : }
1008 :
1009 : #[inline]
1010 1 : fn has_values(&self) -> bool {
1011 1 : self.values.try_lock().expect("thread-local use").1
1012 1 : }
1013 : }
1014 :
1015 : impl<const F: usize> serde::ser::Serialize for ExtractedSpanFields<'_, F> {
1016 1 : fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
1017 1 : where
1018 1 : S: serde::ser::Serializer,
1019 1 : {
1020 1 : let mut serializer = serializer.serialize_map(None)?;
1021 :
1022 1 : let values = self.values.try_lock().expect("thread-local use");
1023 1 : for (i, value) in values.0.iter().enumerate() {
1024 1 : if let Some(value) = value {
1025 1 : let key = self.names[i];
1026 1 : serializer.serialize_entry(key, value)?;
1027 0 : }
1028 : }
1029 :
1030 1 : serializer.end()
1031 1 : }
1032 : }
1033 :
1034 : #[cfg(test)]
1035 : #[allow(clippy::unwrap_used)]
1036 : mod tests {
1037 : use std::marker::PhantomData;
1038 : use std::sync::{Arc, Mutex, MutexGuard};
1039 :
1040 : use assert_json_diff::assert_json_eq;
1041 : use tracing::info_span;
1042 :
1043 : use super::*;
1044 :
1045 : struct TestClock {
1046 : current_time: Mutex<DateTime<Utc>>,
1047 : }
1048 :
1049 : impl Clock for Arc<TestClock> {
1050 2 : fn now(&self) -> DateTime<Utc> {
1051 2 : *self.current_time.lock().expect("poisoned")
1052 2 : }
1053 : }
1054 :
1055 : struct VecWriter<'a> {
1056 : buffer: MutexGuard<'a, Vec<u8>>,
1057 : }
1058 :
1059 : impl MakeWriter for Arc<Mutex<Vec<u8>>> {
1060 1 : fn make_writer(&self) -> impl io::Write {
1061 1 : VecWriter {
1062 1 : buffer: self.lock().expect("poisoned"),
1063 1 : }
1064 1 : }
1065 : }
1066 :
1067 : impl io::Write for VecWriter<'_> {
1068 1 : fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
1069 1 : self.buffer.write(buf)
1070 1 : }
1071 :
1072 0 : fn flush(&mut self) -> io::Result<()> {
1073 0 : Ok(())
1074 0 : }
1075 : }
1076 :
1077 : #[test]
1078 1 : fn test_field_collection() {
1079 1 : let clock = Arc::new(TestClock {
1080 1 : current_time: Mutex::new(Utc::now()),
1081 1 : });
1082 1 : let buffer = Arc::new(Mutex::new(Vec::new()));
1083 1 : let log_layer = JsonLoggingLayer {
1084 1 : clock: clock.clone(),
1085 1 : skipped_field_indices: papaya::HashMap::default(),
1086 1 : callsite_ids: papaya::HashMap::default(),
1087 1 : writer: buffer.clone(),
1088 1 : extract_fields: IndexSet::from_iter(["x"]),
1089 1 : _marker: PhantomData::<[&'static str; 1]>,
1090 1 : };
1091 1 :
1092 1 : let registry = tracing_subscriber::Registry::default().with(log_layer);
1093 1 :
1094 1 : tracing::subscriber::with_default(registry, || {
1095 1 : info_span!("some_span", x = 24).in_scope(|| {
1096 1 : info_span!("some_span", x = 40, x = 41, x = 42).in_scope(|| {
1097 1 : tracing::error!(
1098 : a = 1,
1099 : a = 2,
1100 : a = 3,
1101 : message = "explicit message field",
1102 0 : "implicit message field"
1103 : );
1104 1 : });
1105 1 : });
1106 1 : });
1107 1 :
1108 1 : let buffer = Arc::try_unwrap(buffer)
1109 1 : .expect("no other reference")
1110 1 : .into_inner()
1111 1 : .expect("poisoned");
1112 1 : let actual: serde_json::Value = serde_json::from_slice(&buffer).expect("valid JSON");
1113 1 : let expected: serde_json::Value = serde_json::json!(
1114 1 : {
1115 1 : "timestamp": clock.now().to_rfc3339_opts(chrono::SecondsFormat::Micros, true),
1116 1 : "level": "ERROR",
1117 1 : "message": "explicit message field",
1118 1 : "fields": {
1119 1 : "a": 3,
1120 1 : },
1121 1 : "spans": {
1122 1 : "some_span#1":{
1123 1 : "x": 24,
1124 1 : },
1125 1 : "some_span#2": {
1126 1 : "x": 42,
1127 1 : }
1128 1 : },
1129 1 : "extract": {
1130 1 : "x": 42,
1131 1 : },
1132 1 : "src": actual.as_object().unwrap().get("src").unwrap().as_str().unwrap(),
1133 1 : "target": "proxy::logging::tests",
1134 1 : "process_id": actual.as_object().unwrap().get("process_id").unwrap().as_number().unwrap(),
1135 1 : "thread_id": actual.as_object().unwrap().get("thread_id").unwrap().as_number().unwrap(),
1136 1 : "thread_name": "logging::tests::test_field_collection",
1137 1 : }
1138 1 : );
1139 1 :
1140 1 : assert_json_eq!(actual, expected);
1141 1 : }
1142 : }
|