Line data Source code
1 : #![deny(clippy::undocumented_unsafe_blocks)]
2 :
3 : extern crate hyper0 as hyper;
4 :
5 : use camino::Utf8PathBuf;
6 : use once_cell::sync::Lazy;
7 : use remote_storage::RemoteStorageConfig;
8 : use tokio::runtime::Runtime;
9 :
10 : use std::time::Duration;
11 : use storage_broker::Uri;
12 :
13 : use utils::{auth::SwappableJwtAuth, id::NodeId, logging::SecretString};
14 :
15 : mod auth;
16 : pub mod broker;
17 : pub mod control_file;
18 : pub mod control_file_upgrade;
19 : pub mod copy_timeline;
20 : pub mod debug_dump;
21 : pub mod handler;
22 : pub mod http;
23 : pub mod json_ctrl;
24 : pub mod metrics;
25 : pub mod patch_control_file;
26 : pub mod pull_timeline;
27 : pub mod rate_limit;
28 : pub mod receive_wal;
29 : pub mod recovery;
30 : pub mod remove_wal;
31 : pub mod safekeeper;
32 : pub mod send_interpreted_wal;
33 : pub mod send_wal;
34 : pub mod state;
35 : pub mod timeline;
36 : pub mod timeline_eviction;
37 : pub mod timeline_guard;
38 : pub mod timeline_manager;
39 : pub mod timelines_set;
40 : pub mod wal_backup;
41 : pub mod wal_backup_partial;
42 : pub mod wal_reader_stream;
43 : pub mod wal_service;
44 : pub mod wal_storage;
45 :
46 : #[cfg(any(test, feature = "benchmarking"))]
47 : pub mod test_utils;
48 :
49 : mod timelines_global_map;
50 : use std::sync::Arc;
51 : pub use timelines_global_map::GlobalTimelines;
52 : use utils::auth::JwtAuth;
53 :
54 : pub mod defaults {
55 : pub use safekeeper_api::{
56 : DEFAULT_HTTP_LISTEN_ADDR, DEFAULT_HTTP_LISTEN_PORT, DEFAULT_PG_LISTEN_ADDR,
57 : DEFAULT_PG_LISTEN_PORT,
58 : };
59 :
60 : pub const DEFAULT_HEARTBEAT_TIMEOUT: &str = "5000ms";
61 : pub const DEFAULT_MAX_OFFLOADER_LAG_BYTES: u64 = 128 * (1 << 20);
62 : pub const DEFAULT_PARTIAL_BACKUP_TIMEOUT: &str = "15m";
63 : pub const DEFAULT_CONTROL_FILE_SAVE_INTERVAL: &str = "300s";
64 : pub const DEFAULT_PARTIAL_BACKUP_CONCURRENCY: &str = "5";
65 : pub const DEFAULT_EVICTION_CONCURRENCY: usize = 2;
66 :
67 : // By default, our required residency before eviction is the same as the period that passes
68 : // before uploading a partial segment, so that in normal operation the eviction can happen
69 : // as soon as we have done the partial segment upload.
70 : pub const DEFAULT_EVICTION_MIN_RESIDENT: &str = DEFAULT_PARTIAL_BACKUP_TIMEOUT;
71 : }
72 :
73 : #[derive(Debug, Clone)]
74 : pub struct SafeKeeperConf {
75 : // Repository directory, relative to current working directory.
76 : // Normally, the safekeeper changes the current working directory
77 : // to the repository, and 'workdir' is always '.'. But we don't do
78 : // that during unit testing, because the current directory is global
79 : // to the process but different unit tests work on different
80 : // data directories to avoid clashing with each other.
81 : pub workdir: Utf8PathBuf,
82 : pub my_id: NodeId,
83 : pub listen_pg_addr: String,
84 : pub listen_pg_addr_tenant_only: Option<String>,
85 : pub listen_http_addr: String,
86 : pub advertise_pg_addr: Option<String>,
87 : pub availability_zone: Option<String>,
88 : pub no_sync: bool,
89 : pub broker_endpoint: Uri,
90 : pub broker_keepalive_interval: Duration,
91 : pub heartbeat_timeout: Duration,
92 : pub peer_recovery_enabled: bool,
93 : pub remote_storage: Option<RemoteStorageConfig>,
94 : pub max_offloader_lag_bytes: u64,
95 : pub backup_parallel_jobs: usize,
96 : pub wal_backup_enabled: bool,
97 : pub pg_auth: Option<Arc<JwtAuth>>,
98 : pub pg_tenant_only_auth: Option<Arc<JwtAuth>>,
99 : pub http_auth: Option<Arc<SwappableJwtAuth>>,
100 : /// JWT token to connect to other safekeepers with.
101 : pub sk_auth_token: Option<SecretString>,
102 : pub current_thread_runtime: bool,
103 : pub walsenders_keep_horizon: bool,
104 : pub partial_backup_timeout: Duration,
105 : pub disable_periodic_broker_push: bool,
106 : pub enable_offload: bool,
107 : pub delete_offloaded_wal: bool,
108 : pub control_file_save_interval: Duration,
109 : pub partial_backup_concurrency: usize,
110 : pub eviction_min_resident: Duration,
111 : pub wal_reader_fanout: bool,
112 : pub max_delta_for_fanout: Option<u64>,
113 : }
114 :
115 : impl SafeKeeperConf {
116 45 : pub fn is_wal_backup_enabled(&self) -> bool {
117 45 : self.remote_storage.is_some() && self.wal_backup_enabled
118 45 : }
119 : }
120 :
121 : impl SafeKeeperConf {
122 6 : pub fn dummy() -> Self {
123 6 : SafeKeeperConf {
124 6 : workdir: Utf8PathBuf::from("./"),
125 6 : no_sync: false,
126 6 : listen_pg_addr: defaults::DEFAULT_PG_LISTEN_ADDR.to_string(),
127 6 : listen_pg_addr_tenant_only: None,
128 6 : listen_http_addr: defaults::DEFAULT_HTTP_LISTEN_ADDR.to_string(),
129 6 : advertise_pg_addr: None,
130 6 : availability_zone: None,
131 6 : remote_storage: None,
132 6 : my_id: NodeId(0),
133 6 : broker_endpoint: storage_broker::DEFAULT_ENDPOINT
134 6 : .parse()
135 6 : .expect("failed to parse default broker endpoint"),
136 6 : broker_keepalive_interval: Duration::from_secs(5),
137 6 : peer_recovery_enabled: true,
138 6 : wal_backup_enabled: true,
139 6 : backup_parallel_jobs: 1,
140 6 : pg_auth: None,
141 6 : pg_tenant_only_auth: None,
142 6 : http_auth: None,
143 6 : sk_auth_token: None,
144 6 : heartbeat_timeout: Duration::new(5, 0),
145 6 : max_offloader_lag_bytes: defaults::DEFAULT_MAX_OFFLOADER_LAG_BYTES,
146 6 : current_thread_runtime: false,
147 6 : walsenders_keep_horizon: false,
148 6 : partial_backup_timeout: Duration::from_secs(0),
149 6 : disable_periodic_broker_push: false,
150 6 : enable_offload: false,
151 6 : delete_offloaded_wal: false,
152 6 : control_file_save_interval: Duration::from_secs(1),
153 6 : partial_backup_concurrency: 1,
154 6 : eviction_min_resident: Duration::ZERO,
155 6 : wal_reader_fanout: false,
156 6 : max_delta_for_fanout: None,
157 6 : }
158 6 : }
159 : }
160 :
161 : // Tokio runtimes.
162 0 : pub static WAL_SERVICE_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
163 0 : tokio::runtime::Builder::new_multi_thread()
164 0 : .thread_name("WAL service worker")
165 0 : .enable_all()
166 0 : .build()
167 0 : .expect("Failed to create WAL service runtime")
168 0 : });
169 :
170 0 : pub static HTTP_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
171 0 : tokio::runtime::Builder::new_multi_thread()
172 0 : .thread_name("HTTP worker")
173 0 : .enable_all()
174 0 : .build()
175 0 : .expect("Failed to create HTTP runtime")
176 0 : });
177 :
178 0 : pub static BROKER_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
179 0 : tokio::runtime::Builder::new_multi_thread()
180 0 : .thread_name("broker worker")
181 0 : .worker_threads(2) // there are only 2 tasks, having more threads doesn't make sense
182 0 : .enable_all()
183 0 : .build()
184 0 : .expect("Failed to create broker runtime")
185 0 : });
186 :
187 0 : pub static WAL_BACKUP_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
188 0 : tokio::runtime::Builder::new_multi_thread()
189 0 : .thread_name("WAL backup worker")
190 0 : .enable_all()
191 0 : .build()
192 0 : .expect("Failed to create WAL backup runtime")
193 0 : });
|