Line data Source code
1 : use std::cell::RefCell;
2 : use std::collections::HashMap;
3 : use std::sync::Arc;
4 : use std::{env, io};
5 :
6 : use chrono::{DateTime, Utc};
7 : use opentelemetry::trace::TraceContextExt;
8 : use tracing::subscriber::Interest;
9 : use tracing::{Event, Metadata, Span, Subscriber, callsite, span};
10 : use tracing_opentelemetry::OpenTelemetrySpanExt;
11 : use tracing_subscriber::filter::{EnvFilter, LevelFilter};
12 : use tracing_subscriber::fmt::format::{Format, Full};
13 : use tracing_subscriber::fmt::time::SystemTime;
14 : use tracing_subscriber::fmt::{FormatEvent, FormatFields};
15 : use tracing_subscriber::layer::{Context, Layer};
16 : use tracing_subscriber::prelude::*;
17 : use tracing_subscriber::registry::LookupSpan;
18 :
19 : use crate::metrics::Metrics;
20 :
21 : /// Initialize logging and OpenTelemetry tracing and exporter.
22 : ///
23 : /// Logging can be configured using `RUST_LOG` environment variable.
24 : ///
25 : /// OpenTelemetry is configured with OTLP/HTTP exporter. It picks up
26 : /// configuration from environment variables. For example, to change the
27 : /// destination, set `OTEL_EXPORTER_OTLP_ENDPOINT=http://jaeger:4318`.
28 : /// See <https://opentelemetry.io/docs/reference/specification/sdk-environment-variables>
29 0 : pub fn init() -> anyhow::Result<LoggingGuard> {
30 0 : let logfmt = LogFormat::from_env()?;
31 :
32 0 : let env_filter = EnvFilter::builder()
33 0 : .with_default_directive(LevelFilter::INFO.into())
34 0 : .from_env_lossy()
35 0 : .add_directive(
36 0 : "aws_config=info"
37 0 : .parse()
38 0 : .expect("this should be a valid filter directive"),
39 : )
40 0 : .add_directive(
41 0 : "azure_core::policies::transport=off"
42 0 : .parse()
43 0 : .expect("this should be a valid filter directive"),
44 : );
45 :
46 0 : let provider = tracing_utils::init_tracing("proxy", tracing_utils::ExportConfig::default());
47 0 : let otlp_layer = provider.as_ref().map(tracing_utils::layer);
48 :
49 0 : let json_log_layer = if logfmt == LogFormat::Json {
50 0 : Some(JsonLoggingLayer::new(
51 0 : RealClock,
52 0 : StderrWriter {
53 0 : stderr: std::io::stderr(),
54 0 : },
55 0 : &["conn_id", "ep", "query_id", "request_id", "session_id"],
56 0 : ))
57 : } else {
58 0 : None
59 : };
60 :
61 0 : let text_log_layer = if logfmt == LogFormat::Text {
62 0 : Some(
63 0 : tracing_subscriber::fmt::layer()
64 0 : .with_ansi(false)
65 0 : .with_writer(std::io::stderr)
66 0 : .with_target(false),
67 0 : )
68 : } else {
69 0 : None
70 : };
71 :
72 0 : tracing_subscriber::registry()
73 0 : .with(env_filter)
74 0 : .with(otlp_layer)
75 0 : .with(json_log_layer)
76 0 : .with(text_log_layer)
77 0 : .try_init()?;
78 :
79 0 : Ok(LoggingGuard(provider))
80 0 : }
81 :
82 : /// Initialize logging for local_proxy with log prefix and no opentelemetry.
83 : ///
84 : /// Logging can be configured using `RUST_LOG` environment variable.
85 0 : pub fn init_local_proxy() -> anyhow::Result<LoggingGuard> {
86 0 : let env_filter = EnvFilter::builder()
87 0 : .with_default_directive(LevelFilter::INFO.into())
88 0 : .from_env_lossy();
89 :
90 0 : let fmt_layer = tracing_subscriber::fmt::layer()
91 0 : .with_ansi(false)
92 0 : .with_writer(std::io::stderr)
93 0 : .event_format(LocalProxyFormatter(Format::default().with_target(false)));
94 :
95 0 : tracing_subscriber::registry()
96 0 : .with(env_filter)
97 0 : .with(fmt_layer)
98 0 : .try_init()?;
99 :
100 0 : Ok(LoggingGuard(None))
101 0 : }
102 :
103 : pub struct LocalProxyFormatter(Format<Full, SystemTime>);
104 :
105 : impl<S, N> FormatEvent<S, N> for LocalProxyFormatter
106 : where
107 : S: Subscriber + for<'a> LookupSpan<'a>,
108 : N: for<'a> FormatFields<'a> + 'static,
109 : {
110 0 : fn format_event(
111 0 : &self,
112 0 : ctx: &tracing_subscriber::fmt::FmtContext<'_, S, N>,
113 0 : mut writer: tracing_subscriber::fmt::format::Writer<'_>,
114 0 : event: &tracing::Event<'_>,
115 0 : ) -> std::fmt::Result {
116 0 : writer.write_str("[local_proxy] ")?;
117 0 : self.0.format_event(ctx, writer, event)
118 0 : }
119 : }
120 :
121 : pub struct LoggingGuard(Option<tracing_utils::Provider>);
122 :
123 : impl Drop for LoggingGuard {
124 0 : fn drop(&mut self) {
125 0 : if let Some(p) = &self.0 {
126 : // Shutdown trace pipeline gracefully, so that it has a chance to send any
127 : // pending traces before we exit.
128 0 : tracing::info!("shutting down the tracing machinery");
129 0 : drop(p.shutdown());
130 0 : }
131 0 : }
132 : }
133 :
134 : #[derive(Copy, Clone, PartialEq, Eq, Default, Debug)]
135 : enum LogFormat {
136 : Text,
137 : #[default]
138 : Json,
139 : }
140 :
141 : impl LogFormat {
142 0 : fn from_env() -> anyhow::Result<Self> {
143 0 : let logfmt = env::var("LOGFMT");
144 0 : Ok(match logfmt.as_deref() {
145 0 : Err(_) => LogFormat::default(),
146 0 : Ok("text") => LogFormat::Text,
147 0 : Ok("json") => LogFormat::Json,
148 0 : Ok(logfmt) => anyhow::bail!("unknown log format: {logfmt}"),
149 : })
150 0 : }
151 : }
152 :
153 : trait MakeWriter {
154 : fn make_writer(&self) -> impl io::Write;
155 : }
156 :
157 : struct StderrWriter {
158 : stderr: io::Stderr,
159 : }
160 :
161 : impl MakeWriter for StderrWriter {
162 : #[inline]
163 0 : fn make_writer(&self) -> impl io::Write {
164 0 : self.stderr.lock()
165 0 : }
166 : }
167 :
168 : // TODO: move into separate module or even separate crate.
169 : trait Clock {
170 : fn now(&self) -> DateTime<Utc>;
171 : }
172 :
173 : struct RealClock;
174 :
175 : impl Clock for RealClock {
176 : #[inline]
177 0 : fn now(&self) -> DateTime<Utc> {
178 0 : Utc::now()
179 0 : }
180 : }
181 :
182 : /// Name of the field used by tracing crate to store the event message.
183 : const MESSAGE_FIELD: &str = "message";
184 :
185 : /// Tracing used to enforce that spans/events have no more than 32 fields.
186 : /// It seems this is no longer the case, but it's still documented in some places.
187 : /// Generally, we shouldn't expect more than 32 fields anyway, so we can try and
188 : /// rely on it for some (minor) performance gains.
189 : const MAX_TRACING_FIELDS: usize = 32;
190 :
191 : thread_local! {
192 : /// Thread-local instance with per-thread buffer for log writing.
193 : static EVENT_FORMATTER: RefCell<EventFormatter> = const { RefCell::new(EventFormatter::new()) };
194 : /// Cached OS thread ID.
195 : static THREAD_ID: u64 = gettid::gettid();
196 : }
197 :
198 : /// Map for values fixed at callsite registration.
199 : // We use papaya here because registration rarely happens post-startup.
200 : // papaya is good for read-heavy workloads.
201 : //
202 : // We use rustc_hash here because callsite::Identifier will always be an integer with low-bit entropy,
203 : // since it's always a pointer to static mutable data. rustc_hash was designed for low-bit entropy.
204 : type CallsiteMap<T> =
205 : papaya::HashMap<callsite::Identifier, T, std::hash::BuildHasherDefault<rustc_hash::FxHasher>>;
206 :
207 : /// Implements tracing layer to handle events specific to logging.
208 : struct JsonLoggingLayer<C: Clock, W: MakeWriter> {
209 : clock: C,
210 : writer: W,
211 :
212 : /// tracks which fields of each **event** are duplicates
213 : skipped_field_indices: CallsiteMap<SkippedFieldIndices>,
214 :
215 : /// tracks callsite names to an ID.
216 : callsite_name_ids: papaya::HashMap<&'static str, u32, ahash::RandomState>,
217 :
218 : span_info: CallsiteMap<CallsiteSpanInfo>,
219 :
220 : /// Fields we want to keep track of in a separate json object.
221 : extract_fields: &'static [&'static str],
222 : }
223 :
224 : impl<C: Clock, W: MakeWriter> JsonLoggingLayer<C, W> {
225 0 : fn new(clock: C, writer: W, extract_fields: &'static [&'static str]) -> Self {
226 0 : JsonLoggingLayer {
227 0 : clock,
228 0 : skipped_field_indices: CallsiteMap::default(),
229 0 : span_info: CallsiteMap::default(),
230 0 : callsite_name_ids: papaya::HashMap::default(),
231 0 : writer,
232 0 : extract_fields,
233 0 : }
234 0 : }
235 :
236 : #[inline]
237 6 : fn span_info(&self, metadata: &'static Metadata<'static>) -> CallsiteSpanInfo {
238 6 : self.span_info
239 6 : .pin()
240 6 : .get_or_insert_with(metadata.callsite(), || {
241 3 : CallsiteSpanInfo::new(&self.callsite_name_ids, metadata, self.extract_fields)
242 3 : })
243 6 : .clone()
244 6 : }
245 : }
246 :
247 : impl<S, C: Clock + 'static, W: MakeWriter + 'static> Layer<S> for JsonLoggingLayer<C, W>
248 : where
249 : S: Subscriber + for<'a> LookupSpan<'a>,
250 : {
251 1 : fn on_event(&self, event: &Event<'_>, ctx: Context<'_, S>) {
252 : use std::io::Write;
253 :
254 : // TODO: consider special tracing subscriber to grab timestamp very
255 : // early, before OTel machinery, and add as event extension.
256 1 : let now = self.clock.now();
257 :
258 1 : EVENT_FORMATTER.with(|f| {
259 1 : let mut borrow = f.try_borrow_mut();
260 1 : let formatter = match borrow.as_deref_mut() {
261 1 : Ok(formatter) => formatter,
262 : // If the thread local formatter is borrowed,
263 : // then we likely hit an edge case were we panicked during formatting.
264 : // We allow the logging to proceed with an uncached formatter.
265 0 : Err(_) => &mut EventFormatter::new(),
266 : };
267 :
268 1 : formatter.format(
269 1 : now,
270 1 : event,
271 1 : &ctx,
272 1 : &self.skipped_field_indices,
273 1 : self.extract_fields,
274 : );
275 :
276 1 : let mut writer = self.writer.make_writer();
277 1 : if writer.write_all(formatter.buffer()).is_err() {
278 0 : Metrics::get().proxy.logging_errors_count.inc();
279 1 : }
280 1 : });
281 1 : }
282 :
283 : /// Registers a SpanFields instance as span extension.
284 3 : fn on_new_span(&self, attrs: &span::Attributes<'_>, id: &span::Id, ctx: Context<'_, S>) {
285 3 : let span = ctx.span(id).expect("span must exist");
286 :
287 3 : let mut fields = SpanFields::new(self.span_info(span.metadata()));
288 3 : attrs.record(&mut fields);
289 :
290 : // This is a new span: the extensions should not be locked
291 : // unless some layer spawned a thread to process this span.
292 : // I don't think any layers do that.
293 3 : span.extensions_mut().insert(fields);
294 3 : }
295 :
296 0 : fn on_record(&self, id: &span::Id, values: &span::Record<'_>, ctx: Context<'_, S>) {
297 0 : let span = ctx.span(id).expect("span must exist");
298 :
299 : // assumption: `on_record` is rarely called.
300 : // assumption: a span being updated by one thread,
301 : // and formatted by another thread is even rarer.
302 0 : let mut ext = span.extensions_mut();
303 0 : if let Some(fields) = ext.get_mut::<SpanFields>() {
304 0 : values.record(fields);
305 0 : }
306 0 : }
307 :
308 : /// Called (lazily) roughly once per event/span instance. We quickly check
309 : /// for duplicate field names and record duplicates as skippable. Last field wins.
310 4 : fn register_callsite(&self, metadata: &'static Metadata<'static>) -> Interest {
311 4 : debug_assert!(
312 4 : metadata.fields().len() <= MAX_TRACING_FIELDS,
313 0 : "callsite {metadata:?} has too many fields."
314 : );
315 :
316 4 : if !metadata.is_event() {
317 : // register the span info.
318 3 : self.span_info(metadata);
319 : // Must not be never because we wouldn't get trace and span data.
320 3 : return Interest::always();
321 1 : }
322 :
323 1 : let mut field_indices = SkippedFieldIndices::default();
324 1 : let mut seen_fields = HashMap::new();
325 5 : for field in metadata.fields() {
326 5 : if let Some(old_index) = seen_fields.insert(field.name(), field.index()) {
327 3 : field_indices.set(old_index);
328 3 : }
329 : }
330 :
331 1 : if !field_indices.is_empty() {
332 1 : self.skipped_field_indices
333 1 : .pin()
334 1 : .insert(metadata.callsite(), field_indices);
335 1 : }
336 :
337 1 : Interest::always()
338 4 : }
339 : }
340 :
341 : /// Any span info that is fixed to a particular callsite. Not variable between span instances.
342 : #[derive(Clone)]
343 : struct CallsiteSpanInfo {
344 : /// index of each field to extract. usize::MAX if not found.
345 : extract: Arc<[usize]>,
346 :
347 : /// tracks the fixed "callsite ID" for each span.
348 : /// note: this is not stable between runs.
349 : normalized_name: Arc<str>,
350 : }
351 :
352 : impl CallsiteSpanInfo {
353 3 : fn new(
354 3 : callsite_name_ids: &papaya::HashMap<&'static str, u32, ahash::RandomState>,
355 3 : metadata: &'static Metadata<'static>,
356 3 : extract_fields: &[&'static str],
357 3 : ) -> Self {
358 5 : let names: Vec<&'static str> = metadata.fields().iter().map(|f| f.name()).collect();
359 :
360 : // get all the indices of span fields we want to focus
361 3 : let extract = extract_fields
362 3 : .iter()
363 : // use rposition, since we want last match wins.
364 3 : .map(|f1| names.iter().rposition(|f2| f1 == f2).unwrap_or(usize::MAX))
365 3 : .collect();
366 :
367 : // normalized_name is unique for each callsite, but it is not
368 : // unified across separate proxy instances.
369 : // todo: can we do better here?
370 3 : let cid = *callsite_name_ids
371 3 : .pin()
372 3 : .update_or_insert(metadata.name(), |&cid| cid + 1, 0);
373 :
374 : // we hope that most span names are unique, in which case this will always be 0
375 3 : let normalized_name = if cid == 0 {
376 2 : metadata.name().into()
377 : } else {
378 : // if the span name is not unique, add the numeric ID to span name to distinguish it.
379 : // sadly this is non-determinstic, across restarts but we should fix it by disambiguating re-used span names instead.
380 1 : format!("{}#{cid}", metadata.name()).into()
381 : };
382 :
383 3 : Self {
384 3 : extract,
385 3 : normalized_name,
386 3 : }
387 3 : }
388 : }
389 :
390 : #[derive(Clone)]
391 : struct RawValue(Box<[u8]>);
392 :
393 : impl RawValue {
394 5 : fn new(v: impl json::ValueEncoder) -> Self {
395 5 : Self(json::value_to_vec!(|val| v.encode(val)).into_boxed_slice())
396 5 : }
397 : }
398 :
399 : impl json::ValueEncoder for &RawValue {
400 6 : fn encode(self, v: json::ValueSer<'_>) {
401 6 : v.write_raw_json(&self.0);
402 6 : }
403 : }
404 :
405 : /// Stores span field values recorded during the spans lifetime.
406 : struct SpanFields {
407 : values: [Option<RawValue>; MAX_TRACING_FIELDS],
408 :
409 : /// cached span info so we can avoid extra hashmap lookups in the hot path.
410 : span_info: CallsiteSpanInfo,
411 : }
412 :
413 : impl SpanFields {
414 3 : fn new(span_info: CallsiteSpanInfo) -> Self {
415 : Self {
416 3 : span_info,
417 : values: [const { None }; MAX_TRACING_FIELDS],
418 : }
419 3 : }
420 : }
421 :
422 : impl tracing::field::Visit for SpanFields {
423 : #[inline]
424 0 : fn record_f64(&mut self, field: &tracing::field::Field, value: f64) {
425 0 : self.values[field.index()] = Some(RawValue::new(value));
426 0 : }
427 :
428 : #[inline]
429 5 : fn record_i64(&mut self, field: &tracing::field::Field, value: i64) {
430 5 : self.values[field.index()] = Some(RawValue::new(value));
431 5 : }
432 :
433 : #[inline]
434 0 : fn record_u64(&mut self, field: &tracing::field::Field, value: u64) {
435 0 : self.values[field.index()] = Some(RawValue::new(value));
436 0 : }
437 :
438 : #[inline]
439 0 : fn record_i128(&mut self, field: &tracing::field::Field, value: i128) {
440 0 : if let Ok(value) = i64::try_from(value) {
441 0 : self.values[field.index()] = Some(RawValue::new(value));
442 0 : } else {
443 0 : self.values[field.index()] = Some(RawValue::new(format_args!("{value}")));
444 0 : }
445 0 : }
446 :
447 : #[inline]
448 0 : fn record_u128(&mut self, field: &tracing::field::Field, value: u128) {
449 0 : if let Ok(value) = u64::try_from(value) {
450 0 : self.values[field.index()] = Some(RawValue::new(value));
451 0 : } else {
452 0 : self.values[field.index()] = Some(RawValue::new(format_args!("{value}")));
453 0 : }
454 0 : }
455 :
456 : #[inline]
457 0 : fn record_bool(&mut self, field: &tracing::field::Field, value: bool) {
458 0 : self.values[field.index()] = Some(RawValue::new(value));
459 0 : }
460 :
461 : #[inline]
462 0 : fn record_bytes(&mut self, field: &tracing::field::Field, value: &[u8]) {
463 0 : self.values[field.index()] = Some(RawValue::new(value));
464 0 : }
465 :
466 : #[inline]
467 0 : fn record_str(&mut self, field: &tracing::field::Field, value: &str) {
468 0 : self.values[field.index()] = Some(RawValue::new(value));
469 0 : }
470 :
471 : #[inline]
472 0 : fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) {
473 0 : self.values[field.index()] = Some(RawValue::new(format_args!("{value:?}")));
474 0 : }
475 :
476 : #[inline]
477 0 : fn record_error(
478 0 : &mut self,
479 0 : field: &tracing::field::Field,
480 0 : value: &(dyn std::error::Error + 'static),
481 0 : ) {
482 0 : self.values[field.index()] = Some(RawValue::new(format_args!("{value}")));
483 0 : }
484 : }
485 :
486 : /// List of field indices skipped during logging. Can list duplicate fields or
487 : /// metafields not meant to be logged.
488 : #[derive(Copy, Clone, Default)]
489 : struct SkippedFieldIndices {
490 : // 32-bits is large enough for `MAX_TRACING_FIELDS`
491 : bits: u32,
492 : }
493 :
494 : impl SkippedFieldIndices {
495 : #[inline]
496 1 : fn is_empty(self) -> bool {
497 1 : self.bits == 0
498 1 : }
499 :
500 : #[inline]
501 3 : fn set(&mut self, index: usize) {
502 3 : debug_assert!(index <= 32, "index out of bounds of 32-bit set");
503 3 : self.bits |= 1 << index;
504 3 : }
505 :
506 : #[inline]
507 5 : fn contains(self, index: usize) -> bool {
508 5 : self.bits & (1 << index) != 0
509 5 : }
510 : }
511 :
512 : /// Formats a tracing event and writes JSON to its internal buffer including a newline.
513 : // TODO: buffer capacity management, truncate if too large
514 : struct EventFormatter {
515 : logline_buffer: Vec<u8>,
516 : }
517 :
518 : impl EventFormatter {
519 : #[inline]
520 0 : const fn new() -> Self {
521 0 : EventFormatter {
522 0 : logline_buffer: Vec::new(),
523 0 : }
524 0 : }
525 :
526 : #[inline]
527 1 : fn buffer(&self) -> &[u8] {
528 1 : &self.logline_buffer
529 1 : }
530 :
531 1 : fn format<S>(
532 1 : &mut self,
533 1 : now: DateTime<Utc>,
534 1 : event: &Event<'_>,
535 1 : ctx: &Context<'_, S>,
536 1 : skipped_field_indices: &CallsiteMap<SkippedFieldIndices>,
537 1 : extract_fields: &'static [&'static str],
538 1 : ) where
539 1 : S: Subscriber + for<'a> LookupSpan<'a>,
540 : {
541 1 : let timestamp = now.to_rfc3339_opts(chrono::SecondsFormat::Micros, true);
542 :
543 : use tracing_log::NormalizeEvent;
544 1 : let normalized_meta = event.normalized_metadata();
545 1 : let meta = normalized_meta.as_ref().unwrap_or_else(|| event.metadata());
546 :
547 1 : let skipped_field_indices = skipped_field_indices
548 1 : .pin()
549 1 : .get(&meta.callsite())
550 1 : .copied()
551 1 : .unwrap_or_default();
552 :
553 1 : self.logline_buffer.clear();
554 1 : let serializer = json::ValueSer::new(&mut self.logline_buffer);
555 1 : json::value_as_object!(|serializer| {
556 : // Timestamp comes first, so raw lines can be sorted by timestamp.
557 1 : serializer.entry("timestamp", &*timestamp);
558 :
559 : // Level next.
560 1 : serializer.entry("level", meta.level().as_str());
561 :
562 : // Message next.
563 1 : let mut message_extractor =
564 1 : MessageFieldExtractor::new(serializer.key("message"), skipped_field_indices);
565 1 : event.record(&mut message_extractor);
566 1 : message_extractor.finish();
567 :
568 : // Direct message fields.
569 : {
570 1 : let mut message_skipper = MessageFieldSkipper::new(
571 1 : serializer.key("fields").object(),
572 1 : skipped_field_indices,
573 : );
574 1 : event.record(&mut message_skipper);
575 :
576 : // rollback if no fields are present.
577 1 : if message_skipper.present {
578 1 : message_skipper.serializer.finish();
579 1 : }
580 : }
581 :
582 1 : let mut extracted = ExtractedSpanFields::new(extract_fields);
583 :
584 1 : let spans = serializer.key("spans");
585 1 : json::value_as_object!(|spans| {
586 1 : let parent_spans = ctx
587 1 : .event_span(event)
588 1 : .map_or(vec![], |parent| parent.scope().collect());
589 :
590 3 : for span in parent_spans.iter().rev() {
591 3 : let ext = span.extensions();
592 :
593 : // all spans should have this extension.
594 3 : let Some(fields) = ext.get() else { continue };
595 :
596 3 : extracted.layer_span(fields);
597 :
598 3 : let SpanFields { values, span_info } = fields;
599 :
600 3 : let span_fields = spans.key(&*span_info.normalized_name);
601 3 : json::value_as_object!(|span_fields| {
602 5 : for (field, value) in std::iter::zip(span.metadata().fields(), values) {
603 5 : if let Some(value) = value {
604 5 : span_fields.entry(field.name(), value);
605 5 : }
606 : }
607 : });
608 : }
609 : });
610 :
611 : // TODO: thread-local cache?
612 1 : let pid = std::process::id();
613 : // Skip adding pid 1 to reduce noise for services running in containers.
614 1 : if pid != 1 {
615 1 : serializer.entry("process_id", pid);
616 1 : }
617 :
618 1 : THREAD_ID.with(|tid| serializer.entry("thread_id", tid));
619 :
620 : // TODO: tls cache? name could change
621 1 : if let Some(thread_name) = std::thread::current().name()
622 1 : && !thread_name.is_empty()
623 1 : && thread_name != "tokio-runtime-worker"
624 1 : {
625 1 : serializer.entry("thread_name", thread_name);
626 1 : }
627 :
628 1 : if let Some(task_id) = tokio::task::try_id() {
629 0 : serializer.entry("task_id", format_args!("{task_id}"));
630 1 : }
631 :
632 1 : serializer.entry("target", meta.target());
633 :
634 : // Skip adding module if it's the same as target.
635 1 : if let Some(module) = meta.module_path()
636 1 : && module != meta.target()
637 0 : {
638 0 : serializer.entry("module", module);
639 1 : }
640 :
641 1 : if let Some(file) = meta.file() {
642 1 : if let Some(line) = meta.line() {
643 1 : serializer.entry("src", format_args!("{file}:{line}"));
644 1 : } else {
645 0 : serializer.entry("src", file);
646 0 : }
647 0 : }
648 :
649 : {
650 1 : let otel_context = Span::current().context();
651 1 : let otel_spanref = otel_context.span();
652 1 : let span_context = otel_spanref.span_context();
653 1 : if span_context.is_valid() {
654 0 : serializer.entry("trace_id", format_args!("{}", span_context.trace_id()));
655 1 : }
656 : }
657 :
658 1 : if extracted.has_values() {
659 : // TODO: add fields from event, too?
660 1 : let extract = serializer.key("extract");
661 1 : json::value_as_object!(|extract| {
662 1 : for (key, value) in std::iter::zip(extracted.names, extracted.values) {
663 1 : if let Some(value) = value {
664 1 : extract.entry(*key, &value);
665 1 : }
666 : }
667 : });
668 0 : }
669 : });
670 :
671 1 : self.logline_buffer.push(b'\n');
672 1 : }
673 : }
674 :
675 : /// Extracts the message field that's mixed will other fields.
676 : struct MessageFieldExtractor<'buf> {
677 : serializer: Option<json::ValueSer<'buf>>,
678 : skipped_field_indices: SkippedFieldIndices,
679 : }
680 :
681 : impl<'buf> MessageFieldExtractor<'buf> {
682 : #[inline]
683 1 : fn new(serializer: json::ValueSer<'buf>, skipped_field_indices: SkippedFieldIndices) -> Self {
684 1 : Self {
685 1 : serializer: Some(serializer),
686 1 : skipped_field_indices,
687 1 : }
688 1 : }
689 :
690 : #[inline]
691 1 : fn finish(self) {
692 1 : if let Some(ser) = self.serializer {
693 0 : ser.value("");
694 1 : }
695 1 : }
696 :
697 : #[inline]
698 5 : fn record_field(&mut self, field: &tracing::field::Field, v: impl json::ValueEncoder) {
699 5 : if field.name() == MESSAGE_FIELD
700 2 : && !self.skipped_field_indices.contains(field.index())
701 1 : && let Some(ser) = self.serializer.take()
702 1 : {
703 1 : ser.value(v);
704 4 : }
705 5 : }
706 : }
707 :
708 : impl tracing::field::Visit for MessageFieldExtractor<'_> {
709 : #[inline]
710 0 : fn record_f64(&mut self, field: &tracing::field::Field, value: f64) {
711 0 : self.record_field(field, value);
712 0 : }
713 :
714 : #[inline]
715 3 : fn record_i64(&mut self, field: &tracing::field::Field, value: i64) {
716 3 : self.record_field(field, value);
717 3 : }
718 :
719 : #[inline]
720 0 : fn record_u64(&mut self, field: &tracing::field::Field, value: u64) {
721 0 : self.record_field(field, value);
722 0 : }
723 :
724 : #[inline]
725 0 : fn record_i128(&mut self, field: &tracing::field::Field, value: i128) {
726 0 : self.record_field(field, value);
727 0 : }
728 :
729 : #[inline]
730 0 : fn record_u128(&mut self, field: &tracing::field::Field, value: u128) {
731 0 : self.record_field(field, value);
732 0 : }
733 :
734 : #[inline]
735 0 : fn record_bool(&mut self, field: &tracing::field::Field, value: bool) {
736 0 : self.record_field(field, value);
737 0 : }
738 :
739 : #[inline]
740 0 : fn record_bytes(&mut self, field: &tracing::field::Field, value: &[u8]) {
741 0 : self.record_field(field, format_args!("{value:x?}"));
742 0 : }
743 :
744 : #[inline]
745 1 : fn record_str(&mut self, field: &tracing::field::Field, value: &str) {
746 1 : self.record_field(field, value);
747 1 : }
748 :
749 : #[inline]
750 1 : fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) {
751 1 : self.record_field(field, format_args!("{value:?}"));
752 1 : }
753 :
754 : #[inline]
755 0 : fn record_error(
756 0 : &mut self,
757 0 : field: &tracing::field::Field,
758 0 : value: &(dyn std::error::Error + 'static),
759 0 : ) {
760 0 : self.record_field(field, format_args!("{value}"));
761 0 : }
762 : }
763 :
764 : /// A tracing field visitor that skips the message field.
765 : struct MessageFieldSkipper<'buf> {
766 : serializer: json::ObjectSer<'buf>,
767 : skipped_field_indices: SkippedFieldIndices,
768 : present: bool,
769 : }
770 :
771 : impl<'buf> MessageFieldSkipper<'buf> {
772 : #[inline]
773 1 : fn new(serializer: json::ObjectSer<'buf>, skipped_field_indices: SkippedFieldIndices) -> Self {
774 1 : Self {
775 1 : serializer,
776 1 : skipped_field_indices,
777 1 : present: false,
778 1 : }
779 1 : }
780 :
781 : #[inline]
782 5 : fn record_field(&mut self, field: &tracing::field::Field, v: impl json::ValueEncoder) {
783 5 : if field.name() != MESSAGE_FIELD
784 3 : && !field.name().starts_with("log.")
785 3 : && !self.skipped_field_indices.contains(field.index())
786 1 : {
787 1 : self.serializer.entry(field.name(), v);
788 1 : self.present |= true;
789 4 : }
790 5 : }
791 : }
792 :
793 : impl tracing::field::Visit for MessageFieldSkipper<'_> {
794 : #[inline]
795 0 : fn record_f64(&mut self, field: &tracing::field::Field, value: f64) {
796 0 : self.record_field(field, value);
797 0 : }
798 :
799 : #[inline]
800 3 : fn record_i64(&mut self, field: &tracing::field::Field, value: i64) {
801 3 : self.record_field(field, value);
802 3 : }
803 :
804 : #[inline]
805 0 : fn record_u64(&mut self, field: &tracing::field::Field, value: u64) {
806 0 : self.record_field(field, value);
807 0 : }
808 :
809 : #[inline]
810 0 : fn record_i128(&mut self, field: &tracing::field::Field, value: i128) {
811 0 : self.record_field(field, value);
812 0 : }
813 :
814 : #[inline]
815 0 : fn record_u128(&mut self, field: &tracing::field::Field, value: u128) {
816 0 : self.record_field(field, value);
817 0 : }
818 :
819 : #[inline]
820 0 : fn record_bool(&mut self, field: &tracing::field::Field, value: bool) {
821 0 : self.record_field(field, value);
822 0 : }
823 :
824 : #[inline]
825 0 : fn record_bytes(&mut self, field: &tracing::field::Field, value: &[u8]) {
826 0 : self.record_field(field, format_args!("{value:x?}"));
827 0 : }
828 :
829 : #[inline]
830 1 : fn record_str(&mut self, field: &tracing::field::Field, value: &str) {
831 1 : self.record_field(field, value);
832 1 : }
833 :
834 : #[inline]
835 1 : fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) {
836 1 : self.record_field(field, format_args!("{value:?}"));
837 1 : }
838 :
839 : #[inline]
840 0 : fn record_error(
841 0 : &mut self,
842 0 : field: &tracing::field::Field,
843 0 : value: &(dyn std::error::Error + 'static),
844 0 : ) {
845 0 : self.record_field(field, format_args!("{value}"));
846 0 : }
847 : }
848 :
849 : struct ExtractedSpanFields {
850 : names: &'static [&'static str],
851 : values: Vec<Option<RawValue>>,
852 : }
853 :
854 : impl ExtractedSpanFields {
855 1 : fn new(names: &'static [&'static str]) -> Self {
856 1 : ExtractedSpanFields {
857 1 : names,
858 1 : values: vec![None; names.len()],
859 1 : }
860 1 : }
861 :
862 3 : fn layer_span(&mut self, fields: &SpanFields) {
863 3 : let SpanFields { values, span_info } = fields;
864 :
865 : // extract the fields
866 3 : for (i, &j) in span_info.extract.iter().enumerate() {
867 3 : let Some(Some(value)) = values.get(j) else {
868 1 : continue;
869 : };
870 :
871 : // TODO: replace clone with reference, if possible.
872 2 : self.values[i] = Some(value.clone());
873 : }
874 3 : }
875 :
876 : #[inline]
877 1 : fn has_values(&self) -> bool {
878 1 : self.values.iter().any(|v| v.is_some())
879 1 : }
880 : }
881 :
882 : #[cfg(test)]
883 : mod tests {
884 : use std::sync::{Arc, Mutex, MutexGuard};
885 :
886 : use assert_json_diff::assert_json_eq;
887 : use tracing::info_span;
888 :
889 : use super::*;
890 :
891 : struct TestClock {
892 : current_time: Mutex<DateTime<Utc>>,
893 : }
894 :
895 : impl Clock for Arc<TestClock> {
896 2 : fn now(&self) -> DateTime<Utc> {
897 2 : *self.current_time.lock().expect("poisoned")
898 2 : }
899 : }
900 :
901 : struct VecWriter<'a> {
902 : buffer: MutexGuard<'a, Vec<u8>>,
903 : }
904 :
905 : impl MakeWriter for Arc<Mutex<Vec<u8>>> {
906 1 : fn make_writer(&self) -> impl io::Write {
907 1 : VecWriter {
908 1 : buffer: self.lock().expect("poisoned"),
909 1 : }
910 1 : }
911 : }
912 :
913 : impl io::Write for VecWriter<'_> {
914 1 : fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
915 1 : self.buffer.write(buf)
916 1 : }
917 :
918 0 : fn flush(&mut self) -> io::Result<()> {
919 0 : Ok(())
920 0 : }
921 : }
922 :
923 : #[test]
924 1 : fn test_field_collection() {
925 1 : let clock = Arc::new(TestClock {
926 1 : current_time: Mutex::new(Utc::now()),
927 1 : });
928 1 : let buffer = Arc::new(Mutex::new(Vec::new()));
929 1 : let log_layer = JsonLoggingLayer {
930 1 : clock: clock.clone(),
931 1 : skipped_field_indices: papaya::HashMap::default(),
932 1 : span_info: papaya::HashMap::default(),
933 1 : callsite_name_ids: papaya::HashMap::default(),
934 1 : writer: buffer.clone(),
935 1 : extract_fields: &["x"],
936 1 : };
937 :
938 1 : let registry = tracing_subscriber::Registry::default().with(log_layer);
939 :
940 1 : tracing::subscriber::with_default(registry, || {
941 1 : info_span!("some_span", x = 24).in_scope(|| {
942 1 : info_span!("some_other_span", y = 30).in_scope(|| {
943 1 : info_span!("some_span", x = 40, x = 41, x = 42).in_scope(|| {
944 1 : tracing::error!(
945 : a = 1,
946 : a = 2,
947 : a = 3,
948 : message = "explicit message field",
949 0 : "implicit message field"
950 : );
951 1 : });
952 1 : });
953 1 : });
954 1 : });
955 :
956 1 : let buffer = Arc::try_unwrap(buffer)
957 1 : .expect("no other reference")
958 1 : .into_inner()
959 1 : .expect("poisoned");
960 1 : let actual: serde_json::Value = serde_json::from_slice(&buffer).expect("valid JSON");
961 1 : let expected: serde_json::Value = serde_json::json!(
962 : {
963 1 : "timestamp": clock.now().to_rfc3339_opts(chrono::SecondsFormat::Micros, true),
964 1 : "level": "ERROR",
965 1 : "message": "explicit message field",
966 1 : "fields": {
967 1 : "a": 3,
968 : },
969 1 : "spans": {
970 1 : "some_span":{
971 1 : "x": 24,
972 : },
973 1 : "some_other_span": {
974 1 : "y": 30,
975 : },
976 1 : "some_span#1": {
977 1 : "x": 42,
978 : },
979 : },
980 1 : "extract": {
981 1 : "x": 42,
982 : },
983 1 : "src": actual.as_object().unwrap().get("src").unwrap().as_str().unwrap(),
984 1 : "target": "proxy::logging::tests",
985 1 : "process_id": actual.as_object().unwrap().get("process_id").unwrap().as_number().unwrap(),
986 1 : "thread_id": actual.as_object().unwrap().get("thread_id").unwrap().as_number().unwrap(),
987 1 : "thread_name": "logging::tests::test_field_collection",
988 : }
989 : );
990 :
991 1 : assert_json_eq!(actual, expected);
992 1 : }
993 : }
|