Line data Source code
1 : use std::future::Future;
2 : use std::pin::Pin;
3 : use std::str::FromStr;
4 : use std::time::Duration;
5 :
6 : use anyhow::Context;
7 : use metrics::{IntCounter, IntCounterVec};
8 : use once_cell::sync::Lazy;
9 : use strum_macros::{EnumString, VariantNames};
10 : use tokio::time::Instant;
11 : use tracing::{info, warn};
12 :
13 : /// Logs a critical error, similarly to `tracing::error!`. This will:
14 : ///
15 : /// * Emit an ERROR log message with prefix "CRITICAL:" and a backtrace.
16 : /// * Trigger a pageable alert (via the metric below).
17 : /// * Increment libmetrics_tracing_event_count{level="critical"}, and indirectly level="error".
18 : /// * In debug builds, panic the process.
19 : ///
20 : /// When including errors in the message, please use {err:?} to include the error cause and original
21 : /// backtrace.
22 : #[macro_export]
23 : macro_rules! critical {
24 : ($($arg:tt)*) => {{
25 : if cfg!(debug_assertions) {
26 : panic!($($arg)*);
27 : }
28 : // Increment both metrics
29 : $crate::logging::TRACING_EVENT_COUNT_METRIC.inc_critical();
30 : let backtrace = std::backtrace::Backtrace::capture();
31 : tracing::error!("CRITICAL: {}\n{backtrace}", format!($($arg)*));
32 : }};
33 : }
34 :
35 : #[macro_export]
36 : macro_rules! critical_timeline {
37 : ($tenant_shard_id:expr, $timeline_id:expr, $corruption_detected:expr, $($arg:tt)*) => {{
38 : if cfg!(debug_assertions) {
39 : panic!($($arg)*);
40 : }
41 : // Increment both metrics
42 : $crate::logging::TRACING_EVENT_COUNT_METRIC.inc_critical();
43 : $crate::logging::HADRON_CRITICAL_STORAGE_EVENT_COUNT_METRIC.inc(&$tenant_shard_id.to_string(), &$timeline_id.to_string());
44 : if let Some(c) = $corruption_detected.as_ref() {
45 : c.store(true, std::sync::atomic::Ordering::Relaxed);
46 : }
47 : let backtrace = std::backtrace::Backtrace::capture();
48 : tracing::error!("CRITICAL: [tenant_shard_id: {}, timeline_id: {}] {}\n{backtrace}",
49 : $tenant_shard_id, $timeline_id, format!($($arg)*));
50 : }};
51 : }
52 :
53 : #[derive(EnumString, strum_macros::Display, VariantNames, Eq, PartialEq, Debug, Clone, Copy)]
54 : #[strum(serialize_all = "snake_case")]
55 : pub enum LogFormat {
56 : Plain,
57 : Json,
58 : Test,
59 : }
60 :
61 : impl LogFormat {
62 0 : pub fn from_config(s: &str) -> anyhow::Result<LogFormat> {
63 : use strum::VariantNames;
64 0 : LogFormat::from_str(s).with_context(|| {
65 0 : format!(
66 0 : "Unrecognized log format. Please specify one of: {:?}",
67 : LogFormat::VARIANTS
68 : )
69 0 : })
70 0 : }
71 : }
72 :
73 : pub struct TracingEventCountMetric {
74 : /// CRITICAL is not a `tracing` log level. Instead, we increment it in the `critical!` macro,
75 : /// and also emit it as a regular error. These are thus double-counted, but that seems fine.
76 : critical: IntCounter,
77 : error: IntCounter,
78 : warn: IntCounter,
79 : info: IntCounter,
80 : debug: IntCounter,
81 : trace: IntCounter,
82 : }
83 :
84 : // Begin Hadron: Add a HadronCriticalStorageEventCountMetric metric that is sliced by tenant_id and timeline_id
85 : pub struct HadronCriticalStorageEventCountMetric {
86 : critical: IntCounterVec,
87 : }
88 :
89 : pub static HADRON_CRITICAL_STORAGE_EVENT_COUNT_METRIC: Lazy<HadronCriticalStorageEventCountMetric> =
90 0 : Lazy::new(|| {
91 0 : let vec = metrics::register_int_counter_vec!(
92 : "hadron_critical_storage_event_count",
93 : "Number of critical storage events, by tenant_id and timeline_id",
94 0 : &["tenant_shard_id", "timeline_id"]
95 : )
96 0 : .expect("failed to define metric");
97 0 : HadronCriticalStorageEventCountMetric::new(vec)
98 0 : });
99 :
100 : impl HadronCriticalStorageEventCountMetric {
101 0 : fn new(vec: IntCounterVec) -> Self {
102 0 : Self { critical: vec }
103 0 : }
104 :
105 : // Allow public access from `critical!` macro.
106 0 : pub fn inc(&self, tenant_shard_id: &str, timeline_id: &str) {
107 0 : self.critical
108 0 : .with_label_values(&[tenant_shard_id, timeline_id])
109 0 : .inc();
110 0 : }
111 : }
112 : // End Hadron
113 :
114 197 : pub static TRACING_EVENT_COUNT_METRIC: Lazy<TracingEventCountMetric> = Lazy::new(|| {
115 197 : let vec = metrics::register_int_counter_vec!(
116 : "libmetrics_tracing_event_count",
117 : "Number of tracing events, by level",
118 197 : &["level"]
119 : )
120 197 : .expect("failed to define metric");
121 197 : TracingEventCountMetric::new(vec)
122 197 : });
123 :
124 : impl TracingEventCountMetric {
125 198 : fn new(vec: IntCounterVec) -> Self {
126 198 : Self {
127 198 : critical: vec.with_label_values(&["critical"]),
128 198 : error: vec.with_label_values(&["error"]),
129 198 : warn: vec.with_label_values(&["warn"]),
130 198 : info: vec.with_label_values(&["info"]),
131 198 : debug: vec.with_label_values(&["debug"]),
132 198 : trace: vec.with_label_values(&["trace"]),
133 198 : }
134 198 : }
135 :
136 : // Allow public access from `critical!` macro.
137 0 : pub fn inc_critical(&self) {
138 0 : self.critical.inc();
139 0 : }
140 :
141 20881 : fn inc_for_level(&self, level: tracing::Level) {
142 20881 : let counter = match level {
143 22 : tracing::Level::ERROR => &self.error,
144 115 : tracing::Level::WARN => &self.warn,
145 20742 : tracing::Level::INFO => &self.info,
146 1 : tracing::Level::DEBUG => &self.debug,
147 1 : tracing::Level::TRACE => &self.trace,
148 : };
149 20881 : counter.inc();
150 20881 : }
151 : }
152 :
153 : struct TracingEventCountLayer(&'static TracingEventCountMetric);
154 :
155 : impl<S> tracing_subscriber::layer::Layer<S> for TracingEventCountLayer
156 : where
157 : S: tracing::Subscriber,
158 : {
159 20881 : fn on_event(
160 20881 : &self,
161 20881 : event: &tracing::Event<'_>,
162 20881 : _ctx: tracing_subscriber::layer::Context<'_, S>,
163 20881 : ) {
164 20881 : self.0.inc_for_level(*event.metadata().level());
165 20881 : }
166 : }
167 :
168 : /// Whether to add the `tracing_error` crate's `ErrorLayer`
169 : /// to the global tracing subscriber.
170 : ///
171 : pub enum TracingErrorLayerEnablement {
172 : /// Do not add the `ErrorLayer`.
173 : Disabled,
174 : /// Add the `ErrorLayer` with the filter specified by RUST_LOG, defaulting to `info` if `RUST_LOG` is unset.
175 : EnableWithRustLogFilter,
176 : }
177 :
178 : /// Where the logging should output to.
179 : #[derive(Clone, Copy)]
180 : pub enum Output {
181 : Stdout,
182 : Stderr,
183 : }
184 :
185 197 : pub fn init(
186 197 : log_format: LogFormat,
187 197 : tracing_error_layer_enablement: TracingErrorLayerEnablement,
188 197 : output: Output,
189 197 : ) -> anyhow::Result<()> {
190 : // We fall back to printing all spans at info-level or above if
191 : // the RUST_LOG environment variable is not set.
192 521 : let rust_log_env_filter = || {
193 521 : tracing_subscriber::EnvFilter::try_from_default_env()
194 521 : .unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("info"))
195 521 : };
196 :
197 : // NB: the order of the with() calls does not matter.
198 : // See https://docs.rs/tracing-subscriber/0.3.16/tracing_subscriber/layer/index.html#per-layer-filtering
199 : use tracing_subscriber::prelude::*;
200 197 : let r = tracing_subscriber::registry();
201 197 : let r = r.with({
202 197 : let log_layer = tracing_subscriber::fmt::layer()
203 197 : .with_target(false)
204 197 : .with_ansi(false)
205 197 : .with_writer(move || -> Box<dyn std::io::Write> {
206 0 : match output {
207 0 : Output::Stdout => Box::new(std::io::stdout()),
208 0 : Output::Stderr => Box::new(std::io::stderr()),
209 : }
210 0 : });
211 197 : let log_layer = match log_format {
212 0 : LogFormat::Json => log_layer.json().boxed(),
213 0 : LogFormat::Plain => log_layer.boxed(),
214 197 : LogFormat::Test => log_layer.with_test_writer().boxed(),
215 : };
216 197 : log_layer.with_filter(rust_log_env_filter())
217 : });
218 :
219 197 : let r = r.with(
220 197 : TracingEventCountLayer(&TRACING_EVENT_COUNT_METRIC).with_filter(rust_log_env_filter()),
221 : );
222 197 : match tracing_error_layer_enablement {
223 127 : TracingErrorLayerEnablement::EnableWithRustLogFilter => r
224 127 : .with(tracing_error::ErrorLayer::default().with_filter(rust_log_env_filter()))
225 127 : .init(),
226 70 : TracingErrorLayerEnablement::Disabled => r.init(),
227 : }
228 :
229 197 : Ok(())
230 197 : }
231 :
232 : /// Disable the default rust panic hook by using `set_hook`.
233 : ///
234 : /// For neon binaries, the assumption is that tracing is configured before with [`init`], after
235 : /// that sentry is configured (if needed). sentry will install it's own on top of this, always
236 : /// processing the panic before we log it.
237 : ///
238 : /// When the return value is dropped, the hook is reverted to std default hook (prints to stderr).
239 : /// If the assumptions about the initialization order are not held, use
240 : /// [`TracingPanicHookGuard::forget`] but keep in mind, if tracing is stopped, then panics will be
241 : /// lost.
242 : #[must_use]
243 2 : pub fn replace_panic_hook_with_tracing_panic_hook() -> TracingPanicHookGuard {
244 2 : std::panic::set_hook(Box::new(tracing_panic_hook));
245 2 : TracingPanicHookGuard::new()
246 2 : }
247 :
248 : /// Drop guard which restores the std panic hook on drop.
249 : ///
250 : /// Tracing should not be used when it's not configured, but we cannot really latch on to any
251 : /// imaginary lifetime of tracing.
252 : pub struct TracingPanicHookGuard {
253 : act: bool,
254 : }
255 :
256 : impl TracingPanicHookGuard {
257 2 : fn new() -> Self {
258 2 : TracingPanicHookGuard { act: true }
259 2 : }
260 :
261 : /// Make this hook guard not do anything when dropped.
262 2 : pub fn forget(&mut self) {
263 2 : self.act = false;
264 2 : }
265 : }
266 :
267 : impl Drop for TracingPanicHookGuard {
268 2 : fn drop(&mut self) {
269 2 : if self.act {
270 0 : let _ = std::panic::take_hook();
271 2 : }
272 2 : }
273 : }
274 :
275 : /// Named symbol for our panic hook, which logs the panic.
276 3 : fn tracing_panic_hook(info: &std::panic::PanicHookInfo) {
277 : // following rust 1.66.1 std implementation:
278 : // https://github.com/rust-lang/rust/blob/90743e7298aca107ddaa0c202a4d3604e29bfeb6/library/std/src/panicking.rs#L235-L288
279 3 : let location = info.location();
280 :
281 3 : let msg = match info.payload().downcast_ref::<&'static str>() {
282 0 : Some(s) => *s,
283 3 : None => match info.payload().downcast_ref::<String>() {
284 3 : Some(s) => &s[..],
285 0 : None => "Box<dyn Any>",
286 : },
287 : };
288 :
289 3 : let thread = std::thread::current();
290 3 : let thread = thread.name().unwrap_or("<unnamed>");
291 3 : let backtrace = std::backtrace::Backtrace::capture();
292 :
293 3 : let _entered = if let Some(location) = location {
294 3 : tracing::error_span!("panic", %thread, location = %PrettyLocation(location))
295 : } else {
296 : // very unlikely to hit here, but the guarantees of std could change
297 0 : tracing::error_span!("panic", %thread)
298 : }
299 3 : .entered();
300 :
301 3 : if backtrace.status() == std::backtrace::BacktraceStatus::Captured {
302 : // this has an annoying extra '\n' in the end which anyhow doesn't do, but we cannot really
303 : // get rid of it as we cannot get in between of std::fmt::Formatter<'_>; we could format to
304 : // string, maybe even to a TLS one but tracing already does that.
305 3 : tracing::error!("{msg}\n\nStack backtrace:\n{backtrace}");
306 : } else {
307 0 : tracing::error!("{msg}");
308 : }
309 :
310 : // ensure that we log something on the panic if this hook is left after tracing has been
311 : // unconfigured. worst case when teardown is racing the panic is to log the panic twice.
312 3 : tracing::dispatcher::get_default(|d| {
313 3 : if let Some(_none) = d.downcast_ref::<tracing::subscriber::NoSubscriber>() {
314 0 : let location = location.map(PrettyLocation);
315 0 : log_panic_to_stderr(thread, msg, location, &backtrace);
316 3 : }
317 3 : });
318 3 : }
319 :
320 : #[cold]
321 0 : fn log_panic_to_stderr(
322 0 : thread: &str,
323 0 : msg: &str,
324 0 : location: Option<PrettyLocation<'_, '_>>,
325 0 : backtrace: &std::backtrace::Backtrace,
326 0 : ) {
327 0 : eprintln!(
328 0 : "panic while tracing is unconfigured: thread '{thread}' panicked at '{msg}', {location:?}\nStack backtrace:\n{backtrace}"
329 : );
330 0 : }
331 :
332 : struct PrettyLocation<'a, 'b>(&'a std::panic::Location<'b>);
333 :
334 : impl std::fmt::Display for PrettyLocation<'_, '_> {
335 3 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
336 3 : write!(f, "{}:{}:{}", self.0.file(), self.0.line(), self.0.column())
337 3 : }
338 : }
339 :
340 : impl std::fmt::Debug for PrettyLocation<'_, '_> {
341 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
342 0 : <Self as std::fmt::Display>::fmt(self, f)
343 0 : }
344 : }
345 :
346 : /// When you will store a secret but want to make sure it won't
347 : /// be accidentally logged, wrap it in a SecretString, whose Debug
348 : /// implementation does not expose the contents.
349 : #[derive(Clone, Eq, PartialEq)]
350 : pub struct SecretString(String);
351 :
352 : impl SecretString {
353 0 : pub fn get_contents(&self) -> &str {
354 0 : self.0.as_str()
355 0 : }
356 : }
357 :
358 : impl From<String> for SecretString {
359 0 : fn from(s: String) -> Self {
360 0 : Self(s)
361 0 : }
362 : }
363 :
364 : impl FromStr for SecretString {
365 : type Err = std::convert::Infallible;
366 :
367 0 : fn from_str(s: &str) -> Result<Self, Self::Err> {
368 0 : Ok(Self(s.to_string()))
369 0 : }
370 : }
371 :
372 : impl std::fmt::Debug for SecretString {
373 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
374 0 : write!(f, "[SECRET]")
375 0 : }
376 : }
377 :
378 : /// Logs a periodic message if a future is slow to complete.
379 : ///
380 : /// This is performance-sensitive as it's used on the GetPage read path.
381 : ///
382 : /// TODO: consider upgrading this to a warning, but currently it fires too often.
383 : #[inline]
384 366 : pub async fn log_slow<O>(
385 366 : name: &str,
386 366 : threshold: Duration,
387 366 : f: Pin<&mut impl Future<Output = O>>,
388 366 : ) -> O {
389 366 : monitor_slow_future(
390 366 : threshold,
391 366 : threshold, // period = threshold
392 366 : f,
393 : |MonitorSlowFutureCallback {
394 : ready,
395 : is_slow,
396 : elapsed_total,
397 : elapsed_since_last_callback: _,
398 0 : }| {
399 366 : if !is_slow {
400 366 : return;
401 0 : }
402 0 : let elapsed = elapsed_total.as_secs_f64();
403 0 : if ready {
404 0 : info!("slow {name} completed after {elapsed:.3}s");
405 : } else {
406 0 : info!("slow {name} still running after {elapsed:.3}s");
407 : }
408 0 : },
409 : )
410 366 : .await
411 0 : }
412 :
413 : /// Logs a periodic warning if a future is slow to complete.
414 : #[inline]
415 0 : pub async fn warn_slow<O>(
416 0 : name: &str,
417 0 : threshold: Duration,
418 0 : f: Pin<&mut impl Future<Output = O>>,
419 0 : ) -> O {
420 0 : monitor_slow_future(
421 0 : threshold,
422 0 : threshold, // period = threshold
423 0 : f,
424 : |MonitorSlowFutureCallback {
425 : ready,
426 : is_slow,
427 : elapsed_total,
428 : elapsed_since_last_callback: _,
429 0 : }| {
430 0 : if !is_slow {
431 0 : return;
432 0 : }
433 0 : let elapsed = elapsed_total.as_secs_f64();
434 0 : if ready {
435 0 : warn!("slow {name} completed after {elapsed:.3}s");
436 : } else {
437 0 : warn!("slow {name} still running after {elapsed:.3}s");
438 : }
439 0 : },
440 : )
441 0 : .await
442 0 : }
443 :
444 : /// Poll future `fut` to completion, invoking callback `cb` at the given `threshold` and every
445 : /// `period` afterwards, and also unconditionally when the future completes.
446 : #[inline]
447 114871 : pub async fn monitor_slow_future<F, O>(
448 114871 : threshold: Duration,
449 114871 : period: Duration,
450 114871 : mut fut: Pin<&mut F>,
451 114871 : mut cb: impl FnMut(MonitorSlowFutureCallback),
452 114871 : ) -> O
453 114871 : where
454 114871 : F: Future<Output = O>,
455 0 : {
456 114871 : let started = Instant::now();
457 114871 : let mut attempt = 1;
458 114871 : let mut last_cb = started;
459 : loop {
460 : // NB: use timeout_at() instead of timeout() to avoid an extra clock reading in the common
461 : // case where the timeout doesn't fire.
462 114871 : let deadline = started + threshold + (attempt - 1) * period;
463 : // TODO: still call the callback if the future panics? Copy how we do it for the page_service flush_in_progress counter.
464 114871 : let res = tokio::time::timeout_at(deadline, &mut fut).await;
465 114871 : let now = Instant::now();
466 114871 : let elapsed_total = now - started;
467 114871 : cb(MonitorSlowFutureCallback {
468 114871 : ready: res.is_ok(),
469 114871 : is_slow: elapsed_total >= threshold,
470 114871 : elapsed_total,
471 114871 : elapsed_since_last_callback: now - last_cb,
472 114871 : });
473 114871 : last_cb = now;
474 114871 : if let Ok(output) = res {
475 114871 : return output;
476 0 : }
477 0 : attempt += 1;
478 : }
479 0 : }
480 :
481 : /// See [`monitor_slow_future`].
482 : pub struct MonitorSlowFutureCallback {
483 : /// Whether the future completed. If true, there will be no more callbacks.
484 : pub ready: bool,
485 : /// Whether the future is taking `>=` the specififed threshold duration to complete.
486 : /// Monotonic: if true in one callback invocation, true in all subsequent onces.
487 : pub is_slow: bool,
488 : /// The time elapsed since the [`monitor_slow_future`] was first polled.
489 : pub elapsed_total: Duration,
490 : /// The time elapsed since the last callback invocation.
491 : /// For the initial callback invocation, the time elapsed since the [`monitor_slow_future`] was first polled.
492 : pub elapsed_since_last_callback: Duration,
493 : }
494 :
495 : #[cfg(test)]
496 : mod tests {
497 : use metrics::IntCounterVec;
498 : use metrics::core::Opts;
499 :
500 : use crate::logging::{TracingEventCountLayer, TracingEventCountMetric};
501 :
502 : #[test]
503 1 : fn tracing_event_count_metric() {
504 1 : let counter_vec =
505 1 : IntCounterVec::new(Opts::new("testmetric", "testhelp"), &["level"]).unwrap();
506 1 : let metric = Box::leak(Box::new(TracingEventCountMetric::new(counter_vec.clone())));
507 1 : let layer = TracingEventCountLayer(metric);
508 : use tracing_subscriber::prelude::*;
509 :
510 1 : tracing::subscriber::with_default(tracing_subscriber::registry().with(layer), || {
511 1 : tracing::trace!("foo");
512 1 : tracing::debug!("foo");
513 1 : tracing::info!("foo");
514 1 : tracing::warn!("foo");
515 1 : tracing::error!("foo");
516 1 : });
517 :
518 1 : assert_eq!(counter_vec.with_label_values(&["trace"]).get(), 1);
519 1 : assert_eq!(counter_vec.with_label_values(&["debug"]).get(), 1);
520 1 : assert_eq!(counter_vec.with_label_values(&["info"]).get(), 1);
521 1 : assert_eq!(counter_vec.with_label_values(&["warn"]).get(), 1);
522 1 : assert_eq!(counter_vec.with_label_values(&["error"]).get(), 1);
523 1 : }
524 : }
|