Line data Source code
1 : #![deny(clippy::undocumented_unsafe_blocks)]
2 : use camino::Utf8PathBuf;
3 : use once_cell::sync::Lazy;
4 : use remote_storage::RemoteStorageConfig;
5 : use tokio::runtime::Runtime;
6 :
7 : use std::time::Duration;
8 : use storage_broker::Uri;
9 :
10 : use utils::{
11 : auth::SwappableJwtAuth,
12 : id::{NodeId, TenantId, TenantTimelineId},
13 : };
14 :
15 : mod auth;
16 : pub mod broker;
17 : pub mod control_file;
18 : pub mod control_file_upgrade;
19 : pub mod copy_timeline;
20 : pub mod debug_dump;
21 : pub mod handler;
22 : pub mod http;
23 : pub mod json_ctrl;
24 : pub mod metrics;
25 : pub mod patch_control_file;
26 : pub mod pull_timeline;
27 : pub mod receive_wal;
28 : pub mod recovery;
29 : pub mod remove_wal;
30 : pub mod safekeeper;
31 : pub mod send_wal;
32 : pub mod state;
33 : pub mod timeline;
34 : pub mod wal_backup;
35 : pub mod wal_service;
36 : pub mod wal_storage;
37 :
38 : mod timelines_global_map;
39 : use std::sync::Arc;
40 : pub use timelines_global_map::GlobalTimelines;
41 : use utils::auth::JwtAuth;
42 :
43 : pub mod defaults {
44 : pub use safekeeper_api::{
45 : DEFAULT_HTTP_LISTEN_ADDR, DEFAULT_HTTP_LISTEN_PORT, DEFAULT_PG_LISTEN_ADDR,
46 : DEFAULT_PG_LISTEN_PORT,
47 : };
48 :
49 : pub const DEFAULT_HEARTBEAT_TIMEOUT: &str = "5000ms";
50 : pub const DEFAULT_MAX_OFFLOADER_LAG_BYTES: u64 = 128 * (1 << 20);
51 : }
52 :
53 10489 : #[derive(Debug, Clone)]
54 : pub struct SafeKeeperConf {
55 : // Repository directory, relative to current working directory.
56 : // Normally, the safekeeper changes the current working directory
57 : // to the repository, and 'workdir' is always '.'. But we don't do
58 : // that during unit testing, because the current directory is global
59 : // to the process but different unit tests work on different
60 : // data directories to avoid clashing with each other.
61 : pub workdir: Utf8PathBuf,
62 : pub my_id: NodeId,
63 : pub listen_pg_addr: String,
64 : pub listen_pg_addr_tenant_only: Option<String>,
65 : pub listen_http_addr: String,
66 : pub advertise_pg_addr: Option<String>,
67 : pub availability_zone: Option<String>,
68 : pub no_sync: bool,
69 : pub broker_endpoint: Uri,
70 : pub broker_keepalive_interval: Duration,
71 : pub heartbeat_timeout: Duration,
72 : pub peer_recovery_enabled: bool,
73 : pub remote_storage: Option<RemoteStorageConfig>,
74 : pub max_offloader_lag_bytes: u64,
75 : pub backup_parallel_jobs: usize,
76 : pub wal_backup_enabled: bool,
77 : pub pg_auth: Option<Arc<JwtAuth>>,
78 : pub pg_tenant_only_auth: Option<Arc<JwtAuth>>,
79 : pub http_auth: Option<Arc<SwappableJwtAuth>>,
80 : pub current_thread_runtime: bool,
81 : }
82 :
83 : impl SafeKeeperConf {
84 3023 : pub fn tenant_dir(&self, tenant_id: &TenantId) -> Utf8PathBuf {
85 3023 : self.workdir.join(tenant_id.to_string())
86 3023 : }
87 :
88 2885 : pub fn timeline_dir(&self, ttid: &TenantTimelineId) -> Utf8PathBuf {
89 2885 : self.tenant_dir(&ttid.tenant_id)
90 2885 : .join(ttid.timeline_id.to_string())
91 2885 : }
92 :
93 13593 : pub fn is_wal_backup_enabled(&self) -> bool {
94 13593 : self.remote_storage.is_some() && self.wal_backup_enabled
95 13593 : }
96 : }
97 :
98 : impl SafeKeeperConf {
99 : #[cfg(test)]
100 4 : fn dummy() -> Self {
101 4 : SafeKeeperConf {
102 4 : workdir: Utf8PathBuf::from("./"),
103 4 : no_sync: false,
104 4 : listen_pg_addr: defaults::DEFAULT_PG_LISTEN_ADDR.to_string(),
105 4 : listen_pg_addr_tenant_only: None,
106 4 : listen_http_addr: defaults::DEFAULT_HTTP_LISTEN_ADDR.to_string(),
107 4 : advertise_pg_addr: None,
108 4 : availability_zone: None,
109 4 : remote_storage: None,
110 4 : my_id: NodeId(0),
111 4 : broker_endpoint: storage_broker::DEFAULT_ENDPOINT
112 4 : .parse()
113 4 : .expect("failed to parse default broker endpoint"),
114 4 : broker_keepalive_interval: Duration::from_secs(5),
115 4 : peer_recovery_enabled: true,
116 4 : wal_backup_enabled: true,
117 4 : backup_parallel_jobs: 1,
118 4 : pg_auth: None,
119 4 : pg_tenant_only_auth: None,
120 4 : http_auth: None,
121 4 : heartbeat_timeout: Duration::new(5, 0),
122 4 : max_offloader_lag_bytes: defaults::DEFAULT_MAX_OFFLOADER_LAG_BYTES,
123 4 : current_thread_runtime: false,
124 4 : }
125 4 : }
126 : }
127 :
128 : // Tokio runtimes.
129 510 : pub static WAL_SERVICE_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
130 510 : tokio::runtime::Builder::new_multi_thread()
131 510 : .thread_name("WAL service worker")
132 510 : .enable_all()
133 510 : .build()
134 510 : .expect("Failed to create WAL service runtime")
135 510 : });
136 :
137 510 : pub static HTTP_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
138 510 : tokio::runtime::Builder::new_multi_thread()
139 510 : .thread_name("HTTP worker")
140 510 : .enable_all()
141 510 : .build()
142 510 : .expect("Failed to create WAL service runtime")
143 510 : });
144 :
145 510 : pub static BROKER_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
146 510 : tokio::runtime::Builder::new_multi_thread()
147 510 : .thread_name("broker worker")
148 510 : .worker_threads(2) // there are only 2 tasks, having more threads doesn't make sense
149 510 : .enable_all()
150 510 : .build()
151 510 : .expect("Failed to create broker runtime")
152 510 : });
153 :
154 510 : pub static WAL_REMOVER_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
155 510 : tokio::runtime::Builder::new_multi_thread()
156 510 : .thread_name("WAL remover")
157 510 : .worker_threads(1)
158 510 : .enable_all()
159 510 : .build()
160 510 : .expect("Failed to create broker runtime")
161 510 : });
162 :
163 510 : pub static WAL_BACKUP_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
164 510 : tokio::runtime::Builder::new_multi_thread()
165 510 : .thread_name("WAL backup worker")
166 510 : .enable_all()
167 510 : .build()
168 510 : .expect("Failed to create WAL backup runtime")
169 510 : });
170 :
171 0 : pub static METRICS_SHIFTER_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
172 0 : tokio::runtime::Builder::new_multi_thread()
173 0 : .thread_name("metric shifter")
174 0 : .worker_threads(1)
175 0 : .enable_all()
176 0 : .build()
177 0 : .expect("Failed to create broker runtime")
178 0 : });
|