Line data Source code
1 : //! Postgres client connection code common to other crates (safekeeper and
2 : //! pageserver) which depends on tenant/timeline ids and thus not fitting into
3 : //! postgres_connection crate.
4 :
5 : use anyhow::Context;
6 : use postgres_connection::{parse_host_port, PgConnectionConfig};
7 :
8 : use crate::id::TenantTimelineId;
9 :
10 : /// Postgres client protocol types
11 : #[derive(
12 : Copy,
13 : Clone,
14 : PartialEq,
15 : Eq,
16 0 : strum_macros::EnumString,
17 0 : strum_macros::Display,
18 0 : serde_with::DeserializeFromStr,
19 : serde_with::SerializeDisplay,
20 : Debug,
21 : )]
22 : #[strum(serialize_all = "kebab-case")]
23 : #[repr(u8)]
24 : pub enum PostgresClientProtocol {
25 : /// Usual Postgres replication protocol
26 : Vanilla,
27 : /// Custom shard-aware protocol that replicates interpreted records.
28 : /// Used to send wal from safekeeper to pageserver.
29 : Interpreted,
30 : }
31 :
32 : impl TryFrom<u8> for PostgresClientProtocol {
33 : type Error = u8;
34 :
35 0 : fn try_from(value: u8) -> Result<Self, Self::Error> {
36 0 : Ok(match value {
37 0 : v if v == (PostgresClientProtocol::Vanilla as u8) => PostgresClientProtocol::Vanilla,
38 0 : v if v == (PostgresClientProtocol::Interpreted as u8) => {
39 0 : PostgresClientProtocol::Interpreted
40 : }
41 0 : x => return Err(x),
42 : })
43 0 : }
44 : }
45 :
46 : pub struct ConnectionConfigArgs<'a> {
47 : pub protocol: PostgresClientProtocol,
48 :
49 : pub ttid: TenantTimelineId,
50 : pub shard_number: Option<u8>,
51 : pub shard_count: Option<u8>,
52 : pub shard_stripe_size: Option<u32>,
53 :
54 : pub listen_pg_addr_str: &'a str,
55 :
56 : pub auth_token: Option<&'a str>,
57 : pub availability_zone: Option<&'a str>,
58 : }
59 :
60 : impl<'a> ConnectionConfigArgs<'a> {
61 26 : fn options(&'a self) -> Vec<String> {
62 26 : let mut options = vec![
63 26 : "-c".to_owned(),
64 26 : format!("timeline_id={}", self.ttid.timeline_id),
65 26 : format!("tenant_id={}", self.ttid.tenant_id),
66 26 : format!("protocol={}", self.protocol as u8),
67 26 : ];
68 26 :
69 26 : if self.shard_number.is_some() {
70 0 : assert!(self.shard_count.is_some());
71 0 : assert!(self.shard_stripe_size.is_some());
72 :
73 0 : options.push(format!("shard_count={}", self.shard_count.unwrap()));
74 0 : options.push(format!("shard_number={}", self.shard_number.unwrap()));
75 0 : options.push(format!(
76 0 : "shard_stripe_size={}",
77 0 : self.shard_stripe_size.unwrap()
78 0 : ));
79 26 : }
80 :
81 26 : options
82 26 : }
83 : }
84 :
85 : /// Create client config for fetching WAL from safekeeper on particular timeline.
86 : /// listen_pg_addr_str is in form host:\[port\].
87 26 : pub fn wal_stream_connection_config(
88 26 : args: ConnectionConfigArgs,
89 26 : ) -> anyhow::Result<PgConnectionConfig> {
90 26 : let (host, port) =
91 26 : parse_host_port(args.listen_pg_addr_str).context("Unable to parse listen_pg_addr_str")?;
92 26 : let port = port.unwrap_or(5432);
93 26 : let mut connstr = PgConnectionConfig::new_host_port(host, port)
94 26 : .extend_options(args.options())
95 26 : .set_password(args.auth_token.map(|s| s.to_owned()));
96 :
97 26 : if let Some(availability_zone) = args.availability_zone {
98 4 : connstr = connstr.extend_options([format!("availability_zone={}", availability_zone)]);
99 22 : }
100 :
101 26 : Ok(connstr)
102 26 : }
|