Line data Source code
1 : use std::collections::HashMap;
2 : use std::sync::{LazyLock, RwLock};
3 : use tracing::Subscriber;
4 : use tracing::info;
5 : use tracing_appender;
6 : use tracing_subscriber::prelude::*;
7 : use tracing_subscriber::{fmt, layer::SubscriberExt, registry::LookupSpan};
8 :
9 : /// Initialize logging to stderr, and OpenTelemetry tracing and exporter.
10 : ///
11 : /// Logging is configured using either `default_log_level` or
12 : /// `RUST_LOG` environment variable as default log level.
13 : ///
14 : /// OpenTelemetry is configured with OTLP/HTTP exporter. It picks up
15 : /// configuration from environment variables. For example, to change the destination,
16 : /// set `OTEL_EXPORTER_OTLP_ENDPOINT=http://jaeger:4318`. See
17 : /// `tracing-utils` package description.
18 : ///
19 0 : pub fn init_tracing_and_logging(
20 0 : default_log_level: &str,
21 0 : log_dir_opt: &Option<String>,
22 0 : ) -> anyhow::Result<(
23 0 : Option<tracing_utils::Provider>,
24 0 : Option<tracing_appender::non_blocking::WorkerGuard>,
25 0 : )> {
26 : // Initialize Logging
27 0 : let env_filter = tracing_subscriber::EnvFilter::try_from_default_env()
28 0 : .unwrap_or_else(|_| tracing_subscriber::EnvFilter::new(default_log_level));
29 :
30 : // Standard output streams
31 0 : let fmt_layer = tracing_subscriber::fmt::layer()
32 0 : .with_ansi(false)
33 0 : .with_target(false)
34 0 : .with_writer(std::io::stderr);
35 :
36 : // Logs with file rotation. Files in `$log_dir/pgcctl.yyyy-MM-dd`
37 0 : let (json_to_file_layer, _file_logs_guard) = if let Some(log_dir) = log_dir_opt {
38 0 : std::fs::create_dir_all(log_dir)?;
39 0 : let file_logs_appender = tracing_appender::rolling::RollingFileAppender::builder()
40 0 : .rotation(tracing_appender::rolling::Rotation::DAILY)
41 0 : .filename_prefix("pgcctl")
42 0 : // Lib appends to existing files, so we will keep files for up to 2 days even on restart loops.
43 0 : // At minimum, log-daemon will have 1 day to detect and upload a file (if created right before midnight).
44 0 : .max_log_files(2)
45 0 : .build(log_dir)
46 0 : .expect("Initializing rolling file appender should succeed");
47 0 : let (file_logs_writer, _file_logs_guard) =
48 0 : tracing_appender::non_blocking(file_logs_appender);
49 0 : let json_to_file_layer = tracing_subscriber::fmt::layer()
50 0 : .with_ansi(false)
51 0 : .with_target(false)
52 0 : .event_format(PgJsonLogShapeFormatter)
53 0 : .with_writer(file_logs_writer);
54 0 : (Some(json_to_file_layer), Some(_file_logs_guard))
55 : } else {
56 0 : (None, None)
57 : };
58 :
59 : // Initialize OpenTelemetry
60 0 : let provider =
61 0 : tracing_utils::init_tracing("compute_ctl", tracing_utils::ExportConfig::default());
62 0 : let otlp_layer = provider.as_ref().map(tracing_utils::layer);
63 :
64 : // Put it all together
65 0 : tracing_subscriber::registry()
66 0 : .with(env_filter)
67 0 : .with(otlp_layer)
68 0 : .with(fmt_layer)
69 0 : .with(json_to_file_layer)
70 0 : .init();
71 0 : tracing::info!("logging and tracing started");
72 :
73 0 : utils::logging::replace_panic_hook_with_tracing_panic_hook().forget();
74 :
75 0 : Ok((provider, _file_logs_guard))
76 0 : }
77 :
78 : /// Replace all newline characters with a special character to make it
79 : /// easier to grep for log messages.
80 0 : pub fn inlinify(s: &str) -> String {
81 0 : s.replace('\n', "\u{200B}")
82 0 : }
83 :
84 0 : pub fn startup_context_from_env() -> Option<opentelemetry::Context> {
85 : // Extract OpenTelemetry context for the startup actions from the
86 : // TRACEPARENT and TRACESTATE env variables, and attach it to the current
87 : // tracing context.
88 : //
89 : // This is used to propagate the context for the 'start_compute' operation
90 : // from the neon control plane. This allows linking together the wider
91 : // 'start_compute' operation that creates the compute container, with the
92 : // startup actions here within the container.
93 : //
94 : // There is no standard for passing context in env variables, but a lot of
95 : // tools use TRACEPARENT/TRACESTATE, so we use that convention too. See
96 : // https://github.com/open-telemetry/opentelemetry-specification/issues/740
97 : //
98 : // Switch to the startup context here, and exit it once the startup has
99 : // completed and Postgres is up and running.
100 : //
101 : // If this pod is pre-created without binding it to any particular endpoint
102 : // yet, this isn't the right place to enter the startup context. In that
103 : // case, the control plane should pass the tracing context as part of the
104 : // /configure API call.
105 : //
106 : // NOTE: This is supposed to only cover the *startup* actions. Once
107 : // postgres is configured and up-and-running, we exit this span. Any other
108 : // actions that are performed on incoming HTTP requests, for example, are
109 : // performed in separate spans.
110 : //
111 : // XXX: If the pod is restarted, we perform the startup actions in the same
112 : // context as the original startup actions, which probably doesn't make
113 : // sense.
114 0 : let mut startup_tracing_carrier: HashMap<String, String> = HashMap::new();
115 0 : if let Ok(val) = std::env::var("TRACEPARENT") {
116 0 : startup_tracing_carrier.insert("traceparent".to_string(), val);
117 0 : }
118 0 : if let Ok(val) = std::env::var("TRACESTATE") {
119 0 : startup_tracing_carrier.insert("tracestate".to_string(), val);
120 0 : }
121 0 : if !startup_tracing_carrier.is_empty() {
122 : use opentelemetry::propagation::TextMapPropagator;
123 : use opentelemetry_sdk::propagation::TraceContextPropagator;
124 0 : info!("got startup tracing context from env variables");
125 0 : Some(TraceContextPropagator::new().extract(&startup_tracing_carrier))
126 : } else {
127 0 : None
128 : }
129 0 : }
130 :
131 : /// Track relevant id's
132 : const UNKNOWN_IDS: &str = r#""pg_instance_id": "", "pg_compute_id": """#;
133 1 : static IDS: LazyLock<RwLock<String>> = LazyLock::new(|| RwLock::new(UNKNOWN_IDS.to_string()));
134 :
135 1 : pub fn update_ids(instance_id: &Option<String>, compute_id: &Option<String>) -> anyhow::Result<()> {
136 1 : let ids = format!(
137 1 : r#""pg_instance_id": "{}", "pg_compute_id": "{}""#,
138 1 : instance_id.as_ref().map(|s| s.as_str()).unwrap_or_default(),
139 1 : compute_id.as_ref().map(|s| s.as_str()).unwrap_or_default()
140 : );
141 1 : let mut guard = IDS
142 1 : .write()
143 1 : .map_err(|e| anyhow::anyhow!("Log set id's rwlock poisoned: {}", e))?;
144 1 : *guard = ids;
145 1 : Ok(())
146 1 : }
147 :
148 : /// Massage compute_ctl logs into PG json log shape so we can use the same Lumberjack setup.
149 : struct PgJsonLogShapeFormatter;
150 : impl<S, N> fmt::format::FormatEvent<S, N> for PgJsonLogShapeFormatter
151 : where
152 : S: Subscriber + for<'a> LookupSpan<'a>,
153 : N: for<'a> fmt::format::FormatFields<'a> + 'static,
154 : {
155 2 : fn format_event(
156 2 : &self,
157 2 : ctx: &fmt::FmtContext<'_, S, N>,
158 2 : mut writer: fmt::format::Writer<'_>,
159 2 : event: &tracing::Event<'_>,
160 2 : ) -> std::fmt::Result {
161 : // Format values from the event's metadata, and open message string
162 2 : let metadata = event.metadata();
163 : {
164 2 : let ids_guard = IDS.read();
165 2 : let ids = ids_guard
166 2 : .as_ref()
167 2 : .map(|guard| guard.as_str())
168 : // Surpress so that we don't lose all uploaded/ file logs if something goes super wrong. We would notice the missing id's.
169 2 : .unwrap_or(UNKNOWN_IDS);
170 2 : write!(
171 2 : &mut writer,
172 2 : r#"{{"timestamp": "{}", "error_severity": "{}", "file_name": "{}", "backend_type": "compute_ctl_self", {}, "message": "#,
173 2 : chrono::Utc::now().format("%Y-%m-%d %H:%M:%S%.3f GMT"),
174 2 : metadata.level(),
175 2 : metadata.target(),
176 : ids
177 0 : )?;
178 : }
179 :
180 2 : let mut message = String::new();
181 2 : let message_writer = fmt::format::Writer::new(&mut message);
182 :
183 : // Gather the message
184 2 : ctx.field_format().format_fields(message_writer, event)?;
185 :
186 : // TODO: any better options than to copy-paste this OSS span formatter?
187 : // impl<S, N, T> FormatEvent<S, N> for Format<Full, T>
188 : // https://docs.rs/tracing-subscriber/latest/tracing_subscriber/fmt/trait.FormatEvent.html#impl-FormatEvent%3CS,+N%3E-for-Format%3CFull,+T%3E
189 :
190 : // write message, close bracket, and new line
191 2 : writeln!(writer, "{}}}", serde_json::to_string(&message).unwrap())
192 2 : }
193 : }
194 :
195 : #[cfg(feature = "testing")]
196 : #[cfg(test)]
197 : mod test {
198 : use super::*;
199 : use std::{cell::RefCell, io};
200 :
201 : // Use thread_local! instead of Mutex for test isolation
202 : thread_local! {
203 : static WRITER_OUTPUT: RefCell<String> = const { RefCell::new(String::new()) };
204 : }
205 :
206 : #[derive(Clone, Default)]
207 : struct StaticStringWriter;
208 :
209 : impl io::Write for StaticStringWriter {
210 2 : fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
211 2 : let output = String::from_utf8(buf.to_vec()).expect("Invalid UTF-8 in test output");
212 2 : WRITER_OUTPUT.with(|s| s.borrow_mut().push_str(&output));
213 2 : Ok(buf.len())
214 2 : }
215 :
216 0 : fn flush(&mut self) -> io::Result<()> {
217 0 : Ok(())
218 0 : }
219 : }
220 :
221 : impl fmt::MakeWriter<'_> for StaticStringWriter {
222 : type Writer = Self;
223 :
224 2 : fn make_writer(&self) -> Self::Writer {
225 2 : Self
226 2 : }
227 : }
228 :
229 : #[test]
230 1 : fn test_log_pg_json_shape_formatter() {
231 : // Use a scoped subscriber to prevent global state pollution
232 1 : let subscriber = tracing_subscriber::registry().with(
233 1 : tracing_subscriber::fmt::layer()
234 1 : .with_ansi(false)
235 1 : .with_target(false)
236 1 : .event_format(PgJsonLogShapeFormatter)
237 1 : .with_writer(StaticStringWriter),
238 : );
239 :
240 1 : let _ = update_ids(&Some("000".to_string()), &Some("111".to_string()));
241 :
242 : // Clear any previous test state
243 1 : WRITER_OUTPUT.with(|s| s.borrow_mut().clear());
244 :
245 1 : let messages = [
246 1 : "test message",
247 1 : r#"json escape check: name="BatchSpanProcessor.Flush.ExportError" reason="Other(reqwest::Error { kind: Request, url: \"http://localhost:4318/v1/traces\", source: hyper_
248 1 : util::client::legacy::Error(Connect, ConnectError(\"tcp connect error\", Os { code: 111, kind: ConnectionRefused, message: \"Connection refused\" })) })" Failed during the export process"#,
249 1 : ];
250 :
251 1 : tracing::subscriber::with_default(subscriber, || {
252 3 : for message in messages {
253 2 : tracing::info!(message);
254 : }
255 1 : });
256 1 : tracing::info!("not test message");
257 :
258 : // Get captured output
259 1 : let output = WRITER_OUTPUT.with(|s| s.borrow().clone());
260 :
261 1 : let json_strings: Vec<&str> = output.lines().collect();
262 1 : assert_eq!(
263 1 : json_strings.len(),
264 1 : messages.len(),
265 0 : "Log didn't have the expected number of json strings."
266 : );
267 :
268 1 : let json_string_shape_regex = regex::Regex::new(
269 1 : r#"\{"timestamp": "\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\.\d{3} GMT", "error_severity": "INFO", "file_name": ".+", "backend_type": "compute_ctl_self", "pg_instance_id": "000", "pg_compute_id": "111", "message": ".+"\}"#
270 1 : ).unwrap();
271 :
272 2 : for (i, expected_message) in messages.iter().enumerate() {
273 2 : let json_string = json_strings[i];
274 2 : assert!(
275 2 : json_string_shape_regex.is_match(json_string),
276 0 : "Json log didn't match expected pattern:\n{json_string}",
277 : );
278 2 : let parsed_json: serde_json::Value = serde_json::from_str(json_string).unwrap();
279 2 : let actual_message = parsed_json["message"].as_str().unwrap();
280 2 : assert_eq!(*expected_message, actual_message);
281 : }
282 1 : }
283 : }
|