LCOV - code coverage report
Current view: top level - compute_tools/src - rsyslog.rs (source / functions) Coverage Total Hit
Test: 1e20c4f2b28aa592527961bb32170ebbd2c9172f.info Lines: 55.5 % 283 157
Test Date: 2025-07-16 12:29:03 Functions: 45.0 % 20 9

            Line data    Source code
       1              : use std::fs;
       2              : use std::io::ErrorKind;
       3              : use std::path::Path;
       4              : use std::process::Command;
       5              : use std::time::Duration;
       6              : use std::{fs::OpenOptions, io::Write};
       7              : use url::{Host, Url};
       8              : 
       9              : use anyhow::{Context, Result, anyhow};
      10              : use hostname_validator;
      11              : use tracing::{error, info, instrument, warn};
      12              : 
      13              : const POSTGRES_LOGS_CONF_PATH: &str = "/etc/rsyslog.d/postgres_logs.conf";
      14              : 
      15            0 : fn get_rsyslog_pid() -> Option<String> {
      16            0 :     let output = Command::new("pgrep")
      17            0 :         .arg("rsyslogd")
      18            0 :         .output()
      19            0 :         .expect("Failed to execute pgrep");
      20              : 
      21            0 :     if !output.stdout.is_empty() {
      22            0 :         let pid = std::str::from_utf8(&output.stdout)
      23            0 :             .expect("Invalid UTF-8 in process output")
      24            0 :             .trim()
      25            0 :             .to_string();
      26            0 :         Some(pid)
      27              :     } else {
      28            0 :         None
      29              :     }
      30            0 : }
      31              : 
      32            0 : fn wait_for_rsyslog_pid() -> Result<String, anyhow::Error> {
      33              :     const MAX_WAIT: Duration = Duration::from_secs(5);
      34              :     const INITIAL_SLEEP: Duration = Duration::from_millis(2);
      35              : 
      36            0 :     let mut sleep_duration = INITIAL_SLEEP;
      37            0 :     let start = std::time::Instant::now();
      38            0 :     let mut attempts = 1;
      39              : 
      40            0 :     for attempt in 1.. {
      41            0 :         attempts = attempt;
      42            0 :         match get_rsyslog_pid() {
      43            0 :             Some(pid) => return Ok(pid),
      44              :             None => {
      45            0 :                 if start.elapsed() >= MAX_WAIT {
      46            0 :                     break;
      47            0 :                 }
      48            0 :                 info!(
      49            0 :                     "rsyslogd is not running, attempt {}. Sleeping for {} ms",
      50              :                     attempt,
      51            0 :                     sleep_duration.as_millis()
      52              :                 );
      53            0 :                 std::thread::sleep(sleep_duration);
      54            0 :                 sleep_duration *= 2;
      55              :             }
      56              :         }
      57              :     }
      58              : 
      59            0 :     Err(anyhow::anyhow!(
      60            0 :         "rsyslogd is not running after waiting for {} seconds and {} attempts",
      61            0 :         attempts,
      62            0 :         start.elapsed().as_secs()
      63            0 :     ))
      64            0 : }
      65              : 
      66              : // Restart rsyslogd to apply the new configuration.
      67              : // This is necessary, because there is no other way to reload the rsyslog configuration.
      68              : //
      69              : // Rsyslogd shouldn't lose any messages, because of the restart,
      70              : // because it tracks the last read position in the log files
      71              : // and will continue reading from that position.
      72              : // TODO: test it properly
      73              : //
      74            0 : fn restart_rsyslog() -> Result<()> {
      75              :     // kill it to restart
      76            0 :     let _ = Command::new("pkill")
      77            0 :         .arg("rsyslogd")
      78            0 :         .output()
      79            0 :         .context("Failed to restart rsyslogd")?;
      80              : 
      81              :     // ensure rsyslogd is running
      82            0 :     wait_for_rsyslog_pid()?;
      83              : 
      84            0 :     Ok(())
      85            0 : }
      86              : 
      87           10 : fn parse_audit_syslog_address(
      88           10 :     remote_plain_endpoint: &str,
      89           10 :     remote_tls_endpoint: &str,
      90           10 : ) -> Result<(String, u16, String)> {
      91              :     let tls;
      92           10 :     let remote_endpoint = if !remote_tls_endpoint.is_empty() {
      93            2 :         tls = "true".to_string();
      94            2 :         remote_tls_endpoint
      95              :     } else {
      96            8 :         tls = "false".to_string();
      97            8 :         remote_plain_endpoint
      98              :     };
      99              :     // Urlify the remote_endpoint, so parsing can be done with url::Url.
     100           10 :     let url_str = format!("http://{remote_endpoint}");
     101           10 :     let url = Url::parse(&url_str).map_err(|err| {
     102            3 :         anyhow!("Error parsing {remote_endpoint}, expected host:port, got {err:?}")
     103            3 :     })?;
     104              : 
     105            7 :     let is_valid = url.scheme() == "http"
     106            7 :         && url.path() == "/"
     107            7 :         && url.query().is_none()
     108            7 :         && url.fragment().is_none()
     109            7 :         && url.username() == ""
     110            7 :         && url.password().is_none();
     111              : 
     112            7 :     if !is_valid {
     113            0 :         return Err(anyhow!(
     114            0 :             "Invalid address format {remote_endpoint}, expected host:port"
     115            0 :         ));
     116            7 :     }
     117            7 :     let host = match url.host() {
     118            5 :         Some(Host::Domain(h)) if hostname_validator::is_valid(h) => h.to_string(),
     119            1 :         Some(Host::Ipv4(ip4)) => ip4.to_string(),
     120            1 :         Some(Host::Ipv6(ip6)) => ip6.to_string(),
     121            1 :         _ => return Err(anyhow!("Invalid host")),
     122              :     };
     123            6 :     let port = url
     124            6 :         .port()
     125            6 :         .ok_or_else(|| anyhow!("Invalid port in {remote_endpoint}"))?;
     126              : 
     127            5 :     Ok((host, port, tls))
     128           10 : }
     129              : 
     130            2 : fn generate_audit_rsyslog_config(
     131            2 :     log_directory: String,
     132            2 :     endpoint_id: &str,
     133            2 :     project_id: &str,
     134            2 :     remote_syslog_host: &str,
     135            2 :     remote_syslog_port: u16,
     136            2 :     remote_syslog_tls: &str,
     137            2 : ) -> String {
     138            2 :     format!(
     139            2 :         include_str!("config_template/compute_audit_rsyslog_template.conf"),
     140              :         log_directory = log_directory,
     141              :         endpoint_id = endpoint_id,
     142              :         project_id = project_id,
     143              :         remote_syslog_host = remote_syslog_host,
     144              :         remote_syslog_port = remote_syslog_port,
     145              :         remote_syslog_tls = remote_syslog_tls
     146              :     )
     147            2 : }
     148              : 
     149            0 : pub fn configure_audit_rsyslog(
     150            0 :     log_directory: String,
     151            0 :     endpoint_id: &str,
     152            0 :     project_id: &str,
     153            0 :     remote_endpoint: &str,
     154            0 :     remote_tls_endpoint: &str,
     155            0 : ) -> Result<()> {
     156            0 :     let (remote_syslog_host, remote_syslog_port, remote_syslog_tls) =
     157            0 :         parse_audit_syslog_address(remote_endpoint, remote_tls_endpoint).unwrap();
     158            0 :     let config_content = generate_audit_rsyslog_config(
     159            0 :         log_directory,
     160            0 :         endpoint_id,
     161            0 :         project_id,
     162            0 :         &remote_syslog_host,
     163            0 :         remote_syslog_port,
     164            0 :         &remote_syslog_tls,
     165              :     );
     166              : 
     167            0 :     info!("rsyslog config_content: {}", config_content);
     168              : 
     169            0 :     let rsyslog_conf_path = "/etc/rsyslog.d/compute_audit_rsyslog.conf";
     170            0 :     let mut file = OpenOptions::new()
     171            0 :         .create(true)
     172            0 :         .write(true)
     173            0 :         .truncate(true)
     174            0 :         .open(rsyslog_conf_path)?;
     175              : 
     176            0 :     file.write_all(config_content.as_bytes())?;
     177              : 
     178            0 :     info!(
     179            0 :         "rsyslog configuration file {} added successfully. Starting rsyslogd",
     180              :         rsyslog_conf_path
     181              :     );
     182              : 
     183              :     // start the service, using the configuration
     184            0 :     restart_rsyslog()?;
     185              : 
     186            0 :     Ok(())
     187            0 : }
     188              : 
     189              : /// Configuration for enabling Postgres logs forwarding from rsyslogd
     190              : pub struct PostgresLogsRsyslogConfig<'a> {
     191              :     pub host: Option<&'a str>,
     192              : }
     193              : 
     194              : impl<'a> PostgresLogsRsyslogConfig<'a> {
     195            3 :     pub fn new(host: Option<&'a str>) -> Self {
     196            3 :         Self { host }
     197            3 :     }
     198              : 
     199            3 :     pub fn build(&self) -> Result<String> {
     200            3 :         match self.host {
     201            2 :             Some(host) => {
     202            2 :                 if let Some((target, port)) = host.split_once(":") {
     203            1 :                     Ok(format!(
     204            1 :                         include_str!(
     205            1 :                             "config_template/compute_rsyslog_postgres_export_template.conf"
     206            1 :                         ),
     207            1 :                         logs_export_target = target,
     208            1 :                         logs_export_port = port,
     209            1 :                     ))
     210              :                 } else {
     211            1 :                     Err(anyhow!("Invalid host format for Postgres logs export"))
     212              :                 }
     213              :             }
     214            1 :             None => Ok("".to_string()),
     215              :         }
     216            3 :     }
     217              : 
     218            0 :     fn current_config() -> Result<String> {
     219            0 :         let config_content = match std::fs::read_to_string(POSTGRES_LOGS_CONF_PATH) {
     220            0 :             Ok(c) => c,
     221            0 :             Err(err) if err.kind() == ErrorKind::NotFound => String::new(),
     222            0 :             Err(err) => return Err(err.into()),
     223              :         };
     224            0 :         Ok(config_content)
     225            0 :     }
     226              : }
     227              : 
     228              : /// Writes rsyslogd configuration for Postgres logs export and restarts rsyslog.
     229            0 : pub fn configure_postgres_logs_export(conf: PostgresLogsRsyslogConfig) -> Result<()> {
     230            0 :     let new_config = conf.build()?;
     231            0 :     let current_config = PostgresLogsRsyslogConfig::current_config()?;
     232              : 
     233            0 :     if new_config == current_config {
     234            0 :         info!("postgres logs rsyslog configuration is up-to-date");
     235            0 :         return Ok(());
     236            0 :     }
     237              : 
     238              :     // Nothing to configure
     239            0 :     if new_config.is_empty() {
     240              :         // When the configuration is removed, PostgreSQL will stop sending data
     241              :         // to the files watched by rsyslog, so restarting rsyslog is more effort
     242              :         // than just ignoring this change.
     243            0 :         return Ok(());
     244            0 :     }
     245              : 
     246            0 :     info!(
     247            0 :         "configuring rsyslog for postgres logs export to: {:?}",
     248              :         conf.host
     249              :     );
     250              : 
     251            0 :     let mut file = OpenOptions::new()
     252            0 :         .create(true)
     253            0 :         .write(true)
     254            0 :         .truncate(true)
     255            0 :         .open(POSTGRES_LOGS_CONF_PATH)?;
     256            0 :     file.write_all(new_config.as_bytes())?;
     257              : 
     258            0 :     info!(
     259            0 :         "rsyslog configuration file {} added successfully. Starting rsyslogd",
     260              :         POSTGRES_LOGS_CONF_PATH
     261              :     );
     262              : 
     263            0 :     restart_rsyslog()?;
     264            0 :     Ok(())
     265            0 : }
     266              : 
     267              : #[instrument(skip_all)]
     268              : async fn pgaudit_gc_main_loop(log_directory: String) -> Result<()> {
     269              :     info!("running pgaudit GC main loop");
     270              :     loop {
     271              :         // Check log_directory for old pgaudit logs and delete them.
     272              :         // New log files are checked every 5 minutes, as set in pgaudit.log_rotation_age
     273              :         // Find files that were not modified in the last 15 minutes and delete them.
     274              :         // This should be enough time for rsyslog to process the logs and for us to catch the alerts.
     275              :         //
     276              :         // In case of a very high load, we might need to adjust this value and pgaudit.log_rotation_age.
     277              :         //
     278              :         // TODO: add some smarter logic to delete the files that are fully streamed according to rsyslog
     279              :         // imfile-state files, but for now just do a simple GC to avoid filling up the disk.
     280              :         let _ = Command::new("find")
     281              :             .arg(&log_directory)
     282              :             .arg("-name")
     283              :             .arg("audit*.log")
     284              :             .arg("-mmin")
     285              :             .arg("+15")
     286              :             .arg("-delete")
     287              :             .output()?;
     288              : 
     289              :         // also collect the metric for the size of the log directory
     290            0 :         async fn get_log_files_size(path: &Path) -> Result<u64> {
     291            0 :             let mut total_size = 0;
     292              : 
     293            0 :             for entry in fs::read_dir(path)? {
     294            0 :                 let entry = entry?;
     295            0 :                 let entry_path = entry.path();
     296              : 
     297            0 :                 if entry_path.is_file() && entry_path.to_string_lossy().ends_with("log") {
     298            0 :                     total_size += entry.metadata()?.len();
     299            0 :                 }
     300              :             }
     301              : 
     302            0 :             Ok(total_size)
     303            0 :         }
     304              : 
     305              :         let log_directory_size = get_log_files_size(Path::new(&log_directory))
     306              :             .await
     307            0 :             .unwrap_or_else(|e| {
     308            0 :                 warn!("Failed to get log directory size: {}", e);
     309            0 :                 0
     310            0 :             });
     311              :         crate::metrics::AUDIT_LOG_DIR_SIZE.set(log_directory_size as f64);
     312              :         tokio::time::sleep(Duration::from_secs(60)).await;
     313              :     }
     314              : }
     315              : 
     316              : // launch pgaudit GC thread to clean up the old pgaudit logs stored in the log_directory
     317            0 : pub fn launch_pgaudit_gc(log_directory: String) {
     318            0 :     tokio::spawn(async move {
     319            0 :         if let Err(e) = pgaudit_gc_main_loop(log_directory).await {
     320            0 :             error!("pgaudit GC main loop failed: {}", e);
     321            0 :         }
     322            0 :     });
     323            0 : }
     324              : 
     325              : #[cfg(test)]
     326              : mod tests {
     327              :     use crate::rsyslog::PostgresLogsRsyslogConfig;
     328              : 
     329              :     use super::{generate_audit_rsyslog_config, parse_audit_syslog_address};
     330              : 
     331              :     #[test]
     332            1 :     fn test_postgres_logs_config() {
     333              :         {
     334              :             // Verify empty config
     335            1 :             let conf = PostgresLogsRsyslogConfig::new(None);
     336            1 :             let res = conf.build();
     337            1 :             assert!(res.is_ok());
     338            1 :             let conf_str = res.unwrap();
     339            1 :             assert_eq!(&conf_str, "");
     340              :         }
     341              : 
     342              :         {
     343              :             // Verify config
     344            1 :             let conf = PostgresLogsRsyslogConfig::new(Some("collector.cvc.local:514"));
     345            1 :             let res = conf.build();
     346            1 :             assert!(res.is_ok());
     347            1 :             let conf_str = res.unwrap();
     348            1 :             assert!(conf_str.contains("omfwd"));
     349            1 :             assert!(conf_str.contains(r#"target="collector.cvc.local""#));
     350            1 :             assert!(conf_str.contains(r#"port="514""#));
     351              :         }
     352              : 
     353              :         {
     354              :             // Verify invalid config
     355            1 :             let conf = PostgresLogsRsyslogConfig::new(Some("invalid"));
     356            1 :             let res = conf.build();
     357            1 :             assert!(res.is_err());
     358              :         }
     359            1 :     }
     360              : 
     361              :     #[test]
     362            1 :     fn test_parse_audit_syslog_address() {
     363              :         {
     364              :             // host:port format (plaintext)
     365            1 :             let parsed = parse_audit_syslog_address("collector.host.tld:5555", "");
     366            1 :             assert!(parsed.is_ok());
     367            1 :             assert_eq!(
     368            1 :                 parsed.unwrap(),
     369            1 :                 (
     370            1 :                     String::from("collector.host.tld"),
     371            1 :                     5555,
     372            1 :                     String::from("false")
     373            1 :                 )
     374              :             );
     375              :         }
     376              : 
     377              :         {
     378              :             // host:port format with ipv4 ip address (plaintext)
     379            1 :             let parsed = parse_audit_syslog_address("10.0.0.1:5555", "");
     380            1 :             assert!(parsed.is_ok());
     381            1 :             assert_eq!(
     382            1 :                 parsed.unwrap(),
     383            1 :                 (String::from("10.0.0.1"), 5555, String::from("false"))
     384              :             );
     385              :         }
     386              : 
     387              :         {
     388              :             // host:port format with ipv6 ip address (plaintext)
     389            1 :             let parsed =
     390            1 :                 parse_audit_syslog_address("[7e60:82ed:cb2e:d617:f904:f395:aaca:e252]:5555", "");
     391            1 :             assert_eq!(
     392            1 :                 parsed.unwrap(),
     393            1 :                 (
     394            1 :                     String::from("7e60:82ed:cb2e:d617:f904:f395:aaca:e252"),
     395            1 :                     5555,
     396            1 :                     String::from("false")
     397            1 :                 )
     398              :             );
     399              :         }
     400              : 
     401              :         {
     402              :             // Only TLS host:port defined
     403            1 :             let parsed = parse_audit_syslog_address("", "tls.host.tld:5556");
     404            1 :             assert_eq!(
     405            1 :                 parsed.unwrap(),
     406            1 :                 (String::from("tls.host.tld"), 5556, String::from("true"))
     407              :             );
     408              :         }
     409              : 
     410              :         {
     411              :             // tls host should take precedence, when both defined
     412            1 :             let parsed = parse_audit_syslog_address("plaintext.host.tld:5555", "tls.host.tld:5556");
     413            1 :             assert_eq!(
     414            1 :                 parsed.unwrap(),
     415            1 :                 (String::from("tls.host.tld"), 5556, String::from("true"))
     416              :             );
     417              :         }
     418              : 
     419              :         {
     420              :             // host without port (plaintext)
     421            1 :             let parsed = parse_audit_syslog_address("collector.host.tld", "");
     422            1 :             assert!(parsed.is_err());
     423              :         }
     424              : 
     425              :         {
     426              :             // port without host
     427            1 :             let parsed = parse_audit_syslog_address(":5555", "");
     428            1 :             assert!(parsed.is_err());
     429              :         }
     430              : 
     431              :         {
     432              :             // valid host with invalid port
     433            1 :             let parsed = parse_audit_syslog_address("collector.host.tld:90001", "");
     434            1 :             assert!(parsed.is_err());
     435              :         }
     436              : 
     437              :         {
     438              :             // invalid hostname with valid port
     439            1 :             let parsed = parse_audit_syslog_address("-collector.host.tld:5555", "");
     440            1 :             assert!(parsed.is_err());
     441              :         }
     442              : 
     443              :         {
     444              :             // parse error
     445            1 :             let parsed = parse_audit_syslog_address("collector.host.tld:::5555", "");
     446            1 :             assert!(parsed.is_err());
     447              :         }
     448            1 :     }
     449              : 
     450              :     #[test]
     451            1 :     fn test_generate_audit_rsyslog_config() {
     452              :         {
     453              :             // plaintext version
     454            1 :             let log_directory = "/tmp/log".to_string();
     455            1 :             let endpoint_id = "ep-test-endpoint-id";
     456            1 :             let project_id = "test-project-id";
     457            1 :             let remote_syslog_host = "collector.host.tld";
     458            1 :             let remote_syslog_port = 5555;
     459            1 :             let remote_syslog_tls = "false";
     460              : 
     461            1 :             let conf_str = generate_audit_rsyslog_config(
     462            1 :                 log_directory,
     463            1 :                 endpoint_id,
     464            1 :                 project_id,
     465            1 :                 remote_syslog_host,
     466            1 :                 remote_syslog_port,
     467            1 :                 remote_syslog_tls,
     468              :             );
     469              : 
     470            1 :             assert!(conf_str.contains(r#"set $.remote_syslog_tls = "false";"#));
     471            1 :             assert!(conf_str.contains(r#"type="omfwd""#));
     472            1 :             assert!(conf_str.contains(r#"target="collector.host.tld""#));
     473            1 :             assert!(conf_str.contains(r#"port="5555""#));
     474            1 :             assert!(conf_str.contains(r#"StreamDriverPermittedPeers="collector.host.tld""#));
     475              :         }
     476              : 
     477              :         {
     478              :             // TLS version
     479            1 :             let log_directory = "/tmp/log".to_string();
     480            1 :             let endpoint_id = "ep-test-endpoint-id";
     481            1 :             let project_id = "test-project-id";
     482            1 :             let remote_syslog_host = "collector.host.tld";
     483            1 :             let remote_syslog_port = 5556;
     484            1 :             let remote_syslog_tls = "true";
     485              : 
     486            1 :             let conf_str = generate_audit_rsyslog_config(
     487            1 :                 log_directory,
     488            1 :                 endpoint_id,
     489            1 :                 project_id,
     490            1 :                 remote_syslog_host,
     491            1 :                 remote_syslog_port,
     492            1 :                 remote_syslog_tls,
     493              :             );
     494              : 
     495            1 :             assert!(conf_str.contains(r#"set $.remote_syslog_tls = "true";"#));
     496            1 :             assert!(conf_str.contains(r#"type="omfwd""#));
     497            1 :             assert!(conf_str.contains(r#"target="collector.host.tld""#));
     498            1 :             assert!(conf_str.contains(r#"port="5556""#));
     499            1 :             assert!(conf_str.contains(r#"StreamDriverPermittedPeers="collector.host.tld""#));
     500              :         }
     501            1 :     }
     502              : }
        

Generated by: LCOV version 2.1-beta