Line data Source code
1 : #![deny(clippy::undocumented_unsafe_blocks)]
2 :
3 : extern crate hyper0 as hyper;
4 :
5 : use std::time::Duration;
6 :
7 : use camino::Utf8PathBuf;
8 : use once_cell::sync::Lazy;
9 : use pem::Pem;
10 : use remote_storage::RemoteStorageConfig;
11 : use storage_broker::Uri;
12 : use tokio::runtime::Runtime;
13 : use utils::auth::SwappableJwtAuth;
14 : use utils::id::NodeId;
15 : use utils::logging::SecretString;
16 :
17 : mod auth;
18 : pub mod broker;
19 : pub mod control_file;
20 : pub mod control_file_upgrade;
21 : pub mod copy_timeline;
22 : pub mod debug_dump;
23 : pub mod handler;
24 : pub mod http;
25 : pub mod metrics;
26 : pub mod patch_control_file;
27 : pub mod pull_timeline;
28 : pub mod rate_limit;
29 : pub mod receive_wal;
30 : pub mod recovery;
31 : pub mod remove_wal;
32 : pub mod safekeeper;
33 : pub mod send_interpreted_wal;
34 : pub mod send_wal;
35 : pub mod state;
36 : pub mod timeline;
37 : pub mod timeline_eviction;
38 : pub mod timeline_guard;
39 : pub mod timeline_manager;
40 : pub mod timelines_set;
41 : pub mod wal_backup;
42 : pub mod wal_backup_partial;
43 : pub mod wal_reader_stream;
44 : pub mod wal_service;
45 : pub mod wal_storage;
46 :
47 : #[cfg(any(test, feature = "benchmarking"))]
48 : pub mod test_utils;
49 :
50 : mod timelines_global_map;
51 : use std::sync::Arc;
52 :
53 : pub use timelines_global_map::GlobalTimelines;
54 : use utils::auth::JwtAuth;
55 :
56 : pub mod defaults {
57 : pub use safekeeper_api::{
58 : DEFAULT_HTTP_LISTEN_ADDR, DEFAULT_HTTP_LISTEN_PORT, DEFAULT_PG_LISTEN_ADDR,
59 : DEFAULT_PG_LISTEN_PORT,
60 : };
61 :
62 : pub const DEFAULT_HEARTBEAT_TIMEOUT: &str = "5000ms";
63 : pub const DEFAULT_MAX_OFFLOADER_LAG_BYTES: u64 = 128 * (1 << 20);
64 : pub const DEFAULT_PARTIAL_BACKUP_TIMEOUT: &str = "15m";
65 : pub const DEFAULT_CONTROL_FILE_SAVE_INTERVAL: &str = "300s";
66 : pub const DEFAULT_PARTIAL_BACKUP_CONCURRENCY: &str = "5";
67 : pub const DEFAULT_EVICTION_CONCURRENCY: usize = 2;
68 :
69 : // By default, our required residency before eviction is the same as the period that passes
70 : // before uploading a partial segment, so that in normal operation the eviction can happen
71 : // as soon as we have done the partial segment upload.
72 : pub const DEFAULT_EVICTION_MIN_RESIDENT: &str = DEFAULT_PARTIAL_BACKUP_TIMEOUT;
73 :
74 : pub const DEFAULT_SSL_KEY_FILE: &str = "server.key";
75 : pub const DEFAULT_SSL_CERT_FILE: &str = "server.crt";
76 : pub const DEFAULT_SSL_CERT_RELOAD_PERIOD: &str = "60s";
77 : }
78 :
79 : #[derive(Debug, Clone)]
80 : pub struct SafeKeeperConf {
81 : // Repository directory, relative to current working directory.
82 : // Normally, the safekeeper changes the current working directory
83 : // to the repository, and 'workdir' is always '.'. But we don't do
84 : // that during unit testing, because the current directory is global
85 : // to the process but different unit tests work on different
86 : // data directories to avoid clashing with each other.
87 : pub workdir: Utf8PathBuf,
88 : pub my_id: NodeId,
89 : pub listen_pg_addr: String,
90 : pub listen_pg_addr_tenant_only: Option<String>,
91 : pub listen_http_addr: String,
92 : pub listen_https_addr: Option<String>,
93 : pub advertise_pg_addr: Option<String>,
94 : pub availability_zone: Option<String>,
95 : pub no_sync: bool,
96 : pub broker_endpoint: Uri,
97 : pub broker_keepalive_interval: Duration,
98 : pub heartbeat_timeout: Duration,
99 : pub peer_recovery_enabled: bool,
100 : pub remote_storage: Option<RemoteStorageConfig>,
101 : pub max_offloader_lag_bytes: u64,
102 : pub backup_parallel_jobs: usize,
103 : pub wal_backup_enabled: bool,
104 : pub pg_auth: Option<Arc<JwtAuth>>,
105 : pub pg_tenant_only_auth: Option<Arc<JwtAuth>>,
106 : pub http_auth: Option<Arc<SwappableJwtAuth>>,
107 : /// JWT token to connect to other safekeepers with.
108 : pub sk_auth_token: Option<SecretString>,
109 : pub current_thread_runtime: bool,
110 : pub walsenders_keep_horizon: bool,
111 : pub partial_backup_timeout: Duration,
112 : pub disable_periodic_broker_push: bool,
113 : pub enable_offload: bool,
114 : pub delete_offloaded_wal: bool,
115 : pub control_file_save_interval: Duration,
116 : pub partial_backup_concurrency: usize,
117 : pub eviction_min_resident: Duration,
118 : pub wal_reader_fanout: bool,
119 : pub max_delta_for_fanout: Option<u64>,
120 : pub ssl_key_file: Utf8PathBuf,
121 : pub ssl_cert_file: Utf8PathBuf,
122 : pub ssl_cert_reload_period: Duration,
123 : pub ssl_ca_certs: Vec<Pem>,
124 : pub use_https_safekeeper_api: bool,
125 : pub enable_tls_wal_service_api: bool,
126 : }
127 :
128 : impl SafeKeeperConf {
129 71 : pub fn is_wal_backup_enabled(&self) -> bool {
130 71 : self.remote_storage.is_some() && self.wal_backup_enabled
131 71 : }
132 : }
133 :
134 : impl SafeKeeperConf {
135 10 : pub fn dummy() -> Self {
136 10 : SafeKeeperConf {
137 10 : workdir: Utf8PathBuf::from("./"),
138 10 : no_sync: false,
139 10 : listen_pg_addr: defaults::DEFAULT_PG_LISTEN_ADDR.to_string(),
140 10 : listen_pg_addr_tenant_only: None,
141 10 : listen_http_addr: defaults::DEFAULT_HTTP_LISTEN_ADDR.to_string(),
142 10 : listen_https_addr: None,
143 10 : advertise_pg_addr: None,
144 10 : availability_zone: None,
145 10 : remote_storage: None,
146 10 : my_id: NodeId(0),
147 10 : broker_endpoint: storage_broker::DEFAULT_ENDPOINT
148 10 : .parse()
149 10 : .expect("failed to parse default broker endpoint"),
150 10 : broker_keepalive_interval: Duration::from_secs(5),
151 10 : peer_recovery_enabled: true,
152 10 : wal_backup_enabled: true,
153 10 : backup_parallel_jobs: 1,
154 10 : pg_auth: None,
155 10 : pg_tenant_only_auth: None,
156 10 : http_auth: None,
157 10 : sk_auth_token: None,
158 10 : heartbeat_timeout: Duration::new(5, 0),
159 10 : max_offloader_lag_bytes: defaults::DEFAULT_MAX_OFFLOADER_LAG_BYTES,
160 10 : current_thread_runtime: false,
161 10 : walsenders_keep_horizon: false,
162 10 : partial_backup_timeout: Duration::from_secs(0),
163 10 : disable_periodic_broker_push: false,
164 10 : enable_offload: false,
165 10 : delete_offloaded_wal: false,
166 10 : control_file_save_interval: Duration::from_secs(1),
167 10 : partial_backup_concurrency: 1,
168 10 : eviction_min_resident: Duration::ZERO,
169 10 : wal_reader_fanout: false,
170 10 : max_delta_for_fanout: None,
171 10 : ssl_key_file: Utf8PathBuf::from(defaults::DEFAULT_SSL_KEY_FILE),
172 10 : ssl_cert_file: Utf8PathBuf::from(defaults::DEFAULT_SSL_CERT_FILE),
173 10 : ssl_cert_reload_period: Duration::from_secs(60),
174 10 : ssl_ca_certs: Vec::new(),
175 10 : use_https_safekeeper_api: false,
176 10 : enable_tls_wal_service_api: false,
177 10 : }
178 10 : }
179 : }
180 :
181 : // Tokio runtimes.
182 0 : pub static WAL_SERVICE_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
183 0 : tokio::runtime::Builder::new_multi_thread()
184 0 : .thread_name("WAL service worker")
185 0 : .enable_all()
186 0 : .build()
187 0 : .expect("Failed to create WAL service runtime")
188 0 : });
189 :
190 0 : pub static HTTP_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
191 0 : tokio::runtime::Builder::new_multi_thread()
192 0 : .thread_name("HTTP worker")
193 0 : .enable_all()
194 0 : .build()
195 0 : .expect("Failed to create HTTP runtime")
196 0 : });
197 :
198 0 : pub static BROKER_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
199 0 : tokio::runtime::Builder::new_multi_thread()
200 0 : .thread_name("broker worker")
201 0 : .worker_threads(2) // there are only 2 tasks, having more threads doesn't make sense
202 0 : .enable_all()
203 0 : .build()
204 0 : .expect("Failed to create broker runtime")
205 0 : });
206 :
207 0 : pub static WAL_BACKUP_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
208 0 : tokio::runtime::Builder::new_multi_thread()
209 0 : .thread_name("WAL backup worker")
210 0 : .enable_all()
211 0 : .build()
212 0 : .expect("Failed to create WAL backup runtime")
213 0 : });
214 :
215 0 : pub static BACKGROUND_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
216 0 : tokio::runtime::Builder::new_multi_thread()
217 0 : .thread_name("background worker")
218 0 : .worker_threads(1) // there is only one task now (ssl certificate reloading), having more threads doesn't make sense
219 0 : .enable_all()
220 0 : .build()
221 0 : .expect("Failed to create background runtime")
222 0 : });
|