TLA Line data Source code
1 : #![deny(clippy::undocumented_unsafe_blocks)]
2 : use camino::Utf8PathBuf;
3 : use once_cell::sync::Lazy;
4 : use remote_storage::RemoteStorageConfig;
5 : use tokio::runtime::Runtime;
6 :
7 : use std::time::Duration;
8 : use storage_broker::Uri;
9 :
10 : use utils::{
11 : auth::SwappableJwtAuth,
12 : id::{NodeId, TenantId, TenantTimelineId},
13 : };
14 :
15 : mod auth;
16 : pub mod broker;
17 : pub mod control_file;
18 : pub mod control_file_upgrade;
19 : pub mod copy_timeline;
20 : pub mod debug_dump;
21 : pub mod handler;
22 : pub mod http;
23 : pub mod json_ctrl;
24 : pub mod metrics;
25 : pub mod pull_timeline;
26 : pub mod receive_wal;
27 : pub mod recovery;
28 : pub mod remove_wal;
29 : pub mod safekeeper;
30 : pub mod send_wal;
31 : pub mod timeline;
32 : pub mod wal_backup;
33 : pub mod wal_service;
34 : pub mod wal_storage;
35 :
36 : mod timelines_global_map;
37 : use std::sync::Arc;
38 : pub use timelines_global_map::GlobalTimelines;
39 : use utils::auth::JwtAuth;
40 :
41 : pub mod defaults {
42 : pub use safekeeper_api::{
43 : DEFAULT_HTTP_LISTEN_ADDR, DEFAULT_HTTP_LISTEN_PORT, DEFAULT_PG_LISTEN_ADDR,
44 : DEFAULT_PG_LISTEN_PORT,
45 : };
46 :
47 : pub const DEFAULT_HEARTBEAT_TIMEOUT: &str = "5000ms";
48 : pub const DEFAULT_MAX_OFFLOADER_LAG_BYTES: u64 = 128 * (1 << 20);
49 : }
50 :
51 CBC 9936 : #[derive(Debug, Clone)]
52 : pub struct SafeKeeperConf {
53 : // Repository directory, relative to current working directory.
54 : // Normally, the safekeeper changes the current working directory
55 : // to the repository, and 'workdir' is always '.'. But we don't do
56 : // that during unit testing, because the current directory is global
57 : // to the process but different unit tests work on different
58 : // data directories to avoid clashing with each other.
59 : pub workdir: Utf8PathBuf,
60 : pub my_id: NodeId,
61 : pub listen_pg_addr: String,
62 : pub listen_pg_addr_tenant_only: Option<String>,
63 : pub listen_http_addr: String,
64 : pub advertise_pg_addr: Option<String>,
65 : pub availability_zone: Option<String>,
66 : pub no_sync: bool,
67 : pub broker_endpoint: Uri,
68 : pub broker_keepalive_interval: Duration,
69 : pub heartbeat_timeout: Duration,
70 : pub peer_recovery_enabled: bool,
71 : pub remote_storage: Option<RemoteStorageConfig>,
72 : pub max_offloader_lag_bytes: u64,
73 : pub backup_parallel_jobs: usize,
74 : pub wal_backup_enabled: bool,
75 : pub pg_auth: Option<Arc<JwtAuth>>,
76 : pub pg_tenant_only_auth: Option<Arc<JwtAuth>>,
77 : pub http_auth: Option<Arc<SwappableJwtAuth>>,
78 : pub current_thread_runtime: bool,
79 : }
80 :
81 : impl SafeKeeperConf {
82 2870 : pub fn tenant_dir(&self, tenant_id: &TenantId) -> Utf8PathBuf {
83 2870 : self.workdir.join(tenant_id.to_string())
84 2870 : }
85 :
86 2736 : pub fn timeline_dir(&self, ttid: &TenantTimelineId) -> Utf8PathBuf {
87 2736 : self.tenant_dir(&ttid.tenant_id)
88 2736 : .join(ttid.timeline_id.to_string())
89 2736 : }
90 : }
91 :
92 : impl SafeKeeperConf {
93 : #[cfg(test)]
94 2 : fn dummy() -> Self {
95 2 : SafeKeeperConf {
96 2 : workdir: Utf8PathBuf::from("./"),
97 2 : no_sync: false,
98 2 : listen_pg_addr: defaults::DEFAULT_PG_LISTEN_ADDR.to_string(),
99 2 : listen_pg_addr_tenant_only: None,
100 2 : listen_http_addr: defaults::DEFAULT_HTTP_LISTEN_ADDR.to_string(),
101 2 : advertise_pg_addr: None,
102 2 : availability_zone: None,
103 2 : remote_storage: None,
104 2 : my_id: NodeId(0),
105 2 : broker_endpoint: storage_broker::DEFAULT_ENDPOINT
106 2 : .parse()
107 2 : .expect("failed to parse default broker endpoint"),
108 2 : broker_keepalive_interval: Duration::from_secs(5),
109 2 : peer_recovery_enabled: true,
110 2 : wal_backup_enabled: true,
111 2 : backup_parallel_jobs: 1,
112 2 : pg_auth: None,
113 2 : pg_tenant_only_auth: None,
114 2 : http_auth: None,
115 2 : heartbeat_timeout: Duration::new(5, 0),
116 2 : max_offloader_lag_bytes: defaults::DEFAULT_MAX_OFFLOADER_LAG_BYTES,
117 2 : current_thread_runtime: false,
118 2 : }
119 2 : }
120 : }
121 :
122 : // Tokio runtimes.
123 485 : pub static WAL_SERVICE_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
124 485 : tokio::runtime::Builder::new_multi_thread()
125 485 : .thread_name("WAL service worker")
126 485 : .enable_all()
127 485 : .build()
128 485 : .expect("Failed to create WAL service runtime")
129 485 : });
130 :
131 485 : pub static HTTP_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
132 485 : tokio::runtime::Builder::new_multi_thread()
133 485 : .thread_name("HTTP worker")
134 485 : .enable_all()
135 485 : .build()
136 485 : .expect("Failed to create WAL service runtime")
137 485 : });
138 :
139 485 : pub static BROKER_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
140 485 : tokio::runtime::Builder::new_multi_thread()
141 485 : .thread_name("broker worker")
142 485 : .worker_threads(2) // there are only 2 tasks, having more threads doesn't make sense
143 485 : .enable_all()
144 485 : .build()
145 485 : .expect("Failed to create broker runtime")
146 485 : });
147 :
148 485 : pub static WAL_REMOVER_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
149 485 : tokio::runtime::Builder::new_multi_thread()
150 485 : .thread_name("WAL remover")
151 485 : .worker_threads(1)
152 485 : .enable_all()
153 485 : .build()
154 485 : .expect("Failed to create broker runtime")
155 485 : });
156 :
157 485 : pub static WAL_BACKUP_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
158 485 : tokio::runtime::Builder::new_multi_thread()
159 485 : .thread_name("WAL backup worker")
160 485 : .enable_all()
161 485 : .build()
162 485 : .expect("Failed to create WAL backup runtime")
163 485 : });
164 :
165 UBC 0 : pub static METRICS_SHIFTER_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
166 0 : tokio::runtime::Builder::new_multi_thread()
167 0 : .thread_name("metric shifter")
168 0 : .worker_threads(1)
169 0 : .enable_all()
170 0 : .build()
171 0 : .expect("Failed to create broker runtime")
172 0 : });
|