Line data Source code
1 : use std::fs;
2 : use std::io::ErrorKind;
3 : use std::path::Path;
4 : use std::process::Command;
5 : use std::time::Duration;
6 : use std::{fs::OpenOptions, io::Write};
7 :
8 : use anyhow::{Context, Result, anyhow};
9 : use tracing::{error, info, instrument, warn};
10 :
11 : const POSTGRES_LOGS_CONF_PATH: &str = "/etc/rsyslog.d/postgres_logs.conf";
12 :
13 0 : fn get_rsyslog_pid() -> Option<String> {
14 0 : let output = Command::new("pgrep")
15 0 : .arg("rsyslogd")
16 0 : .output()
17 0 : .expect("Failed to execute pgrep");
18 0 :
19 0 : if !output.stdout.is_empty() {
20 0 : let pid = std::str::from_utf8(&output.stdout)
21 0 : .expect("Invalid UTF-8 in process output")
22 0 : .trim()
23 0 : .to_string();
24 0 : Some(pid)
25 : } else {
26 0 : None
27 : }
28 0 : }
29 :
30 0 : fn wait_for_rsyslog_pid() -> Result<String, anyhow::Error> {
31 : const MAX_WAIT: Duration = Duration::from_secs(5);
32 : const INITIAL_SLEEP: Duration = Duration::from_millis(2);
33 :
34 0 : let mut sleep_duration = INITIAL_SLEEP;
35 0 : let start = std::time::Instant::now();
36 0 : let mut attempts = 1;
37 :
38 0 : for attempt in 1.. {
39 0 : attempts = attempt;
40 0 : match get_rsyslog_pid() {
41 0 : Some(pid) => return Ok(pid),
42 : None => {
43 0 : if start.elapsed() >= MAX_WAIT {
44 0 : break;
45 0 : }
46 0 : info!(
47 0 : "rsyslogd is not running, attempt {}. Sleeping for {} ms",
48 0 : attempt,
49 0 : sleep_duration.as_millis()
50 : );
51 0 : std::thread::sleep(sleep_duration);
52 0 : sleep_duration *= 2;
53 : }
54 : }
55 : }
56 :
57 0 : Err(anyhow::anyhow!(
58 0 : "rsyslogd is not running after waiting for {} seconds and {} attempts",
59 0 : attempts,
60 0 : start.elapsed().as_secs()
61 0 : ))
62 0 : }
63 :
64 : // Restart rsyslogd to apply the new configuration.
65 : // This is necessary, because there is no other way to reload the rsyslog configuration.
66 : //
67 : // Rsyslogd shouldn't lose any messages, because of the restart,
68 : // because it tracks the last read position in the log files
69 : // and will continue reading from that position.
70 : // TODO: test it properly
71 : //
72 0 : fn restart_rsyslog() -> Result<()> {
73 0 : // kill it to restart
74 0 : let _ = Command::new("pkill")
75 0 : .arg("rsyslogd")
76 0 : .output()
77 0 : .context("Failed to restart rsyslogd")?;
78 :
79 : // ensure rsyslogd is running
80 0 : wait_for_rsyslog_pid()?;
81 :
82 0 : Ok(())
83 0 : }
84 :
85 0 : pub fn configure_audit_rsyslog(
86 0 : log_directory: String,
87 0 : tag: Option<String>,
88 0 : remote_endpoint: &str,
89 0 : ) -> Result<()> {
90 0 : let config_content: String = format!(
91 0 : include_str!("config_template/compute_audit_rsyslog_template.conf"),
92 0 : log_directory = log_directory,
93 0 : tag = tag.unwrap_or("".to_string()),
94 0 : remote_endpoint = remote_endpoint
95 0 : );
96 0 :
97 0 : info!("rsyslog config_content: {}", config_content);
98 :
99 0 : let rsyslog_conf_path = "/etc/rsyslog.d/compute_audit_rsyslog.conf";
100 0 : let mut file = OpenOptions::new()
101 0 : .create(true)
102 0 : .write(true)
103 0 : .truncate(true)
104 0 : .open(rsyslog_conf_path)?;
105 :
106 0 : file.write_all(config_content.as_bytes())?;
107 :
108 0 : info!(
109 0 : "rsyslog configuration file {} added successfully. Starting rsyslogd",
110 : rsyslog_conf_path
111 : );
112 :
113 : // start the service, using the configuration
114 0 : restart_rsyslog()?;
115 :
116 0 : Ok(())
117 0 : }
118 :
119 : /// Configuration for enabling Postgres logs forwarding from rsyslogd
120 : pub struct PostgresLogsRsyslogConfig<'a> {
121 : pub host: Option<&'a str>,
122 : }
123 :
124 : impl<'a> PostgresLogsRsyslogConfig<'a> {
125 3 : pub fn new(host: Option<&'a str>) -> Self {
126 3 : Self { host }
127 3 : }
128 :
129 3 : pub fn build(&self) -> Result<String> {
130 3 : match self.host {
131 2 : Some(host) => {
132 2 : if let Some((target, port)) = host.split_once(":") {
133 1 : Ok(format!(
134 1 : include_str!(
135 1 : "config_template/compute_rsyslog_postgres_export_template.conf"
136 1 : ),
137 1 : logs_export_target = target,
138 1 : logs_export_port = port,
139 1 : ))
140 : } else {
141 1 : Err(anyhow!("Invalid host format for Postgres logs export"))
142 : }
143 : }
144 1 : None => Ok("".to_string()),
145 : }
146 3 : }
147 :
148 0 : fn current_config() -> Result<String> {
149 0 : let config_content = match std::fs::read_to_string(POSTGRES_LOGS_CONF_PATH) {
150 0 : Ok(c) => c,
151 0 : Err(err) if err.kind() == ErrorKind::NotFound => String::new(),
152 0 : Err(err) => return Err(err.into()),
153 : };
154 0 : Ok(config_content)
155 0 : }
156 : }
157 :
158 : /// Writes rsyslogd configuration for Postgres logs export and restarts rsyslog.
159 0 : pub fn configure_postgres_logs_export(conf: PostgresLogsRsyslogConfig) -> Result<()> {
160 0 : let new_config = conf.build()?;
161 0 : let current_config = PostgresLogsRsyslogConfig::current_config()?;
162 :
163 0 : if new_config == current_config {
164 0 : info!("postgres logs rsyslog configuration is up-to-date");
165 0 : return Ok(());
166 0 : }
167 0 :
168 0 : // Nothing to configure
169 0 : if new_config.is_empty() {
170 : // When the configuration is removed, PostgreSQL will stop sending data
171 : // to the files watched by rsyslog, so restarting rsyslog is more effort
172 : // than just ignoring this change.
173 0 : return Ok(());
174 0 : }
175 0 :
176 0 : info!(
177 0 : "configuring rsyslog for postgres logs export to: {:?}",
178 : conf.host
179 : );
180 :
181 0 : let mut file = OpenOptions::new()
182 0 : .create(true)
183 0 : .write(true)
184 0 : .truncate(true)
185 0 : .open(POSTGRES_LOGS_CONF_PATH)?;
186 0 : file.write_all(new_config.as_bytes())?;
187 :
188 0 : info!(
189 0 : "rsyslog configuration file {} added successfully. Starting rsyslogd",
190 : POSTGRES_LOGS_CONF_PATH
191 : );
192 :
193 0 : restart_rsyslog()?;
194 0 : Ok(())
195 0 : }
196 :
197 : #[instrument(skip_all)]
198 : async fn pgaudit_gc_main_loop(log_directory: String) -> Result<()> {
199 : info!("running pgaudit GC main loop");
200 : loop {
201 : // Check log_directory for old pgaudit logs and delete them.
202 : // New log files are checked every 5 minutes, as set in pgaudit.log_rotation_age
203 : // Find files that were not modified in the last 15 minutes and delete them.
204 : // This should be enough time for rsyslog to process the logs and for us to catch the alerts.
205 : //
206 : // In case of a very high load, we might need to adjust this value and pgaudit.log_rotation_age.
207 : //
208 : // TODO: add some smarter logic to delete the files that are fully streamed according to rsyslog
209 : // imfile-state files, but for now just do a simple GC to avoid filling up the disk.
210 : let _ = Command::new("find")
211 : .arg(&log_directory)
212 : .arg("-name")
213 : .arg("audit*.log")
214 : .arg("-mmin")
215 : .arg("+15")
216 : .arg("-delete")
217 : .output()?;
218 :
219 : // also collect the metric for the size of the log directory
220 0 : async fn get_log_files_size(path: &Path) -> Result<u64> {
221 0 : let mut total_size = 0;
222 :
223 0 : for entry in fs::read_dir(path)? {
224 0 : let entry = entry?;
225 0 : let entry_path = entry.path();
226 0 :
227 0 : if entry_path.is_file() && entry_path.to_string_lossy().ends_with("log") {
228 0 : total_size += entry.metadata()?.len();
229 0 : }
230 : }
231 :
232 0 : Ok(total_size)
233 0 : }
234 :
235 : let log_directory_size = get_log_files_size(Path::new(&log_directory))
236 : .await
237 0 : .unwrap_or_else(|e| {
238 0 : warn!("Failed to get log directory size: {}", e);
239 0 : 0
240 0 : });
241 : crate::metrics::AUDIT_LOG_DIR_SIZE.set(log_directory_size as f64);
242 : tokio::time::sleep(Duration::from_secs(60)).await;
243 : }
244 : }
245 :
246 : // launch pgaudit GC thread to clean up the old pgaudit logs stored in the log_directory
247 0 : pub fn launch_pgaudit_gc(log_directory: String) {
248 0 : tokio::spawn(async move {
249 0 : if let Err(e) = pgaudit_gc_main_loop(log_directory).await {
250 0 : error!("pgaudit GC main loop failed: {}", e);
251 0 : }
252 0 : });
253 0 : }
254 :
255 : #[cfg(test)]
256 : mod tests {
257 : use crate::rsyslog::PostgresLogsRsyslogConfig;
258 :
259 : #[test]
260 1 : fn test_postgres_logs_config() {
261 1 : {
262 1 : // Verify empty config
263 1 : let conf = PostgresLogsRsyslogConfig::new(None);
264 1 : let res = conf.build();
265 1 : assert!(res.is_ok());
266 1 : let conf_str = res.unwrap();
267 1 : assert_eq!(&conf_str, "");
268 : }
269 :
270 : {
271 : // Verify config
272 1 : let conf = PostgresLogsRsyslogConfig::new(Some("collector.cvc.local:514"));
273 1 : let res = conf.build();
274 1 : assert!(res.is_ok());
275 1 : let conf_str = res.unwrap();
276 1 : assert!(conf_str.contains("omfwd"));
277 1 : assert!(conf_str.contains(r#"target="collector.cvc.local""#));
278 1 : assert!(conf_str.contains(r#"port="514""#));
279 : }
280 :
281 : {
282 : // Verify invalid config
283 1 : let conf = PostgresLogsRsyslogConfig::new(Some("invalid"));
284 1 : let res = conf.build();
285 1 : assert!(res.is_err());
286 : }
287 1 : }
288 : }
|