LCOV - code coverage report
Current view: top level - libs/utils/src - logging.rs (source / functions) Coverage Total Hit
Test: 5713ff31fc16472ab3f92425989ca6addc3dcf9c.info Lines: 61.7 % 240 148
Test Date: 2025-07-30 16:18:19 Functions: 40.0 % 60 24

            Line data    Source code
       1              : use std::future::Future;
       2              : use std::pin::Pin;
       3              : use std::str::FromStr;
       4              : use std::time::Duration;
       5              : 
       6              : use anyhow::Context;
       7              : use metrics::{IntCounter, IntCounterVec};
       8              : use once_cell::sync::Lazy;
       9              : use strum_macros::{EnumString, VariantNames};
      10              : use tokio::time::Instant;
      11              : use tracing::{info, warn};
      12              : 
      13              : /// Logs a critical error, similarly to `tracing::error!`. This will:
      14              : ///
      15              : /// * Emit an ERROR log message with prefix "CRITICAL:" and a backtrace.
      16              : /// * Trigger a pageable alert (via the metric below).
      17              : /// * Increment libmetrics_tracing_event_count{level="critical"}, and indirectly level="error".
      18              : /// * In debug builds, panic the process.
      19              : ///
      20              : /// When including errors in the message, please use {err:?} to include the error cause and original
      21              : /// backtrace.
      22              : #[macro_export]
      23              : macro_rules! critical {
      24              :     ($($arg:tt)*) => {{
      25              :         if cfg!(debug_assertions) {
      26              :             panic!($($arg)*);
      27              :         }
      28              :         // Increment both metrics
      29              :         $crate::logging::TRACING_EVENT_COUNT_METRIC.inc_critical();
      30              :         let backtrace = std::backtrace::Backtrace::capture();
      31              :         tracing::error!("CRITICAL: {}\n{backtrace}", format!($($arg)*));
      32              :     }};
      33              : }
      34              : 
      35              : #[macro_export]
      36              : macro_rules! critical_timeline {
      37              :     ($tenant_shard_id:expr, $timeline_id:expr, $corruption_detected:expr, $($arg:tt)*) => {{
      38              :         if cfg!(debug_assertions) {
      39              :             panic!($($arg)*);
      40              :         }
      41              :         // Increment both metrics
      42              :         $crate::logging::TRACING_EVENT_COUNT_METRIC.inc_critical();
      43              :         $crate::logging::HADRON_CRITICAL_STORAGE_EVENT_COUNT_METRIC.inc(&$tenant_shard_id.to_string(), &$timeline_id.to_string());
      44              :         if let Some(c) = $corruption_detected.as_ref() {
      45              :             c.store(true, std::sync::atomic::Ordering::Relaxed);
      46              :         }
      47              :         let backtrace = std::backtrace::Backtrace::capture();
      48              :         tracing::error!("CRITICAL: [tenant_shard_id: {}, timeline_id: {}] {}\n{backtrace}",
      49              :                        $tenant_shard_id, $timeline_id, format!($($arg)*));
      50              :     }};
      51              : }
      52              : 
      53              : #[derive(EnumString, strum_macros::Display, VariantNames, Eq, PartialEq, Debug, Clone, Copy)]
      54              : #[strum(serialize_all = "snake_case")]
      55              : pub enum LogFormat {
      56              :     Plain,
      57              :     Json,
      58              :     Test,
      59              : }
      60              : 
      61              : impl LogFormat {
      62            0 :     pub fn from_config(s: &str) -> anyhow::Result<LogFormat> {
      63              :         use strum::VariantNames;
      64            0 :         LogFormat::from_str(s).with_context(|| {
      65            0 :             format!(
      66            0 :                 "Unrecognized log format. Please specify one of: {:?}",
      67              :                 LogFormat::VARIANTS
      68              :             )
      69            0 :         })
      70            0 :     }
      71              : }
      72              : 
      73              : pub struct TracingEventCountMetric {
      74              :     /// CRITICAL is not a `tracing` log level. Instead, we increment it in the `critical!` macro,
      75              :     /// and also emit it as a regular error. These are thus double-counted, but that seems fine.
      76              :     critical: IntCounter,
      77              :     error: IntCounter,
      78              :     warn: IntCounter,
      79              :     info: IntCounter,
      80              :     debug: IntCounter,
      81              :     trace: IntCounter,
      82              : }
      83              : 
      84              : // Begin Hadron: Add a HadronCriticalStorageEventCountMetric metric that is sliced by tenant_id and timeline_id
      85              : pub struct HadronCriticalStorageEventCountMetric {
      86              :     critical: IntCounterVec,
      87              : }
      88              : 
      89              : pub static HADRON_CRITICAL_STORAGE_EVENT_COUNT_METRIC: Lazy<HadronCriticalStorageEventCountMetric> =
      90            0 :     Lazy::new(|| {
      91            0 :         let vec = metrics::register_int_counter_vec!(
      92              :             "hadron_critical_storage_event_count",
      93              :             "Number of critical storage events, by tenant_id and timeline_id",
      94            0 :             &["tenant_shard_id", "timeline_id"]
      95              :         )
      96            0 :         .expect("failed to define metric");
      97            0 :         HadronCriticalStorageEventCountMetric::new(vec)
      98            0 :     });
      99              : 
     100              : impl HadronCriticalStorageEventCountMetric {
     101            0 :     fn new(vec: IntCounterVec) -> Self {
     102            0 :         Self { critical: vec }
     103            0 :     }
     104              : 
     105              :     // Allow public access from `critical!` macro.
     106            0 :     pub fn inc(&self, tenant_shard_id: &str, timeline_id: &str) {
     107            0 :         self.critical
     108            0 :             .with_label_values(&[tenant_shard_id, timeline_id])
     109            0 :             .inc();
     110            0 :     }
     111              : }
     112              : // End Hadron
     113              : 
     114          197 : pub static TRACING_EVENT_COUNT_METRIC: Lazy<TracingEventCountMetric> = Lazy::new(|| {
     115          197 :     let vec = metrics::register_int_counter_vec!(
     116              :         "libmetrics_tracing_event_count",
     117              :         "Number of tracing events, by level",
     118          197 :         &["level"]
     119              :     )
     120          197 :     .expect("failed to define metric");
     121          197 :     TracingEventCountMetric::new(vec)
     122          197 : });
     123              : 
     124              : impl TracingEventCountMetric {
     125          198 :     fn new(vec: IntCounterVec) -> Self {
     126          198 :         Self {
     127          198 :             critical: vec.with_label_values(&["critical"]),
     128          198 :             error: vec.with_label_values(&["error"]),
     129          198 :             warn: vec.with_label_values(&["warn"]),
     130          198 :             info: vec.with_label_values(&["info"]),
     131          198 :             debug: vec.with_label_values(&["debug"]),
     132          198 :             trace: vec.with_label_values(&["trace"]),
     133          198 :         }
     134          198 :     }
     135              : 
     136              :     // Allow public access from `critical!` macro.
     137            0 :     pub fn inc_critical(&self) {
     138            0 :         self.critical.inc();
     139            0 :     }
     140              : 
     141        20881 :     fn inc_for_level(&self, level: tracing::Level) {
     142        20881 :         let counter = match level {
     143           22 :             tracing::Level::ERROR => &self.error,
     144          115 :             tracing::Level::WARN => &self.warn,
     145        20742 :             tracing::Level::INFO => &self.info,
     146            1 :             tracing::Level::DEBUG => &self.debug,
     147            1 :             tracing::Level::TRACE => &self.trace,
     148              :         };
     149        20881 :         counter.inc();
     150        20881 :     }
     151              : }
     152              : 
     153              : struct TracingEventCountLayer(&'static TracingEventCountMetric);
     154              : 
     155              : impl<S> tracing_subscriber::layer::Layer<S> for TracingEventCountLayer
     156              : where
     157              :     S: tracing::Subscriber,
     158              : {
     159        20881 :     fn on_event(
     160        20881 :         &self,
     161        20881 :         event: &tracing::Event<'_>,
     162        20881 :         _ctx: tracing_subscriber::layer::Context<'_, S>,
     163        20881 :     ) {
     164        20881 :         self.0.inc_for_level(*event.metadata().level());
     165        20881 :     }
     166              : }
     167              : 
     168              : /// Whether to add the `tracing_error` crate's `ErrorLayer`
     169              : /// to the global tracing subscriber.
     170              : ///
     171              : pub enum TracingErrorLayerEnablement {
     172              :     /// Do not add the `ErrorLayer`.
     173              :     Disabled,
     174              :     /// Add the `ErrorLayer` with the filter specified by RUST_LOG, defaulting to `info` if `RUST_LOG` is unset.
     175              :     EnableWithRustLogFilter,
     176              : }
     177              : 
     178              : /// Where the logging should output to.
     179              : #[derive(Clone, Copy)]
     180              : pub enum Output {
     181              :     Stdout,
     182              :     Stderr,
     183              : }
     184              : 
     185          197 : pub fn init(
     186          197 :     log_format: LogFormat,
     187          197 :     tracing_error_layer_enablement: TracingErrorLayerEnablement,
     188          197 :     output: Output,
     189          197 : ) -> anyhow::Result<()> {
     190              :     // We fall back to printing all spans at info-level or above if
     191              :     // the RUST_LOG environment variable is not set.
     192          521 :     let rust_log_env_filter = || {
     193          521 :         tracing_subscriber::EnvFilter::try_from_default_env()
     194          521 :             .unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("info"))
     195          521 :     };
     196              : 
     197              :     // NB: the order of the with() calls does not matter.
     198              :     // See https://docs.rs/tracing-subscriber/0.3.16/tracing_subscriber/layer/index.html#per-layer-filtering
     199              :     use tracing_subscriber::prelude::*;
     200          197 :     let r = tracing_subscriber::registry();
     201          197 :     let r = r.with({
     202          197 :         let log_layer = tracing_subscriber::fmt::layer()
     203          197 :             .with_target(false)
     204          197 :             .with_ansi(false)
     205          197 :             .with_writer(move || -> Box<dyn std::io::Write> {
     206            0 :                 match output {
     207            0 :                     Output::Stdout => Box::new(std::io::stdout()),
     208            0 :                     Output::Stderr => Box::new(std::io::stderr()),
     209              :                 }
     210            0 :             });
     211          197 :         let log_layer = match log_format {
     212            0 :             LogFormat::Json => log_layer.json().boxed(),
     213            0 :             LogFormat::Plain => log_layer.boxed(),
     214          197 :             LogFormat::Test => log_layer.with_test_writer().boxed(),
     215              :         };
     216          197 :         log_layer.with_filter(rust_log_env_filter())
     217              :     });
     218              : 
     219          197 :     let r = r.with(
     220          197 :         TracingEventCountLayer(&TRACING_EVENT_COUNT_METRIC).with_filter(rust_log_env_filter()),
     221              :     );
     222          197 :     match tracing_error_layer_enablement {
     223          127 :         TracingErrorLayerEnablement::EnableWithRustLogFilter => r
     224          127 :             .with(tracing_error::ErrorLayer::default().with_filter(rust_log_env_filter()))
     225          127 :             .init(),
     226           70 :         TracingErrorLayerEnablement::Disabled => r.init(),
     227              :     }
     228              : 
     229          197 :     Ok(())
     230          197 : }
     231              : 
     232              : /// Disable the default rust panic hook by using `set_hook`.
     233              : ///
     234              : /// For neon binaries, the assumption is that tracing is configured before with [`init`], after
     235              : /// that sentry is configured (if needed). sentry will install it's own on top of this, always
     236              : /// processing the panic before we log it.
     237              : ///
     238              : /// When the return value is dropped, the hook is reverted to std default hook (prints to stderr).
     239              : /// If the assumptions about the initialization order are not held, use
     240              : /// [`TracingPanicHookGuard::forget`] but keep in mind, if tracing is stopped, then panics will be
     241              : /// lost.
     242              : #[must_use]
     243            2 : pub fn replace_panic_hook_with_tracing_panic_hook() -> TracingPanicHookGuard {
     244            2 :     std::panic::set_hook(Box::new(tracing_panic_hook));
     245            2 :     TracingPanicHookGuard::new()
     246            2 : }
     247              : 
     248              : /// Drop guard which restores the std panic hook on drop.
     249              : ///
     250              : /// Tracing should not be used when it's not configured, but we cannot really latch on to any
     251              : /// imaginary lifetime of tracing.
     252              : pub struct TracingPanicHookGuard {
     253              :     act: bool,
     254              : }
     255              : 
     256              : impl TracingPanicHookGuard {
     257            2 :     fn new() -> Self {
     258            2 :         TracingPanicHookGuard { act: true }
     259            2 :     }
     260              : 
     261              :     /// Make this hook guard not do anything when dropped.
     262            2 :     pub fn forget(&mut self) {
     263            2 :         self.act = false;
     264            2 :     }
     265              : }
     266              : 
     267              : impl Drop for TracingPanicHookGuard {
     268            2 :     fn drop(&mut self) {
     269            2 :         if self.act {
     270            0 :             let _ = std::panic::take_hook();
     271            2 :         }
     272            2 :     }
     273              : }
     274              : 
     275              : /// Named symbol for our panic hook, which logs the panic.
     276            3 : fn tracing_panic_hook(info: &std::panic::PanicHookInfo) {
     277              :     // following rust 1.66.1 std implementation:
     278              :     // https://github.com/rust-lang/rust/blob/90743e7298aca107ddaa0c202a4d3604e29bfeb6/library/std/src/panicking.rs#L235-L288
     279            3 :     let location = info.location();
     280              : 
     281            3 :     let msg = match info.payload().downcast_ref::<&'static str>() {
     282            0 :         Some(s) => *s,
     283            3 :         None => match info.payload().downcast_ref::<String>() {
     284            3 :             Some(s) => &s[..],
     285            0 :             None => "Box<dyn Any>",
     286              :         },
     287              :     };
     288              : 
     289            3 :     let thread = std::thread::current();
     290            3 :     let thread = thread.name().unwrap_or("<unnamed>");
     291            3 :     let backtrace = std::backtrace::Backtrace::capture();
     292              : 
     293            3 :     let _entered = if let Some(location) = location {
     294            3 :         tracing::error_span!("panic", %thread, location = %PrettyLocation(location))
     295              :     } else {
     296              :         // very unlikely to hit here, but the guarantees of std could change
     297            0 :         tracing::error_span!("panic", %thread)
     298              :     }
     299            3 :     .entered();
     300              : 
     301            3 :     if backtrace.status() == std::backtrace::BacktraceStatus::Captured {
     302              :         // this has an annoying extra '\n' in the end which anyhow doesn't do, but we cannot really
     303              :         // get rid of it as we cannot get in between of std::fmt::Formatter<'_>; we could format to
     304              :         // string, maybe even to a TLS one but tracing already does that.
     305            3 :         tracing::error!("{msg}\n\nStack backtrace:\n{backtrace}");
     306              :     } else {
     307            0 :         tracing::error!("{msg}");
     308              :     }
     309              : 
     310              :     // ensure that we log something on the panic if this hook is left after tracing has been
     311              :     // unconfigured. worst case when teardown is racing the panic is to log the panic twice.
     312            3 :     tracing::dispatcher::get_default(|d| {
     313            3 :         if let Some(_none) = d.downcast_ref::<tracing::subscriber::NoSubscriber>() {
     314            0 :             let location = location.map(PrettyLocation);
     315            0 :             log_panic_to_stderr(thread, msg, location, &backtrace);
     316            3 :         }
     317            3 :     });
     318            3 : }
     319              : 
     320              : #[cold]
     321            0 : fn log_panic_to_stderr(
     322            0 :     thread: &str,
     323            0 :     msg: &str,
     324            0 :     location: Option<PrettyLocation<'_, '_>>,
     325            0 :     backtrace: &std::backtrace::Backtrace,
     326            0 : ) {
     327            0 :     eprintln!(
     328            0 :         "panic while tracing is unconfigured: thread '{thread}' panicked at '{msg}', {location:?}\nStack backtrace:\n{backtrace}"
     329              :     );
     330            0 : }
     331              : 
     332              : struct PrettyLocation<'a, 'b>(&'a std::panic::Location<'b>);
     333              : 
     334              : impl std::fmt::Display for PrettyLocation<'_, '_> {
     335            3 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
     336            3 :         write!(f, "{}:{}:{}", self.0.file(), self.0.line(), self.0.column())
     337            3 :     }
     338              : }
     339              : 
     340              : impl std::fmt::Debug for PrettyLocation<'_, '_> {
     341            0 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
     342            0 :         <Self as std::fmt::Display>::fmt(self, f)
     343            0 :     }
     344              : }
     345              : 
     346              : /// When you will store a secret but want to make sure it won't
     347              : /// be accidentally logged, wrap it in a SecretString, whose Debug
     348              : /// implementation does not expose the contents.
     349              : #[derive(Clone, Eq, PartialEq)]
     350              : pub struct SecretString(String);
     351              : 
     352              : impl SecretString {
     353            0 :     pub fn get_contents(&self) -> &str {
     354            0 :         self.0.as_str()
     355            0 :     }
     356              : }
     357              : 
     358              : impl From<String> for SecretString {
     359            0 :     fn from(s: String) -> Self {
     360            0 :         Self(s)
     361            0 :     }
     362              : }
     363              : 
     364              : impl FromStr for SecretString {
     365              :     type Err = std::convert::Infallible;
     366              : 
     367            0 :     fn from_str(s: &str) -> Result<Self, Self::Err> {
     368            0 :         Ok(Self(s.to_string()))
     369            0 :     }
     370              : }
     371              : 
     372              : impl std::fmt::Debug for SecretString {
     373            0 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
     374            0 :         write!(f, "[SECRET]")
     375            0 :     }
     376              : }
     377              : 
     378              : /// Logs a periodic message if a future is slow to complete.
     379              : ///
     380              : /// This is performance-sensitive as it's used on the GetPage read path.
     381              : ///
     382              : /// TODO: consider upgrading this to a warning, but currently it fires too often.
     383              : #[inline]
     384          366 : pub async fn log_slow<O>(
     385          366 :     name: &str,
     386          366 :     threshold: Duration,
     387          366 :     f: Pin<&mut impl Future<Output = O>>,
     388          366 : ) -> O {
     389          366 :     monitor_slow_future(
     390          366 :         threshold,
     391          366 :         threshold, // period = threshold
     392          366 :         f,
     393              :         |MonitorSlowFutureCallback {
     394              :              ready,
     395              :              is_slow,
     396              :              elapsed_total,
     397              :              elapsed_since_last_callback: _,
     398            0 :          }| {
     399          366 :             if !is_slow {
     400          366 :                 return;
     401            0 :             }
     402            0 :             let elapsed = elapsed_total.as_secs_f64();
     403            0 :             if ready {
     404            0 :                 info!("slow {name} completed after {elapsed:.3}s");
     405              :             } else {
     406            0 :                 info!("slow {name} still running after {elapsed:.3}s");
     407              :             }
     408            0 :         },
     409              :     )
     410          366 :     .await
     411            0 : }
     412              : 
     413              : /// Logs a periodic warning if a future is slow to complete.
     414              : #[inline]
     415            0 : pub async fn warn_slow<O>(
     416            0 :     name: &str,
     417            0 :     threshold: Duration,
     418            0 :     f: Pin<&mut impl Future<Output = O>>,
     419            0 : ) -> O {
     420            0 :     monitor_slow_future(
     421            0 :         threshold,
     422            0 :         threshold, // period = threshold
     423            0 :         f,
     424              :         |MonitorSlowFutureCallback {
     425              :              ready,
     426              :              is_slow,
     427              :              elapsed_total,
     428              :              elapsed_since_last_callback: _,
     429            0 :          }| {
     430            0 :             if !is_slow {
     431            0 :                 return;
     432            0 :             }
     433            0 :             let elapsed = elapsed_total.as_secs_f64();
     434            0 :             if ready {
     435            0 :                 warn!("slow {name} completed after {elapsed:.3}s");
     436              :             } else {
     437            0 :                 warn!("slow {name} still running after {elapsed:.3}s");
     438              :             }
     439            0 :         },
     440              :     )
     441            0 :     .await
     442            0 : }
     443              : 
     444              : /// Poll future `fut` to completion, invoking callback `cb` at the given `threshold` and every
     445              : /// `period` afterwards, and also unconditionally when the future completes.
     446              : #[inline]
     447       114871 : pub async fn monitor_slow_future<F, O>(
     448       114871 :     threshold: Duration,
     449       114871 :     period: Duration,
     450       114871 :     mut fut: Pin<&mut F>,
     451       114871 :     mut cb: impl FnMut(MonitorSlowFutureCallback),
     452       114871 : ) -> O
     453       114871 : where
     454       114871 :     F: Future<Output = O>,
     455            0 : {
     456       114871 :     let started = Instant::now();
     457       114871 :     let mut attempt = 1;
     458       114871 :     let mut last_cb = started;
     459              :     loop {
     460              :         // NB: use timeout_at() instead of timeout() to avoid an extra clock reading in the common
     461              :         // case where the timeout doesn't fire.
     462       114871 :         let deadline = started + threshold + (attempt - 1) * period;
     463              :         // TODO: still call the callback if the future panics? Copy how we do it for the page_service flush_in_progress counter.
     464       114871 :         let res = tokio::time::timeout_at(deadline, &mut fut).await;
     465       114871 :         let now = Instant::now();
     466       114871 :         let elapsed_total = now - started;
     467       114871 :         cb(MonitorSlowFutureCallback {
     468       114871 :             ready: res.is_ok(),
     469       114871 :             is_slow: elapsed_total >= threshold,
     470       114871 :             elapsed_total,
     471       114871 :             elapsed_since_last_callback: now - last_cb,
     472       114871 :         });
     473       114871 :         last_cb = now;
     474       114871 :         if let Ok(output) = res {
     475       114871 :             return output;
     476            0 :         }
     477            0 :         attempt += 1;
     478              :     }
     479            0 : }
     480              : 
     481              : /// See [`monitor_slow_future`].
     482              : pub struct MonitorSlowFutureCallback {
     483              :     /// Whether the future completed. If true, there will be no more callbacks.
     484              :     pub ready: bool,
     485              :     /// Whether the future is taking `>=` the specififed threshold duration to complete.
     486              :     /// Monotonic: if true in one callback invocation, true in all subsequent onces.
     487              :     pub is_slow: bool,
     488              :     /// The time elapsed since the [`monitor_slow_future`] was first polled.
     489              :     pub elapsed_total: Duration,
     490              :     /// The time elapsed since the last callback invocation.
     491              :     /// For the initial callback invocation, the time elapsed since the [`monitor_slow_future`] was first polled.
     492              :     pub elapsed_since_last_callback: Duration,
     493              : }
     494              : 
     495              : #[cfg(test)]
     496              : mod tests {
     497              :     use metrics::IntCounterVec;
     498              :     use metrics::core::Opts;
     499              : 
     500              :     use crate::logging::{TracingEventCountLayer, TracingEventCountMetric};
     501              : 
     502              :     #[test]
     503            1 :     fn tracing_event_count_metric() {
     504            1 :         let counter_vec =
     505            1 :             IntCounterVec::new(Opts::new("testmetric", "testhelp"), &["level"]).unwrap();
     506            1 :         let metric = Box::leak(Box::new(TracingEventCountMetric::new(counter_vec.clone())));
     507            1 :         let layer = TracingEventCountLayer(metric);
     508              :         use tracing_subscriber::prelude::*;
     509              : 
     510            1 :         tracing::subscriber::with_default(tracing_subscriber::registry().with(layer), || {
     511            1 :             tracing::trace!("foo");
     512            1 :             tracing::debug!("foo");
     513            1 :             tracing::info!("foo");
     514            1 :             tracing::warn!("foo");
     515            1 :             tracing::error!("foo");
     516            1 :         });
     517              : 
     518            1 :         assert_eq!(counter_vec.with_label_values(&["trace"]).get(), 1);
     519            1 :         assert_eq!(counter_vec.with_label_values(&["debug"]).get(), 1);
     520            1 :         assert_eq!(counter_vec.with_label_values(&["info"]).get(), 1);
     521            1 :         assert_eq!(counter_vec.with_label_values(&["warn"]).get(), 1);
     522            1 :         assert_eq!(counter_vec.with_label_values(&["error"]).get(), 1);
     523            1 :     }
     524              : }
        

Generated by: LCOV version 2.1-beta