LCOV - code coverage report
Current view: top level - compute_tools/src - rsyslog.rs (source / functions) Coverage Total Hit
Test: 472031e0b71f3195f7f21b1f2b20de09fd07bb56.info Lines: 22.2 % 162 36
Test Date: 2025-05-26 10:37:33 Functions: 21.4 % 14 3

            Line data    Source code
       1              : use std::fs;
       2              : use std::io::ErrorKind;
       3              : use std::path::Path;
       4              : use std::process::Command;
       5              : use std::time::Duration;
       6              : use std::{fs::OpenOptions, io::Write};
       7              : 
       8              : use anyhow::{Context, Result, anyhow};
       9              : use tracing::{error, info, instrument, warn};
      10              : 
      11              : const POSTGRES_LOGS_CONF_PATH: &str = "/etc/rsyslog.d/postgres_logs.conf";
      12              : 
      13            0 : fn get_rsyslog_pid() -> Option<String> {
      14            0 :     let output = Command::new("pgrep")
      15            0 :         .arg("rsyslogd")
      16            0 :         .output()
      17            0 :         .expect("Failed to execute pgrep");
      18            0 : 
      19            0 :     if !output.stdout.is_empty() {
      20            0 :         let pid = std::str::from_utf8(&output.stdout)
      21            0 :             .expect("Invalid UTF-8 in process output")
      22            0 :             .trim()
      23            0 :             .to_string();
      24            0 :         Some(pid)
      25              :     } else {
      26            0 :         None
      27              :     }
      28            0 : }
      29              : 
      30            0 : fn wait_for_rsyslog_pid() -> Result<String, anyhow::Error> {
      31              :     const MAX_WAIT: Duration = Duration::from_secs(5);
      32              :     const INITIAL_SLEEP: Duration = Duration::from_millis(2);
      33              : 
      34            0 :     let mut sleep_duration = INITIAL_SLEEP;
      35            0 :     let start = std::time::Instant::now();
      36            0 :     let mut attempts = 1;
      37              : 
      38            0 :     for attempt in 1.. {
      39            0 :         attempts = attempt;
      40            0 :         match get_rsyslog_pid() {
      41            0 :             Some(pid) => return Ok(pid),
      42              :             None => {
      43            0 :                 if start.elapsed() >= MAX_WAIT {
      44            0 :                     break;
      45            0 :                 }
      46            0 :                 info!(
      47            0 :                     "rsyslogd is not running, attempt {}. Sleeping for {} ms",
      48            0 :                     attempt,
      49            0 :                     sleep_duration.as_millis()
      50              :                 );
      51            0 :                 std::thread::sleep(sleep_duration);
      52            0 :                 sleep_duration *= 2;
      53              :             }
      54              :         }
      55              :     }
      56              : 
      57            0 :     Err(anyhow::anyhow!(
      58            0 :         "rsyslogd is not running after waiting for {} seconds and {} attempts",
      59            0 :         attempts,
      60            0 :         start.elapsed().as_secs()
      61            0 :     ))
      62            0 : }
      63              : 
      64              : // Restart rsyslogd to apply the new configuration.
      65              : // This is necessary, because there is no other way to reload the rsyslog configuration.
      66              : //
      67              : // Rsyslogd shouldn't lose any messages, because of the restart,
      68              : // because it tracks the last read position in the log files
      69              : // and will continue reading from that position.
      70              : // TODO: test it properly
      71              : //
      72            0 : fn restart_rsyslog() -> Result<()> {
      73            0 :     // kill it to restart
      74            0 :     let _ = Command::new("pkill")
      75            0 :         .arg("rsyslogd")
      76            0 :         .output()
      77            0 :         .context("Failed to restart rsyslogd")?;
      78              : 
      79              :     // ensure rsyslogd is running
      80            0 :     wait_for_rsyslog_pid()?;
      81              : 
      82            0 :     Ok(())
      83            0 : }
      84              : 
      85            0 : pub fn configure_audit_rsyslog(
      86            0 :     log_directory: String,
      87            0 :     tag: Option<String>,
      88            0 :     remote_endpoint: &str,
      89            0 : ) -> Result<()> {
      90            0 :     let config_content: String = format!(
      91            0 :         include_str!("config_template/compute_audit_rsyslog_template.conf"),
      92            0 :         log_directory = log_directory,
      93            0 :         tag = tag.unwrap_or("".to_string()),
      94            0 :         remote_endpoint = remote_endpoint
      95            0 :     );
      96            0 : 
      97            0 :     info!("rsyslog config_content: {}", config_content);
      98              : 
      99            0 :     let rsyslog_conf_path = "/etc/rsyslog.d/compute_audit_rsyslog.conf";
     100            0 :     let mut file = OpenOptions::new()
     101            0 :         .create(true)
     102            0 :         .write(true)
     103            0 :         .truncate(true)
     104            0 :         .open(rsyslog_conf_path)?;
     105              : 
     106            0 :     file.write_all(config_content.as_bytes())?;
     107              : 
     108            0 :     info!(
     109            0 :         "rsyslog configuration file {} added successfully. Starting rsyslogd",
     110              :         rsyslog_conf_path
     111              :     );
     112              : 
     113              :     // start the service, using the configuration
     114            0 :     restart_rsyslog()?;
     115              : 
     116            0 :     Ok(())
     117            0 : }
     118              : 
     119              : /// Configuration for enabling Postgres logs forwarding from rsyslogd
     120              : pub struct PostgresLogsRsyslogConfig<'a> {
     121              :     pub host: Option<&'a str>,
     122              : }
     123              : 
     124              : impl<'a> PostgresLogsRsyslogConfig<'a> {
     125            3 :     pub fn new(host: Option<&'a str>) -> Self {
     126            3 :         Self { host }
     127            3 :     }
     128              : 
     129            3 :     pub fn build(&self) -> Result<String> {
     130            3 :         match self.host {
     131            2 :             Some(host) => {
     132            2 :                 if let Some((target, port)) = host.split_once(":") {
     133            1 :                     Ok(format!(
     134            1 :                         include_str!(
     135            1 :                             "config_template/compute_rsyslog_postgres_export_template.conf"
     136            1 :                         ),
     137            1 :                         logs_export_target = target,
     138            1 :                         logs_export_port = port,
     139            1 :                     ))
     140              :                 } else {
     141            1 :                     Err(anyhow!("Invalid host format for Postgres logs export"))
     142              :                 }
     143              :             }
     144            1 :             None => Ok("".to_string()),
     145              :         }
     146            3 :     }
     147              : 
     148            0 :     fn current_config() -> Result<String> {
     149            0 :         let config_content = match std::fs::read_to_string(POSTGRES_LOGS_CONF_PATH) {
     150            0 :             Ok(c) => c,
     151            0 :             Err(err) if err.kind() == ErrorKind::NotFound => String::new(),
     152            0 :             Err(err) => return Err(err.into()),
     153              :         };
     154            0 :         Ok(config_content)
     155            0 :     }
     156              : }
     157              : 
     158              : /// Writes rsyslogd configuration for Postgres logs export and restarts rsyslog.
     159            0 : pub fn configure_postgres_logs_export(conf: PostgresLogsRsyslogConfig) -> Result<()> {
     160            0 :     let new_config = conf.build()?;
     161            0 :     let current_config = PostgresLogsRsyslogConfig::current_config()?;
     162              : 
     163            0 :     if new_config == current_config {
     164            0 :         info!("postgres logs rsyslog configuration is up-to-date");
     165            0 :         return Ok(());
     166            0 :     }
     167            0 : 
     168            0 :     // Nothing to configure
     169            0 :     if new_config.is_empty() {
     170              :         // When the configuration is removed, PostgreSQL will stop sending data
     171              :         // to the files watched by rsyslog, so restarting rsyslog is more effort
     172              :         // than just ignoring this change.
     173            0 :         return Ok(());
     174            0 :     }
     175            0 : 
     176            0 :     info!(
     177            0 :         "configuring rsyslog for postgres logs export to: {:?}",
     178              :         conf.host
     179              :     );
     180              : 
     181            0 :     let mut file = OpenOptions::new()
     182            0 :         .create(true)
     183            0 :         .write(true)
     184            0 :         .truncate(true)
     185            0 :         .open(POSTGRES_LOGS_CONF_PATH)?;
     186            0 :     file.write_all(new_config.as_bytes())?;
     187              : 
     188            0 :     info!(
     189            0 :         "rsyslog configuration file {} added successfully. Starting rsyslogd",
     190              :         POSTGRES_LOGS_CONF_PATH
     191              :     );
     192              : 
     193            0 :     restart_rsyslog()?;
     194            0 :     Ok(())
     195            0 : }
     196              : 
     197              : #[instrument(skip_all)]
     198              : async fn pgaudit_gc_main_loop(log_directory: String) -> Result<()> {
     199              :     info!("running pgaudit GC main loop");
     200              :     loop {
     201              :         // Check log_directory for old pgaudit logs and delete them.
     202              :         // New log files are checked every 5 minutes, as set in pgaudit.log_rotation_age
     203              :         // Find files that were not modified in the last 15 minutes and delete them.
     204              :         // This should be enough time for rsyslog to process the logs and for us to catch the alerts.
     205              :         //
     206              :         // In case of a very high load, we might need to adjust this value and pgaudit.log_rotation_age.
     207              :         //
     208              :         // TODO: add some smarter logic to delete the files that are fully streamed according to rsyslog
     209              :         // imfile-state files, but for now just do a simple GC to avoid filling up the disk.
     210              :         let _ = Command::new("find")
     211              :             .arg(&log_directory)
     212              :             .arg("-name")
     213              :             .arg("audit*.log")
     214              :             .arg("-mmin")
     215              :             .arg("+15")
     216              :             .arg("-delete")
     217              :             .output()?;
     218              : 
     219              :         // also collect the metric for the size of the log directory
     220            0 :         async fn get_log_files_size(path: &Path) -> Result<u64> {
     221            0 :             let mut total_size = 0;
     222              : 
     223            0 :             for entry in fs::read_dir(path)? {
     224            0 :                 let entry = entry?;
     225            0 :                 let entry_path = entry.path();
     226            0 : 
     227            0 :                 if entry_path.is_file() && entry_path.to_string_lossy().ends_with("log") {
     228            0 :                     total_size += entry.metadata()?.len();
     229            0 :                 }
     230              :             }
     231              : 
     232            0 :             Ok(total_size)
     233            0 :         }
     234              : 
     235              :         let log_directory_size = get_log_files_size(Path::new(&log_directory))
     236              :             .await
     237            0 :             .unwrap_or_else(|e| {
     238            0 :                 warn!("Failed to get log directory size: {}", e);
     239            0 :                 0
     240            0 :             });
     241              :         crate::metrics::AUDIT_LOG_DIR_SIZE.set(log_directory_size as f64);
     242              :         tokio::time::sleep(Duration::from_secs(60)).await;
     243              :     }
     244              : }
     245              : 
     246              : // launch pgaudit GC thread to clean up the old pgaudit logs stored in the log_directory
     247            0 : pub fn launch_pgaudit_gc(log_directory: String) {
     248            0 :     tokio::spawn(async move {
     249            0 :         if let Err(e) = pgaudit_gc_main_loop(log_directory).await {
     250            0 :             error!("pgaudit GC main loop failed: {}", e);
     251            0 :         }
     252            0 :     });
     253            0 : }
     254              : 
     255              : #[cfg(test)]
     256              : mod tests {
     257              :     use crate::rsyslog::PostgresLogsRsyslogConfig;
     258              : 
     259              :     #[test]
     260            1 :     fn test_postgres_logs_config() {
     261            1 :         {
     262            1 :             // Verify empty config
     263            1 :             let conf = PostgresLogsRsyslogConfig::new(None);
     264            1 :             let res = conf.build();
     265            1 :             assert!(res.is_ok());
     266            1 :             let conf_str = res.unwrap();
     267            1 :             assert_eq!(&conf_str, "");
     268              :         }
     269              : 
     270              :         {
     271              :             // Verify config
     272            1 :             let conf = PostgresLogsRsyslogConfig::new(Some("collector.cvc.local:514"));
     273            1 :             let res = conf.build();
     274            1 :             assert!(res.is_ok());
     275            1 :             let conf_str = res.unwrap();
     276            1 :             assert!(conf_str.contains("omfwd"));
     277            1 :             assert!(conf_str.contains(r#"target="collector.cvc.local""#));
     278            1 :             assert!(conf_str.contains(r#"port="514""#));
     279              :         }
     280              : 
     281              :         {
     282              :             // Verify invalid config
     283            1 :             let conf = PostgresLogsRsyslogConfig::new(Some("invalid"));
     284            1 :             let res = conf.build();
     285            1 :             assert!(res.is_err());
     286              :         }
     287            1 :     }
     288              : }
        

Generated by: LCOV version 2.1-beta