Line data Source code
1 : use std::collections::HashMap;
2 : use std::fmt::Write;
3 : use std::fs;
4 : use std::fs::File;
5 : use std::io::{BufRead, BufReader};
6 : use std::os::unix::fs::PermissionsExt;
7 : use std::path::Path;
8 : use std::process::Child;
9 : use std::str::FromStr;
10 : use std::time::{Duration, Instant};
11 :
12 : use anyhow::{Result, bail};
13 : use compute_api::responses::TlsConfig;
14 : use compute_api::spec::{
15 : Database, DatabricksSettings, GenericOption, GenericOptions, PgIdent, Role,
16 : };
17 : use futures::StreamExt;
18 : use indexmap::IndexMap;
19 : use ini::Ini;
20 : use notify::{RecursiveMode, Watcher};
21 : use postgres::config::Config;
22 : use tokio::io::AsyncBufReadExt;
23 : use tokio::task::JoinHandle;
24 : use tokio::time::timeout;
25 : use tokio_postgres;
26 : use tokio_postgres::NoTls;
27 : use tracing::{debug, error, info, instrument};
28 :
29 : const POSTGRES_WAIT_TIMEOUT: Duration = Duration::from_millis(60 * 1000); // milliseconds
30 :
31 : /// Escape a string for including it in a SQL literal.
32 : ///
33 : /// Wrapping the result with `E'{}'` or `'{}'` is not required,
34 : /// as it returns a ready-to-use SQL string literal, e.g. `'db'''` or `E'db\\'`.
35 : /// See <https://github.com/postgres/postgres/blob/da98d005cdbcd45af563d0c4ac86d0e9772cd15f/src/backend/utils/adt/quote.c#L47>
36 : /// for the original implementation.
37 6 : pub fn escape_literal(s: &str) -> String {
38 6 : let res = s.replace('\'', "''").replace('\\', "\\\\");
39 :
40 6 : if res.contains('\\') {
41 2 : format!("E'{res}'")
42 : } else {
43 4 : format!("'{res}'")
44 : }
45 6 : }
46 :
47 : /// Escape a string so that it can be used in postgresql.conf. Wrapping the result
48 : /// with `'{}'` is not required, as it returns a ready-to-use config string.
49 8 : pub fn escape_conf_value(s: &str) -> String {
50 8 : let res = s.replace('\'', "''").replace('\\', "\\\\");
51 8 : format!("'{res}'")
52 8 : }
53 :
54 : pub trait GenericOptionExt {
55 : fn to_pg_option(&self) -> String;
56 : fn to_pg_setting(&self) -> String;
57 : }
58 :
59 : impl GenericOptionExt for GenericOption {
60 : /// Represent `GenericOption` as SQL statement parameter.
61 3 : fn to_pg_option(&self) -> String {
62 3 : if let Some(val) = &self.value {
63 3 : match self.vartype.as_ref() {
64 3 : "string" => format!("{} {}", self.name, escape_literal(val)),
65 1 : _ => format!("{} {}", self.name, val),
66 : }
67 : } else {
68 0 : self.name.to_owned()
69 : }
70 3 : }
71 :
72 : /// Represent `GenericOption` as configuration option.
73 25 : fn to_pg_setting(&self) -> String {
74 25 : if let Some(val) = &self.value {
75 25 : match self.vartype.as_ref() {
76 25 : "string" => format!("{} = {}", self.name, escape_conf_value(val)),
77 17 : _ => format!("{} = {}", self.name, val),
78 : }
79 : } else {
80 0 : self.name.to_owned()
81 : }
82 25 : }
83 : }
84 :
85 : pub trait PgOptionsSerialize {
86 : fn as_pg_options(&self) -> String;
87 : fn as_pg_settings(&self) -> String;
88 : }
89 :
90 : impl PgOptionsSerialize for GenericOptions {
91 : /// Serialize an optional collection of `GenericOption`'s to
92 : /// Postgres SQL statement arguments.
93 2 : fn as_pg_options(&self) -> String {
94 2 : if let Some(ops) = &self {
95 1 : ops.iter()
96 3 : .map(|op| op.to_pg_option())
97 1 : .collect::<Vec<String>>()
98 1 : .join(" ")
99 : } else {
100 1 : "".to_string()
101 : }
102 2 : }
103 :
104 : /// Serialize an optional collection of `GenericOption`'s to
105 : /// `postgresql.conf` compatible format.
106 1 : fn as_pg_settings(&self) -> String {
107 1 : if let Some(ops) = &self {
108 1 : ops.iter()
109 25 : .map(|op| op.to_pg_setting())
110 1 : .collect::<Vec<String>>()
111 1 : .join("\n")
112 1 : + "\n" // newline after last setting
113 : } else {
114 0 : "".to_string()
115 : }
116 1 : }
117 : }
118 :
119 : pub trait GenericOptionsSearch {
120 : fn find(&self, name: &str) -> Option<String>;
121 : fn find_ref(&self, name: &str) -> Option<&GenericOption>;
122 : }
123 :
124 : impl GenericOptionsSearch for GenericOptions {
125 : /// Lookup option by name
126 15 : fn find(&self, name: &str) -> Option<String> {
127 15 : let ops = self.as_ref()?;
128 119 : let op = ops.iter().find(|s| s.name == name)?;
129 6 : op.value.clone()
130 15 : }
131 :
132 : /// Lookup option by name, returning ref
133 0 : fn find_ref(&self, name: &str) -> Option<&GenericOption> {
134 0 : let ops = self.as_ref()?;
135 0 : ops.iter().find(|s| s.name == name)
136 0 : }
137 : }
138 :
139 : pub trait RoleExt {
140 : fn to_pg_options(&self) -> String;
141 : }
142 :
143 : impl RoleExt for Role {
144 : /// Serialize a list of role parameters into a Postgres-acceptable
145 : /// string of arguments.
146 1 : fn to_pg_options(&self) -> String {
147 : // XXX: consider putting LOGIN as a default option somewhere higher, e.g. in control-plane.
148 1 : let mut params: String = self.options.as_pg_options();
149 1 : params.push_str(" LOGIN");
150 :
151 1 : if let Some(pass) = &self.encrypted_password {
152 : // Some time ago we supported only md5 and treated all encrypted_password as md5.
153 : // Now we also support SCRAM-SHA-256 and to preserve compatibility
154 : // we treat all encrypted_password as md5 unless they starts with SCRAM-SHA-256.
155 1 : if pass.starts_with("SCRAM-SHA-256") {
156 0 : write!(params, " PASSWORD '{pass}'")
157 0 : .expect("String is documented to not to error during write operations");
158 1 : } else {
159 1 : write!(params, " PASSWORD 'md5{pass}'")
160 1 : .expect("String is documented to not to error during write operations");
161 1 : }
162 0 : } else {
163 0 : params.push_str(" PASSWORD NULL");
164 0 : }
165 :
166 1 : params
167 1 : }
168 : }
169 :
170 : pub trait DatabaseExt {
171 : fn to_pg_options(&self) -> String;
172 : }
173 :
174 : impl DatabaseExt for Database {
175 : /// Serialize a list of database parameters into a Postgres-acceptable
176 : /// string of arguments.
177 : /// NB: `TEMPLATE` is actually also an identifier, but so far we only need
178 : /// to use `template0` and `template1`, so it is not a problem. Yet in the future
179 : /// it may require a proper quoting too.
180 1 : fn to_pg_options(&self) -> String {
181 1 : let mut params: String = self.options.as_pg_options();
182 1 : write!(params, " OWNER {}", &self.owner.pg_quote())
183 1 : .expect("String is documented to not to error during write operations");
184 :
185 1 : params
186 1 : }
187 : }
188 :
189 : pub trait DatabricksSettingsExt {
190 : fn as_pg_settings(&self) -> String;
191 : }
192 :
193 : impl DatabricksSettingsExt for DatabricksSettings {
194 0 : fn as_pg_settings(&self) -> String {
195 : // Postgres GUCs rendered from DatabricksSettings
196 0 : vec![
197 0 : // ssl_ca_file
198 0 : Some(format!(
199 0 : "ssl_ca_file = '{}'",
200 0 : self.pg_compute_tls_settings.ca_file
201 0 : )),
202 0 : // [Optional] databricks.workspace_url
203 0 : Some(format!(
204 0 : "databricks.workspace_url = '{}'",
205 0 : &self.databricks_workspace_host
206 0 : )),
207 0 : // todo(vikas.jain): these are not required anymore as they are moved to static
208 0 : // conf but keeping these to avoid image mismatch between hcc and pg.
209 0 : // Once hcc and pg are in sync, we can remove these.
210 0 : //
211 0 : // databricks.enable_databricks_identity_login
212 0 : Some("databricks.enable_databricks_identity_login = true".to_string()),
213 0 : // databricks.enable_sql_restrictions
214 0 : Some("databricks.enable_sql_restrictions = true".to_string()),
215 0 : ]
216 0 : .into_iter()
217 0 : // Removes `None`s
218 0 : .flatten()
219 0 : .collect::<Vec<String>>()
220 0 : .join("\n")
221 0 : + "\n"
222 0 : }
223 : }
224 :
225 : /// Generic trait used to provide quoting / encoding for strings used in the
226 : /// Postgres SQL queries and DATABASE_URL.
227 : pub trait Escaping {
228 : fn pg_quote(&self) -> String;
229 : fn pg_quote_dollar(&self) -> (String, String);
230 : }
231 :
232 : impl Escaping for PgIdent {
233 : /// This is intended to mimic Postgres quote_ident(), but for simplicity it
234 : /// always quotes provided string with `""` and escapes every `"`.
235 : /// **Not idempotent**, i.e. if string is already escaped it will be escaped again.
236 : /// N.B. it's not useful for escaping identifiers that are used inside WHERE
237 : /// clause, use `escape_literal()` instead.
238 2 : fn pg_quote(&self) -> String {
239 2 : format!("\"{}\"", self.replace('"', "\"\""))
240 2 : }
241 :
242 : /// This helper is intended to be used for dollar-escaping strings for usage
243 : /// inside PL/pgSQL procedures. In addition to dollar-escaping the string,
244 : /// it also returns a tag that is intended to be used inside the outer
245 : /// PL/pgSQL procedure. If you do not need an outer tag, just discard it.
246 : /// Here we somewhat mimic the logic of Postgres' `pg_get_functiondef()`,
247 : /// <https://github.com/postgres/postgres/blob/8b49392b270b4ac0b9f5c210e2a503546841e832/src/backend/utils/adt/ruleutils.c#L2924>
248 14 : fn pg_quote_dollar(&self) -> (String, String) {
249 14 : let mut tag: String = "x".to_string();
250 14 : let mut outer_tag = "xx".to_string();
251 :
252 : // Find the first suitable tag that is not present in the string.
253 : // Postgres' max role/DB name length is 63 bytes, so even in the
254 : // worst case it won't take long. Outer tag is always `tag + "x"`,
255 : // so if `tag` is not present in the string, `outer_tag` is not
256 : // present in the string either.
257 27 : while self.contains(&tag.to_string()) {
258 13 : tag += "x";
259 13 : outer_tag = tag.clone() + "x";
260 13 : }
261 :
262 14 : let escaped = format!("${tag}${self}${tag}$");
263 :
264 14 : (escaped, outer_tag)
265 14 : }
266 : }
267 :
268 : /// Build a list of existing Postgres roles
269 0 : pub async fn get_existing_roles_async(client: &tokio_postgres::Client) -> Result<Vec<Role>> {
270 0 : let postgres_roles = client
271 0 : .query_raw::<str, &String, &[String; 0]>(
272 0 : "SELECT rolname, rolpassword FROM pg_catalog.pg_authid",
273 0 : &[],
274 0 : )
275 0 : .await?
276 0 : .filter_map(|row| async { row.ok() })
277 0 : .map(|row| Role {
278 0 : name: row.get("rolname"),
279 0 : encrypted_password: row.get("rolpassword"),
280 0 : options: None,
281 0 : })
282 0 : .collect()
283 0 : .await;
284 :
285 0 : Ok(postgres_roles)
286 0 : }
287 :
288 : /// Build a list of existing Postgres databases
289 0 : pub async fn get_existing_dbs_async(
290 0 : client: &tokio_postgres::Client,
291 0 : ) -> Result<HashMap<String, Database>> {
292 : // `pg_database.datconnlimit = -2` means that the database is in the
293 : // invalid state. See:
294 : // https://github.com/postgres/postgres/commit/a4b4cc1d60f7e8ccfcc8ff8cb80c28ee411ad9a9
295 0 : let rowstream = client
296 0 : // We use a subquery instead of a fancy `datdba::regrole::text AS owner`,
297 0 : // because the latter automatically wraps the result in double quotes,
298 0 : // if the role name contains special characters.
299 0 : .query_raw::<str, &String, &[String; 0]>(
300 0 : "SELECT
301 0 : datname AS name,
302 0 : (SELECT rolname FROM pg_roles WHERE oid = datdba) AS owner,
303 0 : NOT datallowconn AS restrict_conn,
304 0 : datconnlimit = - 2 AS invalid
305 0 : FROM
306 0 : pg_catalog.pg_database;",
307 0 : &[],
308 0 : )
309 0 : .await?;
310 :
311 0 : let dbs_map = rowstream
312 0 : .filter_map(|r| async { r.ok() })
313 0 : .map(|row| Database {
314 0 : name: row.get("name"),
315 0 : owner: row.get("owner"),
316 0 : restrict_conn: row.get("restrict_conn"),
317 0 : invalid: row.get("invalid"),
318 0 : options: None,
319 0 : })
320 0 : .map(|db| (db.name.clone(), db.clone()))
321 0 : .collect::<HashMap<_, _>>()
322 0 : .await;
323 :
324 0 : Ok(dbs_map)
325 0 : }
326 :
327 : /// Wait for Postgres to become ready to accept connections. It's ready to
328 : /// accept connections when the state-field in `pgdata/postmaster.pid` says
329 : /// 'ready'.
330 : #[instrument(skip_all, fields(pgdata = %pgdata.display()))]
331 : pub fn wait_for_postgres(pg: &mut Child, pgdata: &Path) -> Result<()> {
332 : let pid_path = pgdata.join("postmaster.pid");
333 :
334 : // PostgreSQL writes line "ready" to the postmaster.pid file, when it has
335 : // completed initialization and is ready to accept connections. We want to
336 : // react quickly and perform the rest of our initialization as soon as
337 : // PostgreSQL starts accepting connections. Use 'notify' to be notified
338 : // whenever the PID file is changed, and whenever it changes, read it to
339 : // check if it's now "ready".
340 : //
341 : // You cannot actually watch a file before it exists, so we first watch the
342 : // data directory, and once the postmaster.pid file appears, we switch to
343 : // watch the file instead. We also wake up every 100 ms to poll, just in
344 : // case we miss some events for some reason. Not strictly necessary, but
345 : // better safe than sorry.
346 : let (tx, rx) = std::sync::mpsc::channel();
347 0 : let watcher_res = notify::recommended_watcher(move |res| {
348 0 : let _ = tx.send(res);
349 0 : });
350 : let (mut watcher, rx): (Box<dyn Watcher>, _) = match watcher_res {
351 : Ok(watcher) => (Box::new(watcher), rx),
352 : Err(e) => {
353 : match e.kind {
354 : notify::ErrorKind::Io(os) if os.raw_os_error() == Some(38) => {
355 : // docker on m1 macs does not support recommended_watcher
356 : // but return "Function not implemented (os error 38)"
357 : // see https://github.com/notify-rs/notify/issues/423
358 : let (tx, rx) = std::sync::mpsc::channel();
359 :
360 : // let's poll it faster than what we check the results for (100ms)
361 : let config =
362 : notify::Config::default().with_poll_interval(Duration::from_millis(50));
363 :
364 : let watcher = notify::PollWatcher::new(
365 0 : move |res| {
366 0 : let _ = tx.send(res);
367 0 : },
368 : config,
369 : )?;
370 :
371 : (Box::new(watcher), rx)
372 : }
373 : _ => return Err(e.into()),
374 : }
375 : }
376 : };
377 :
378 : watcher.watch(pgdata, RecursiveMode::NonRecursive)?;
379 :
380 : let started_at = Instant::now();
381 : let mut postmaster_pid_seen = false;
382 : loop {
383 : if let Ok(Some(status)) = pg.try_wait() {
384 : // Postgres exited, that is not what we expected, bail out earlier.
385 : let code = status.code().unwrap_or(-1);
386 : bail!("Postgres exited unexpectedly with code {}", code);
387 : }
388 :
389 : let res = rx.recv_timeout(Duration::from_millis(100));
390 : debug!("woken up by notify: {res:?}");
391 : // If there are multiple events in the channel already, we only need to be
392 : // check once. Swallow the extra events before we go ahead to check the
393 : // pid file.
394 : while let Ok(res) = rx.try_recv() {
395 : debug!("swallowing extra event: {res:?}");
396 : }
397 :
398 : // Check that we can open pid file first.
399 : if let Ok(file) = File::open(&pid_path) {
400 : if !postmaster_pid_seen {
401 : debug!("postmaster.pid appeared");
402 : watcher
403 : .unwatch(pgdata)
404 : .expect("Failed to remove pgdata dir watch");
405 : watcher
406 : .watch(&pid_path, RecursiveMode::NonRecursive)
407 : .expect("Failed to add postmaster.pid file watch");
408 : postmaster_pid_seen = true;
409 : }
410 :
411 : let file = BufReader::new(file);
412 : let last_line = file.lines().last();
413 :
414 : // Pid file could be there and we could read it, but it could be empty, for example.
415 : if let Some(Ok(line)) = last_line {
416 : let status = line.trim();
417 : debug!("last line of postmaster.pid: {status:?}");
418 :
419 : // Now Postgres is ready to accept connections
420 : if status == "ready" {
421 : break;
422 : }
423 : }
424 : }
425 :
426 : // Give up after POSTGRES_WAIT_TIMEOUT.
427 : let duration = started_at.elapsed();
428 : if duration >= POSTGRES_WAIT_TIMEOUT {
429 : bail!("timed out while waiting for Postgres to start");
430 : }
431 : }
432 :
433 : tracing::info!("PostgreSQL is now running, continuing to configure it");
434 :
435 : Ok(())
436 : }
437 :
438 : /// Remove `pgdata` directory and create it again with right permissions.
439 0 : pub fn create_pgdata(pgdata: &str) -> Result<()> {
440 : // Ignore removal error, likely it is a 'No such file or directory (os error 2)'.
441 : // If it is something different then create_dir() will error out anyway.
442 0 : let _ok = fs::remove_dir_all(pgdata);
443 0 : fs::create_dir(pgdata)?;
444 0 : fs::set_permissions(pgdata, fs::Permissions::from_mode(0o700))?;
445 :
446 0 : Ok(())
447 0 : }
448 :
449 : /// Update pgbouncer.ini with provided options
450 0 : fn update_pgbouncer_ini(
451 0 : pgbouncer_config: IndexMap<String, String>,
452 0 : pgbouncer_ini_path: &str,
453 0 : ) -> Result<()> {
454 0 : let mut conf = Ini::load_from_file(pgbouncer_ini_path)?;
455 0 : let section = conf.section_mut(Some("pgbouncer")).unwrap();
456 :
457 0 : for (option_name, value) in pgbouncer_config.iter() {
458 0 : section.insert(option_name, value);
459 0 : debug!(
460 0 : "Updating pgbouncer.ini with new values {}={}",
461 : option_name, value
462 : );
463 : }
464 :
465 0 : conf.write_to_file(pgbouncer_ini_path)?;
466 0 : Ok(())
467 0 : }
468 :
469 : /// Tune pgbouncer.
470 : /// 1. Apply new config using pgbouncer admin console
471 : /// 2. Add new values to pgbouncer.ini to preserve them after restart
472 0 : pub async fn tune_pgbouncer(
473 0 : mut pgbouncer_config: IndexMap<String, String>,
474 0 : tls_config: Option<TlsConfig>,
475 0 : ) -> Result<()> {
476 0 : let pgbouncer_connstr = if std::env::var_os("AUTOSCALING").is_some() {
477 : // for VMs use pgbouncer specific way to connect to
478 : // pgbouncer admin console without password
479 : // when pgbouncer is running under the same user.
480 0 : "host=/tmp port=6432 dbname=pgbouncer user=pgbouncer".to_string()
481 : } else {
482 : // for k8s use normal connection string with password
483 : // to connect to pgbouncer admin console
484 0 : let mut pgbouncer_connstr =
485 0 : "host=localhost port=6432 dbname=pgbouncer user=postgres sslmode=disable".to_string();
486 0 : if let Ok(pass) = std::env::var("PGBOUNCER_PASSWORD") {
487 0 : pgbouncer_connstr.push_str(format!(" password={pass}").as_str());
488 0 : }
489 0 : pgbouncer_connstr
490 : };
491 :
492 0 : info!(
493 0 : "Connecting to pgbouncer with connection string: {}",
494 : pgbouncer_connstr
495 : );
496 :
497 : // connect to pgbouncer, retrying several times
498 : // because pgbouncer may not be ready yet
499 0 : let mut retries = 3;
500 0 : let client = loop {
501 0 : match tokio_postgres::connect(&pgbouncer_connstr, NoTls).await {
502 0 : Ok((client, connection)) => {
503 0 : tokio::spawn(async move {
504 0 : if let Err(e) = connection.await {
505 0 : eprintln!("connection error: {e}");
506 0 : }
507 0 : });
508 0 : break client;
509 : }
510 0 : Err(e) => {
511 0 : if retries == 0 {
512 0 : return Err(e.into());
513 0 : }
514 0 : error!("Failed to connect to pgbouncer: pgbouncer_connstr {}", e);
515 0 : retries -= 1;
516 0 : tokio::time::sleep(Duration::from_secs(1)).await;
517 : }
518 : }
519 : };
520 :
521 0 : if let Some(tls_config) = tls_config {
522 : // pgbouncer starts in a half-ok state if it cannot find these files.
523 : // It will default to client_tls_sslmode=deny, which causes proxy to error.
524 : // There is a small window at startup where these files don't yet exist in the VM.
525 : // Best to wait until it exists.
526 : loop {
527 0 : if let Ok(true) = tokio::fs::try_exists(&tls_config.key_path).await {
528 0 : break;
529 0 : }
530 0 : tokio::time::sleep(Duration::from_millis(500)).await
531 : }
532 :
533 0 : pgbouncer_config.insert("client_tls_cert_file".to_string(), tls_config.cert_path);
534 0 : pgbouncer_config.insert("client_tls_key_file".to_string(), tls_config.key_path);
535 0 : pgbouncer_config.insert("client_tls_sslmode".to_string(), "allow".to_string());
536 0 : }
537 :
538 : // save values to pgbouncer.ini
539 : // so that they are preserved after pgbouncer restart
540 0 : let pgbouncer_ini_path = if std::env::var_os("AUTOSCALING").is_some() {
541 : // in VMs we use /etc/pgbouncer.ini
542 0 : "/etc/pgbouncer.ini".to_string()
543 : } else {
544 : // in pods we use /var/db/postgres/pgbouncer/pgbouncer.ini
545 : // this is a shared volume between pgbouncer and postgres containers
546 : // FIXME: fix permissions for this file
547 0 : "/var/db/postgres/pgbouncer/pgbouncer.ini".to_string()
548 : };
549 0 : update_pgbouncer_ini(pgbouncer_config, &pgbouncer_ini_path)?;
550 :
551 0 : info!("Applying pgbouncer setting change");
552 :
553 0 : if let Err(err) = client.simple_query("RELOAD").await {
554 : // Don't fail on error, just print it into log
555 0 : error!("Failed to apply pgbouncer setting change, {err}",);
556 0 : };
557 :
558 0 : Ok(())
559 0 : }
560 :
561 : /// Spawn a task that will read Postgres logs from `stderr`, join multiline logs
562 : /// and send them to the logger. In the future we may also want to add context to
563 : /// these logs.
564 0 : pub fn handle_postgres_logs(stderr: std::process::ChildStderr) -> JoinHandle<Result<()>> {
565 0 : tokio::spawn(async move {
566 0 : let stderr = tokio::process::ChildStderr::from_std(stderr)?;
567 0 : handle_postgres_logs_async(stderr).await
568 0 : })
569 0 : }
570 :
571 : /// Read Postgres logs from `stderr` until EOF. Buffer is flushed on one of the following conditions:
572 : /// - next line starts with timestamp
573 : /// - EOF
574 : /// - no new lines were written for the last 100 milliseconds
575 0 : async fn handle_postgres_logs_async(stderr: tokio::process::ChildStderr) -> Result<()> {
576 0 : let mut lines = tokio::io::BufReader::new(stderr).lines();
577 0 : let timeout_duration = Duration::from_millis(100);
578 0 : let ts_regex =
579 0 : regex::Regex::new(r"^\d+-\d{2}-\d{2} \d{2}:\d{2}:\d{2}").expect("regex is valid");
580 :
581 0 : let mut buf = vec![];
582 : loop {
583 0 : let next_line = timeout(timeout_duration, lines.next_line()).await;
584 :
585 : // we should flush lines from the buffer if we cannot continue reading multiline message
586 0 : let should_flush_buf = match next_line {
587 : // Flushing if new line starts with timestamp
588 0 : Ok(Ok(Some(ref line))) => ts_regex.is_match(line),
589 : // Flushing on EOF, timeout or error
590 0 : _ => true,
591 : };
592 :
593 0 : if !buf.is_empty() && should_flush_buf {
594 : // join multiline message into a single line, separated by unicode Zero Width Space.
595 : // "PG:" suffix is used to distinguish postgres logs from other logs.
596 0 : let combined = format!("PG:{}\n", buf.join("\u{200B}"));
597 0 : buf.clear();
598 :
599 : // sync write to stderr to avoid interleaving with other logs
600 : use std::io::Write;
601 0 : let res = std::io::stderr().lock().write_all(combined.as_bytes());
602 0 : if let Err(e) = res {
603 0 : tracing::error!("error while writing to stderr: {}", e);
604 0 : }
605 0 : }
606 :
607 : // if not timeout, append line to the buffer
608 0 : if next_line.is_ok() {
609 0 : match next_line?? {
610 0 : Some(line) => buf.push(line),
611 : // EOF
612 0 : None => break,
613 : };
614 0 : }
615 : }
616 :
617 0 : Ok(())
618 0 : }
619 :
620 : /// `Postgres::config::Config` handles database names with whitespaces
621 : /// and special characters properly.
622 0 : pub fn postgres_conf_for_db(connstr: &url::Url, dbname: &str) -> Result<Config> {
623 0 : let mut conf = Config::from_str(connstr.as_str())?;
624 0 : conf.dbname(dbname);
625 0 : Ok(conf)
626 0 : }
|