TLA Line data Source code
1 : #![deny(clippy::undocumented_unsafe_blocks)]
2 :
3 : mod auth;
4 : pub mod basebackup;
5 : pub mod config;
6 : pub mod consumption_metrics;
7 : pub mod context;
8 : pub mod control_plane_client;
9 : pub mod deletion_queue;
10 : pub mod disk_usage_eviction_task;
11 : pub mod http;
12 : pub mod import_datadir;
13 : pub use pageserver_api::keyspace;
14 : pub mod metrics;
15 : pub mod page_cache;
16 : pub mod page_service;
17 : pub mod pgdatadir_mapping;
18 : pub mod repository;
19 : pub(crate) mod statvfs;
20 : pub mod task_mgr;
21 : pub mod tenant;
22 : pub mod trace;
23 : pub mod virtual_file;
24 : pub mod walingest;
25 : pub mod walrecord;
26 : pub mod walredo;
27 :
28 : use crate::task_mgr::TaskKind;
29 : use camino::Utf8Path;
30 : use deletion_queue::DeletionQueue;
31 : use tracing::info;
32 :
33 : /// Current storage format version
34 : ///
35 : /// This is embedded in the header of all the layer files.
36 : /// If you make any backwards-incompatible changes to the storage
37 : /// format, bump this!
38 : /// Note that TimelineMetadata uses its own version number to track
39 : /// backwards-compatible changes to the metadata format.
40 : pub const STORAGE_FORMAT_VERSION: u16 = 3;
41 :
42 : pub const DEFAULT_PG_VERSION: u32 = 15;
43 :
44 : // Magic constants used to identify different kinds of files
45 : pub const IMAGE_FILE_MAGIC: u16 = 0x5A60;
46 : pub const DELTA_FILE_MAGIC: u16 = 0x5A61;
47 :
48 : static ZERO_PAGE: bytes::Bytes = bytes::Bytes::from_static(&[0u8; 8192]);
49 :
50 : pub use crate::metrics::preinitialize_metrics;
51 :
52 CBC 159 : #[tracing::instrument(skip_all, fields(%exit_code))]
53 : pub async fn shutdown_pageserver(deletion_queue: Option<DeletionQueue>, exit_code: i32) {
54 : use std::time::Duration;
55 : // Shut down the libpq endpoint task. This prevents new connections from
56 : // being accepted.
57 : timed(
58 : task_mgr::shutdown_tasks(Some(TaskKind::LibpqEndpointListener), None, None),
59 : "shutdown LibpqEndpointListener",
60 : Duration::from_secs(1),
61 : )
62 : .await;
63 :
64 : // Shut down all the tenants. This flushes everything to disk and kills
65 : // the checkpoint and GC tasks.
66 : timed(
67 : tenant::mgr::shutdown_all_tenants(),
68 : "shutdown all tenants",
69 : Duration::from_secs(5),
70 : )
71 : .await;
72 :
73 : // Shut down any page service tasks: any in-progress work for particular timelines or tenants
74 : // should already have been canclled via mgr::shutdown_all_tenants
75 : timed(
76 : task_mgr::shutdown_tasks(Some(TaskKind::PageRequestHandler), None, None),
77 : "shutdown PageRequestHandlers",
78 : Duration::from_secs(1),
79 : )
80 : .await;
81 :
82 : // Best effort to persist any outstanding deletions, to avoid leaking objects
83 : if let Some(mut deletion_queue) = deletion_queue {
84 : deletion_queue.shutdown(Duration::from_secs(5)).await;
85 : }
86 :
87 : // Shut down the HTTP endpoint last, so that you can still check the server's
88 : // status while it's shutting down.
89 : // FIXME: We should probably stop accepting commands like attach/detach earlier.
90 : timed(
91 : task_mgr::shutdown_tasks(Some(TaskKind::HttpEndpointListener), None, None),
92 : "shutdown http",
93 : Duration::from_secs(1),
94 : )
95 : .await;
96 :
97 : // There should be nothing left, but let's be sure
98 : timed(
99 : task_mgr::shutdown_tasks(None, None, None),
100 : "shutdown leftovers",
101 : Duration::from_secs(1),
102 : )
103 : .await;
104 159 : info!("Shut down successfully completed");
105 : std::process::exit(exit_code);
106 : }
107 :
108 : /// The name of the metadata file pageserver creates per timeline.
109 : /// Full path: `tenants/<tenant_id>/timelines/<timeline_id>/metadata`.
110 : pub const METADATA_FILE_NAME: &str = "metadata";
111 :
112 : /// Per-tenant configuration file.
113 : /// Full path: `tenants/<tenant_id>/config`.
114 : pub const TENANT_CONFIG_NAME: &str = "config";
115 :
116 : /// Per-tenant configuration file.
117 : /// Full path: `tenants/<tenant_id>/config`.
118 : pub const TENANT_LOCATION_CONFIG_NAME: &str = "config-v1";
119 :
120 : /// Per-tenant copy of their remote heatmap, downloaded into the local
121 : /// tenant path while in secondary mode.
122 : pub const TENANT_HEATMAP_BASENAME: &str = "heatmap-v1.json";
123 :
124 : /// A suffix used for various temporary files. Any temporary files found in the
125 : /// data directory at pageserver startup can be automatically removed.
126 : pub const TEMP_FILE_SUFFIX: &str = "___temp";
127 :
128 : /// A marker file to mark that a timeline directory was not fully initialized.
129 : /// If a timeline directory with this marker is encountered at pageserver startup,
130 : /// the timeline directory and the marker file are both removed.
131 : /// Full path: `tenants/<tenant_id>/timelines/<timeline_id>___uninit`.
132 : pub const TIMELINE_UNINIT_MARK_SUFFIX: &str = "___uninit";
133 :
134 : pub const TIMELINE_DELETE_MARK_SUFFIX: &str = "___delete";
135 :
136 : /// A marker file to prevent pageserver from loading a certain tenant on restart.
137 : /// Different from [`TIMELINE_UNINIT_MARK_SUFFIX`] due to semantics of the corresponding
138 : /// `ignore` management API command, that expects the ignored tenant to be properly loaded
139 : /// into pageserver's memory before being ignored.
140 : /// Full path: `tenants/<tenant_id>/___ignored_tenant`.
141 : pub const IGNORED_TENANT_FILE_NAME: &str = "___ignored_tenant";
142 :
143 1342 : pub fn is_temporary(path: &Utf8Path) -> bool {
144 1342 : match path.file_name() {
145 1342 : Some(name) => name.ends_with(TEMP_FILE_SUFFIX),
146 UBC 0 : None => false,
147 : }
148 CBC 1342 : }
149 :
150 751 : fn ends_with_suffix(path: &Utf8Path, suffix: &str) -> bool {
151 751 : match path.file_name() {
152 751 : Some(name) => name.ends_with(suffix),
153 UBC 0 : None => false,
154 : }
155 CBC 751 : }
156 :
157 : // FIXME: DO NOT ADD new query methods like this, which will have a next step of parsing timelineid
158 : // from the directory name. Instead create type "UninitMark(TimelineId)" and only parse it once
159 : // from the name.
160 :
161 377 : pub fn is_uninit_mark(path: &Utf8Path) -> bool {
162 377 : ends_with_suffix(path, TIMELINE_UNINIT_MARK_SUFFIX)
163 377 : }
164 :
165 374 : pub fn is_delete_mark(path: &Utf8Path) -> bool {
166 374 : ends_with_suffix(path, TIMELINE_DELETE_MARK_SUFFIX)
167 374 : }
168 :
169 UBC 0 : fn is_walkdir_io_not_found(e: &walkdir::Error) -> bool {
170 0 : if let Some(e) = e.io_error() {
171 0 : if e.kind() == std::io::ErrorKind::NotFound {
172 0 : return true;
173 0 : }
174 0 : }
175 0 : false
176 0 : }
177 :
178 : /// During pageserver startup, we need to order operations not to exhaust tokio worker threads by
179 : /// blocking.
180 : ///
181 : /// The instances of this value exist only during startup, otherwise `None` is provided, meaning no
182 : /// delaying is needed.
183 CBC 202 : #[derive(Clone)]
184 : pub struct InitializationOrder {
185 : /// Each initial tenant load task carries this until it is done loading timelines from remote storage
186 : pub initial_tenant_load_remote: Option<utils::completion::Completion>,
187 :
188 : /// Each initial tenant load task carries this until completion.
189 : pub initial_tenant_load: Option<utils::completion::Completion>,
190 :
191 : /// Barrier for when we can start any background jobs.
192 : ///
193 : /// This can be broken up later on, but right now there is just one class of a background job.
194 : pub background_jobs_can_start: utils::completion::Barrier,
195 : }
196 :
197 : /// Time the future with a warning when it exceeds a threshold.
198 980 : async fn timed<Fut: std::future::Future>(
199 980 : fut: Fut,
200 980 : name: &str,
201 980 : warn_at: std::time::Duration,
202 980 : ) -> <Fut as std::future::Future>::Output {
203 980 : let started = std::time::Instant::now();
204 980 :
205 980 : let mut fut = std::pin::pin!(fut);
206 980 :
207 980 : match tokio::time::timeout(warn_at, &mut fut).await {
208 979 : Ok(ret) => {
209 978 : tracing::info!(
210 978 : stage = name,
211 978 : elapsed_ms = started.elapsed().as_millis(),
212 978 : "completed"
213 978 : );
214 979 : ret
215 : }
216 : Err(_) => {
217 UBC 0 : tracing::info!(
218 0 : stage = name,
219 0 : elapsed_ms = started.elapsed().as_millis(),
220 0 : "still waiting, taking longer than expected..."
221 0 : );
222 :
223 CBC 1 : let ret = fut.await;
224 :
225 : // this has a global allowed_errors
226 UBC 0 : tracing::warn!(
227 0 : stage = name,
228 0 : elapsed_ms = started.elapsed().as_millis(),
229 0 : "completed, took longer than expected"
230 0 : );
231 :
232 CBC 1 : ret
233 : }
234 : }
235 980 : }
236 :
237 : #[cfg(test)]
238 : mod timed_tests {
239 : use super::timed;
240 : use std::time::Duration;
241 :
242 1 : #[tokio::test]
243 1 : async fn timed_completes_when_inner_future_completes() {
244 : // A future that completes on time should have its result returned
245 1 : let r1 = timed(
246 1 : async move {
247 1 : tokio::time::sleep(Duration::from_millis(10)).await;
248 1 : 123
249 1 : },
250 1 : "test 1",
251 1 : Duration::from_millis(50),
252 1 : )
253 1 : .await;
254 1 : assert_eq!(r1, 123);
255 :
256 : // A future that completes too slowly should also have its result returned
257 1 : let r1 = timed(
258 1 : async move {
259 3 : tokio::time::sleep(Duration::from_millis(50)).await;
260 1 : 456
261 1 : },
262 1 : "test 1",
263 1 : Duration::from_millis(10),
264 1 : )
265 2 : .await;
266 1 : assert_eq!(r1, 456);
267 : }
268 : }
|