LCOV - code coverage report
Current view: top level - libs/utils/src - postgres_client.rs (source / functions) Coverage Total Hit
Test: 685df7483efdc579d44aa7093bca9796bb9d088e.info Lines: 59.1 % 44 26
Test Date: 2024-11-25 17:08:35 Functions: 16.7 % 12 2

            Line data    Source code
       1              : //! Postgres client connection code common to other crates (safekeeper and
       2              : //! pageserver) which depends on tenant/timeline ids and thus not fitting into
       3              : //! postgres_connection crate.
       4              : 
       5              : use anyhow::Context;
       6              : use postgres_connection::{parse_host_port, PgConnectionConfig};
       7              : 
       8              : use crate::id::TenantTimelineId;
       9              : 
      10              : /// Postgres client protocol types
      11              : #[derive(
      12              :     Copy,
      13              :     Clone,
      14              :     PartialEq,
      15              :     Eq,
      16            0 :     strum_macros::EnumString,
      17            0 :     strum_macros::Display,
      18            0 :     serde_with::DeserializeFromStr,
      19              :     serde_with::SerializeDisplay,
      20              :     Debug,
      21              : )]
      22              : #[strum(serialize_all = "kebab-case")]
      23              : #[repr(u8)]
      24              : pub enum PostgresClientProtocol {
      25              :     /// Usual Postgres replication protocol
      26              :     Vanilla,
      27              :     /// Custom shard-aware protocol that replicates interpreted records.
      28              :     /// Used to send wal from safekeeper to pageserver.
      29              :     Interpreted,
      30              : }
      31              : 
      32              : impl TryFrom<u8> for PostgresClientProtocol {
      33              :     type Error = u8;
      34              : 
      35            0 :     fn try_from(value: u8) -> Result<Self, Self::Error> {
      36            0 :         Ok(match value {
      37            0 :             v if v == (PostgresClientProtocol::Vanilla as u8) => PostgresClientProtocol::Vanilla,
      38            0 :             v if v == (PostgresClientProtocol::Interpreted as u8) => {
      39            0 :                 PostgresClientProtocol::Interpreted
      40              :             }
      41            0 :             x => return Err(x),
      42              :         })
      43            0 :     }
      44              : }
      45              : 
      46              : pub struct ConnectionConfigArgs<'a> {
      47              :     pub protocol: PostgresClientProtocol,
      48              : 
      49              :     pub ttid: TenantTimelineId,
      50              :     pub shard_number: Option<u8>,
      51              :     pub shard_count: Option<u8>,
      52              :     pub shard_stripe_size: Option<u32>,
      53              : 
      54              :     pub listen_pg_addr_str: &'a str,
      55              : 
      56              :     pub auth_token: Option<&'a str>,
      57              :     pub availability_zone: Option<&'a str>,
      58              : }
      59              : 
      60              : impl<'a> ConnectionConfigArgs<'a> {
      61           26 :     fn options(&'a self) -> Vec<String> {
      62           26 :         let mut options = vec![
      63           26 :             "-c".to_owned(),
      64           26 :             format!("timeline_id={}", self.ttid.timeline_id),
      65           26 :             format!("tenant_id={}", self.ttid.tenant_id),
      66           26 :             format!("protocol={}", self.protocol as u8),
      67           26 :         ];
      68           26 : 
      69           26 :         if self.shard_number.is_some() {
      70            0 :             assert!(self.shard_count.is_some());
      71            0 :             assert!(self.shard_stripe_size.is_some());
      72              : 
      73            0 :             options.push(format!("shard_count={}", self.shard_count.unwrap()));
      74            0 :             options.push(format!("shard_number={}", self.shard_number.unwrap()));
      75            0 :             options.push(format!(
      76            0 :                 "shard_stripe_size={}",
      77            0 :                 self.shard_stripe_size.unwrap()
      78            0 :             ));
      79           26 :         }
      80              : 
      81           26 :         options
      82           26 :     }
      83              : }
      84              : 
      85              : /// Create client config for fetching WAL from safekeeper on particular timeline.
      86              : /// listen_pg_addr_str is in form host:\[port\].
      87           26 : pub fn wal_stream_connection_config(
      88           26 :     args: ConnectionConfigArgs,
      89           26 : ) -> anyhow::Result<PgConnectionConfig> {
      90           26 :     let (host, port) =
      91           26 :         parse_host_port(args.listen_pg_addr_str).context("Unable to parse listen_pg_addr_str")?;
      92           26 :     let port = port.unwrap_or(5432);
      93           26 :     let mut connstr = PgConnectionConfig::new_host_port(host, port)
      94           26 :         .extend_options(args.options())
      95           26 :         .set_password(args.auth_token.map(|s| s.to_owned()));
      96              : 
      97           26 :     if let Some(availability_zone) = args.availability_zone {
      98            4 :         connstr = connstr.extend_options([format!("availability_zone={}", availability_zone)]);
      99           22 :     }
     100              : 
     101           26 :     Ok(connstr)
     102           26 : }
        

Generated by: LCOV version 2.1-beta