Line data Source code
1 : #![deny(clippy::undocumented_unsafe_blocks)]
2 :
3 : extern crate hyper0 as hyper;
4 : use postgres_backend::AuthType;
5 :
6 : use std::time::Duration;
7 :
8 : use camino::Utf8PathBuf;
9 : use once_cell::sync::Lazy;
10 : use pem::Pem;
11 : use remote_storage::RemoteStorageConfig;
12 : use storage_broker::Uri;
13 : use tokio::runtime::Runtime;
14 : use url::Url;
15 : use utils::auth::SwappableJwtAuth;
16 : use utils::id::NodeId;
17 : use utils::logging::SecretString;
18 :
19 : mod auth;
20 : pub mod broker;
21 : pub mod control_file;
22 : pub mod control_file_upgrade;
23 : pub mod copy_timeline;
24 : pub mod debug_dump;
25 : pub mod hadron;
26 : pub mod handler;
27 : pub mod http;
28 : pub mod metrics;
29 : pub mod patch_control_file;
30 : pub mod pull_timeline;
31 : pub mod rate_limit;
32 : pub mod receive_wal;
33 : pub mod recovery;
34 : pub mod remove_wal;
35 : pub mod safekeeper;
36 : pub mod send_interpreted_wal;
37 : pub mod send_wal;
38 : pub mod state;
39 : pub mod timeline;
40 : pub mod timeline_eviction;
41 : pub mod timeline_guard;
42 : pub mod timeline_manager;
43 : pub mod timelines_set;
44 : pub mod wal_backup;
45 : pub mod wal_backup_partial;
46 : pub mod wal_reader_stream;
47 : pub mod wal_service;
48 : pub mod wal_storage;
49 :
50 : #[cfg(any(test, feature = "benchmarking"))]
51 : pub mod test_utils;
52 :
53 : mod timelines_global_map;
54 :
55 : use std::sync::Arc;
56 :
57 : pub use timelines_global_map::GlobalTimelines;
58 : use utils::auth::JwtAuth;
59 :
60 : pub mod defaults {
61 : pub use safekeeper_api::{
62 : DEFAULT_HTTP_LISTEN_ADDR, DEFAULT_HTTP_LISTEN_PORT, DEFAULT_PG_LISTEN_ADDR,
63 : DEFAULT_PG_LISTEN_PORT,
64 : };
65 :
66 : pub const DEFAULT_HEARTBEAT_TIMEOUT: &str = "5000ms";
67 : pub const DEFAULT_MAX_OFFLOADER_LAG_BYTES: u64 = 128 * (1 << 20);
68 : /* BEGIN_HADRON */
69 : // Default leader re-elect is 0(disabled). SK will re-elect leader if the current leader is lagging this many bytes.
70 : pub const DEFAULT_MAX_REELECT_OFFLOADER_LAG_BYTES: u64 = 0;
71 : // Default disk usage limit is 0 (disabled). It means each timeline by default can use up to this many WAL
72 : // disk space on this SK until SK begins to reject WALs.
73 : pub const DEFAULT_MAX_TIMELINE_DISK_USAGE_BYTES: u64 = 0;
74 : /* END_HADRON */
75 : pub const DEFAULT_PARTIAL_BACKUP_TIMEOUT: &str = "15m";
76 : pub const DEFAULT_CONTROL_FILE_SAVE_INTERVAL: &str = "300s";
77 : pub const DEFAULT_PARTIAL_BACKUP_CONCURRENCY: &str = "5";
78 : pub const DEFAULT_EVICTION_CONCURRENCY: usize = 2;
79 :
80 : // By default, our required residency before eviction is the same as the period that passes
81 : // before uploading a partial segment, so that in normal operation the eviction can happen
82 : // as soon as we have done the partial segment upload.
83 : pub const DEFAULT_EVICTION_MIN_RESIDENT: &str = DEFAULT_PARTIAL_BACKUP_TIMEOUT;
84 :
85 : pub const DEFAULT_SSL_KEY_FILE: &str = "server.key";
86 : pub const DEFAULT_SSL_CERT_FILE: &str = "server.crt";
87 : pub const DEFAULT_SSL_CERT_RELOAD_PERIOD: &str = "60s";
88 :
89 : // Global disk watcher defaults
90 : pub const DEFAULT_GLOBAL_DISK_CHECK_INTERVAL: &str = "60s";
91 : pub const DEFAULT_MAX_GLOBAL_DISK_USAGE_RATIO: f64 = 0.0;
92 : }
93 :
94 : #[derive(Debug, Clone)]
95 : pub struct SafeKeeperConf {
96 : // Repository directory, relative to current working directory.
97 : // Normally, the safekeeper changes the current working directory
98 : // to the repository, and 'workdir' is always '.'. But we don't do
99 : // that during unit testing, because the current directory is global
100 : // to the process but different unit tests work on different
101 : // data directories to avoid clashing with each other.
102 : pub workdir: Utf8PathBuf,
103 : pub my_id: NodeId,
104 : pub listen_pg_addr: String,
105 : pub listen_pg_addr_tenant_only: Option<String>,
106 : pub listen_http_addr: String,
107 : pub listen_https_addr: Option<String>,
108 : pub advertise_pg_addr: Option<String>,
109 : pub availability_zone: Option<String>,
110 : pub no_sync: bool,
111 : /* BEGIN_HADRON */
112 : pub advertise_pg_addr_tenant_only: Option<String>,
113 : pub enable_pull_timeline_on_startup: bool,
114 : pub hcc_base_url: Option<Url>,
115 : /* END_HADRON */
116 : pub broker_endpoint: Uri,
117 : pub broker_keepalive_interval: Duration,
118 : pub heartbeat_timeout: Duration,
119 : pub peer_recovery_enabled: bool,
120 : pub remote_storage: Option<RemoteStorageConfig>,
121 : pub max_offloader_lag_bytes: u64,
122 : /* BEGIN_HADRON */
123 : pub max_reelect_offloader_lag_bytes: u64,
124 : pub max_timeline_disk_usage_bytes: u64,
125 : /// How often to check the working directory's filesystem for total disk usage.
126 : pub global_disk_check_interval: Duration,
127 : /// The portion of the filesystem capacity that can be used by all timelines.
128 : pub max_global_disk_usage_ratio: f64,
129 : /* END_HADRON */
130 : pub backup_parallel_jobs: usize,
131 : pub wal_backup_enabled: bool,
132 : pub auth_type: AuthType,
133 : pub pg_auth: Option<Arc<JwtAuth>>,
134 : pub pg_tenant_only_auth: Option<Arc<JwtAuth>>,
135 : pub http_auth: Option<Arc<SwappableJwtAuth>>,
136 : /// JWT token to connect to other safekeepers with.
137 : pub sk_auth_token: Option<SecretString>,
138 : pub current_thread_runtime: bool,
139 : pub walsenders_keep_horizon: bool,
140 : pub partial_backup_timeout: Duration,
141 : pub disable_periodic_broker_push: bool,
142 : pub enable_offload: bool,
143 : pub delete_offloaded_wal: bool,
144 : pub control_file_save_interval: Duration,
145 : pub partial_backup_concurrency: usize,
146 : pub eviction_min_resident: Duration,
147 : pub wal_reader_fanout: bool,
148 : pub max_delta_for_fanout: Option<u64>,
149 : pub ssl_key_file: Utf8PathBuf,
150 : pub ssl_cert_file: Utf8PathBuf,
151 : pub ssl_cert_reload_period: Duration,
152 : pub ssl_ca_certs: Vec<Pem>,
153 : pub use_https_safekeeper_api: bool,
154 : pub enable_tls_wal_service_api: bool,
155 : pub force_metric_collection_on_scrape: bool,
156 : }
157 :
158 : impl SafeKeeperConf {
159 11 : pub fn dummy() -> Self {
160 11 : SafeKeeperConf {
161 11 : workdir: Utf8PathBuf::from("./"),
162 11 : no_sync: false,
163 11 : listen_pg_addr: defaults::DEFAULT_PG_LISTEN_ADDR.to_string(),
164 11 : listen_pg_addr_tenant_only: None,
165 11 : listen_http_addr: defaults::DEFAULT_HTTP_LISTEN_ADDR.to_string(),
166 11 : listen_https_addr: None,
167 11 : advertise_pg_addr: None,
168 11 : availability_zone: None,
169 11 : remote_storage: None,
170 11 : my_id: NodeId(0),
171 11 : broker_endpoint: storage_broker::DEFAULT_ENDPOINT
172 11 : .parse()
173 11 : .expect("failed to parse default broker endpoint"),
174 11 : broker_keepalive_interval: Duration::from_secs(5),
175 11 : peer_recovery_enabled: true,
176 11 : wal_backup_enabled: true,
177 11 : backup_parallel_jobs: 1,
178 11 : auth_type: AuthType::HadronJWT,
179 11 : pg_auth: None,
180 11 : pg_tenant_only_auth: None,
181 11 : http_auth: None,
182 11 : sk_auth_token: None,
183 11 : heartbeat_timeout: Duration::new(5, 0),
184 11 : max_offloader_lag_bytes: defaults::DEFAULT_MAX_OFFLOADER_LAG_BYTES,
185 11 : /* BEGIN_HADRON */
186 11 : max_reelect_offloader_lag_bytes: defaults::DEFAULT_MAX_REELECT_OFFLOADER_LAG_BYTES,
187 11 : max_timeline_disk_usage_bytes: defaults::DEFAULT_MAX_TIMELINE_DISK_USAGE_BYTES,
188 11 : global_disk_check_interval: Duration::from_secs(60),
189 11 : max_global_disk_usage_ratio: defaults::DEFAULT_MAX_GLOBAL_DISK_USAGE_RATIO,
190 11 : /* END_HADRON */
191 11 : current_thread_runtime: false,
192 11 : walsenders_keep_horizon: false,
193 11 : partial_backup_timeout: Duration::from_secs(0),
194 11 : disable_periodic_broker_push: false,
195 11 : enable_offload: false,
196 11 : delete_offloaded_wal: false,
197 11 : control_file_save_interval: Duration::from_secs(1),
198 11 : partial_backup_concurrency: 1,
199 11 : eviction_min_resident: Duration::ZERO,
200 11 : wal_reader_fanout: false,
201 11 : max_delta_for_fanout: None,
202 11 : ssl_key_file: Utf8PathBuf::from(defaults::DEFAULT_SSL_KEY_FILE),
203 11 : ssl_cert_file: Utf8PathBuf::from(defaults::DEFAULT_SSL_CERT_FILE),
204 11 : ssl_cert_reload_period: Duration::from_secs(60),
205 11 : ssl_ca_certs: Vec::new(),
206 11 : use_https_safekeeper_api: false,
207 11 : enable_tls_wal_service_api: false,
208 11 : force_metric_collection_on_scrape: true,
209 11 : /* BEGIN_HADRON */
210 11 : advertise_pg_addr_tenant_only: None,
211 11 : enable_pull_timeline_on_startup: false,
212 11 : hcc_base_url: None,
213 11 : /* END_HADRON */
214 11 : }
215 11 : }
216 : }
217 :
218 : // Tokio runtimes.
219 0 : pub static WAL_SERVICE_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
220 0 : tokio::runtime::Builder::new_multi_thread()
221 0 : .thread_name("WAL service worker")
222 0 : .enable_all()
223 0 : .build()
224 0 : .expect("Failed to create WAL service runtime")
225 0 : });
226 :
227 0 : pub static HTTP_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
228 0 : tokio::runtime::Builder::new_multi_thread()
229 0 : .thread_name("HTTP worker")
230 0 : .enable_all()
231 0 : .build()
232 0 : .expect("Failed to create HTTP runtime")
233 0 : });
234 :
235 0 : pub static BROKER_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
236 0 : tokio::runtime::Builder::new_multi_thread()
237 0 : .thread_name("broker worker")
238 0 : .worker_threads(2) // there are only 2 tasks, having more threads doesn't make sense
239 0 : .enable_all()
240 0 : .build()
241 0 : .expect("Failed to create broker runtime")
242 0 : });
243 :
244 0 : pub static WAL_BACKUP_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
245 0 : tokio::runtime::Builder::new_multi_thread()
246 0 : .thread_name("WAL backup worker")
247 0 : .enable_all()
248 0 : .build()
249 0 : .expect("Failed to create WAL backup runtime")
250 0 : });
251 :
252 : /// Hadron: Dedicated runtime for infrequent background tasks.
253 0 : pub static BACKGROUND_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
254 0 : tokio::runtime::Builder::new_multi_thread()
255 0 : .thread_name("Hadron background worker")
256 0 : // One worker thread is enough, as most of the actual tasks run on blocking threads
257 0 : // which has it own thread pool.
258 0 : .worker_threads(1)
259 0 : .enable_all()
260 0 : .build()
261 0 : .expect("Failed to create background runtime")
262 0 : });
|