Line data Source code
1 : use std::future::Future;
2 : use std::pin::Pin;
3 : use std::str::FromStr;
4 : use std::time::Duration;
5 :
6 : use anyhow::Context;
7 : use metrics::{IntCounter, IntCounterVec};
8 : use once_cell::sync::Lazy;
9 : use strum_macros::{EnumString, VariantNames};
10 : use tokio::time::Instant;
11 : use tracing::{info, warn};
12 :
13 : /// Logs a critical error, similarly to `tracing::error!`. This will:
14 : ///
15 : /// * Emit an ERROR log message with prefix "CRITICAL:" and a backtrace.
16 : /// * Trigger a pageable alert (via the metric below).
17 : /// * Increment libmetrics_tracing_event_count{level="critical"}, and indirectly level="error".
18 : /// * In debug builds, panic the process.
19 : ///
20 : /// When including errors in the message, please use {err:?} to include the error cause and original
21 : /// backtrace.
22 : #[macro_export]
23 : macro_rules! critical {
24 : ($($arg:tt)*) => {{
25 : if cfg!(debug_assertions) {
26 : panic!($($arg)*);
27 : }
28 : // Increment both metrics
29 : $crate::logging::TRACING_EVENT_COUNT_METRIC.inc_critical();
30 : let backtrace = std::backtrace::Backtrace::capture();
31 : tracing::error!("CRITICAL: {}\n{backtrace}", format!($($arg)*));
32 : }};
33 : }
34 :
35 : #[macro_export]
36 : macro_rules! critical_timeline {
37 : ($tenant_shard_id:expr, $timeline_id:expr, $($arg:tt)*) => {{
38 : if cfg!(debug_assertions) {
39 : panic!($($arg)*);
40 : }
41 : // Increment both metrics
42 : $crate::logging::TRACING_EVENT_COUNT_METRIC.inc_critical();
43 : $crate::logging::HADRON_CRITICAL_STORAGE_EVENT_COUNT_METRIC.inc(&$tenant_shard_id.to_string(), &$timeline_id.to_string());
44 : let backtrace = std::backtrace::Backtrace::capture();
45 : tracing::error!("CRITICAL: [tenant_shard_id: {}, timeline_id: {}] {}\n{backtrace}",
46 : $tenant_shard_id, $timeline_id, format!($($arg)*));
47 : }};
48 : }
49 :
50 : #[derive(EnumString, strum_macros::Display, VariantNames, Eq, PartialEq, Debug, Clone, Copy)]
51 : #[strum(serialize_all = "snake_case")]
52 : pub enum LogFormat {
53 : Plain,
54 : Json,
55 : Test,
56 : }
57 :
58 : impl LogFormat {
59 0 : pub fn from_config(s: &str) -> anyhow::Result<LogFormat> {
60 : use strum::VariantNames;
61 0 : LogFormat::from_str(s).with_context(|| {
62 0 : format!(
63 0 : "Unrecognized log format. Please specify one of: {:?}",
64 : LogFormat::VARIANTS
65 : )
66 0 : })
67 0 : }
68 : }
69 :
70 : pub struct TracingEventCountMetric {
71 : /// CRITICAL is not a `tracing` log level. Instead, we increment it in the `critical!` macro,
72 : /// and also emit it as a regular error. These are thus double-counted, but that seems fine.
73 : critical: IntCounter,
74 : error: IntCounter,
75 : warn: IntCounter,
76 : info: IntCounter,
77 : debug: IntCounter,
78 : trace: IntCounter,
79 : }
80 :
81 : // Begin Hadron: Add a HadronCriticalStorageEventCountMetric metric that is sliced by tenant_id and timeline_id
82 : pub struct HadronCriticalStorageEventCountMetric {
83 : critical: IntCounterVec,
84 : }
85 :
86 : pub static HADRON_CRITICAL_STORAGE_EVENT_COUNT_METRIC: Lazy<HadronCriticalStorageEventCountMetric> =
87 0 : Lazy::new(|| {
88 0 : let vec = metrics::register_int_counter_vec!(
89 : "hadron_critical_storage_event_count",
90 : "Number of critical storage events, by tenant_id and timeline_id",
91 0 : &["tenant_shard_id", "timeline_id"]
92 : )
93 0 : .expect("failed to define metric");
94 0 : HadronCriticalStorageEventCountMetric::new(vec)
95 0 : });
96 :
97 : impl HadronCriticalStorageEventCountMetric {
98 0 : fn new(vec: IntCounterVec) -> Self {
99 0 : Self { critical: vec }
100 0 : }
101 :
102 : // Allow public access from `critical!` macro.
103 0 : pub fn inc(&self, tenant_shard_id: &str, timeline_id: &str) {
104 0 : self.critical
105 0 : .with_label_values(&[tenant_shard_id, timeline_id])
106 0 : .inc();
107 0 : }
108 : }
109 : // End Hadron
110 :
111 197 : pub static TRACING_EVENT_COUNT_METRIC: Lazy<TracingEventCountMetric> = Lazy::new(|| {
112 197 : let vec = metrics::register_int_counter_vec!(
113 : "libmetrics_tracing_event_count",
114 : "Number of tracing events, by level",
115 197 : &["level"]
116 : )
117 197 : .expect("failed to define metric");
118 197 : TracingEventCountMetric::new(vec)
119 197 : });
120 :
121 : impl TracingEventCountMetric {
122 198 : fn new(vec: IntCounterVec) -> Self {
123 198 : Self {
124 198 : critical: vec.with_label_values(&["critical"]),
125 198 : error: vec.with_label_values(&["error"]),
126 198 : warn: vec.with_label_values(&["warn"]),
127 198 : info: vec.with_label_values(&["info"]),
128 198 : debug: vec.with_label_values(&["debug"]),
129 198 : trace: vec.with_label_values(&["trace"]),
130 198 : }
131 198 : }
132 :
133 : // Allow public access from `critical!` macro.
134 0 : pub fn inc_critical(&self) {
135 0 : self.critical.inc();
136 0 : }
137 :
138 20473 : fn inc_for_level(&self, level: tracing::Level) {
139 20473 : let counter = match level {
140 22 : tracing::Level::ERROR => &self.error,
141 118 : tracing::Level::WARN => &self.warn,
142 20331 : tracing::Level::INFO => &self.info,
143 1 : tracing::Level::DEBUG => &self.debug,
144 1 : tracing::Level::TRACE => &self.trace,
145 : };
146 20473 : counter.inc();
147 20473 : }
148 : }
149 :
150 : struct TracingEventCountLayer(&'static TracingEventCountMetric);
151 :
152 : impl<S> tracing_subscriber::layer::Layer<S> for TracingEventCountLayer
153 : where
154 : S: tracing::Subscriber,
155 : {
156 20473 : fn on_event(
157 20473 : &self,
158 20473 : event: &tracing::Event<'_>,
159 20473 : _ctx: tracing_subscriber::layer::Context<'_, S>,
160 20473 : ) {
161 20473 : self.0.inc_for_level(*event.metadata().level());
162 20473 : }
163 : }
164 :
165 : /// Whether to add the `tracing_error` crate's `ErrorLayer`
166 : /// to the global tracing subscriber.
167 : ///
168 : pub enum TracingErrorLayerEnablement {
169 : /// Do not add the `ErrorLayer`.
170 : Disabled,
171 : /// Add the `ErrorLayer` with the filter specified by RUST_LOG, defaulting to `info` if `RUST_LOG` is unset.
172 : EnableWithRustLogFilter,
173 : }
174 :
175 : /// Where the logging should output to.
176 : #[derive(Clone, Copy)]
177 : pub enum Output {
178 : Stdout,
179 : Stderr,
180 : }
181 :
182 197 : pub fn init(
183 197 : log_format: LogFormat,
184 197 : tracing_error_layer_enablement: TracingErrorLayerEnablement,
185 197 : output: Output,
186 197 : ) -> anyhow::Result<()> {
187 : // We fall back to printing all spans at info-level or above if
188 : // the RUST_LOG environment variable is not set.
189 521 : let rust_log_env_filter = || {
190 521 : tracing_subscriber::EnvFilter::try_from_default_env()
191 521 : .unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("info"))
192 521 : };
193 :
194 : // NB: the order of the with() calls does not matter.
195 : // See https://docs.rs/tracing-subscriber/0.3.16/tracing_subscriber/layer/index.html#per-layer-filtering
196 : use tracing_subscriber::prelude::*;
197 197 : let r = tracing_subscriber::registry();
198 197 : let r = r.with({
199 197 : let log_layer = tracing_subscriber::fmt::layer()
200 197 : .with_target(false)
201 197 : .with_ansi(false)
202 197 : .with_writer(move || -> Box<dyn std::io::Write> {
203 0 : match output {
204 0 : Output::Stdout => Box::new(std::io::stdout()),
205 0 : Output::Stderr => Box::new(std::io::stderr()),
206 : }
207 0 : });
208 197 : let log_layer = match log_format {
209 0 : LogFormat::Json => log_layer.json().boxed(),
210 0 : LogFormat::Plain => log_layer.boxed(),
211 197 : LogFormat::Test => log_layer.with_test_writer().boxed(),
212 : };
213 197 : log_layer.with_filter(rust_log_env_filter())
214 : });
215 :
216 197 : let r = r.with(
217 197 : TracingEventCountLayer(&TRACING_EVENT_COUNT_METRIC).with_filter(rust_log_env_filter()),
218 : );
219 197 : match tracing_error_layer_enablement {
220 127 : TracingErrorLayerEnablement::EnableWithRustLogFilter => r
221 127 : .with(tracing_error::ErrorLayer::default().with_filter(rust_log_env_filter()))
222 127 : .init(),
223 70 : TracingErrorLayerEnablement::Disabled => r.init(),
224 : }
225 :
226 197 : Ok(())
227 197 : }
228 :
229 : /// Disable the default rust panic hook by using `set_hook`.
230 : ///
231 : /// For neon binaries, the assumption is that tracing is configured before with [`init`], after
232 : /// that sentry is configured (if needed). sentry will install it's own on top of this, always
233 : /// processing the panic before we log it.
234 : ///
235 : /// When the return value is dropped, the hook is reverted to std default hook (prints to stderr).
236 : /// If the assumptions about the initialization order are not held, use
237 : /// [`TracingPanicHookGuard::forget`] but keep in mind, if tracing is stopped, then panics will be
238 : /// lost.
239 : #[must_use]
240 2 : pub fn replace_panic_hook_with_tracing_panic_hook() -> TracingPanicHookGuard {
241 2 : std::panic::set_hook(Box::new(tracing_panic_hook));
242 2 : TracingPanicHookGuard::new()
243 2 : }
244 :
245 : /// Drop guard which restores the std panic hook on drop.
246 : ///
247 : /// Tracing should not be used when it's not configured, but we cannot really latch on to any
248 : /// imaginary lifetime of tracing.
249 : pub struct TracingPanicHookGuard {
250 : act: bool,
251 : }
252 :
253 : impl TracingPanicHookGuard {
254 2 : fn new() -> Self {
255 2 : TracingPanicHookGuard { act: true }
256 2 : }
257 :
258 : /// Make this hook guard not do anything when dropped.
259 2 : pub fn forget(&mut self) {
260 2 : self.act = false;
261 2 : }
262 : }
263 :
264 : impl Drop for TracingPanicHookGuard {
265 2 : fn drop(&mut self) {
266 2 : if self.act {
267 0 : let _ = std::panic::take_hook();
268 2 : }
269 2 : }
270 : }
271 :
272 : /// Named symbol for our panic hook, which logs the panic.
273 3 : fn tracing_panic_hook(info: &std::panic::PanicHookInfo) {
274 : // following rust 1.66.1 std implementation:
275 : // https://github.com/rust-lang/rust/blob/90743e7298aca107ddaa0c202a4d3604e29bfeb6/library/std/src/panicking.rs#L235-L288
276 3 : let location = info.location();
277 :
278 3 : let msg = match info.payload().downcast_ref::<&'static str>() {
279 0 : Some(s) => *s,
280 3 : None => match info.payload().downcast_ref::<String>() {
281 3 : Some(s) => &s[..],
282 0 : None => "Box<dyn Any>",
283 : },
284 : };
285 :
286 3 : let thread = std::thread::current();
287 3 : let thread = thread.name().unwrap_or("<unnamed>");
288 3 : let backtrace = std::backtrace::Backtrace::capture();
289 :
290 3 : let _entered = if let Some(location) = location {
291 3 : tracing::error_span!("panic", %thread, location = %PrettyLocation(location))
292 : } else {
293 : // very unlikely to hit here, but the guarantees of std could change
294 0 : tracing::error_span!("panic", %thread)
295 : }
296 3 : .entered();
297 :
298 3 : if backtrace.status() == std::backtrace::BacktraceStatus::Captured {
299 : // this has an annoying extra '\n' in the end which anyhow doesn't do, but we cannot really
300 : // get rid of it as we cannot get in between of std::fmt::Formatter<'_>; we could format to
301 : // string, maybe even to a TLS one but tracing already does that.
302 3 : tracing::error!("{msg}\n\nStack backtrace:\n{backtrace}");
303 : } else {
304 0 : tracing::error!("{msg}");
305 : }
306 :
307 : // ensure that we log something on the panic if this hook is left after tracing has been
308 : // unconfigured. worst case when teardown is racing the panic is to log the panic twice.
309 3 : tracing::dispatcher::get_default(|d| {
310 3 : if let Some(_none) = d.downcast_ref::<tracing::subscriber::NoSubscriber>() {
311 0 : let location = location.map(PrettyLocation);
312 0 : log_panic_to_stderr(thread, msg, location, &backtrace);
313 3 : }
314 3 : });
315 3 : }
316 :
317 : #[cold]
318 0 : fn log_panic_to_stderr(
319 0 : thread: &str,
320 0 : msg: &str,
321 0 : location: Option<PrettyLocation<'_, '_>>,
322 0 : backtrace: &std::backtrace::Backtrace,
323 0 : ) {
324 0 : eprintln!(
325 0 : "panic while tracing is unconfigured: thread '{thread}' panicked at '{msg}', {location:?}\nStack backtrace:\n{backtrace}"
326 : );
327 0 : }
328 :
329 : struct PrettyLocation<'a, 'b>(&'a std::panic::Location<'b>);
330 :
331 : impl std::fmt::Display for PrettyLocation<'_, '_> {
332 3 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
333 3 : write!(f, "{}:{}:{}", self.0.file(), self.0.line(), self.0.column())
334 3 : }
335 : }
336 :
337 : impl std::fmt::Debug for PrettyLocation<'_, '_> {
338 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
339 0 : <Self as std::fmt::Display>::fmt(self, f)
340 0 : }
341 : }
342 :
343 : /// When you will store a secret but want to make sure it won't
344 : /// be accidentally logged, wrap it in a SecretString, whose Debug
345 : /// implementation does not expose the contents.
346 : #[derive(Clone, Eq, PartialEq)]
347 : pub struct SecretString(String);
348 :
349 : impl SecretString {
350 0 : pub fn get_contents(&self) -> &str {
351 0 : self.0.as_str()
352 0 : }
353 : }
354 :
355 : impl From<String> for SecretString {
356 0 : fn from(s: String) -> Self {
357 0 : Self(s)
358 0 : }
359 : }
360 :
361 : impl FromStr for SecretString {
362 : type Err = std::convert::Infallible;
363 :
364 0 : fn from_str(s: &str) -> Result<Self, Self::Err> {
365 0 : Ok(Self(s.to_string()))
366 0 : }
367 : }
368 :
369 : impl std::fmt::Debug for SecretString {
370 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
371 0 : write!(f, "[SECRET]")
372 0 : }
373 : }
374 :
375 : /// Logs a periodic message if a future is slow to complete.
376 : ///
377 : /// This is performance-sensitive as it's used on the GetPage read path.
378 : ///
379 : /// TODO: consider upgrading this to a warning, but currently it fires too often.
380 : #[inline]
381 0 : pub async fn log_slow<O>(
382 0 : name: &str,
383 0 : threshold: Duration,
384 0 : f: Pin<&mut impl Future<Output = O>>,
385 0 : ) -> O {
386 0 : monitor_slow_future(
387 0 : threshold,
388 0 : threshold, // period = threshold
389 0 : f,
390 : |MonitorSlowFutureCallback {
391 : ready,
392 : is_slow,
393 : elapsed_total,
394 : elapsed_since_last_callback: _,
395 0 : }| {
396 0 : if !is_slow {
397 0 : return;
398 0 : }
399 0 : let elapsed = elapsed_total.as_secs_f64();
400 0 : if ready {
401 0 : info!("slow {name} completed after {elapsed:.3}s");
402 : } else {
403 0 : info!("slow {name} still running after {elapsed:.3}s");
404 : }
405 0 : },
406 : )
407 0 : .await
408 0 : }
409 :
410 : /// Logs a periodic warning if a future is slow to complete.
411 : #[inline]
412 0 : pub async fn warn_slow<O>(
413 0 : name: &str,
414 0 : threshold: Duration,
415 0 : f: Pin<&mut impl Future<Output = O>>,
416 0 : ) -> O {
417 0 : monitor_slow_future(
418 0 : threshold,
419 0 : threshold, // period = threshold
420 0 : f,
421 : |MonitorSlowFutureCallback {
422 : ready,
423 : is_slow,
424 : elapsed_total,
425 : elapsed_since_last_callback: _,
426 0 : }| {
427 0 : if !is_slow {
428 0 : return;
429 0 : }
430 0 : let elapsed = elapsed_total.as_secs_f64();
431 0 : if ready {
432 0 : warn!("slow {name} completed after {elapsed:.3}s");
433 : } else {
434 0 : warn!("slow {name} still running after {elapsed:.3}s");
435 : }
436 0 : },
437 : )
438 0 : .await
439 0 : }
440 :
441 : /// Poll future `fut` to completion, invoking callback `cb` at the given `threshold` and every
442 : /// `period` afterwards, and also unconditionally when the future completes.
443 : #[inline]
444 112980 : pub async fn monitor_slow_future<F, O>(
445 112980 : threshold: Duration,
446 112980 : period: Duration,
447 112980 : mut fut: Pin<&mut F>,
448 112980 : mut cb: impl FnMut(MonitorSlowFutureCallback),
449 112980 : ) -> O
450 112980 : where
451 112980 : F: Future<Output = O>,
452 0 : {
453 112980 : let started = Instant::now();
454 112980 : let mut attempt = 1;
455 112980 : let mut last_cb = started;
456 : loop {
457 : // NB: use timeout_at() instead of timeout() to avoid an extra clock reading in the common
458 : // case where the timeout doesn't fire.
459 112980 : let deadline = started + threshold + (attempt - 1) * period;
460 : // TODO: still call the callback if the future panics? Copy how we do it for the page_service flush_in_progress counter.
461 112980 : let res = tokio::time::timeout_at(deadline, &mut fut).await;
462 112980 : let now = Instant::now();
463 112980 : let elapsed_total = now - started;
464 112980 : cb(MonitorSlowFutureCallback {
465 112980 : ready: res.is_ok(),
466 112980 : is_slow: elapsed_total >= threshold,
467 112980 : elapsed_total,
468 112980 : elapsed_since_last_callback: now - last_cb,
469 112980 : });
470 112980 : last_cb = now;
471 112980 : if let Ok(output) = res {
472 112980 : return output;
473 0 : }
474 0 : attempt += 1;
475 : }
476 0 : }
477 :
478 : /// See [`monitor_slow_future`].
479 : pub struct MonitorSlowFutureCallback {
480 : /// Whether the future completed. If true, there will be no more callbacks.
481 : pub ready: bool,
482 : /// Whether the future is taking `>=` the specififed threshold duration to complete.
483 : /// Monotonic: if true in one callback invocation, true in all subsequent onces.
484 : pub is_slow: bool,
485 : /// The time elapsed since the [`monitor_slow_future`] was first polled.
486 : pub elapsed_total: Duration,
487 : /// The time elapsed since the last callback invocation.
488 : /// For the initial callback invocation, the time elapsed since the [`monitor_slow_future`] was first polled.
489 : pub elapsed_since_last_callback: Duration,
490 : }
491 :
492 : #[cfg(test)]
493 : mod tests {
494 : use metrics::IntCounterVec;
495 : use metrics::core::Opts;
496 :
497 : use crate::logging::{TracingEventCountLayer, TracingEventCountMetric};
498 :
499 : #[test]
500 1 : fn tracing_event_count_metric() {
501 1 : let counter_vec =
502 1 : IntCounterVec::new(Opts::new("testmetric", "testhelp"), &["level"]).unwrap();
503 1 : let metric = Box::leak(Box::new(TracingEventCountMetric::new(counter_vec.clone())));
504 1 : let layer = TracingEventCountLayer(metric);
505 : use tracing_subscriber::prelude::*;
506 :
507 1 : tracing::subscriber::with_default(tracing_subscriber::registry().with(layer), || {
508 1 : tracing::trace!("foo");
509 1 : tracing::debug!("foo");
510 1 : tracing::info!("foo");
511 1 : tracing::warn!("foo");
512 1 : tracing::error!("foo");
513 1 : });
514 :
515 1 : assert_eq!(counter_vec.with_label_values(&["trace"]).get(), 1);
516 1 : assert_eq!(counter_vec.with_label_values(&["debug"]).get(), 1);
517 1 : assert_eq!(counter_vec.with_label_values(&["info"]).get(), 1);
518 1 : assert_eq!(counter_vec.with_label_values(&["warn"]).get(), 1);
519 1 : assert_eq!(counter_vec.with_label_values(&["error"]).get(), 1);
520 1 : }
521 : }
|