|             Line data    Source code 
       1              : use std::collections::HashMap;
       2              : use std::fmt::Write;
       3              : use std::fs;
       4              : use std::fs::File;
       5              : use std::io::{BufRead, BufReader};
       6              : use std::os::unix::fs::PermissionsExt;
       7              : use std::path::Path;
       8              : use std::process::Child;
       9              : use std::str::FromStr;
      10              : use std::time::{Duration, Instant};
      11              : 
      12              : use anyhow::{Result, bail};
      13              : use compute_api::responses::TlsConfig;
      14              : use compute_api::spec::{
      15              :     Database, DatabricksSettings, GenericOption, GenericOptions, PgIdent, Role,
      16              : };
      17              : use futures::StreamExt;
      18              : use indexmap::IndexMap;
      19              : use ini::Ini;
      20              : use notify::{RecursiveMode, Watcher};
      21              : use postgres::config::Config;
      22              : use tokio::io::AsyncBufReadExt;
      23              : use tokio::task::JoinHandle;
      24              : use tokio::time::timeout;
      25              : use tokio_postgres;
      26              : use tokio_postgres::NoTls;
      27              : use tracing::{debug, error, info, instrument};
      28              : 
      29              : const POSTGRES_WAIT_TIMEOUT: Duration = Duration::from_millis(60 * 1000); // milliseconds
      30              : 
      31              : /// Escape a string for including it in a SQL literal.
      32              : ///
      33              : /// Wrapping the result with `E'{}'` or `'{}'` is not required,
      34              : /// as it returns a ready-to-use SQL string literal, e.g. `'db'''` or `E'db\\'`.
      35              : /// See <https://github.com/postgres/postgres/blob/da98d005cdbcd45af563d0c4ac86d0e9772cd15f/src/backend/utils/adt/quote.c#L47>
      36              : /// for the original implementation.
      37            6 : pub fn escape_literal(s: &str) -> String {
      38            6 :     let res = s.replace('\'', "''").replace('\\', "\\\\");
      39              : 
      40            6 :     if res.contains('\\') {
      41            2 :         format!("E'{res}'")
      42              :     } else {
      43            4 :         format!("'{res}'")
      44              :     }
      45            6 : }
      46              : 
      47              : /// Escape a string so that it can be used in postgresql.conf. Wrapping the result
      48              : /// with `'{}'` is not required, as it returns a ready-to-use config string.
      49            8 : pub fn escape_conf_value(s: &str) -> String {
      50            8 :     let res = s.replace('\'', "''").replace('\\', "\\\\");
      51            8 :     format!("'{res}'")
      52            8 : }
      53              : 
      54              : pub trait GenericOptionExt {
      55              :     fn to_pg_option(&self) -> String;
      56              :     fn to_pg_setting(&self) -> String;
      57              : }
      58              : 
      59              : impl GenericOptionExt for GenericOption {
      60              :     /// Represent `GenericOption` as SQL statement parameter.
      61            3 :     fn to_pg_option(&self) -> String {
      62            3 :         if let Some(val) = &self.value {
      63            3 :             match self.vartype.as_ref() {
      64            3 :                 "string" => format!("{} {}", self.name, escape_literal(val)),
      65            1 :                 _ => format!("{} {}", self.name, val),
      66              :             }
      67              :         } else {
      68            0 :             self.name.to_owned()
      69              :         }
      70            3 :     }
      71              : 
      72              :     /// Represent `GenericOption` as configuration option.
      73           25 :     fn to_pg_setting(&self) -> String {
      74           25 :         if let Some(val) = &self.value {
      75           25 :             match self.vartype.as_ref() {
      76           25 :                 "string" => format!("{} = {}", self.name, escape_conf_value(val)),
      77           17 :                 _ => format!("{} = {}", self.name, val),
      78              :             }
      79              :         } else {
      80            0 :             self.name.to_owned()
      81              :         }
      82           25 :     }
      83              : }
      84              : 
      85              : pub trait PgOptionsSerialize {
      86              :     fn as_pg_options(&self) -> String;
      87              :     fn as_pg_settings(&self) -> String;
      88              : }
      89              : 
      90              : impl PgOptionsSerialize for GenericOptions {
      91              :     /// Serialize an optional collection of `GenericOption`'s to
      92              :     /// Postgres SQL statement arguments.
      93            2 :     fn as_pg_options(&self) -> String {
      94            2 :         if let Some(ops) = &self {
      95            1 :             ops.iter()
      96            3 :                 .map(|op| op.to_pg_option())
      97            1 :                 .collect::<Vec<String>>()
      98            1 :                 .join(" ")
      99              :         } else {
     100            1 :             "".to_string()
     101              :         }
     102            2 :     }
     103              : 
     104              :     /// Serialize an optional collection of `GenericOption`'s to
     105              :     /// `postgresql.conf` compatible format.
     106            1 :     fn as_pg_settings(&self) -> String {
     107            1 :         if let Some(ops) = &self {
     108            1 :             ops.iter()
     109           25 :                 .map(|op| op.to_pg_setting())
     110            1 :                 .collect::<Vec<String>>()
     111            1 :                 .join("\n")
     112            1 :                 + "\n" // newline after last setting
     113              :         } else {
     114            0 :             "".to_string()
     115              :         }
     116            1 :     }
     117              : }
     118              : 
     119              : pub trait GenericOptionsSearch {
     120              :     fn find(&self, name: &str) -> Option<String>;
     121              :     fn find_ref(&self, name: &str) -> Option<&GenericOption>;
     122              : }
     123              : 
     124              : impl GenericOptionsSearch for GenericOptions {
     125              :     /// Lookup option by name
     126           16 :     fn find(&self, name: &str) -> Option<String> {
     127           16 :         let ops = self.as_ref()?;
     128          143 :         let op = ops.iter().find(|s| s.name == name)?;
     129            6 :         op.value.clone()
     130           16 :     }
     131              : 
     132              :     /// Lookup option by name, returning ref
     133            0 :     fn find_ref(&self, name: &str) -> Option<&GenericOption> {
     134            0 :         let ops = self.as_ref()?;
     135            0 :         ops.iter().find(|s| s.name == name)
     136            0 :     }
     137              : }
     138              : 
     139              : pub trait RoleExt {
     140              :     fn to_pg_options(&self) -> String;
     141              : }
     142              : 
     143              : impl RoleExt for Role {
     144              :     /// Serialize a list of role parameters into a Postgres-acceptable
     145              :     /// string of arguments.
     146            1 :     fn to_pg_options(&self) -> String {
     147              :         // XXX: consider putting LOGIN as a default option somewhere higher, e.g. in control-plane.
     148            1 :         let mut params: String = self.options.as_pg_options();
     149            1 :         params.push_str(" LOGIN");
     150              : 
     151            1 :         if let Some(pass) = &self.encrypted_password {
     152              :             // Some time ago we supported only md5 and treated all encrypted_password as md5.
     153              :             // Now we also support SCRAM-SHA-256 and to preserve compatibility
     154              :             // we treat all encrypted_password as md5 unless they starts with SCRAM-SHA-256.
     155            1 :             if pass.starts_with("SCRAM-SHA-256") {
     156            0 :                 write!(params, " PASSWORD '{pass}'")
     157            0 :                     .expect("String is documented to not to error during write operations");
     158            1 :             } else {
     159            1 :                 write!(params, " PASSWORD 'md5{pass}'")
     160            1 :                     .expect("String is documented to not to error during write operations");
     161            1 :             }
     162            0 :         } else {
     163            0 :             params.push_str(" PASSWORD NULL");
     164            0 :         }
     165              : 
     166            1 :         params
     167            1 :     }
     168              : }
     169              : 
     170              : pub trait DatabaseExt {
     171              :     fn to_pg_options(&self) -> String;
     172              : }
     173              : 
     174              : impl DatabaseExt for Database {
     175              :     /// Serialize a list of database parameters into a Postgres-acceptable
     176              :     /// string of arguments.
     177              :     /// NB: `TEMPLATE` is actually also an identifier, but so far we only need
     178              :     /// to use `template0` and `template1`, so it is not a problem. Yet in the future
     179              :     /// it may require a proper quoting too.
     180            1 :     fn to_pg_options(&self) -> String {
     181            1 :         let mut params: String = self.options.as_pg_options();
     182            1 :         write!(params, " OWNER {}", &self.owner.pg_quote())
     183            1 :             .expect("String is documented to not to error during write operations");
     184              : 
     185            1 :         params
     186            1 :     }
     187              : }
     188              : 
     189              : pub trait DatabricksSettingsExt {
     190              :     fn as_pg_settings(&self) -> String;
     191              : }
     192              : 
     193              : impl DatabricksSettingsExt for DatabricksSettings {
     194            0 :     fn as_pg_settings(&self) -> String {
     195              :         // Postgres GUCs rendered from DatabricksSettings
     196            0 :         vec![
     197            0 :             // ssl_ca_file
     198            0 :             Some(format!(
     199            0 :                 "ssl_ca_file = '{}'",
     200            0 :                 self.pg_compute_tls_settings.ca_file
     201            0 :             )),
     202            0 :             // [Optional] databricks.workspace_url
     203            0 :             Some(format!(
     204            0 :                 "databricks.workspace_url = '{}'",
     205            0 :                 &self.databricks_workspace_host
     206            0 :             )),
     207            0 :             // todo(vikas.jain): these are not required anymore as they are moved to static
     208            0 :             // conf but keeping these to avoid image mismatch between hcc and pg.
     209            0 :             // Once hcc and pg are in sync, we can remove these.
     210            0 :             //
     211            0 :             // databricks.enable_databricks_identity_login
     212            0 :             Some("databricks.enable_databricks_identity_login = true".to_string()),
     213            0 :             // databricks.enable_sql_restrictions
     214            0 :             Some("databricks.enable_sql_restrictions = true".to_string()),
     215            0 :         ]
     216            0 :         .into_iter()
     217            0 :         // Removes `None`s
     218            0 :         .flatten()
     219            0 :         .collect::<Vec<String>>()
     220            0 :         .join("\n")
     221            0 :             + "\n"
     222            0 :     }
     223              : }
     224              : 
     225              : /// Generic trait used to provide quoting / encoding for strings used in the
     226              : /// Postgres SQL queries and DATABASE_URL.
     227              : pub trait Escaping {
     228              :     fn pg_quote(&self) -> String;
     229              :     fn pg_quote_dollar(&self) -> (String, String);
     230              : }
     231              : 
     232              : impl Escaping for PgIdent {
     233              :     /// This is intended to mimic Postgres quote_ident(), but for simplicity it
     234              :     /// always quotes provided string with `""` and escapes every `"`.
     235              :     /// **Not idempotent**, i.e. if string is already escaped it will be escaped again.
     236              :     /// N.B. it's not useful for escaping identifiers that are used inside WHERE
     237              :     /// clause, use `escape_literal()` instead.
     238            2 :     fn pg_quote(&self) -> String {
     239            2 :         format!("\"{}\"", self.replace('"', "\"\""))
     240            2 :     }
     241              : 
     242              :     /// This helper is intended to be used for dollar-escaping strings for usage
     243              :     /// inside PL/pgSQL procedures. In addition to dollar-escaping the string,
     244              :     /// it also returns a tag that is intended to be used inside the outer
     245              :     /// PL/pgSQL procedure. If you do not need an outer tag, just discard it.
     246              :     /// Here we somewhat mimic the logic of Postgres' `pg_get_functiondef()`,
     247              :     /// <https://github.com/postgres/postgres/blob/8b49392b270b4ac0b9f5c210e2a503546841e832/src/backend/utils/adt/ruleutils.c#L2924>
     248           14 :     fn pg_quote_dollar(&self) -> (String, String) {
     249           14 :         let mut tag: String = "x".to_string();
     250           14 :         let mut outer_tag = "xx".to_string();
     251              : 
     252              :         // Find the first suitable tag that is not present in the string.
     253              :         // Postgres' max role/DB name length is 63 bytes, so even in the
     254              :         // worst case it won't take long. Outer tag is always `tag + "x"`,
     255              :         // so if `tag` is not present in the string, `outer_tag` is not
     256              :         // present in the string either.
     257           27 :         while self.contains(&tag.to_string()) {
     258           13 :             tag += "x";
     259           13 :             outer_tag = tag.clone() + "x";
     260           13 :         }
     261              : 
     262           14 :         let escaped = format!("${tag}${self}${tag}$");
     263              : 
     264           14 :         (escaped, outer_tag)
     265           14 :     }
     266              : }
     267              : 
     268              : /// Build a list of existing Postgres roles
     269            0 : pub async fn get_existing_roles_async(client: &tokio_postgres::Client) -> Result<Vec<Role>> {
     270            0 :     let postgres_roles = client
     271            0 :         .query_raw::<str, &String, &[String; 0]>(
     272            0 :             "SELECT rolname, rolpassword FROM pg_catalog.pg_authid",
     273            0 :             &[],
     274            0 :         )
     275            0 :         .await?
     276            0 :         .filter_map(|row| async { row.ok() })
     277            0 :         .map(|row| Role {
     278            0 :             name: row.get("rolname"),
     279            0 :             encrypted_password: row.get("rolpassword"),
     280            0 :             options: None,
     281            0 :         })
     282            0 :         .collect()
     283            0 :         .await;
     284              : 
     285            0 :     Ok(postgres_roles)
     286            0 : }
     287              : 
     288              : /// Build a list of existing Postgres databases
     289            0 : pub async fn get_existing_dbs_async(
     290            0 :     client: &tokio_postgres::Client,
     291            0 : ) -> Result<HashMap<String, Database>> {
     292              :     // `pg_database.datconnlimit = -2` means that the database is in the
     293              :     // invalid state. See:
     294              :     //   https://github.com/postgres/postgres/commit/a4b4cc1d60f7e8ccfcc8ff8cb80c28ee411ad9a9
     295            0 :     let rowstream = client
     296            0 :         // We use a subquery instead of a fancy `datdba::regrole::text AS owner`,
     297            0 :         // because the latter automatically wraps the result in double quotes,
     298            0 :         // if the role name contains special characters.
     299            0 :         .query_raw::<str, &String, &[String; 0]>(
     300            0 :             "SELECT
     301            0 :                 datname AS name,
     302            0 :                 (SELECT rolname FROM pg_catalog.pg_roles WHERE oid OPERATOR(pg_catalog.=) datdba) AS owner,
     303            0 :                 NOT datallowconn AS restrict_conn,
     304            0 :                 datconnlimit OPERATOR(pg_catalog.=) (OPERATOR(pg_catalog.-) 2) AS invalid
     305            0 :             FROM
     306            0 :                 pg_catalog.pg_database;",
     307            0 :             &[],
     308            0 :         )
     309            0 :         .await?;
     310              : 
     311            0 :     let dbs_map = rowstream
     312            0 :         .filter_map(|r| async { r.ok() })
     313            0 :         .map(|row| Database {
     314            0 :             name: row.get("name"),
     315            0 :             owner: row.get("owner"),
     316            0 :             restrict_conn: row.get("restrict_conn"),
     317            0 :             invalid: row.get("invalid"),
     318            0 :             options: None,
     319            0 :         })
     320            0 :         .map(|db| (db.name.clone(), db.clone()))
     321            0 :         .collect::<HashMap<_, _>>()
     322            0 :         .await;
     323              : 
     324            0 :     Ok(dbs_map)
     325            0 : }
     326              : 
     327              : /// Wait for Postgres to become ready to accept connections. It's ready to
     328              : /// accept connections when the state-field in `pgdata/postmaster.pid` says
     329              : /// 'ready'.
     330              : #[instrument(skip_all, fields(pgdata = %pgdata.display()))]
     331              : pub fn wait_for_postgres(pg: &mut Child, pgdata: &Path) -> Result<()> {
     332              :     let pid_path = pgdata.join("postmaster.pid");
     333              : 
     334              :     // PostgreSQL writes line "ready" to the postmaster.pid file, when it has
     335              :     // completed initialization and is ready to accept connections. We want to
     336              :     // react quickly and perform the rest of our initialization as soon as
     337              :     // PostgreSQL starts accepting connections. Use 'notify' to be notified
     338              :     // whenever the PID file is changed, and whenever it changes, read it to
     339              :     // check if it's now "ready".
     340              :     //
     341              :     // You cannot actually watch a file before it exists, so we first watch the
     342              :     // data directory, and once the postmaster.pid file appears, we switch to
     343              :     // watch the file instead. We also wake up every 100 ms to poll, just in
     344              :     // case we miss some events for some reason. Not strictly necessary, but
     345              :     // better safe than sorry.
     346              :     let (tx, rx) = std::sync::mpsc::channel();
     347            0 :     let watcher_res = notify::recommended_watcher(move |res| {
     348            0 :         let _ = tx.send(res);
     349            0 :     });
     350              :     let (mut watcher, rx): (Box<dyn Watcher>, _) = match watcher_res {
     351              :         Ok(watcher) => (Box::new(watcher), rx),
     352              :         Err(e) => {
     353              :             match e.kind {
     354              :                 notify::ErrorKind::Io(os) if os.raw_os_error() == Some(38) => {
     355              :                     // docker on m1 macs does not support recommended_watcher
     356              :                     // but return "Function not implemented (os error 38)"
     357              :                     // see https://github.com/notify-rs/notify/issues/423
     358              :                     let (tx, rx) = std::sync::mpsc::channel();
     359              : 
     360              :                     // let's poll it faster than what we check the results for (100ms)
     361              :                     let config =
     362              :                         notify::Config::default().with_poll_interval(Duration::from_millis(50));
     363              : 
     364              :                     let watcher = notify::PollWatcher::new(
     365            0 :                         move |res| {
     366            0 :                             let _ = tx.send(res);
     367            0 :                         },
     368              :                         config,
     369              :                     )?;
     370              : 
     371              :                     (Box::new(watcher), rx)
     372              :                 }
     373              :                 _ => return Err(e.into()),
     374              :             }
     375              :         }
     376              :     };
     377              : 
     378              :     watcher.watch(pgdata, RecursiveMode::NonRecursive)?;
     379              : 
     380              :     let started_at = Instant::now();
     381              :     let mut postmaster_pid_seen = false;
     382              :     loop {
     383              :         if let Ok(Some(status)) = pg.try_wait() {
     384              :             // Postgres exited, that is not what we expected, bail out earlier.
     385              :             let code = status.code().unwrap_or(-1);
     386              :             bail!("Postgres exited unexpectedly with code {}", code);
     387              :         }
     388              : 
     389              :         let res = rx.recv_timeout(Duration::from_millis(100));
     390              :         debug!("woken up by notify: {res:?}");
     391              :         // If there are multiple events in the channel already, we only need to be
     392              :         // check once. Swallow the extra events before we go ahead to check the
     393              :         // pid file.
     394              :         while let Ok(res) = rx.try_recv() {
     395              :             debug!("swallowing extra event: {res:?}");
     396              :         }
     397              : 
     398              :         // Check that we can open pid file first.
     399              :         if let Ok(file) = File::open(&pid_path) {
     400              :             if !postmaster_pid_seen {
     401              :                 debug!("postmaster.pid appeared");
     402              :                 watcher
     403              :                     .unwatch(pgdata)
     404              :                     .expect("Failed to remove pgdata dir watch");
     405              :                 watcher
     406              :                     .watch(&pid_path, RecursiveMode::NonRecursive)
     407              :                     .expect("Failed to add postmaster.pid file watch");
     408              :                 postmaster_pid_seen = true;
     409              :             }
     410              : 
     411              :             let file = BufReader::new(file);
     412              :             let last_line = file.lines().last();
     413              : 
     414              :             // Pid file could be there and we could read it, but it could be empty, for example.
     415              :             if let Some(Ok(line)) = last_line {
     416              :                 let status = line.trim();
     417              :                 debug!("last line of postmaster.pid: {status:?}");
     418              : 
     419              :                 // Now Postgres is ready to accept connections
     420              :                 if status == "ready" {
     421              :                     break;
     422              :                 }
     423              :             }
     424              :         }
     425              : 
     426              :         // Give up after POSTGRES_WAIT_TIMEOUT.
     427              :         let duration = started_at.elapsed();
     428              :         if duration >= POSTGRES_WAIT_TIMEOUT {
     429              :             bail!("timed out while waiting for Postgres to start");
     430              :         }
     431              :     }
     432              : 
     433              :     tracing::info!("PostgreSQL is now running, continuing to configure it");
     434              : 
     435              :     Ok(())
     436              : }
     437              : 
     438              : /// Remove `pgdata` directory and create it again with right permissions.
     439            0 : pub fn create_pgdata(pgdata: &str) -> Result<()> {
     440              :     // Ignore removal error, likely it is a 'No such file or directory (os error 2)'.
     441              :     // If it is something different then create_dir() will error out anyway.
     442            0 :     let _ok = fs::remove_dir_all(pgdata);
     443            0 :     fs::create_dir(pgdata)?;
     444            0 :     fs::set_permissions(pgdata, fs::Permissions::from_mode(0o700))?;
     445              : 
     446            0 :     Ok(())
     447            0 : }
     448              : 
     449              : /// Update pgbouncer.ini with provided options
     450            0 : fn update_pgbouncer_ini(
     451            0 :     pgbouncer_config: IndexMap<String, String>,
     452            0 :     pgbouncer_ini_path: &str,
     453            0 : ) -> Result<()> {
     454            0 :     let mut conf = Ini::load_from_file(pgbouncer_ini_path)?;
     455            0 :     let section = conf.section_mut(Some("pgbouncer")).unwrap();
     456              : 
     457            0 :     for (option_name, value) in pgbouncer_config.iter() {
     458            0 :         section.insert(option_name, value);
     459            0 :         debug!(
     460            0 :             "Updating pgbouncer.ini with new values {}={}",
     461              :             option_name, value
     462              :         );
     463              :     }
     464              : 
     465            0 :     conf.write_to_file(pgbouncer_ini_path)?;
     466            0 :     Ok(())
     467            0 : }
     468              : 
     469              : /// Tune pgbouncer.
     470              : /// 1. Apply new config using pgbouncer admin console
     471              : /// 2. Add new values to pgbouncer.ini to preserve them after restart
     472            0 : pub async fn tune_pgbouncer(
     473            0 :     mut pgbouncer_config: IndexMap<String, String>,
     474            0 :     tls_config: Option<TlsConfig>,
     475            0 : ) -> Result<()> {
     476            0 :     let pgbouncer_connstr = if std::env::var_os("AUTOSCALING").is_some() {
     477              :         // for VMs use pgbouncer specific way to connect to
     478              :         // pgbouncer admin console without password
     479              :         // when pgbouncer is running under the same user.
     480            0 :         "host=/tmp port=6432 dbname=pgbouncer user=pgbouncer".to_string()
     481              :     } else {
     482              :         // for k8s use normal connection string with password
     483              :         // to connect to pgbouncer admin console
     484            0 :         let mut pgbouncer_connstr =
     485            0 :             "host=localhost port=6432 dbname=pgbouncer user=postgres sslmode=disable".to_string();
     486            0 :         if let Ok(pass) = std::env::var("PGBOUNCER_PASSWORD") {
     487            0 :             pgbouncer_connstr.push_str(format!(" password={pass}").as_str());
     488            0 :         }
     489            0 :         pgbouncer_connstr
     490              :     };
     491              : 
     492            0 :     info!(
     493            0 :         "Connecting to pgbouncer with connection string: {}",
     494              :         pgbouncer_connstr
     495              :     );
     496              : 
     497              :     // connect to pgbouncer, retrying several times
     498              :     // because pgbouncer may not be ready yet
     499            0 :     let mut retries = 3;
     500            0 :     let client = loop {
     501            0 :         match tokio_postgres::connect(&pgbouncer_connstr, NoTls).await {
     502            0 :             Ok((client, connection)) => {
     503            0 :                 tokio::spawn(async move {
     504            0 :                     if let Err(e) = connection.await {
     505            0 :                         eprintln!("connection error: {e}");
     506            0 :                     }
     507            0 :                 });
     508            0 :                 break client;
     509              :             }
     510            0 :             Err(e) => {
     511            0 :                 if retries == 0 {
     512            0 :                     return Err(e.into());
     513            0 :                 }
     514            0 :                 error!("Failed to connect to pgbouncer: pgbouncer_connstr {}", e);
     515            0 :                 retries -= 1;
     516            0 :                 tokio::time::sleep(Duration::from_secs(1)).await;
     517              :             }
     518              :         }
     519              :     };
     520              : 
     521            0 :     if let Some(tls_config) = tls_config {
     522              :         // pgbouncer starts in a half-ok state if it cannot find these files.
     523              :         // It will default to client_tls_sslmode=deny, which causes proxy to error.
     524              :         // There is a small window at startup where these files don't yet exist in the VM.
     525              :         // Best to wait until it exists.
     526              :         loop {
     527            0 :             if let Ok(true) = tokio::fs::try_exists(&tls_config.key_path).await {
     528            0 :                 break;
     529            0 :             }
     530            0 :             tokio::time::sleep(Duration::from_millis(500)).await
     531              :         }
     532              : 
     533            0 :         pgbouncer_config.insert("client_tls_cert_file".to_string(), tls_config.cert_path);
     534            0 :         pgbouncer_config.insert("client_tls_key_file".to_string(), tls_config.key_path);
     535            0 :         pgbouncer_config.insert("client_tls_sslmode".to_string(), "allow".to_string());
     536            0 :     }
     537              : 
     538              :     // save values to pgbouncer.ini
     539              :     // so that they are preserved after pgbouncer restart
     540            0 :     let pgbouncer_ini_path = if std::env::var_os("AUTOSCALING").is_some() {
     541              :         // in VMs we use /etc/pgbouncer.ini
     542            0 :         "/etc/pgbouncer.ini".to_string()
     543              :     } else {
     544              :         // in pods we use /var/db/postgres/pgbouncer/pgbouncer.ini
     545              :         // this is a shared volume between pgbouncer and postgres containers
     546              :         // FIXME: fix permissions for this file
     547            0 :         "/var/db/postgres/pgbouncer/pgbouncer.ini".to_string()
     548              :     };
     549            0 :     update_pgbouncer_ini(pgbouncer_config, &pgbouncer_ini_path)?;
     550              : 
     551            0 :     info!("Applying pgbouncer setting change");
     552              : 
     553            0 :     if let Err(err) = client.simple_query("RELOAD").await {
     554              :         // Don't fail on error, just print it into log
     555            0 :         error!("Failed to apply pgbouncer setting change,  {err}",);
     556            0 :     };
     557              : 
     558            0 :     Ok(())
     559            0 : }
     560              : 
     561              : /// Spawn a task that will read Postgres logs from `stderr`, join multiline logs
     562              : /// and send them to the logger. In the future we may also want to add context to
     563              : /// these logs.
     564            0 : pub fn handle_postgres_logs(stderr: std::process::ChildStderr) -> JoinHandle<Result<()>> {
     565            0 :     tokio::spawn(async move {
     566            0 :         let stderr = tokio::process::ChildStderr::from_std(stderr)?;
     567            0 :         handle_postgres_logs_async(stderr).await
     568            0 :     })
     569            0 : }
     570              : 
     571              : /// Read Postgres logs from `stderr` until EOF. Buffer is flushed on one of the following conditions:
     572              : /// - next line starts with timestamp
     573              : /// - EOF
     574              : /// - no new lines were written for the last 100 milliseconds
     575            0 : async fn handle_postgres_logs_async(stderr: tokio::process::ChildStderr) -> Result<()> {
     576            0 :     let mut lines = tokio::io::BufReader::new(stderr).lines();
     577            0 :     let timeout_duration = Duration::from_millis(100);
     578            0 :     let ts_regex =
     579            0 :         regex::Regex::new(r"^\d+-\d{2}-\d{2} \d{2}:\d{2}:\d{2}").expect("regex is valid");
     580              : 
     581            0 :     let mut buf = vec![];
     582              :     loop {
     583            0 :         let next_line = timeout(timeout_duration, lines.next_line()).await;
     584              : 
     585              :         // we should flush lines from the buffer if we cannot continue reading multiline message
     586            0 :         let should_flush_buf = match next_line {
     587              :             // Flushing if new line starts with timestamp
     588            0 :             Ok(Ok(Some(ref line))) => ts_regex.is_match(line),
     589              :             // Flushing on EOF, timeout or error
     590            0 :             _ => true,
     591              :         };
     592              : 
     593            0 :         if !buf.is_empty() && should_flush_buf {
     594              :             // join multiline message into a single line, separated by unicode Zero Width Space.
     595              :             // "PG:" suffix is used to distinguish postgres logs from other logs.
     596            0 :             let combined = format!("PG:{}\n", buf.join("\u{200B}"));
     597            0 :             buf.clear();
     598              : 
     599              :             // sync write to stderr to avoid interleaving with other logs
     600              :             use std::io::Write;
     601            0 :             let res = std::io::stderr().lock().write_all(combined.as_bytes());
     602            0 :             if let Err(e) = res {
     603            0 :                 tracing::error!("error while writing to stderr: {}", e);
     604            0 :             }
     605            0 :         }
     606              : 
     607              :         // if not timeout, append line to the buffer
     608            0 :         if next_line.is_ok() {
     609            0 :             match next_line?? {
     610            0 :                 Some(line) => buf.push(line),
     611              :                 // EOF
     612            0 :                 None => break,
     613              :             };
     614            0 :         }
     615              :     }
     616              : 
     617            0 :     Ok(())
     618            0 : }
     619              : 
     620              : /// `Postgres::config::Config` handles database names with whitespaces
     621              : /// and special characters properly.
     622            0 : pub fn postgres_conf_for_db(connstr: &url::Url, dbname: &str) -> Result<Config> {
     623            0 :     let mut conf = Config::from_str(connstr.as_str())?;
     624            0 :     conf.dbname(dbname);
     625            0 :     Ok(conf)
     626            0 : }
         |