Line data Source code
1 : use std::fs;
2 : use std::io::ErrorKind;
3 : use std::path::Path;
4 : use std::process::Command;
5 : use std::time::Duration;
6 : use std::{fs::OpenOptions, io::Write};
7 : use url::{Host, Url};
8 :
9 : use anyhow::{Context, Result, anyhow};
10 : use hostname_validator;
11 : use tracing::{error, info, instrument, warn};
12 :
13 : const POSTGRES_LOGS_CONF_PATH: &str = "/etc/rsyslog.d/postgres_logs.conf";
14 :
15 0 : fn get_rsyslog_pid() -> Option<String> {
16 0 : let output = Command::new("pgrep")
17 0 : .arg("rsyslogd")
18 0 : .output()
19 0 : .expect("Failed to execute pgrep");
20 :
21 0 : if !output.stdout.is_empty() {
22 0 : let pid = std::str::from_utf8(&output.stdout)
23 0 : .expect("Invalid UTF-8 in process output")
24 0 : .trim()
25 0 : .to_string();
26 0 : Some(pid)
27 : } else {
28 0 : None
29 : }
30 0 : }
31 :
32 0 : fn wait_for_rsyslog_pid() -> Result<String, anyhow::Error> {
33 : const MAX_WAIT: Duration = Duration::from_secs(5);
34 : const INITIAL_SLEEP: Duration = Duration::from_millis(2);
35 :
36 0 : let mut sleep_duration = INITIAL_SLEEP;
37 0 : let start = std::time::Instant::now();
38 0 : let mut attempts = 1;
39 :
40 0 : for attempt in 1.. {
41 0 : attempts = attempt;
42 0 : match get_rsyslog_pid() {
43 0 : Some(pid) => return Ok(pid),
44 : None => {
45 0 : if start.elapsed() >= MAX_WAIT {
46 0 : break;
47 0 : }
48 0 : info!(
49 0 : "rsyslogd is not running, attempt {}. Sleeping for {} ms",
50 : attempt,
51 0 : sleep_duration.as_millis()
52 : );
53 0 : std::thread::sleep(sleep_duration);
54 0 : sleep_duration *= 2;
55 : }
56 : }
57 : }
58 :
59 0 : Err(anyhow::anyhow!(
60 0 : "rsyslogd is not running after waiting for {} seconds and {} attempts",
61 0 : attempts,
62 0 : start.elapsed().as_secs()
63 0 : ))
64 0 : }
65 :
66 : // Restart rsyslogd to apply the new configuration.
67 : // This is necessary, because there is no other way to reload the rsyslog configuration.
68 : //
69 : // Rsyslogd shouldn't lose any messages, because of the restart,
70 : // because it tracks the last read position in the log files
71 : // and will continue reading from that position.
72 : // TODO: test it properly
73 : //
74 0 : fn restart_rsyslog() -> Result<()> {
75 : // kill it to restart
76 0 : let _ = Command::new("pkill")
77 0 : .arg("rsyslogd")
78 0 : .output()
79 0 : .context("Failed to restart rsyslogd")?;
80 :
81 : // ensure rsyslogd is running
82 0 : wait_for_rsyslog_pid()?;
83 :
84 0 : Ok(())
85 0 : }
86 :
87 10 : fn parse_audit_syslog_address(
88 10 : remote_plain_endpoint: &str,
89 10 : remote_tls_endpoint: &str,
90 10 : ) -> Result<(String, u16, String)> {
91 : let tls;
92 10 : let remote_endpoint = if !remote_tls_endpoint.is_empty() {
93 2 : tls = "true".to_string();
94 2 : remote_tls_endpoint
95 : } else {
96 8 : tls = "false".to_string();
97 8 : remote_plain_endpoint
98 : };
99 : // Urlify the remote_endpoint, so parsing can be done with url::Url.
100 10 : let url_str = format!("http://{remote_endpoint}");
101 10 : let url = Url::parse(&url_str).map_err(|err| {
102 3 : anyhow!("Error parsing {remote_endpoint}, expected host:port, got {err:?}")
103 3 : })?;
104 :
105 7 : let is_valid = url.scheme() == "http"
106 7 : && url.path() == "/"
107 7 : && url.query().is_none()
108 7 : && url.fragment().is_none()
109 7 : && url.username() == ""
110 7 : && url.password().is_none();
111 :
112 7 : if !is_valid {
113 0 : return Err(anyhow!(
114 0 : "Invalid address format {remote_endpoint}, expected host:port"
115 0 : ));
116 7 : }
117 7 : let host = match url.host() {
118 5 : Some(Host::Domain(h)) if hostname_validator::is_valid(h) => h.to_string(),
119 1 : Some(Host::Ipv4(ip4)) => ip4.to_string(),
120 1 : Some(Host::Ipv6(ip6)) => ip6.to_string(),
121 1 : _ => return Err(anyhow!("Invalid host")),
122 : };
123 6 : let port = url
124 6 : .port()
125 6 : .ok_or_else(|| anyhow!("Invalid port in {remote_endpoint}"))?;
126 :
127 5 : Ok((host, port, tls))
128 10 : }
129 :
130 2 : fn generate_audit_rsyslog_config(
131 2 : log_directory: String,
132 2 : endpoint_id: &str,
133 2 : project_id: &str,
134 2 : remote_syslog_host: &str,
135 2 : remote_syslog_port: u16,
136 2 : remote_syslog_tls: &str,
137 2 : ) -> String {
138 2 : format!(
139 2 : include_str!("config_template/compute_audit_rsyslog_template.conf"),
140 : log_directory = log_directory,
141 : endpoint_id = endpoint_id,
142 : project_id = project_id,
143 : remote_syslog_host = remote_syslog_host,
144 : remote_syslog_port = remote_syslog_port,
145 : remote_syslog_tls = remote_syslog_tls
146 : )
147 2 : }
148 :
149 0 : pub fn configure_audit_rsyslog(
150 0 : log_directory: String,
151 0 : endpoint_id: &str,
152 0 : project_id: &str,
153 0 : remote_endpoint: &str,
154 0 : remote_tls_endpoint: &str,
155 0 : ) -> Result<()> {
156 0 : let (remote_syslog_host, remote_syslog_port, remote_syslog_tls) =
157 0 : parse_audit_syslog_address(remote_endpoint, remote_tls_endpoint).unwrap();
158 0 : let config_content = generate_audit_rsyslog_config(
159 0 : log_directory,
160 0 : endpoint_id,
161 0 : project_id,
162 0 : &remote_syslog_host,
163 0 : remote_syslog_port,
164 0 : &remote_syslog_tls,
165 : );
166 :
167 0 : info!("rsyslog config_content: {}", config_content);
168 :
169 0 : let rsyslog_conf_path = "/etc/rsyslog.d/compute_audit_rsyslog.conf";
170 0 : let mut file = OpenOptions::new()
171 0 : .create(true)
172 0 : .write(true)
173 0 : .truncate(true)
174 0 : .open(rsyslog_conf_path)?;
175 :
176 0 : file.write_all(config_content.as_bytes())?;
177 :
178 0 : info!(
179 0 : "rsyslog configuration file {} added successfully. Starting rsyslogd",
180 : rsyslog_conf_path
181 : );
182 :
183 : // start the service, using the configuration
184 0 : restart_rsyslog()?;
185 :
186 0 : Ok(())
187 0 : }
188 :
189 : /// Configuration for enabling Postgres logs forwarding from rsyslogd
190 : pub struct PostgresLogsRsyslogConfig<'a> {
191 : pub host: Option<&'a str>,
192 : }
193 :
194 : impl<'a> PostgresLogsRsyslogConfig<'a> {
195 3 : pub fn new(host: Option<&'a str>) -> Self {
196 3 : Self { host }
197 3 : }
198 :
199 3 : pub fn build(&self) -> Result<String> {
200 3 : match self.host {
201 2 : Some(host) => {
202 2 : if let Some((target, port)) = host.split_once(":") {
203 1 : Ok(format!(
204 1 : include_str!(
205 1 : "config_template/compute_rsyslog_postgres_export_template.conf"
206 1 : ),
207 1 : logs_export_target = target,
208 1 : logs_export_port = port,
209 1 : ))
210 : } else {
211 1 : Err(anyhow!("Invalid host format for Postgres logs export"))
212 : }
213 : }
214 1 : None => Ok("".to_string()),
215 : }
216 3 : }
217 :
218 0 : fn current_config() -> Result<String> {
219 0 : let config_content = match std::fs::read_to_string(POSTGRES_LOGS_CONF_PATH) {
220 0 : Ok(c) => c,
221 0 : Err(err) if err.kind() == ErrorKind::NotFound => String::new(),
222 0 : Err(err) => return Err(err.into()),
223 : };
224 0 : Ok(config_content)
225 0 : }
226 : }
227 :
228 : /// Writes rsyslogd configuration for Postgres logs export and restarts rsyslog.
229 0 : pub fn configure_postgres_logs_export(conf: PostgresLogsRsyslogConfig) -> Result<()> {
230 0 : let new_config = conf.build()?;
231 0 : let current_config = PostgresLogsRsyslogConfig::current_config()?;
232 :
233 0 : if new_config == current_config {
234 0 : info!("postgres logs rsyslog configuration is up-to-date");
235 0 : return Ok(());
236 0 : }
237 :
238 : // Nothing to configure
239 0 : if new_config.is_empty() {
240 : // When the configuration is removed, PostgreSQL will stop sending data
241 : // to the files watched by rsyslog, so restarting rsyslog is more effort
242 : // than just ignoring this change.
243 0 : return Ok(());
244 0 : }
245 :
246 0 : info!(
247 0 : "configuring rsyslog for postgres logs export to: {:?}",
248 : conf.host
249 : );
250 :
251 0 : let mut file = OpenOptions::new()
252 0 : .create(true)
253 0 : .write(true)
254 0 : .truncate(true)
255 0 : .open(POSTGRES_LOGS_CONF_PATH)?;
256 0 : file.write_all(new_config.as_bytes())?;
257 :
258 0 : info!(
259 0 : "rsyslog configuration file {} added successfully. Starting rsyslogd",
260 : POSTGRES_LOGS_CONF_PATH
261 : );
262 :
263 0 : restart_rsyslog()?;
264 0 : Ok(())
265 0 : }
266 :
267 : #[instrument(skip_all)]
268 : async fn pgaudit_gc_main_loop(log_directory: String) -> Result<()> {
269 : info!("running pgaudit GC main loop");
270 : loop {
271 : // Check log_directory for old pgaudit logs and delete them.
272 : // New log files are checked every 5 minutes, as set in pgaudit.log_rotation_age
273 : // Find files that were not modified in the last 15 minutes and delete them.
274 : // This should be enough time for rsyslog to process the logs and for us to catch the alerts.
275 : //
276 : // In case of a very high load, we might need to adjust this value and pgaudit.log_rotation_age.
277 : //
278 : // TODO: add some smarter logic to delete the files that are fully streamed according to rsyslog
279 : // imfile-state files, but for now just do a simple GC to avoid filling up the disk.
280 : let _ = Command::new("find")
281 : .arg(&log_directory)
282 : .arg("-name")
283 : .arg("audit*.log")
284 : .arg("-mmin")
285 : .arg("+15")
286 : .arg("-delete")
287 : .output()?;
288 :
289 : // also collect the metric for the size of the log directory
290 0 : async fn get_log_files_size(path: &Path) -> Result<u64> {
291 0 : let mut total_size = 0;
292 :
293 0 : for entry in fs::read_dir(path)? {
294 0 : let entry = entry?;
295 0 : let entry_path = entry.path();
296 :
297 0 : if entry_path.is_file() && entry_path.to_string_lossy().ends_with("log") {
298 0 : total_size += entry.metadata()?.len();
299 0 : }
300 : }
301 :
302 0 : Ok(total_size)
303 0 : }
304 :
305 : let log_directory_size = get_log_files_size(Path::new(&log_directory))
306 : .await
307 0 : .unwrap_or_else(|e| {
308 0 : warn!("Failed to get log directory size: {}", e);
309 0 : 0
310 0 : });
311 : crate::metrics::AUDIT_LOG_DIR_SIZE.set(log_directory_size as f64);
312 : tokio::time::sleep(Duration::from_secs(60)).await;
313 : }
314 : }
315 :
316 : // launch pgaudit GC thread to clean up the old pgaudit logs stored in the log_directory
317 0 : pub fn launch_pgaudit_gc(log_directory: String) {
318 0 : tokio::spawn(async move {
319 0 : if let Err(e) = pgaudit_gc_main_loop(log_directory).await {
320 0 : error!("pgaudit GC main loop failed: {}", e);
321 0 : }
322 0 : });
323 0 : }
324 :
325 : #[cfg(test)]
326 : mod tests {
327 : use crate::rsyslog::PostgresLogsRsyslogConfig;
328 :
329 : use super::{generate_audit_rsyslog_config, parse_audit_syslog_address};
330 :
331 : #[test]
332 1 : fn test_postgres_logs_config() {
333 : {
334 : // Verify empty config
335 1 : let conf = PostgresLogsRsyslogConfig::new(None);
336 1 : let res = conf.build();
337 1 : assert!(res.is_ok());
338 1 : let conf_str = res.unwrap();
339 1 : assert_eq!(&conf_str, "");
340 : }
341 :
342 : {
343 : // Verify config
344 1 : let conf = PostgresLogsRsyslogConfig::new(Some("collector.cvc.local:514"));
345 1 : let res = conf.build();
346 1 : assert!(res.is_ok());
347 1 : let conf_str = res.unwrap();
348 1 : assert!(conf_str.contains("omfwd"));
349 1 : assert!(conf_str.contains(r#"target="collector.cvc.local""#));
350 1 : assert!(conf_str.contains(r#"port="514""#));
351 : }
352 :
353 : {
354 : // Verify invalid config
355 1 : let conf = PostgresLogsRsyslogConfig::new(Some("invalid"));
356 1 : let res = conf.build();
357 1 : assert!(res.is_err());
358 : }
359 1 : }
360 :
361 : #[test]
362 1 : fn test_parse_audit_syslog_address() {
363 : {
364 : // host:port format (plaintext)
365 1 : let parsed = parse_audit_syslog_address("collector.host.tld:5555", "");
366 1 : assert!(parsed.is_ok());
367 1 : assert_eq!(
368 1 : parsed.unwrap(),
369 1 : (
370 1 : String::from("collector.host.tld"),
371 1 : 5555,
372 1 : String::from("false")
373 1 : )
374 : );
375 : }
376 :
377 : {
378 : // host:port format with ipv4 ip address (plaintext)
379 1 : let parsed = parse_audit_syslog_address("10.0.0.1:5555", "");
380 1 : assert!(parsed.is_ok());
381 1 : assert_eq!(
382 1 : parsed.unwrap(),
383 1 : (String::from("10.0.0.1"), 5555, String::from("false"))
384 : );
385 : }
386 :
387 : {
388 : // host:port format with ipv6 ip address (plaintext)
389 1 : let parsed =
390 1 : parse_audit_syslog_address("[7e60:82ed:cb2e:d617:f904:f395:aaca:e252]:5555", "");
391 1 : assert_eq!(
392 1 : parsed.unwrap(),
393 1 : (
394 1 : String::from("7e60:82ed:cb2e:d617:f904:f395:aaca:e252"),
395 1 : 5555,
396 1 : String::from("false")
397 1 : )
398 : );
399 : }
400 :
401 : {
402 : // Only TLS host:port defined
403 1 : let parsed = parse_audit_syslog_address("", "tls.host.tld:5556");
404 1 : assert_eq!(
405 1 : parsed.unwrap(),
406 1 : (String::from("tls.host.tld"), 5556, String::from("true"))
407 : );
408 : }
409 :
410 : {
411 : // tls host should take precedence, when both defined
412 1 : let parsed = parse_audit_syslog_address("plaintext.host.tld:5555", "tls.host.tld:5556");
413 1 : assert_eq!(
414 1 : parsed.unwrap(),
415 1 : (String::from("tls.host.tld"), 5556, String::from("true"))
416 : );
417 : }
418 :
419 : {
420 : // host without port (plaintext)
421 1 : let parsed = parse_audit_syslog_address("collector.host.tld", "");
422 1 : assert!(parsed.is_err());
423 : }
424 :
425 : {
426 : // port without host
427 1 : let parsed = parse_audit_syslog_address(":5555", "");
428 1 : assert!(parsed.is_err());
429 : }
430 :
431 : {
432 : // valid host with invalid port
433 1 : let parsed = parse_audit_syslog_address("collector.host.tld:90001", "");
434 1 : assert!(parsed.is_err());
435 : }
436 :
437 : {
438 : // invalid hostname with valid port
439 1 : let parsed = parse_audit_syslog_address("-collector.host.tld:5555", "");
440 1 : assert!(parsed.is_err());
441 : }
442 :
443 : {
444 : // parse error
445 1 : let parsed = parse_audit_syslog_address("collector.host.tld:::5555", "");
446 1 : assert!(parsed.is_err());
447 : }
448 1 : }
449 :
450 : #[test]
451 1 : fn test_generate_audit_rsyslog_config() {
452 : {
453 : // plaintext version
454 1 : let log_directory = "/tmp/log".to_string();
455 1 : let endpoint_id = "ep-test-endpoint-id";
456 1 : let project_id = "test-project-id";
457 1 : let remote_syslog_host = "collector.host.tld";
458 1 : let remote_syslog_port = 5555;
459 1 : let remote_syslog_tls = "false";
460 :
461 1 : let conf_str = generate_audit_rsyslog_config(
462 1 : log_directory,
463 1 : endpoint_id,
464 1 : project_id,
465 1 : remote_syslog_host,
466 1 : remote_syslog_port,
467 1 : remote_syslog_tls,
468 : );
469 :
470 1 : assert!(conf_str.contains(r#"set $.remote_syslog_tls = "false";"#));
471 1 : assert!(conf_str.contains(r#"type="omfwd""#));
472 1 : assert!(conf_str.contains(r#"target="collector.host.tld""#));
473 1 : assert!(conf_str.contains(r#"port="5555""#));
474 1 : assert!(conf_str.contains(r#"StreamDriverPermittedPeers="collector.host.tld""#));
475 : }
476 :
477 : {
478 : // TLS version
479 1 : let log_directory = "/tmp/log".to_string();
480 1 : let endpoint_id = "ep-test-endpoint-id";
481 1 : let project_id = "test-project-id";
482 1 : let remote_syslog_host = "collector.host.tld";
483 1 : let remote_syslog_port = 5556;
484 1 : let remote_syslog_tls = "true";
485 :
486 1 : let conf_str = generate_audit_rsyslog_config(
487 1 : log_directory,
488 1 : endpoint_id,
489 1 : project_id,
490 1 : remote_syslog_host,
491 1 : remote_syslog_port,
492 1 : remote_syslog_tls,
493 : );
494 :
495 1 : assert!(conf_str.contains(r#"set $.remote_syslog_tls = "true";"#));
496 1 : assert!(conf_str.contains(r#"type="omfwd""#));
497 1 : assert!(conf_str.contains(r#"target="collector.host.tld""#));
498 1 : assert!(conf_str.contains(r#"port="5556""#));
499 1 : assert!(conf_str.contains(r#"StreamDriverPermittedPeers="collector.host.tld""#));
500 : }
501 1 : }
502 : }
|