Line data Source code
1 : //! Postgres client connection code common to other crates (safekeeper and
2 : //! pageserver) which depends on tenant/timeline ids and thus not fitting into
3 : //! postgres_connection crate.
4 :
5 : use anyhow::Context;
6 : use postgres_connection::{parse_host_port, PgConnectionConfig};
7 :
8 : use crate::id::TenantTimelineId;
9 :
10 0 : #[derive(Copy, Clone, PartialEq, Eq, Debug, serde::Serialize, serde::Deserialize)]
11 : #[serde(rename_all = "kebab-case")]
12 : pub enum InterpretedFormat {
13 : Bincode,
14 : Protobuf,
15 : }
16 :
17 0 : #[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
18 : #[serde(rename_all = "kebab-case")]
19 : pub enum Compression {
20 : Zstd { level: i8 },
21 : }
22 :
23 0 : #[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
24 : #[serde(tag = "type", content = "args")]
25 : #[serde(rename_all = "kebab-case")]
26 : pub enum PostgresClientProtocol {
27 : /// Usual Postgres replication protocol
28 : Vanilla,
29 : /// Custom shard-aware protocol that replicates interpreted records.
30 : /// Used to send wal from safekeeper to pageserver.
31 : Interpreted {
32 : format: InterpretedFormat,
33 : compression: Option<Compression>,
34 : },
35 : }
36 :
37 : pub struct ConnectionConfigArgs<'a> {
38 : pub protocol: PostgresClientProtocol,
39 :
40 : pub ttid: TenantTimelineId,
41 : pub shard_number: Option<u8>,
42 : pub shard_count: Option<u8>,
43 : pub shard_stripe_size: Option<u32>,
44 :
45 : pub listen_pg_addr_str: &'a str,
46 :
47 : pub auth_token: Option<&'a str>,
48 : pub availability_zone: Option<&'a str>,
49 : }
50 :
51 : impl<'a> ConnectionConfigArgs<'a> {
52 26 : fn options(&'a self) -> Vec<String> {
53 26 : let mut options = vec![
54 26 : "-c".to_owned(),
55 26 : format!("timeline_id={}", self.ttid.timeline_id),
56 26 : format!("tenant_id={}", self.ttid.tenant_id),
57 26 : format!(
58 26 : "protocol={}",
59 26 : serde_json::to_string(&self.protocol).unwrap()
60 26 : ),
61 26 : ];
62 26 :
63 26 : if self.shard_number.is_some() {
64 0 : assert!(self.shard_count.is_some());
65 0 : assert!(self.shard_stripe_size.is_some());
66 :
67 0 : options.push(format!("shard_count={}", self.shard_count.unwrap()));
68 0 : options.push(format!("shard_number={}", self.shard_number.unwrap()));
69 0 : options.push(format!(
70 0 : "shard_stripe_size={}",
71 0 : self.shard_stripe_size.unwrap()
72 0 : ));
73 26 : }
74 :
75 26 : options
76 26 : }
77 : }
78 :
79 : /// Create client config for fetching WAL from safekeeper on particular timeline.
80 : /// listen_pg_addr_str is in form host:\[port\].
81 26 : pub fn wal_stream_connection_config(
82 26 : args: ConnectionConfigArgs,
83 26 : ) -> anyhow::Result<PgConnectionConfig> {
84 26 : let (host, port) =
85 26 : parse_host_port(args.listen_pg_addr_str).context("Unable to parse listen_pg_addr_str")?;
86 26 : let port = port.unwrap_or(5432);
87 26 : let mut connstr = PgConnectionConfig::new_host_port(host, port)
88 26 : .extend_options(args.options())
89 26 : .set_password(args.auth_token.map(|s| s.to_owned()));
90 :
91 26 : if let Some(availability_zone) = args.availability_zone {
92 4 : connstr = connstr.extend_options([format!("availability_zone={}", availability_zone)]);
93 22 : }
94 :
95 26 : Ok(connstr)
96 26 : }
|