LCOV - code coverage report
Current view: top level - libs/utils/src - postgres_client.rs (source / functions) Coverage Total Hit
Test: 8b13a09a5c233d98abd4a0d3e59157e7db16d6fd.info Lines: 72.5 % 40 29
Test Date: 2024-11-21 10:53:51 Functions: 1.2 % 167 2

            Line data    Source code
       1              : //! Postgres client connection code common to other crates (safekeeper and
       2              : //! pageserver) which depends on tenant/timeline ids and thus not fitting into
       3              : //! postgres_connection crate.
       4              : 
       5              : use anyhow::Context;
       6              : use postgres_connection::{parse_host_port, PgConnectionConfig};
       7              : 
       8              : use crate::id::TenantTimelineId;
       9              : 
      10            0 : #[derive(Copy, Clone, PartialEq, Eq, Debug, serde::Serialize, serde::Deserialize)]
      11              : #[serde(rename_all = "kebab-case")]
      12              : pub enum InterpretedFormat {
      13              :     Bincode,
      14              :     Protobuf,
      15              : }
      16              : 
      17            0 : #[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
      18              : #[serde(rename_all = "kebab-case")]
      19              : pub enum Compression {
      20              :     Zstd { level: i8 },
      21              : }
      22              : 
      23            0 : #[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
      24              : #[serde(tag = "type", content = "args")]
      25              : #[serde(rename_all = "kebab-case")]
      26              : pub enum PostgresClientProtocol {
      27              :     /// Usual Postgres replication protocol
      28              :     Vanilla,
      29              :     /// Custom shard-aware protocol that replicates interpreted records.
      30              :     /// Used to send wal from safekeeper to pageserver.
      31              :     Interpreted {
      32              :         format: InterpretedFormat,
      33              :         compression: Option<Compression>,
      34              :     },
      35              : }
      36              : 
      37              : pub struct ConnectionConfigArgs<'a> {
      38              :     pub protocol: PostgresClientProtocol,
      39              : 
      40              :     pub ttid: TenantTimelineId,
      41              :     pub shard_number: Option<u8>,
      42              :     pub shard_count: Option<u8>,
      43              :     pub shard_stripe_size: Option<u32>,
      44              : 
      45              :     pub listen_pg_addr_str: &'a str,
      46              : 
      47              :     pub auth_token: Option<&'a str>,
      48              :     pub availability_zone: Option<&'a str>,
      49              : }
      50              : 
      51              : impl<'a> ConnectionConfigArgs<'a> {
      52           26 :     fn options(&'a self) -> Vec<String> {
      53           26 :         let mut options = vec![
      54           26 :             "-c".to_owned(),
      55           26 :             format!("timeline_id={}", self.ttid.timeline_id),
      56           26 :             format!("tenant_id={}", self.ttid.tenant_id),
      57           26 :             format!(
      58           26 :                 "protocol={}",
      59           26 :                 serde_json::to_string(&self.protocol).unwrap()
      60           26 :             ),
      61           26 :         ];
      62           26 : 
      63           26 :         if self.shard_number.is_some() {
      64            0 :             assert!(self.shard_count.is_some());
      65            0 :             assert!(self.shard_stripe_size.is_some());
      66              : 
      67            0 :             options.push(format!("shard_count={}", self.shard_count.unwrap()));
      68            0 :             options.push(format!("shard_number={}", self.shard_number.unwrap()));
      69            0 :             options.push(format!(
      70            0 :                 "shard_stripe_size={}",
      71            0 :                 self.shard_stripe_size.unwrap()
      72            0 :             ));
      73           26 :         }
      74              : 
      75           26 :         options
      76           26 :     }
      77              : }
      78              : 
      79              : /// Create client config for fetching WAL from safekeeper on particular timeline.
      80              : /// listen_pg_addr_str is in form host:\[port\].
      81           26 : pub fn wal_stream_connection_config(
      82           26 :     args: ConnectionConfigArgs,
      83           26 : ) -> anyhow::Result<PgConnectionConfig> {
      84           26 :     let (host, port) =
      85           26 :         parse_host_port(args.listen_pg_addr_str).context("Unable to parse listen_pg_addr_str")?;
      86           26 :     let port = port.unwrap_or(5432);
      87           26 :     let mut connstr = PgConnectionConfig::new_host_port(host, port)
      88           26 :         .extend_options(args.options())
      89           26 :         .set_password(args.auth_token.map(|s| s.to_owned()));
      90              : 
      91           26 :     if let Some(availability_zone) = args.availability_zone {
      92            4 :         connstr = connstr.extend_options([format!("availability_zone={}", availability_zone)]);
      93           22 :     }
      94              : 
      95           26 :     Ok(connstr)
      96           26 : }
        

Generated by: LCOV version 2.1-beta