LCOV - code coverage report
Current view: top level - compute_tools/src - logger.rs (source / functions) Coverage Total Hit
Test: c8f8d331b83562868d9054d9e0e68f866772aeaa.info Lines: 53.8 % 145 78
Test Date: 2025-07-26 17:20:05 Functions: 61.9 % 21 13

            Line data    Source code
       1              : use std::collections::HashMap;
       2              : use std::sync::{LazyLock, RwLock};
       3              : use tracing::Subscriber;
       4              : use tracing::info;
       5              : use tracing_appender;
       6              : use tracing_subscriber::prelude::*;
       7              : use tracing_subscriber::{fmt, layer::SubscriberExt, registry::LookupSpan};
       8              : 
       9              : /// Initialize logging to stderr, and OpenTelemetry tracing and exporter.
      10              : ///
      11              : /// Logging is configured using either `default_log_level` or
      12              : /// `RUST_LOG` environment variable as default log level.
      13              : ///
      14              : /// OpenTelemetry is configured with OTLP/HTTP exporter. It picks up
      15              : /// configuration from environment variables. For example, to change the destination,
      16              : /// set `OTEL_EXPORTER_OTLP_ENDPOINT=http://jaeger:4318`. See
      17              : /// `tracing-utils` package description.
      18              : ///
      19            0 : pub fn init_tracing_and_logging(
      20            0 :     default_log_level: &str,
      21            0 :     log_dir_opt: &Option<String>,
      22            0 : ) -> anyhow::Result<(
      23            0 :     Option<tracing_utils::Provider>,
      24            0 :     Option<tracing_appender::non_blocking::WorkerGuard>,
      25            0 : )> {
      26              :     // Initialize Logging
      27            0 :     let env_filter = tracing_subscriber::EnvFilter::try_from_default_env()
      28            0 :         .unwrap_or_else(|_| tracing_subscriber::EnvFilter::new(default_log_level));
      29              : 
      30              :     // Standard output streams
      31            0 :     let fmt_layer = tracing_subscriber::fmt::layer()
      32            0 :         .with_ansi(false)
      33            0 :         .with_target(false)
      34            0 :         .with_writer(std::io::stderr);
      35              : 
      36              :     // Logs with file rotation. Files in `$log_dir/pgcctl.yyyy-MM-dd`
      37            0 :     let (json_to_file_layer, _file_logs_guard) = if let Some(log_dir) = log_dir_opt {
      38            0 :         std::fs::create_dir_all(log_dir)?;
      39            0 :         let file_logs_appender = tracing_appender::rolling::RollingFileAppender::builder()
      40            0 :             .rotation(tracing_appender::rolling::Rotation::DAILY)
      41            0 :             .filename_prefix("pgcctl")
      42            0 :             // Lib appends to existing files, so we will keep files for up to 2 days even on restart loops.
      43            0 :             // At minimum, log-daemon will have 1 day to detect and upload a file (if created right before midnight).
      44            0 :             .max_log_files(2)
      45            0 :             .build(log_dir)
      46            0 :             .expect("Initializing rolling file appender should succeed");
      47            0 :         let (file_logs_writer, _file_logs_guard) =
      48            0 :             tracing_appender::non_blocking(file_logs_appender);
      49            0 :         let json_to_file_layer = tracing_subscriber::fmt::layer()
      50            0 :             .with_ansi(false)
      51            0 :             .with_target(false)
      52            0 :             .event_format(PgJsonLogShapeFormatter)
      53            0 :             .with_writer(file_logs_writer);
      54            0 :         (Some(json_to_file_layer), Some(_file_logs_guard))
      55              :     } else {
      56            0 :         (None, None)
      57              :     };
      58              : 
      59              :     // Initialize OpenTelemetry
      60            0 :     let provider =
      61            0 :         tracing_utils::init_tracing("compute_ctl", tracing_utils::ExportConfig::default());
      62            0 :     let otlp_layer = provider.as_ref().map(tracing_utils::layer);
      63              : 
      64              :     // Put it all together
      65            0 :     tracing_subscriber::registry()
      66            0 :         .with(env_filter)
      67            0 :         .with(otlp_layer)
      68            0 :         .with(fmt_layer)
      69            0 :         .with(json_to_file_layer)
      70            0 :         .init();
      71            0 :     tracing::info!("logging and tracing started");
      72              : 
      73            0 :     utils::logging::replace_panic_hook_with_tracing_panic_hook().forget();
      74              : 
      75            0 :     Ok((provider, _file_logs_guard))
      76            0 : }
      77              : 
      78              : /// Replace all newline characters with a special character to make it
      79              : /// easier to grep for log messages.
      80            0 : pub fn inlinify(s: &str) -> String {
      81            0 :     s.replace('\n', "\u{200B}")
      82            0 : }
      83              : 
      84            0 : pub fn startup_context_from_env() -> Option<opentelemetry::Context> {
      85              :     // Extract OpenTelemetry context for the startup actions from the
      86              :     // TRACEPARENT and TRACESTATE env variables, and attach it to the current
      87              :     // tracing context.
      88              :     //
      89              :     // This is used to propagate the context for the 'start_compute' operation
      90              :     // from the neon control plane. This allows linking together the wider
      91              :     // 'start_compute' operation that creates the compute container, with the
      92              :     // startup actions here within the container.
      93              :     //
      94              :     // There is no standard for passing context in env variables, but a lot of
      95              :     // tools use TRACEPARENT/TRACESTATE, so we use that convention too. See
      96              :     // https://github.com/open-telemetry/opentelemetry-specification/issues/740
      97              :     //
      98              :     // Switch to the startup context here, and exit it once the startup has
      99              :     // completed and Postgres is up and running.
     100              :     //
     101              :     // If this pod is pre-created without binding it to any particular endpoint
     102              :     // yet, this isn't the right place to enter the startup context. In that
     103              :     // case, the control plane should pass the tracing context as part of the
     104              :     // /configure API call.
     105              :     //
     106              :     // NOTE: This is supposed to only cover the *startup* actions. Once
     107              :     // postgres is configured and up-and-running, we exit this span. Any other
     108              :     // actions that are performed on incoming HTTP requests, for example, are
     109              :     // performed in separate spans.
     110              :     //
     111              :     // XXX: If the pod is restarted, we perform the startup actions in the same
     112              :     // context as the original startup actions, which probably doesn't make
     113              :     // sense.
     114            0 :     let mut startup_tracing_carrier: HashMap<String, String> = HashMap::new();
     115            0 :     if let Ok(val) = std::env::var("TRACEPARENT") {
     116            0 :         startup_tracing_carrier.insert("traceparent".to_string(), val);
     117            0 :     }
     118            0 :     if let Ok(val) = std::env::var("TRACESTATE") {
     119            0 :         startup_tracing_carrier.insert("tracestate".to_string(), val);
     120            0 :     }
     121            0 :     if !startup_tracing_carrier.is_empty() {
     122              :         use opentelemetry::propagation::TextMapPropagator;
     123              :         use opentelemetry_sdk::propagation::TraceContextPropagator;
     124            0 :         info!("got startup tracing context from env variables");
     125            0 :         Some(TraceContextPropagator::new().extract(&startup_tracing_carrier))
     126              :     } else {
     127            0 :         None
     128              :     }
     129            0 : }
     130              : 
     131              : /// Track relevant id's
     132              : const UNKNOWN_IDS: &str = r#""pg_instance_id": "", "pg_compute_id": """#;
     133            1 : static IDS: LazyLock<RwLock<String>> = LazyLock::new(|| RwLock::new(UNKNOWN_IDS.to_string()));
     134              : 
     135            1 : pub fn update_ids(instance_id: &Option<String>, compute_id: &Option<String>) -> anyhow::Result<()> {
     136            1 :     let ids = format!(
     137            1 :         r#""pg_instance_id": "{}", "pg_compute_id": "{}""#,
     138            1 :         instance_id.as_ref().map(|s| s.as_str()).unwrap_or_default(),
     139            1 :         compute_id.as_ref().map(|s| s.as_str()).unwrap_or_default()
     140              :     );
     141            1 :     let mut guard = IDS
     142            1 :         .write()
     143            1 :         .map_err(|e| anyhow::anyhow!("Log set id's rwlock poisoned: {}", e))?;
     144            1 :     *guard = ids;
     145            1 :     Ok(())
     146            1 : }
     147              : 
     148              : /// Massage compute_ctl logs into PG json log shape so we can use the same Lumberjack setup.
     149              : struct PgJsonLogShapeFormatter;
     150              : impl<S, N> fmt::format::FormatEvent<S, N> for PgJsonLogShapeFormatter
     151              : where
     152              :     S: Subscriber + for<'a> LookupSpan<'a>,
     153              :     N: for<'a> fmt::format::FormatFields<'a> + 'static,
     154              : {
     155            2 :     fn format_event(
     156            2 :         &self,
     157            2 :         ctx: &fmt::FmtContext<'_, S, N>,
     158            2 :         mut writer: fmt::format::Writer<'_>,
     159            2 :         event: &tracing::Event<'_>,
     160            2 :     ) -> std::fmt::Result {
     161              :         // Format values from the event's metadata, and open message string
     162            2 :         let metadata = event.metadata();
     163              :         {
     164            2 :             let ids_guard = IDS.read();
     165            2 :             let ids = ids_guard
     166            2 :                 .as_ref()
     167            2 :                 .map(|guard| guard.as_str())
     168              :                 // Surpress so that we don't lose all uploaded/ file logs if something goes super wrong. We would notice the missing id's.
     169            2 :                 .unwrap_or(UNKNOWN_IDS);
     170            2 :             write!(
     171            2 :                 &mut writer,
     172            2 :                 r#"{{"timestamp": "{}", "error_severity": "{}", "file_name": "{}", "backend_type": "compute_ctl_self", {}, "message": "#,
     173            2 :                 chrono::Utc::now().format("%Y-%m-%d %H:%M:%S%.3f GMT"),
     174            2 :                 metadata.level(),
     175            2 :                 metadata.target(),
     176              :                 ids
     177            0 :             )?;
     178              :         }
     179              : 
     180            2 :         let mut message = String::new();
     181            2 :         let message_writer = fmt::format::Writer::new(&mut message);
     182              : 
     183              :         // Gather the message
     184            2 :         ctx.field_format().format_fields(message_writer, event)?;
     185              : 
     186              :         // TODO: any better options than to copy-paste this OSS span formatter?
     187              :         // impl<S, N, T> FormatEvent<S, N> for Format<Full, T>
     188              :         // https://docs.rs/tracing-subscriber/latest/tracing_subscriber/fmt/trait.FormatEvent.html#impl-FormatEvent%3CS,+N%3E-for-Format%3CFull,+T%3E
     189              : 
     190              :         // write message, close bracket, and new line
     191            2 :         writeln!(writer, "{}}}", serde_json::to_string(&message).unwrap())
     192            2 :     }
     193              : }
     194              : 
     195              : #[cfg(feature = "testing")]
     196              : #[cfg(test)]
     197              : mod test {
     198              :     use super::*;
     199              :     use std::{cell::RefCell, io};
     200              : 
     201              :     // Use thread_local! instead of Mutex for test isolation
     202              :     thread_local! {
     203              :         static WRITER_OUTPUT: RefCell<String> = const { RefCell::new(String::new()) };
     204              :     }
     205              : 
     206              :     #[derive(Clone, Default)]
     207              :     struct StaticStringWriter;
     208              : 
     209              :     impl io::Write for StaticStringWriter {
     210            2 :         fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
     211            2 :             let output = String::from_utf8(buf.to_vec()).expect("Invalid UTF-8 in test output");
     212            2 :             WRITER_OUTPUT.with(|s| s.borrow_mut().push_str(&output));
     213            2 :             Ok(buf.len())
     214            2 :         }
     215              : 
     216            0 :         fn flush(&mut self) -> io::Result<()> {
     217            0 :             Ok(())
     218            0 :         }
     219              :     }
     220              : 
     221              :     impl fmt::MakeWriter<'_> for StaticStringWriter {
     222              :         type Writer = Self;
     223              : 
     224            2 :         fn make_writer(&self) -> Self::Writer {
     225            2 :             Self
     226            2 :         }
     227              :     }
     228              : 
     229              :     #[test]
     230            1 :     fn test_log_pg_json_shape_formatter() {
     231              :         // Use a scoped subscriber to prevent global state pollution
     232            1 :         let subscriber = tracing_subscriber::registry().with(
     233            1 :             tracing_subscriber::fmt::layer()
     234            1 :                 .with_ansi(false)
     235            1 :                 .with_target(false)
     236            1 :                 .event_format(PgJsonLogShapeFormatter)
     237            1 :                 .with_writer(StaticStringWriter),
     238              :         );
     239              : 
     240            1 :         let _ = update_ids(&Some("000".to_string()), &Some("111".to_string()));
     241              : 
     242              :         // Clear any previous test state
     243            1 :         WRITER_OUTPUT.with(|s| s.borrow_mut().clear());
     244              : 
     245            1 :         let messages = [
     246            1 :             "test message",
     247            1 :             r#"json escape check:  name="BatchSpanProcessor.Flush.ExportError" reason="Other(reqwest::Error { kind: Request, url: \"http://localhost:4318/v1/traces\", source: hyper_
     248            1 :             util::client::legacy::Error(Connect, ConnectError(\"tcp connect error\", Os { code: 111, kind: ConnectionRefused, message: \"Connection refused\" })) })" Failed during the export process"#,
     249            1 :         ];
     250              : 
     251            1 :         tracing::subscriber::with_default(subscriber, || {
     252            3 :             for message in messages {
     253            2 :                 tracing::info!(message);
     254              :             }
     255            1 :         });
     256            1 :         tracing::info!("not test message");
     257              : 
     258              :         // Get captured output
     259            1 :         let output = WRITER_OUTPUT.with(|s| s.borrow().clone());
     260              : 
     261            1 :         let json_strings: Vec<&str> = output.lines().collect();
     262            1 :         assert_eq!(
     263            1 :             json_strings.len(),
     264            1 :             messages.len(),
     265            0 :             "Log didn't have the expected number of json strings."
     266              :         );
     267              : 
     268            1 :         let json_string_shape_regex = regex::Regex::new(
     269            1 :             r#"\{"timestamp": "\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\.\d{3} GMT", "error_severity": "INFO", "file_name": ".+", "backend_type": "compute_ctl_self", "pg_instance_id": "000", "pg_compute_id": "111", "message": ".+"\}"#
     270            1 :         ).unwrap();
     271              : 
     272            2 :         for (i, expected_message) in messages.iter().enumerate() {
     273            2 :             let json_string = json_strings[i];
     274            2 :             assert!(
     275            2 :                 json_string_shape_regex.is_match(json_string),
     276            0 :                 "Json log didn't match expected pattern:\n{json_string}",
     277              :             );
     278            2 :             let parsed_json: serde_json::Value = serde_json::from_str(json_string).unwrap();
     279            2 :             let actual_message = parsed_json["message"].as_str().unwrap();
     280            2 :             assert_eq!(*expected_message, actual_message);
     281              :         }
     282            1 :     }
     283              : }
        

Generated by: LCOV version 2.1-beta