Line data Source code
1 : use std::fs;
2 : use std::io::ErrorKind;
3 : use std::path::Path;
4 : use std::process::Command;
5 : use std::time::Duration;
6 : use std::{fs::OpenOptions, io::Write};
7 :
8 : use anyhow::{Context, Result, anyhow};
9 : use tracing::{error, info, instrument, warn};
10 :
11 : const POSTGRES_LOGS_CONF_PATH: &str = "/etc/rsyslog.d/postgres_logs.conf";
12 : const AUDIT_LOGS_CONF_PATH: &str = "/etc/rsyslog.d/compute_audit_rsyslog.conf";
13 :
14 0 : fn get_rsyslog_pid() -> Option<String> {
15 0 : let output = Command::new("pgrep")
16 0 : .arg("rsyslogd")
17 0 : .output()
18 0 : .expect("Failed to execute pgrep");
19 0 :
20 0 : if !output.stdout.is_empty() {
21 0 : let pid = std::str::from_utf8(&output.stdout)
22 0 : .expect("Invalid UTF-8 in process output")
23 0 : .trim()
24 0 : .to_string();
25 0 : Some(pid)
26 : } else {
27 0 : None
28 : }
29 0 : }
30 :
31 : // Restart rsyslogd to apply the new configuration.
32 : // This is necessary, because there is no other way to reload the rsyslog configuration.
33 : //
34 : // Rsyslogd shouldn't lose any messages, because of the restart,
35 : // because it tracks the last read position in the log files
36 : // and will continue reading from that position.
37 : // TODO: test it properly
38 : //
39 0 : fn restart_rsyslog() -> Result<()> {
40 0 : let old_pid = get_rsyslog_pid().context("rsyslogd is not running")?;
41 0 : info!("rsyslogd is running with pid: {}, restart it", old_pid);
42 :
43 : // kill it to restart
44 0 : let _ = Command::new("pkill")
45 0 : .arg("rsyslogd")
46 0 : .output()
47 0 : .context("Failed to stop rsyslogd")?;
48 :
49 0 : Ok(())
50 0 : }
51 :
52 0 : pub fn configure_audit_rsyslog(log_directory: &str, audit_log_level: &str) -> Result<()> {
53 0 : let remote_endpoint = std::env::var("AUDIT_LOGGING_ENDPOINT")?;
54 0 : if remote_endpoint.is_empty() {
55 0 : return Err(anyhow!("AUDIT_LOGGING_ENDPOINT is not set"));
56 0 : }
57 :
58 0 : let old_config_content = match std::fs::read_to_string(AUDIT_LOGS_CONF_PATH) {
59 0 : Ok(c) => c,
60 0 : Err(err) if err.kind() == ErrorKind::NotFound => String::new(),
61 0 : Err(err) => return Err(err.into()),
62 : };
63 :
64 0 : let config_content: String = format!(
65 0 : include_str!("config_template/compute_audit_rsyslog_template.conf"),
66 0 : log_directory = log_directory,
67 0 : tag = audit_log_level,
68 0 : remote_endpoint = remote_endpoint
69 0 : );
70 0 :
71 0 : if old_config_content == config_content {
72 0 : info!("rsyslog configuration is up-to-date");
73 0 : return Ok(());
74 0 : }
75 0 :
76 0 : info!("rsyslog config_content: {}", config_content);
77 :
78 0 : let mut file = OpenOptions::new()
79 0 : .create(true)
80 0 : .write(true)
81 0 : .truncate(true)
82 0 : .open(AUDIT_LOGS_CONF_PATH)?;
83 :
84 0 : file.write_all(config_content.as_bytes())?;
85 :
86 0 : info!(
87 0 : "rsyslog configuration file {} added successfully. Starting rsyslogd",
88 : AUDIT_LOGS_CONF_PATH
89 : );
90 :
91 : // start the service, using the configuration
92 0 : restart_rsyslog()?;
93 :
94 0 : Ok(())
95 0 : }
96 :
97 : /// Configuration for enabling Postgres logs forwarding from rsyslogd
98 : pub struct PostgresLogsRsyslogConfig<'a> {
99 : pub host: Option<&'a str>,
100 : }
101 :
102 : impl<'a> PostgresLogsRsyslogConfig<'a> {
103 4 : pub fn new(host: Option<&'a str>) -> Self {
104 4 : Self { host }
105 4 : }
106 :
107 4 : pub fn build(&self) -> Result<String> {
108 4 : match self.host {
109 3 : Some(host) => {
110 3 : if let Some((target, port)) = host.split_once(":") {
111 2 : Ok(format!(
112 2 : include_str!(
113 2 : "config_template/compute_rsyslog_postgres_export_template.conf"
114 2 : ),
115 2 : logs_export_target = target,
116 2 : logs_export_port = port,
117 2 : ))
118 : } else {
119 1 : Err(anyhow!("Invalid host format for Postgres logs export"))
120 : }
121 : }
122 1 : None => Ok("".to_string()),
123 : }
124 4 : }
125 :
126 0 : fn current_config() -> Result<String> {
127 0 : let config_content = match std::fs::read_to_string(POSTGRES_LOGS_CONF_PATH) {
128 0 : Ok(c) => c,
129 0 : Err(err) if err.kind() == ErrorKind::NotFound => String::new(),
130 0 : Err(err) => return Err(err.into()),
131 : };
132 0 : Ok(config_content)
133 0 : }
134 :
135 : /// Returns the default host for otel collector that receives Postgres logs
136 1 : pub fn default_host(project_id: &str) -> String {
137 1 : format!(
138 1 : "config-{}-collector.neon-telemetry.svc.cluster.local:10514",
139 1 : project_id
140 1 : )
141 1 : }
142 : }
143 :
144 0 : pub fn configure_postgres_logs_export(conf: PostgresLogsRsyslogConfig) -> Result<()> {
145 0 : let new_config = conf.build()?;
146 0 : let current_config = PostgresLogsRsyslogConfig::current_config()?;
147 :
148 0 : if new_config == current_config {
149 0 : info!("postgres logs rsyslog configuration is up-to-date");
150 0 : return Ok(());
151 0 : }
152 0 :
153 0 : // When new config is empty we can simply remove the configuration file.
154 0 : if new_config.is_empty() {
155 0 : info!("removing rsyslog config file: {}", POSTGRES_LOGS_CONF_PATH);
156 0 : match std::fs::remove_file(POSTGRES_LOGS_CONF_PATH) {
157 0 : Ok(_) => {}
158 0 : Err(err) if err.kind() == ErrorKind::NotFound => {}
159 0 : Err(err) => return Err(err.into()),
160 : }
161 0 : restart_rsyslog()?;
162 0 : return Ok(());
163 0 : }
164 0 :
165 0 : info!(
166 0 : "configuring rsyslog for postgres logs export to: {:?}",
167 : conf.host
168 : );
169 :
170 0 : let mut file = OpenOptions::new()
171 0 : .create(true)
172 0 : .write(true)
173 0 : .truncate(true)
174 0 : .open(POSTGRES_LOGS_CONF_PATH)?;
175 0 : file.write_all(new_config.as_bytes())?;
176 :
177 0 : info!(
178 0 : "rsyslog configuration file {} added successfully. Starting rsyslogd",
179 : POSTGRES_LOGS_CONF_PATH
180 : );
181 :
182 0 : restart_rsyslog()?;
183 0 : Ok(())
184 0 : }
185 :
186 : #[instrument(skip_all)]
187 : async fn pgaudit_gc_main_loop(log_directory: String) -> Result<()> {
188 : info!("running pgaudit GC main loop");
189 : loop {
190 : // Check log_directory for old pgaudit logs and delete them.
191 : // New log files are checked every 5 minutes, as set in pgaudit.log_rotation_age
192 : // Find files that were not modified in the last 15 minutes and delete them.
193 : // This should be enough time for rsyslog to process the logs and for us to catch the alerts.
194 : //
195 : // In case of a very high load, we might need to adjust this value and pgaudit.log_rotation_age.
196 : //
197 : // TODO: add some smarter logic to delete the files that are fully streamed according to rsyslog
198 : // imfile-state files, but for now just do a simple GC to avoid filling up the disk.
199 : let _ = Command::new("find")
200 : .arg(&log_directory)
201 : .arg("-name")
202 : .arg("audit*.log")
203 : .arg("-mmin")
204 : .arg("+15")
205 : .arg("-delete")
206 : .output()?;
207 :
208 : // also collect the metric for the size of the log directory
209 0 : async fn get_log_files_size(path: &Path) -> Result<u64> {
210 0 : let mut total_size = 0;
211 :
212 0 : for entry in fs::read_dir(path)? {
213 0 : let entry = entry?;
214 0 : let entry_path = entry.path();
215 0 :
216 0 : if entry_path.is_file() && entry_path.to_string_lossy().ends_with("log") {
217 0 : total_size += entry.metadata()?.len();
218 0 : }
219 : }
220 :
221 0 : Ok(total_size)
222 0 : }
223 :
224 : let log_directory_size = get_log_files_size(Path::new(&log_directory))
225 : .await
226 0 : .unwrap_or_else(|e| {
227 0 : warn!("Failed to get log directory size: {}", e);
228 0 : 0
229 0 : });
230 : crate::metrics::AUDIT_LOG_DIR_SIZE.set(log_directory_size as f64);
231 : tokio::time::sleep(Duration::from_secs(60)).await;
232 : }
233 : }
234 :
235 : // launch pgaudit GC thread to clean up the old pgaudit logs stored in the log_directory
236 0 : pub fn launch_pgaudit_gc(log_directory: String) {
237 0 : tokio::spawn(async move {
238 0 : if let Err(e) = pgaudit_gc_main_loop(log_directory).await {
239 0 : error!("pgaudit GC main loop failed: {}", e);
240 0 : }
241 0 : });
242 0 : }
243 :
244 : #[cfg(test)]
245 : mod tests {
246 : use crate::rsyslog::PostgresLogsRsyslogConfig;
247 :
248 : #[test]
249 1 : fn test_postgres_logs_config() {
250 1 : {
251 1 : // Verify empty config
252 1 : let conf = PostgresLogsRsyslogConfig::new(None);
253 1 : let res = conf.build();
254 1 : assert!(res.is_ok());
255 1 : let conf_str = res.unwrap();
256 1 : assert_eq!(&conf_str, "");
257 : }
258 :
259 : {
260 : // Verify config
261 1 : let conf = PostgresLogsRsyslogConfig::new(Some("collector.cvc.local:514"));
262 1 : let res = conf.build();
263 1 : assert!(res.is_ok());
264 1 : let conf_str = res.unwrap();
265 1 : assert!(conf_str.contains("omfwd"));
266 1 : assert!(conf_str.contains(r#"target="collector.cvc.local""#));
267 1 : assert!(conf_str.contains(r#"port="514""#));
268 : }
269 :
270 : {
271 : // Verify invalid config
272 1 : let conf = PostgresLogsRsyslogConfig::new(Some("invalid"));
273 1 : let res = conf.build();
274 1 : assert!(res.is_err());
275 : }
276 :
277 : {
278 : // Verify config with default host
279 1 : let host = PostgresLogsRsyslogConfig::default_host("shy-breeze-123");
280 1 : let conf = PostgresLogsRsyslogConfig::new(Some(&host));
281 1 : let res = conf.build();
282 1 : assert!(res.is_ok());
283 1 : let conf_str = res.unwrap();
284 1 : assert!(conf_str.contains(r#"shy-breeze-123"#));
285 1 : assert!(conf_str.contains(r#"port="10514""#));
286 : }
287 1 : }
288 : }
|