Line data Source code
1 : #![deny(clippy::undocumented_unsafe_blocks)]
2 : use camino::Utf8PathBuf;
3 : use once_cell::sync::Lazy;
4 : use remote_storage::RemoteStorageConfig;
5 : use tokio::runtime::Runtime;
6 :
7 : use std::time::Duration;
8 : use storage_broker::Uri;
9 :
10 : use utils::{auth::SwappableJwtAuth, id::NodeId, logging::SecretString};
11 :
12 : mod auth;
13 : pub mod broker;
14 : pub mod control_file;
15 : pub mod control_file_upgrade;
16 : pub mod copy_timeline;
17 : pub mod debug_dump;
18 : pub mod handler;
19 : pub mod http;
20 : pub mod json_ctrl;
21 : pub mod metrics;
22 : pub mod patch_control_file;
23 : pub mod pull_timeline;
24 : pub mod receive_wal;
25 : pub mod recovery;
26 : pub mod remove_wal;
27 : pub mod safekeeper;
28 : pub mod send_wal;
29 : pub mod state;
30 : pub mod timeline;
31 : pub mod timeline_eviction;
32 : pub mod timeline_guard;
33 : pub mod timeline_manager;
34 : pub mod timelines_set;
35 : pub mod wal_backup;
36 : pub mod wal_backup_partial;
37 : pub mod wal_service;
38 : pub mod wal_storage;
39 :
40 : mod timelines_global_map;
41 : use std::sync::Arc;
42 : pub use timelines_global_map::GlobalTimelines;
43 : use utils::auth::JwtAuth;
44 :
45 : pub mod defaults {
46 : pub use safekeeper_api::{
47 : DEFAULT_HTTP_LISTEN_ADDR, DEFAULT_HTTP_LISTEN_PORT, DEFAULT_PG_LISTEN_ADDR,
48 : DEFAULT_PG_LISTEN_PORT,
49 : };
50 :
51 : pub const DEFAULT_HEARTBEAT_TIMEOUT: &str = "5000ms";
52 : pub const DEFAULT_MAX_OFFLOADER_LAG_BYTES: u64 = 128 * (1 << 20);
53 : pub const DEFAULT_PARTIAL_BACKUP_TIMEOUT: &str = "15m";
54 : pub const DEFAULT_CONTROL_FILE_SAVE_INTERVAL: &str = "300s";
55 : pub const DEFAULT_PARTIAL_BACKUP_CONCURRENCY: &str = "5";
56 : }
57 :
58 : #[derive(Debug, Clone)]
59 : pub struct SafeKeeperConf {
60 : // Repository directory, relative to current working directory.
61 : // Normally, the safekeeper changes the current working directory
62 : // to the repository, and 'workdir' is always '.'. But we don't do
63 : // that during unit testing, because the current directory is global
64 : // to the process but different unit tests work on different
65 : // data directories to avoid clashing with each other.
66 : pub workdir: Utf8PathBuf,
67 : pub my_id: NodeId,
68 : pub listen_pg_addr: String,
69 : pub listen_pg_addr_tenant_only: Option<String>,
70 : pub listen_http_addr: String,
71 : pub advertise_pg_addr: Option<String>,
72 : pub availability_zone: Option<String>,
73 : pub no_sync: bool,
74 : pub broker_endpoint: Uri,
75 : pub broker_keepalive_interval: Duration,
76 : pub heartbeat_timeout: Duration,
77 : pub peer_recovery_enabled: bool,
78 : pub remote_storage: Option<RemoteStorageConfig>,
79 : pub max_offloader_lag_bytes: u64,
80 : pub backup_parallel_jobs: usize,
81 : pub wal_backup_enabled: bool,
82 : pub pg_auth: Option<Arc<JwtAuth>>,
83 : pub pg_tenant_only_auth: Option<Arc<JwtAuth>>,
84 : pub http_auth: Option<Arc<SwappableJwtAuth>>,
85 : /// JWT token to connect to other safekeepers with.
86 : pub sk_auth_token: Option<SecretString>,
87 : pub current_thread_runtime: bool,
88 : pub walsenders_keep_horizon: bool,
89 : pub partial_backup_enabled: bool,
90 : pub partial_backup_timeout: Duration,
91 : pub disable_periodic_broker_push: bool,
92 : pub enable_offload: bool,
93 : pub delete_offloaded_wal: bool,
94 : pub control_file_save_interval: Duration,
95 : pub partial_backup_concurrency: usize,
96 : }
97 :
98 : impl SafeKeeperConf {
99 0 : pub fn is_wal_backup_enabled(&self) -> bool {
100 0 : self.remote_storage.is_some() && self.wal_backup_enabled
101 0 : }
102 : }
103 :
104 : impl SafeKeeperConf {
105 : #[cfg(test)]
106 4 : fn dummy() -> Self {
107 4 : SafeKeeperConf {
108 4 : workdir: Utf8PathBuf::from("./"),
109 4 : no_sync: false,
110 4 : listen_pg_addr: defaults::DEFAULT_PG_LISTEN_ADDR.to_string(),
111 4 : listen_pg_addr_tenant_only: None,
112 4 : listen_http_addr: defaults::DEFAULT_HTTP_LISTEN_ADDR.to_string(),
113 4 : advertise_pg_addr: None,
114 4 : availability_zone: None,
115 4 : remote_storage: None,
116 4 : my_id: NodeId(0),
117 4 : broker_endpoint: storage_broker::DEFAULT_ENDPOINT
118 4 : .parse()
119 4 : .expect("failed to parse default broker endpoint"),
120 4 : broker_keepalive_interval: Duration::from_secs(5),
121 4 : peer_recovery_enabled: true,
122 4 : wal_backup_enabled: true,
123 4 : backup_parallel_jobs: 1,
124 4 : pg_auth: None,
125 4 : pg_tenant_only_auth: None,
126 4 : http_auth: None,
127 4 : sk_auth_token: None,
128 4 : heartbeat_timeout: Duration::new(5, 0),
129 4 : max_offloader_lag_bytes: defaults::DEFAULT_MAX_OFFLOADER_LAG_BYTES,
130 4 : current_thread_runtime: false,
131 4 : walsenders_keep_horizon: false,
132 4 : partial_backup_enabled: false,
133 4 : partial_backup_timeout: Duration::from_secs(0),
134 4 : disable_periodic_broker_push: false,
135 4 : enable_offload: false,
136 4 : delete_offloaded_wal: false,
137 4 : control_file_save_interval: Duration::from_secs(1),
138 4 : partial_backup_concurrency: 1,
139 4 : }
140 4 : }
141 : }
142 :
143 : // Tokio runtimes.
144 0 : pub static WAL_SERVICE_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
145 0 : tokio::runtime::Builder::new_multi_thread()
146 0 : .thread_name("WAL service worker")
147 0 : .enable_all()
148 0 : .build()
149 0 : .expect("Failed to create WAL service runtime")
150 0 : });
151 :
152 0 : pub static HTTP_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
153 0 : tokio::runtime::Builder::new_multi_thread()
154 0 : .thread_name("HTTP worker")
155 0 : .enable_all()
156 0 : .build()
157 0 : .expect("Failed to create WAL service runtime")
158 0 : });
159 :
160 0 : pub static BROKER_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
161 0 : tokio::runtime::Builder::new_multi_thread()
162 0 : .thread_name("broker worker")
163 0 : .worker_threads(2) // there are only 2 tasks, having more threads doesn't make sense
164 0 : .enable_all()
165 0 : .build()
166 0 : .expect("Failed to create broker runtime")
167 0 : });
168 :
169 0 : pub static WAL_REMOVER_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
170 0 : tokio::runtime::Builder::new_multi_thread()
171 0 : .thread_name("WAL remover")
172 0 : .worker_threads(1)
173 0 : .enable_all()
174 0 : .build()
175 0 : .expect("Failed to create broker runtime")
176 0 : });
177 :
178 0 : pub static WAL_BACKUP_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
179 0 : tokio::runtime::Builder::new_multi_thread()
180 0 : .thread_name("WAL backup worker")
181 0 : .enable_all()
182 0 : .build()
183 0 : .expect("Failed to create WAL backup runtime")
184 0 : });
185 :
186 0 : pub static METRICS_SHIFTER_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
187 0 : tokio::runtime::Builder::new_multi_thread()
188 0 : .thread_name("metric shifter")
189 0 : .worker_threads(1)
190 0 : .enable_all()
191 0 : .build()
192 0 : .expect("Failed to create broker runtime")
193 0 : });
|