TLA Line data Source code
1 : use camino::Utf8PathBuf;
2 : use once_cell::sync::Lazy;
3 : use remote_storage::RemoteStorageConfig;
4 : use tokio::runtime::Runtime;
5 :
6 : use std::time::Duration;
7 : use storage_broker::Uri;
8 :
9 : use utils::id::{NodeId, TenantId, TenantTimelineId};
10 :
11 : mod auth;
12 : pub mod broker;
13 : pub mod control_file;
14 : pub mod control_file_upgrade;
15 : pub mod debug_dump;
16 : pub mod handler;
17 : pub mod http;
18 : pub mod json_ctrl;
19 : pub mod metrics;
20 : pub mod pull_timeline;
21 : pub mod receive_wal;
22 : pub mod recovery;
23 : pub mod remove_wal;
24 : pub mod safekeeper;
25 : pub mod send_wal;
26 : pub mod timeline;
27 : pub mod wal_backup;
28 : pub mod wal_service;
29 : pub mod wal_storage;
30 :
31 : mod timelines_global_map;
32 : use std::sync::Arc;
33 : pub use timelines_global_map::GlobalTimelines;
34 : use utils::auth::JwtAuth;
35 :
36 : pub mod defaults {
37 : pub use safekeeper_api::{
38 : DEFAULT_HTTP_LISTEN_ADDR, DEFAULT_HTTP_LISTEN_PORT, DEFAULT_PG_LISTEN_ADDR,
39 : DEFAULT_PG_LISTEN_PORT,
40 : };
41 :
42 : pub const DEFAULT_HEARTBEAT_TIMEOUT: &str = "5000ms";
43 : pub const DEFAULT_MAX_OFFLOADER_LAG_BYTES: u64 = 128 * (1 << 20);
44 : }
45 :
46 CBC 10834 : #[derive(Debug, Clone)]
47 : pub struct SafeKeeperConf {
48 : // Repository directory, relative to current working directory.
49 : // Normally, the safekeeper changes the current working directory
50 : // to the repository, and 'workdir' is always '.'. But we don't do
51 : // that during unit testing, because the current directory is global
52 : // to the process but different unit tests work on different
53 : // data directories to avoid clashing with each other.
54 : pub workdir: Utf8PathBuf,
55 : pub my_id: NodeId,
56 : pub listen_pg_addr: String,
57 : pub listen_pg_addr_tenant_only: Option<String>,
58 : pub listen_http_addr: String,
59 : pub advertise_pg_addr: Option<String>,
60 : pub availability_zone: Option<String>,
61 : pub no_sync: bool,
62 : pub broker_endpoint: Uri,
63 : pub broker_keepalive_interval: Duration,
64 : pub heartbeat_timeout: Duration,
65 : pub remote_storage: Option<RemoteStorageConfig>,
66 : pub max_offloader_lag_bytes: u64,
67 : pub backup_parallel_jobs: usize,
68 : pub wal_backup_enabled: bool,
69 : pub pg_auth: Option<Arc<JwtAuth>>,
70 : pub pg_tenant_only_auth: Option<Arc<JwtAuth>>,
71 : pub http_auth: Option<Arc<JwtAuth>>,
72 : pub current_thread_runtime: bool,
73 : }
74 :
75 : impl SafeKeeperConf {
76 2693 : pub fn tenant_dir(&self, tenant_id: &TenantId) -> Utf8PathBuf {
77 2693 : self.workdir.join(tenant_id.to_string())
78 2693 : }
79 :
80 2602 : pub fn timeline_dir(&self, ttid: &TenantTimelineId) -> Utf8PathBuf {
81 2602 : self.tenant_dir(&ttid.tenant_id)
82 2602 : .join(ttid.timeline_id.to_string())
83 2602 : }
84 : }
85 :
86 : impl SafeKeeperConf {
87 : #[cfg(test)]
88 2 : fn dummy() -> Self {
89 2 : SafeKeeperConf {
90 2 : workdir: Utf8PathBuf::from("./"),
91 2 : no_sync: false,
92 2 : listen_pg_addr: defaults::DEFAULT_PG_LISTEN_ADDR.to_string(),
93 2 : listen_pg_addr_tenant_only: None,
94 2 : listen_http_addr: defaults::DEFAULT_HTTP_LISTEN_ADDR.to_string(),
95 2 : advertise_pg_addr: None,
96 2 : availability_zone: None,
97 2 : remote_storage: None,
98 2 : my_id: NodeId(0),
99 2 : broker_endpoint: storage_broker::DEFAULT_ENDPOINT
100 2 : .parse()
101 2 : .expect("failed to parse default broker endpoint"),
102 2 : broker_keepalive_interval: Duration::from_secs(5),
103 2 : wal_backup_enabled: true,
104 2 : backup_parallel_jobs: 1,
105 2 : pg_auth: None,
106 2 : pg_tenant_only_auth: None,
107 2 : http_auth: None,
108 2 : heartbeat_timeout: Duration::new(5, 0),
109 2 : max_offloader_lag_bytes: defaults::DEFAULT_MAX_OFFLOADER_LAG_BYTES,
110 2 : current_thread_runtime: false,
111 2 : }
112 2 : }
113 : }
114 :
115 : // Tokio runtimes.
116 500 : pub static WAL_SERVICE_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
117 500 : tokio::runtime::Builder::new_multi_thread()
118 500 : .thread_name("WAL service worker")
119 500 : .enable_all()
120 500 : .build()
121 500 : .expect("Failed to create WAL service runtime")
122 500 : });
123 :
124 500 : pub static HTTP_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
125 500 : tokio::runtime::Builder::new_multi_thread()
126 500 : .thread_name("HTTP worker")
127 500 : .enable_all()
128 500 : .build()
129 500 : .expect("Failed to create WAL service runtime")
130 500 : });
131 :
132 500 : pub static BROKER_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
133 500 : tokio::runtime::Builder::new_multi_thread()
134 500 : .thread_name("broker worker")
135 500 : .worker_threads(2) // there are only 2 tasks, having more threads doesn't make sense
136 500 : .enable_all()
137 500 : .build()
138 500 : .expect("Failed to create broker runtime")
139 500 : });
140 :
141 500 : pub static WAL_REMOVER_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
142 500 : tokio::runtime::Builder::new_multi_thread()
143 500 : .thread_name("WAL remover")
144 500 : .worker_threads(1)
145 500 : .enable_all()
146 500 : .build()
147 500 : .expect("Failed to create broker runtime")
148 500 : });
149 :
150 500 : pub static WAL_BACKUP_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
151 500 : tokio::runtime::Builder::new_multi_thread()
152 500 : .thread_name("WAL backup worker")
153 500 : .enable_all()
154 500 : .build()
155 500 : .expect("Failed to create WAL backup runtime")
156 500 : });
157 :
158 UBC 0 : pub static METRICS_SHIFTER_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
159 0 : tokio::runtime::Builder::new_multi_thread()
160 0 : .thread_name("metric shifter")
161 0 : .worker_threads(1)
162 0 : .enable_all()
163 0 : .build()
164 0 : .expect("Failed to create broker runtime")
165 0 : });
|