Line data Source code
1 : use std::fs;
2 : use std::io::ErrorKind;
3 : use std::path::Path;
4 : use std::process::Command;
5 : use std::time::Duration;
6 : use std::{fs::OpenOptions, io::Write};
7 :
8 : use anyhow::{Context, Result, anyhow};
9 : use tracing::{error, info, instrument, warn};
10 :
11 : const POSTGRES_LOGS_CONF_PATH: &str = "/etc/rsyslog.d/postgres_logs.conf";
12 :
13 0 : fn get_rsyslog_pid() -> Option<String> {
14 0 : let output = Command::new("pgrep")
15 0 : .arg("rsyslogd")
16 0 : .output()
17 0 : .expect("Failed to execute pgrep");
18 0 :
19 0 : if !output.stdout.is_empty() {
20 0 : let pid = std::str::from_utf8(&output.stdout)
21 0 : .expect("Invalid UTF-8 in process output")
22 0 : .trim()
23 0 : .to_string();
24 0 : Some(pid)
25 : } else {
26 0 : None
27 : }
28 0 : }
29 :
30 : // Restart rsyslogd to apply the new configuration.
31 : // This is necessary, because there is no other way to reload the rsyslog configuration.
32 : //
33 : // Rsyslogd shouldn't lose any messages, because of the restart,
34 : // because it tracks the last read position in the log files
35 : // and will continue reading from that position.
36 : // TODO: test it properly
37 : //
38 0 : fn restart_rsyslog() -> Result<()> {
39 0 : let old_pid = get_rsyslog_pid().context("rsyslogd is not running")?;
40 0 : info!("rsyslogd is running with pid: {}, restart it", old_pid);
41 :
42 : // kill it to restart
43 0 : let _ = Command::new("pkill")
44 0 : .arg("rsyslogd")
45 0 : .output()
46 0 : .context("Failed to stop rsyslogd")?;
47 :
48 0 : Ok(())
49 0 : }
50 :
51 0 : pub fn configure_audit_rsyslog(
52 0 : log_directory: String,
53 0 : tag: &str,
54 0 : remote_endpoint: &str,
55 0 : ) -> Result<()> {
56 0 : let config_content: String = format!(
57 0 : include_str!("config_template/compute_audit_rsyslog_template.conf"),
58 0 : log_directory = log_directory,
59 0 : tag = tag,
60 0 : remote_endpoint = remote_endpoint
61 0 : );
62 0 :
63 0 : info!("rsyslog config_content: {}", config_content);
64 :
65 0 : let rsyslog_conf_path = "/etc/rsyslog.d/compute_audit_rsyslog.conf";
66 0 : let mut file = OpenOptions::new()
67 0 : .create(true)
68 0 : .write(true)
69 0 : .truncate(true)
70 0 : .open(rsyslog_conf_path)?;
71 :
72 0 : file.write_all(config_content.as_bytes())?;
73 :
74 0 : info!(
75 0 : "rsyslog configuration file {} added successfully. Starting rsyslogd",
76 : rsyslog_conf_path
77 : );
78 :
79 : // start the service, using the configuration
80 0 : restart_rsyslog()?;
81 :
82 0 : Ok(())
83 0 : }
84 :
85 : /// Configuration for enabling Postgres logs forwarding from rsyslogd
86 : pub struct PostgresLogsRsyslogConfig<'a> {
87 : pub host: Option<&'a str>,
88 : }
89 :
90 : impl<'a> PostgresLogsRsyslogConfig<'a> {
91 3 : pub fn new(host: Option<&'a str>) -> Self {
92 3 : Self { host }
93 3 : }
94 :
95 3 : pub fn build(&self) -> Result<String> {
96 3 : match self.host {
97 2 : Some(host) => {
98 2 : if let Some((target, port)) = host.split_once(":") {
99 1 : Ok(format!(
100 1 : include_str!(
101 1 : "config_template/compute_rsyslog_postgres_export_template.conf"
102 1 : ),
103 1 : logs_export_target = target,
104 1 : logs_export_port = port,
105 1 : ))
106 : } else {
107 1 : Err(anyhow!("Invalid host format for Postgres logs export"))
108 : }
109 : }
110 1 : None => Ok("".to_string()),
111 : }
112 3 : }
113 :
114 0 : fn current_config() -> Result<String> {
115 0 : let config_content = match std::fs::read_to_string(POSTGRES_LOGS_CONF_PATH) {
116 0 : Ok(c) => c,
117 0 : Err(err) if err.kind() == ErrorKind::NotFound => String::new(),
118 0 : Err(err) => return Err(err.into()),
119 : };
120 0 : Ok(config_content)
121 0 : }
122 : }
123 :
124 : /// Writes rsyslogd configuration for Postgres logs export and restarts rsyslog.
125 0 : pub fn configure_postgres_logs_export(conf: PostgresLogsRsyslogConfig) -> Result<()> {
126 0 : let new_config = conf.build()?;
127 0 : let current_config = PostgresLogsRsyslogConfig::current_config()?;
128 :
129 0 : if new_config == current_config {
130 0 : info!("postgres logs rsyslog configuration is up-to-date");
131 0 : return Ok(());
132 0 : }
133 0 :
134 0 : // When new config is empty we can simply remove the configuration file.
135 0 : if new_config.is_empty() {
136 0 : info!("removing rsyslog config file: {}", POSTGRES_LOGS_CONF_PATH);
137 0 : match std::fs::remove_file(POSTGRES_LOGS_CONF_PATH) {
138 0 : Ok(_) => {}
139 0 : Err(err) if err.kind() == ErrorKind::NotFound => {}
140 0 : Err(err) => return Err(err.into()),
141 : }
142 0 : restart_rsyslog()?;
143 0 : return Ok(());
144 0 : }
145 0 :
146 0 : info!(
147 0 : "configuring rsyslog for postgres logs export to: {:?}",
148 : conf.host
149 : );
150 :
151 0 : let mut file = OpenOptions::new()
152 0 : .create(true)
153 0 : .write(true)
154 0 : .truncate(true)
155 0 : .open(POSTGRES_LOGS_CONF_PATH)?;
156 0 : file.write_all(new_config.as_bytes())?;
157 :
158 0 : info!(
159 0 : "rsyslog configuration file {} added successfully. Starting rsyslogd",
160 : POSTGRES_LOGS_CONF_PATH
161 : );
162 :
163 0 : restart_rsyslog()?;
164 0 : Ok(())
165 0 : }
166 :
167 : #[instrument(skip_all)]
168 : async fn pgaudit_gc_main_loop(log_directory: String) -> Result<()> {
169 : info!("running pgaudit GC main loop");
170 : loop {
171 : // Check log_directory for old pgaudit logs and delete them.
172 : // New log files are checked every 5 minutes, as set in pgaudit.log_rotation_age
173 : // Find files that were not modified in the last 15 minutes and delete them.
174 : // This should be enough time for rsyslog to process the logs and for us to catch the alerts.
175 : //
176 : // In case of a very high load, we might need to adjust this value and pgaudit.log_rotation_age.
177 : //
178 : // TODO: add some smarter logic to delete the files that are fully streamed according to rsyslog
179 : // imfile-state files, but for now just do a simple GC to avoid filling up the disk.
180 : let _ = Command::new("find")
181 : .arg(&log_directory)
182 : .arg("-name")
183 : .arg("audit*.log")
184 : .arg("-mmin")
185 : .arg("+15")
186 : .arg("-delete")
187 : .output()?;
188 :
189 : // also collect the metric for the size of the log directory
190 0 : async fn get_log_files_size(path: &Path) -> Result<u64> {
191 0 : let mut total_size = 0;
192 :
193 0 : for entry in fs::read_dir(path)? {
194 0 : let entry = entry?;
195 0 : let entry_path = entry.path();
196 0 :
197 0 : if entry_path.is_file() && entry_path.to_string_lossy().ends_with("log") {
198 0 : total_size += entry.metadata()?.len();
199 0 : }
200 : }
201 :
202 0 : Ok(total_size)
203 0 : }
204 :
205 : let log_directory_size = get_log_files_size(Path::new(&log_directory))
206 : .await
207 0 : .unwrap_or_else(|e| {
208 0 : warn!("Failed to get log directory size: {}", e);
209 0 : 0
210 0 : });
211 : crate::metrics::AUDIT_LOG_DIR_SIZE.set(log_directory_size as f64);
212 : tokio::time::sleep(Duration::from_secs(60)).await;
213 : }
214 : }
215 :
216 : // launch pgaudit GC thread to clean up the old pgaudit logs stored in the log_directory
217 0 : pub fn launch_pgaudit_gc(log_directory: String) {
218 0 : tokio::spawn(async move {
219 0 : if let Err(e) = pgaudit_gc_main_loop(log_directory).await {
220 0 : error!("pgaudit GC main loop failed: {}", e);
221 0 : }
222 0 : });
223 0 : }
224 :
225 : #[cfg(test)]
226 : mod tests {
227 : use crate::rsyslog::PostgresLogsRsyslogConfig;
228 :
229 : #[test]
230 1 : fn test_postgres_logs_config() {
231 1 : {
232 1 : // Verify empty config
233 1 : let conf = PostgresLogsRsyslogConfig::new(None);
234 1 : let res = conf.build();
235 1 : assert!(res.is_ok());
236 1 : let conf_str = res.unwrap();
237 1 : assert_eq!(&conf_str, "");
238 : }
239 :
240 : {
241 : // Verify config
242 1 : let conf = PostgresLogsRsyslogConfig::new(Some("collector.cvc.local:514"));
243 1 : let res = conf.build();
244 1 : assert!(res.is_ok());
245 1 : let conf_str = res.unwrap();
246 1 : assert!(conf_str.contains("omfwd"));
247 1 : assert!(conf_str.contains(r#"target="collector.cvc.local""#));
248 1 : assert!(conf_str.contains(r#"port="514""#));
249 : }
250 :
251 : {
252 : // Verify invalid config
253 1 : let conf = PostgresLogsRsyslogConfig::new(Some("invalid"));
254 1 : let res = conf.build();
255 1 : assert!(res.is_err());
256 : }
257 1 : }
258 : }
|