Line data Source code
1 : use std::future::Future;
2 : use std::str::FromStr;
3 : use std::time::Duration;
4 :
5 : use anyhow::Context;
6 : use metrics::{IntCounter, IntCounterVec};
7 : use once_cell::sync::Lazy;
8 : use strum_macros::{EnumString, VariantNames};
9 : use tokio::time::Instant;
10 : use tracing::info;
11 :
12 : /// Logs a critical error, similarly to `tracing::error!`. This will:
13 : ///
14 : /// * Emit an ERROR log message with prefix "CRITICAL:" and a backtrace.
15 : /// * Trigger a pageable alert (via the metric below).
16 : /// * Increment libmetrics_tracing_event_count{level="critical"}, and indirectly level="error".
17 : /// * In debug builds, panic the process.
18 : ///
19 : /// When including errors in the message, please use {err:?} to include the error cause and original
20 : /// backtrace.
21 : #[macro_export]
22 : macro_rules! critical {
23 : ($($arg:tt)*) => {{
24 : if cfg!(debug_assertions) {
25 : panic!($($arg)*);
26 : }
27 : // Increment both metrics
28 : $crate::logging::TRACING_EVENT_COUNT_METRIC.inc_critical();
29 : let backtrace = std::backtrace::Backtrace::capture();
30 : tracing::error!("CRITICAL: {}\n{backtrace}", format!($($arg)*));
31 : }};
32 : }
33 :
34 : #[macro_export]
35 : macro_rules! critical_timeline {
36 : ($tenant_shard_id:expr, $timeline_id:expr, $($arg:tt)*) => {{
37 : if cfg!(debug_assertions) {
38 : panic!($($arg)*);
39 : }
40 : // Increment both metrics
41 : $crate::logging::TRACING_EVENT_COUNT_METRIC.inc_critical();
42 : $crate::logging::HADRON_CRITICAL_STORAGE_EVENT_COUNT_METRIC.inc(&$tenant_shard_id.to_string(), &$timeline_id.to_string());
43 : let backtrace = std::backtrace::Backtrace::capture();
44 : tracing::error!("CRITICAL: [tenant_shard_id: {}, timeline_id: {}] {}\n{backtrace}",
45 : $tenant_shard_id, $timeline_id, format!($($arg)*));
46 : }};
47 : }
48 :
49 : #[derive(EnumString, strum_macros::Display, VariantNames, Eq, PartialEq, Debug, Clone, Copy)]
50 : #[strum(serialize_all = "snake_case")]
51 : pub enum LogFormat {
52 : Plain,
53 : Json,
54 : Test,
55 : }
56 :
57 : impl LogFormat {
58 0 : pub fn from_config(s: &str) -> anyhow::Result<LogFormat> {
59 : use strum::VariantNames;
60 0 : LogFormat::from_str(s).with_context(|| {
61 0 : format!(
62 0 : "Unrecognized log format. Please specify one of: {:?}",
63 : LogFormat::VARIANTS
64 : )
65 0 : })
66 0 : }
67 : }
68 :
69 : pub struct TracingEventCountMetric {
70 : /// CRITICAL is not a `tracing` log level. Instead, we increment it in the `critical!` macro,
71 : /// and also emit it as a regular error. These are thus double-counted, but that seems fine.
72 : critical: IntCounter,
73 : error: IntCounter,
74 : warn: IntCounter,
75 : info: IntCounter,
76 : debug: IntCounter,
77 : trace: IntCounter,
78 : }
79 :
80 : // Begin Hadron: Add a HadronCriticalStorageEventCountMetric metric that is sliced by tenant_id and timeline_id
81 : pub struct HadronCriticalStorageEventCountMetric {
82 : critical: IntCounterVec,
83 : }
84 :
85 : pub static HADRON_CRITICAL_STORAGE_EVENT_COUNT_METRIC: Lazy<HadronCriticalStorageEventCountMetric> =
86 0 : Lazy::new(|| {
87 0 : let vec = metrics::register_int_counter_vec!(
88 : "hadron_critical_storage_event_count",
89 : "Number of critical storage events, by tenant_id and timeline_id",
90 0 : &["tenant_shard_id", "timeline_id"]
91 : )
92 0 : .expect("failed to define metric");
93 0 : HadronCriticalStorageEventCountMetric::new(vec)
94 0 : });
95 :
96 : impl HadronCriticalStorageEventCountMetric {
97 0 : fn new(vec: IntCounterVec) -> Self {
98 0 : Self { critical: vec }
99 0 : }
100 :
101 : // Allow public access from `critical!` macro.
102 0 : pub fn inc(&self, tenant_shard_id: &str, timeline_id: &str) {
103 0 : self.critical
104 0 : .with_label_values(&[tenant_shard_id, timeline_id])
105 0 : .inc();
106 0 : }
107 : }
108 : // End Hadron
109 :
110 197 : pub static TRACING_EVENT_COUNT_METRIC: Lazy<TracingEventCountMetric> = Lazy::new(|| {
111 197 : let vec = metrics::register_int_counter_vec!(
112 : "libmetrics_tracing_event_count",
113 : "Number of tracing events, by level",
114 197 : &["level"]
115 : )
116 197 : .expect("failed to define metric");
117 197 : TracingEventCountMetric::new(vec)
118 197 : });
119 :
120 : impl TracingEventCountMetric {
121 198 : fn new(vec: IntCounterVec) -> Self {
122 198 : Self {
123 198 : critical: vec.with_label_values(&["critical"]),
124 198 : error: vec.with_label_values(&["error"]),
125 198 : warn: vec.with_label_values(&["warn"]),
126 198 : info: vec.with_label_values(&["info"]),
127 198 : debug: vec.with_label_values(&["debug"]),
128 198 : trace: vec.with_label_values(&["trace"]),
129 198 : }
130 198 : }
131 :
132 : // Allow public access from `critical!` macro.
133 0 : pub fn inc_critical(&self) {
134 0 : self.critical.inc();
135 0 : }
136 :
137 20472 : fn inc_for_level(&self, level: tracing::Level) {
138 20472 : let counter = match level {
139 24 : tracing::Level::ERROR => &self.error,
140 118 : tracing::Level::WARN => &self.warn,
141 20328 : tracing::Level::INFO => &self.info,
142 1 : tracing::Level::DEBUG => &self.debug,
143 1 : tracing::Level::TRACE => &self.trace,
144 : };
145 20472 : counter.inc();
146 20472 : }
147 : }
148 :
149 : struct TracingEventCountLayer(&'static TracingEventCountMetric);
150 :
151 : impl<S> tracing_subscriber::layer::Layer<S> for TracingEventCountLayer
152 : where
153 : S: tracing::Subscriber,
154 : {
155 20472 : fn on_event(
156 20472 : &self,
157 20472 : event: &tracing::Event<'_>,
158 20472 : _ctx: tracing_subscriber::layer::Context<'_, S>,
159 20472 : ) {
160 20472 : self.0.inc_for_level(*event.metadata().level());
161 20472 : }
162 : }
163 :
164 : /// Whether to add the `tracing_error` crate's `ErrorLayer`
165 : /// to the global tracing subscriber.
166 : ///
167 : pub enum TracingErrorLayerEnablement {
168 : /// Do not add the `ErrorLayer`.
169 : Disabled,
170 : /// Add the `ErrorLayer` with the filter specified by RUST_LOG, defaulting to `info` if `RUST_LOG` is unset.
171 : EnableWithRustLogFilter,
172 : }
173 :
174 : /// Where the logging should output to.
175 : #[derive(Clone, Copy)]
176 : pub enum Output {
177 : Stdout,
178 : Stderr,
179 : }
180 :
181 197 : pub fn init(
182 197 : log_format: LogFormat,
183 197 : tracing_error_layer_enablement: TracingErrorLayerEnablement,
184 197 : output: Output,
185 197 : ) -> anyhow::Result<()> {
186 : // We fall back to printing all spans at info-level or above if
187 : // the RUST_LOG environment variable is not set.
188 521 : let rust_log_env_filter = || {
189 521 : tracing_subscriber::EnvFilter::try_from_default_env()
190 521 : .unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("info"))
191 521 : };
192 :
193 : // NB: the order of the with() calls does not matter.
194 : // See https://docs.rs/tracing-subscriber/0.3.16/tracing_subscriber/layer/index.html#per-layer-filtering
195 : use tracing_subscriber::prelude::*;
196 197 : let r = tracing_subscriber::registry();
197 197 : let r = r.with({
198 197 : let log_layer = tracing_subscriber::fmt::layer()
199 197 : .with_target(false)
200 197 : .with_ansi(false)
201 197 : .with_writer(move || -> Box<dyn std::io::Write> {
202 0 : match output {
203 0 : Output::Stdout => Box::new(std::io::stdout()),
204 0 : Output::Stderr => Box::new(std::io::stderr()),
205 : }
206 0 : });
207 197 : let log_layer = match log_format {
208 0 : LogFormat::Json => log_layer.json().boxed(),
209 0 : LogFormat::Plain => log_layer.boxed(),
210 197 : LogFormat::Test => log_layer.with_test_writer().boxed(),
211 : };
212 197 : log_layer.with_filter(rust_log_env_filter())
213 : });
214 :
215 197 : let r = r.with(
216 197 : TracingEventCountLayer(&TRACING_EVENT_COUNT_METRIC).with_filter(rust_log_env_filter()),
217 : );
218 197 : match tracing_error_layer_enablement {
219 127 : TracingErrorLayerEnablement::EnableWithRustLogFilter => r
220 127 : .with(tracing_error::ErrorLayer::default().with_filter(rust_log_env_filter()))
221 127 : .init(),
222 70 : TracingErrorLayerEnablement::Disabled => r.init(),
223 : }
224 :
225 197 : Ok(())
226 197 : }
227 :
228 : /// Disable the default rust panic hook by using `set_hook`.
229 : ///
230 : /// For neon binaries, the assumption is that tracing is configured before with [`init`], after
231 : /// that sentry is configured (if needed). sentry will install it's own on top of this, always
232 : /// processing the panic before we log it.
233 : ///
234 : /// When the return value is dropped, the hook is reverted to std default hook (prints to stderr).
235 : /// If the assumptions about the initialization order are not held, use
236 : /// [`TracingPanicHookGuard::forget`] but keep in mind, if tracing is stopped, then panics will be
237 : /// lost.
238 : #[must_use]
239 2 : pub fn replace_panic_hook_with_tracing_panic_hook() -> TracingPanicHookGuard {
240 2 : std::panic::set_hook(Box::new(tracing_panic_hook));
241 2 : TracingPanicHookGuard::new()
242 2 : }
243 :
244 : /// Drop guard which restores the std panic hook on drop.
245 : ///
246 : /// Tracing should not be used when it's not configured, but we cannot really latch on to any
247 : /// imaginary lifetime of tracing.
248 : pub struct TracingPanicHookGuard {
249 : act: bool,
250 : }
251 :
252 : impl TracingPanicHookGuard {
253 2 : fn new() -> Self {
254 2 : TracingPanicHookGuard { act: true }
255 2 : }
256 :
257 : /// Make this hook guard not do anything when dropped.
258 2 : pub fn forget(&mut self) {
259 2 : self.act = false;
260 2 : }
261 : }
262 :
263 : impl Drop for TracingPanicHookGuard {
264 2 : fn drop(&mut self) {
265 2 : if self.act {
266 0 : let _ = std::panic::take_hook();
267 2 : }
268 2 : }
269 : }
270 :
271 : /// Named symbol for our panic hook, which logs the panic.
272 3 : fn tracing_panic_hook(info: &std::panic::PanicHookInfo) {
273 : // following rust 1.66.1 std implementation:
274 : // https://github.com/rust-lang/rust/blob/90743e7298aca107ddaa0c202a4d3604e29bfeb6/library/std/src/panicking.rs#L235-L288
275 3 : let location = info.location();
276 :
277 3 : let msg = match info.payload().downcast_ref::<&'static str>() {
278 0 : Some(s) => *s,
279 3 : None => match info.payload().downcast_ref::<String>() {
280 3 : Some(s) => &s[..],
281 0 : None => "Box<dyn Any>",
282 : },
283 : };
284 :
285 3 : let thread = std::thread::current();
286 3 : let thread = thread.name().unwrap_or("<unnamed>");
287 3 : let backtrace = std::backtrace::Backtrace::capture();
288 :
289 3 : let _entered = if let Some(location) = location {
290 3 : tracing::error_span!("panic", %thread, location = %PrettyLocation(location))
291 : } else {
292 : // very unlikely to hit here, but the guarantees of std could change
293 0 : tracing::error_span!("panic", %thread)
294 : }
295 3 : .entered();
296 :
297 3 : if backtrace.status() == std::backtrace::BacktraceStatus::Captured {
298 : // this has an annoying extra '\n' in the end which anyhow doesn't do, but we cannot really
299 : // get rid of it as we cannot get in between of std::fmt::Formatter<'_>; we could format to
300 : // string, maybe even to a TLS one but tracing already does that.
301 3 : tracing::error!("{msg}\n\nStack backtrace:\n{backtrace}");
302 : } else {
303 0 : tracing::error!("{msg}");
304 : }
305 :
306 : // ensure that we log something on the panic if this hook is left after tracing has been
307 : // unconfigured. worst case when teardown is racing the panic is to log the panic twice.
308 3 : tracing::dispatcher::get_default(|d| {
309 3 : if let Some(_none) = d.downcast_ref::<tracing::subscriber::NoSubscriber>() {
310 0 : let location = location.map(PrettyLocation);
311 0 : log_panic_to_stderr(thread, msg, location, &backtrace);
312 3 : }
313 3 : });
314 3 : }
315 :
316 : #[cold]
317 0 : fn log_panic_to_stderr(
318 0 : thread: &str,
319 0 : msg: &str,
320 0 : location: Option<PrettyLocation<'_, '_>>,
321 0 : backtrace: &std::backtrace::Backtrace,
322 0 : ) {
323 0 : eprintln!(
324 0 : "panic while tracing is unconfigured: thread '{thread}' panicked at '{msg}', {location:?}\nStack backtrace:\n{backtrace}"
325 : );
326 0 : }
327 :
328 : struct PrettyLocation<'a, 'b>(&'a std::panic::Location<'b>);
329 :
330 : impl std::fmt::Display for PrettyLocation<'_, '_> {
331 3 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
332 3 : write!(f, "{}:{}:{}", self.0.file(), self.0.line(), self.0.column())
333 3 : }
334 : }
335 :
336 : impl std::fmt::Debug for PrettyLocation<'_, '_> {
337 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
338 0 : <Self as std::fmt::Display>::fmt(self, f)
339 0 : }
340 : }
341 :
342 : /// When you will store a secret but want to make sure it won't
343 : /// be accidentally logged, wrap it in a SecretString, whose Debug
344 : /// implementation does not expose the contents.
345 : #[derive(Clone, Eq, PartialEq)]
346 : pub struct SecretString(String);
347 :
348 : impl SecretString {
349 0 : pub fn get_contents(&self) -> &str {
350 0 : self.0.as_str()
351 0 : }
352 : }
353 :
354 : impl From<String> for SecretString {
355 0 : fn from(s: String) -> Self {
356 0 : Self(s)
357 0 : }
358 : }
359 :
360 : impl FromStr for SecretString {
361 : type Err = std::convert::Infallible;
362 :
363 0 : fn from_str(s: &str) -> Result<Self, Self::Err> {
364 0 : Ok(Self(s.to_string()))
365 0 : }
366 : }
367 :
368 : impl std::fmt::Debug for SecretString {
369 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
370 0 : write!(f, "[SECRET]")
371 0 : }
372 : }
373 :
374 : /// Logs a periodic message if a future is slow to complete.
375 : ///
376 : /// This is performance-sensitive as it's used on the GetPage read path.
377 : ///
378 : /// TODO: consider upgrading this to a warning, but currently it fires too often.
379 : #[inline]
380 0 : pub async fn log_slow<F, O>(name: &str, threshold: Duration, f: std::pin::Pin<&mut F>) -> O
381 0 : where
382 0 : F: Future<Output = O>,
383 0 : {
384 0 : monitor_slow_future(
385 0 : threshold,
386 0 : threshold, // period = threshold
387 0 : f,
388 : |MonitorSlowFutureCallback {
389 : ready,
390 : is_slow,
391 : elapsed_total,
392 : elapsed_since_last_callback: _,
393 0 : }| {
394 0 : if !is_slow {
395 0 : return;
396 0 : }
397 0 : if ready {
398 0 : info!(
399 0 : "slow {name} completed after {:.3}s",
400 0 : elapsed_total.as_secs_f64()
401 : );
402 : } else {
403 0 : info!(
404 0 : "slow {name} still running after {:.3}s",
405 0 : elapsed_total.as_secs_f64()
406 : );
407 : }
408 0 : },
409 : )
410 0 : .await
411 0 : }
412 :
413 : /// Poll future `fut` to completion, invoking callback `cb` at the given `threshold` and every
414 : /// `period` afterwards, and also unconditionally when the future completes.
415 : #[inline]
416 114636 : pub async fn monitor_slow_future<F, O>(
417 114636 : threshold: Duration,
418 114636 : period: Duration,
419 114636 : mut fut: std::pin::Pin<&mut F>,
420 114636 : mut cb: impl FnMut(MonitorSlowFutureCallback),
421 114636 : ) -> O
422 114636 : where
423 114636 : F: Future<Output = O>,
424 0 : {
425 114636 : let started = Instant::now();
426 114636 : let mut attempt = 1;
427 114636 : let mut last_cb = started;
428 : loop {
429 : // NB: use timeout_at() instead of timeout() to avoid an extra clock reading in the common
430 : // case where the timeout doesn't fire.
431 114636 : let deadline = started + threshold + (attempt - 1) * period;
432 : // TODO: still call the callback if the future panics? Copy how we do it for the page_service flush_in_progress counter.
433 114636 : let res = tokio::time::timeout_at(deadline, &mut fut).await;
434 114636 : let now = Instant::now();
435 114636 : let elapsed_total = now - started;
436 114636 : cb(MonitorSlowFutureCallback {
437 114636 : ready: res.is_ok(),
438 114636 : is_slow: elapsed_total >= threshold,
439 114636 : elapsed_total,
440 114636 : elapsed_since_last_callback: now - last_cb,
441 114636 : });
442 114636 : last_cb = now;
443 114636 : if let Ok(output) = res {
444 114636 : return output;
445 0 : }
446 0 : attempt += 1;
447 : }
448 0 : }
449 :
450 : /// See [`monitor_slow_future`].
451 : pub struct MonitorSlowFutureCallback {
452 : /// Whether the future completed. If true, there will be no more callbacks.
453 : pub ready: bool,
454 : /// Whether the future is taking `>=` the specififed threshold duration to complete.
455 : /// Monotonic: if true in one callback invocation, true in all subsequent onces.
456 : pub is_slow: bool,
457 : /// The time elapsed since the [`monitor_slow_future`] was first polled.
458 : pub elapsed_total: Duration,
459 : /// The time elapsed since the last callback invocation.
460 : /// For the initial callback invocation, the time elapsed since the [`monitor_slow_future`] was first polled.
461 : pub elapsed_since_last_callback: Duration,
462 : }
463 :
464 : #[cfg(test)]
465 : mod tests {
466 : use metrics::IntCounterVec;
467 : use metrics::core::Opts;
468 :
469 : use crate::logging::{TracingEventCountLayer, TracingEventCountMetric};
470 :
471 : #[test]
472 1 : fn tracing_event_count_metric() {
473 1 : let counter_vec =
474 1 : IntCounterVec::new(Opts::new("testmetric", "testhelp"), &["level"]).unwrap();
475 1 : let metric = Box::leak(Box::new(TracingEventCountMetric::new(counter_vec.clone())));
476 1 : let layer = TracingEventCountLayer(metric);
477 : use tracing_subscriber::prelude::*;
478 :
479 1 : tracing::subscriber::with_default(tracing_subscriber::registry().with(layer), || {
480 1 : tracing::trace!("foo");
481 1 : tracing::debug!("foo");
482 1 : tracing::info!("foo");
483 1 : tracing::warn!("foo");
484 1 : tracing::error!("foo");
485 1 : });
486 :
487 1 : assert_eq!(counter_vec.with_label_values(&["trace"]).get(), 1);
488 1 : assert_eq!(counter_vec.with_label_values(&["debug"]).get(), 1);
489 1 : assert_eq!(counter_vec.with_label_values(&["info"]).get(), 1);
490 1 : assert_eq!(counter_vec.with_label_values(&["warn"]).get(), 1);
491 1 : assert_eq!(counter_vec.with_label_values(&["error"]).get(), 1);
492 1 : }
493 : }
|