LCOV - code coverage report
Current view: top level - compute_tools/src - rsyslog.rs (source / functions) Coverage Total Hit
Test: c28d23d327d4ca6acc894004f1432d7b7eea829c.info Lines: 29.5 % 166 49
Test Date: 2025-03-21 14:50:36 Functions: 28.6 % 14 4

            Line data    Source code
       1              : use std::fs;
       2              : use std::io::ErrorKind;
       3              : use std::path::Path;
       4              : use std::process::Command;
       5              : use std::time::Duration;
       6              : use std::{fs::OpenOptions, io::Write};
       7              : 
       8              : use anyhow::{Context, Result, anyhow};
       9              : use tracing::{error, info, instrument, warn};
      10              : 
      11              : const POSTGRES_LOGS_CONF_PATH: &str = "/etc/rsyslog.d/postgres_logs.conf";
      12              : const AUDIT_LOGS_CONF_PATH: &str = "/etc/rsyslog.d/compute_audit_rsyslog.conf";
      13              : 
      14            0 : fn get_rsyslog_pid() -> Option<String> {
      15            0 :     let output = Command::new("pgrep")
      16            0 :         .arg("rsyslogd")
      17            0 :         .output()
      18            0 :         .expect("Failed to execute pgrep");
      19            0 : 
      20            0 :     if !output.stdout.is_empty() {
      21            0 :         let pid = std::str::from_utf8(&output.stdout)
      22            0 :             .expect("Invalid UTF-8 in process output")
      23            0 :             .trim()
      24            0 :             .to_string();
      25            0 :         Some(pid)
      26              :     } else {
      27            0 :         None
      28              :     }
      29            0 : }
      30              : 
      31              : // Restart rsyslogd to apply the new configuration.
      32              : // This is necessary, because there is no other way to reload the rsyslog configuration.
      33              : //
      34              : // Rsyslogd shouldn't lose any messages, because of the restart,
      35              : // because it tracks the last read position in the log files
      36              : // and will continue reading from that position.
      37              : // TODO: test it properly
      38              : //
      39            0 : fn restart_rsyslog() -> Result<()> {
      40            0 :     let old_pid = get_rsyslog_pid().context("rsyslogd is not running")?;
      41            0 :     info!("rsyslogd is running with pid: {}, restart it", old_pid);
      42              : 
      43              :     // kill it to restart
      44            0 :     let _ = Command::new("pkill")
      45            0 :         .arg("rsyslogd")
      46            0 :         .output()
      47            0 :         .context("Failed to stop rsyslogd")?;
      48              : 
      49            0 :     Ok(())
      50            0 : }
      51              : 
      52            0 : pub fn configure_audit_rsyslog(log_directory: &str, audit_log_level: &str) -> Result<()> {
      53            0 :     let remote_endpoint = std::env::var("AUDIT_LOGGING_ENDPOINT")?;
      54            0 :     if remote_endpoint.is_empty() {
      55            0 :         return Err(anyhow!("AUDIT_LOGGING_ENDPOINT is not set"));
      56            0 :     }
      57              : 
      58            0 :     let old_config_content = match std::fs::read_to_string(AUDIT_LOGS_CONF_PATH) {
      59            0 :         Ok(c) => c,
      60            0 :         Err(err) if err.kind() == ErrorKind::NotFound => String::new(),
      61            0 :         Err(err) => return Err(err.into()),
      62              :     };
      63              : 
      64            0 :     let config_content: String = format!(
      65            0 :         include_str!("config_template/compute_audit_rsyslog_template.conf"),
      66            0 :         log_directory = log_directory,
      67            0 :         tag = audit_log_level,
      68            0 :         remote_endpoint = remote_endpoint
      69            0 :     );
      70            0 : 
      71            0 :     if old_config_content == config_content {
      72            0 :         info!("rsyslog configuration is up-to-date");
      73            0 :         return Ok(());
      74            0 :     }
      75            0 : 
      76            0 :     info!("rsyslog config_content: {}", config_content);
      77              : 
      78            0 :     let mut file = OpenOptions::new()
      79            0 :         .create(true)
      80            0 :         .write(true)
      81            0 :         .truncate(true)
      82            0 :         .open(AUDIT_LOGS_CONF_PATH)?;
      83              : 
      84            0 :     file.write_all(config_content.as_bytes())?;
      85              : 
      86            0 :     info!(
      87            0 :         "rsyslog configuration file {} added successfully. Starting rsyslogd",
      88              :         AUDIT_LOGS_CONF_PATH
      89              :     );
      90              : 
      91              :     // start the service, using the configuration
      92            0 :     restart_rsyslog()?;
      93              : 
      94            0 :     Ok(())
      95            0 : }
      96              : 
      97              : /// Configuration for enabling Postgres logs forwarding from rsyslogd
      98              : pub struct PostgresLogsRsyslogConfig<'a> {
      99              :     pub host: Option<&'a str>,
     100              : }
     101              : 
     102              : impl<'a> PostgresLogsRsyslogConfig<'a> {
     103            4 :     pub fn new(host: Option<&'a str>) -> Self {
     104            4 :         Self { host }
     105            4 :     }
     106              : 
     107            4 :     pub fn build(&self) -> Result<String> {
     108            4 :         match self.host {
     109            3 :             Some(host) => {
     110            3 :                 if let Some((target, port)) = host.split_once(":") {
     111            2 :                     Ok(format!(
     112            2 :                         include_str!(
     113            2 :                             "config_template/compute_rsyslog_postgres_export_template.conf"
     114            2 :                         ),
     115            2 :                         logs_export_target = target,
     116            2 :                         logs_export_port = port,
     117            2 :                     ))
     118              :                 } else {
     119            1 :                     Err(anyhow!("Invalid host format for Postgres logs export"))
     120              :                 }
     121              :             }
     122            1 :             None => Ok("".to_string()),
     123              :         }
     124            4 :     }
     125              : 
     126            0 :     fn current_config() -> Result<String> {
     127            0 :         let config_content = match std::fs::read_to_string(POSTGRES_LOGS_CONF_PATH) {
     128            0 :             Ok(c) => c,
     129            0 :             Err(err) if err.kind() == ErrorKind::NotFound => String::new(),
     130            0 :             Err(err) => return Err(err.into()),
     131              :         };
     132            0 :         Ok(config_content)
     133            0 :     }
     134              : 
     135              :     /// Returns the default host for otel collector that receives Postgres logs
     136            1 :     pub fn default_host(project_id: &str) -> String {
     137            1 :         format!(
     138            1 :             "config-{}-collector.neon-telemetry.svc.cluster.local:10514",
     139            1 :             project_id
     140            1 :         )
     141            1 :     }
     142              : }
     143              : 
     144            0 : pub fn configure_postgres_logs_export(conf: PostgresLogsRsyslogConfig) -> Result<()> {
     145            0 :     let new_config = conf.build()?;
     146            0 :     let current_config = PostgresLogsRsyslogConfig::current_config()?;
     147              : 
     148            0 :     if new_config == current_config {
     149            0 :         info!("postgres logs rsyslog configuration is up-to-date");
     150            0 :         return Ok(());
     151            0 :     }
     152            0 : 
     153            0 :     // When new config is empty we can simply remove the configuration file.
     154            0 :     if new_config.is_empty() {
     155            0 :         info!("removing rsyslog config file: {}", POSTGRES_LOGS_CONF_PATH);
     156            0 :         match std::fs::remove_file(POSTGRES_LOGS_CONF_PATH) {
     157            0 :             Ok(_) => {}
     158            0 :             Err(err) if err.kind() == ErrorKind::NotFound => {}
     159            0 :             Err(err) => return Err(err.into()),
     160              :         }
     161            0 :         restart_rsyslog()?;
     162            0 :         return Ok(());
     163            0 :     }
     164            0 : 
     165            0 :     info!(
     166            0 :         "configuring rsyslog for postgres logs export to: {:?}",
     167              :         conf.host
     168              :     );
     169              : 
     170            0 :     let mut file = OpenOptions::new()
     171            0 :         .create(true)
     172            0 :         .write(true)
     173            0 :         .truncate(true)
     174            0 :         .open(POSTGRES_LOGS_CONF_PATH)?;
     175            0 :     file.write_all(new_config.as_bytes())?;
     176              : 
     177            0 :     info!(
     178            0 :         "rsyslog configuration file {} added successfully. Starting rsyslogd",
     179              :         POSTGRES_LOGS_CONF_PATH
     180              :     );
     181              : 
     182            0 :     restart_rsyslog()?;
     183            0 :     Ok(())
     184            0 : }
     185              : 
     186              : #[instrument(skip_all)]
     187              : async fn pgaudit_gc_main_loop(log_directory: String) -> Result<()> {
     188              :     info!("running pgaudit GC main loop");
     189              :     loop {
     190              :         // Check log_directory for old pgaudit logs and delete them.
     191              :         // New log files are checked every 5 minutes, as set in pgaudit.log_rotation_age
     192              :         // Find files that were not modified in the last 15 minutes and delete them.
     193              :         // This should be enough time for rsyslog to process the logs and for us to catch the alerts.
     194              :         //
     195              :         // In case of a very high load, we might need to adjust this value and pgaudit.log_rotation_age.
     196              :         //
     197              :         // TODO: add some smarter logic to delete the files that are fully streamed according to rsyslog
     198              :         // imfile-state files, but for now just do a simple GC to avoid filling up the disk.
     199              :         let _ = Command::new("find")
     200              :             .arg(&log_directory)
     201              :             .arg("-name")
     202              :             .arg("audit*.log")
     203              :             .arg("-mmin")
     204              :             .arg("+15")
     205              :             .arg("-delete")
     206              :             .output()?;
     207              : 
     208              :         // also collect the metric for the size of the log directory
     209            0 :         async fn get_log_files_size(path: &Path) -> Result<u64> {
     210            0 :             let mut total_size = 0;
     211              : 
     212            0 :             for entry in fs::read_dir(path)? {
     213            0 :                 let entry = entry?;
     214            0 :                 let entry_path = entry.path();
     215            0 : 
     216            0 :                 if entry_path.is_file() && entry_path.to_string_lossy().ends_with("log") {
     217            0 :                     total_size += entry.metadata()?.len();
     218            0 :                 }
     219              :             }
     220              : 
     221            0 :             Ok(total_size)
     222            0 :         }
     223              : 
     224              :         let log_directory_size = get_log_files_size(Path::new(&log_directory))
     225              :             .await
     226            0 :             .unwrap_or_else(|e| {
     227            0 :                 warn!("Failed to get log directory size: {}", e);
     228            0 :                 0
     229            0 :             });
     230              :         crate::metrics::AUDIT_LOG_DIR_SIZE.set(log_directory_size as f64);
     231              :         tokio::time::sleep(Duration::from_secs(60)).await;
     232              :     }
     233              : }
     234              : 
     235              : // launch pgaudit GC thread to clean up the old pgaudit logs stored in the log_directory
     236            0 : pub fn launch_pgaudit_gc(log_directory: String) {
     237            0 :     tokio::spawn(async move {
     238            0 :         if let Err(e) = pgaudit_gc_main_loop(log_directory).await {
     239            0 :             error!("pgaudit GC main loop failed: {}", e);
     240            0 :         }
     241            0 :     });
     242            0 : }
     243              : 
     244              : #[cfg(test)]
     245              : mod tests {
     246              :     use crate::rsyslog::PostgresLogsRsyslogConfig;
     247              : 
     248              :     #[test]
     249            1 :     fn test_postgres_logs_config() {
     250            1 :         {
     251            1 :             // Verify empty config
     252            1 :             let conf = PostgresLogsRsyslogConfig::new(None);
     253            1 :             let res = conf.build();
     254            1 :             assert!(res.is_ok());
     255            1 :             let conf_str = res.unwrap();
     256            1 :             assert_eq!(&conf_str, "");
     257              :         }
     258              : 
     259              :         {
     260              :             // Verify config
     261            1 :             let conf = PostgresLogsRsyslogConfig::new(Some("collector.cvc.local:514"));
     262            1 :             let res = conf.build();
     263            1 :             assert!(res.is_ok());
     264            1 :             let conf_str = res.unwrap();
     265            1 :             assert!(conf_str.contains("omfwd"));
     266            1 :             assert!(conf_str.contains(r#"target="collector.cvc.local""#));
     267            1 :             assert!(conf_str.contains(r#"port="514""#));
     268              :         }
     269              : 
     270              :         {
     271              :             // Verify invalid config
     272            1 :             let conf = PostgresLogsRsyslogConfig::new(Some("invalid"));
     273            1 :             let res = conf.build();
     274            1 :             assert!(res.is_err());
     275              :         }
     276              : 
     277              :         {
     278              :             // Verify config with default host
     279            1 :             let host = PostgresLogsRsyslogConfig::default_host("shy-breeze-123");
     280            1 :             let conf = PostgresLogsRsyslogConfig::new(Some(&host));
     281            1 :             let res = conf.build();
     282            1 :             assert!(res.is_ok());
     283            1 :             let conf_str = res.unwrap();
     284            1 :             assert!(conf_str.contains(r#"shy-breeze-123"#));
     285            1 :             assert!(conf_str.contains(r#"port="10514""#));
     286              :         }
     287            1 :     }
     288              : }
        

Generated by: LCOV version 2.1-beta