Line data Source code
1 : #![recursion_limit = "300"]
2 : #![deny(clippy::undocumented_unsafe_blocks)]
3 :
4 : mod auth;
5 : pub mod basebackup;
6 : pub mod config;
7 : pub mod consumption_metrics;
8 : pub mod context;
9 : pub mod control_plane_client;
10 : pub mod deletion_queue;
11 : pub mod disk_usage_eviction_task;
12 : pub mod http;
13 : pub mod import_datadir;
14 : pub use pageserver_api::keyspace;
15 : pub mod metrics;
16 : pub mod page_cache;
17 : pub mod page_service;
18 : pub mod pgdatadir_mapping;
19 : pub mod repository;
20 : pub mod span;
21 : pub(crate) mod statvfs;
22 : pub mod task_mgr;
23 : pub mod tenant;
24 : pub mod trace;
25 : pub mod virtual_file;
26 : pub mod walingest;
27 : pub mod walrecord;
28 : pub mod walredo;
29 :
30 : use crate::task_mgr::TaskKind;
31 : use camino::Utf8Path;
32 : use deletion_queue::DeletionQueue;
33 : use tracing::info;
34 :
35 : /// Current storage format version
36 : ///
37 : /// This is embedded in the header of all the layer files.
38 : /// If you make any backwards-incompatible changes to the storage
39 : /// format, bump this!
40 : /// Note that TimelineMetadata uses its own version number to track
41 : /// backwards-compatible changes to the metadata format.
42 : pub const STORAGE_FORMAT_VERSION: u16 = 3;
43 :
44 : pub const DEFAULT_PG_VERSION: u32 = 15;
45 :
46 : // Magic constants used to identify different kinds of files
47 : pub const IMAGE_FILE_MAGIC: u16 = 0x5A60;
48 : pub const DELTA_FILE_MAGIC: u16 = 0x5A61;
49 :
50 : static ZERO_PAGE: bytes::Bytes = bytes::Bytes::from_static(&[0u8; 8192]);
51 :
52 : pub use crate::metrics::preinitialize_metrics;
53 :
54 172 : #[tracing::instrument(skip_all, fields(%exit_code))]
55 : pub async fn shutdown_pageserver(deletion_queue: Option<DeletionQueue>, exit_code: i32) {
56 : use std::time::Duration;
57 : // Shut down the libpq endpoint task. This prevents new connections from
58 : // being accepted.
59 : timed(
60 : task_mgr::shutdown_tasks(Some(TaskKind::LibpqEndpointListener), None, None),
61 : "shutdown LibpqEndpointListener",
62 : Duration::from_secs(1),
63 : )
64 : .await;
65 :
66 : // Shut down all the tenants. This flushes everything to disk and kills
67 : // the checkpoint and GC tasks.
68 : timed(
69 : tenant::mgr::shutdown_all_tenants(),
70 : "shutdown all tenants",
71 : Duration::from_secs(5),
72 : )
73 : .await;
74 :
75 : // Shut down any page service tasks: any in-progress work for particular timelines or tenants
76 : // should already have been canclled via mgr::shutdown_all_tenants
77 : timed(
78 : task_mgr::shutdown_tasks(Some(TaskKind::PageRequestHandler), None, None),
79 : "shutdown PageRequestHandlers",
80 : Duration::from_secs(1),
81 : )
82 : .await;
83 :
84 : // Best effort to persist any outstanding deletions, to avoid leaking objects
85 : if let Some(mut deletion_queue) = deletion_queue {
86 : deletion_queue.shutdown(Duration::from_secs(5)).await;
87 : }
88 :
89 : // Shut down the HTTP endpoint last, so that you can still check the server's
90 : // status while it's shutting down.
91 : // FIXME: We should probably stop accepting commands like attach/detach earlier.
92 : timed(
93 : task_mgr::shutdown_tasks(Some(TaskKind::HttpEndpointListener), None, None),
94 : "shutdown http",
95 : Duration::from_secs(1),
96 : )
97 : .await;
98 :
99 : // There should be nothing left, but let's be sure
100 : timed(
101 : task_mgr::shutdown_tasks(None, None, None),
102 : "shutdown leftovers",
103 : Duration::from_secs(1),
104 : )
105 : .await;
106 172 : info!("Shut down successfully completed");
107 : std::process::exit(exit_code);
108 : }
109 :
110 : /// The name of the metadata file pageserver creates per timeline.
111 : /// Full path: `tenants/<tenant_id>/timelines/<timeline_id>/metadata`.
112 : pub const METADATA_FILE_NAME: &str = "metadata";
113 :
114 : /// Per-tenant configuration file.
115 : /// Full path: `tenants/<tenant_id>/config`.
116 : pub const TENANT_CONFIG_NAME: &str = "config";
117 :
118 : /// Per-tenant configuration file.
119 : /// Full path: `tenants/<tenant_id>/config`.
120 : pub const TENANT_LOCATION_CONFIG_NAME: &str = "config-v1";
121 :
122 : /// Per-tenant copy of their remote heatmap, downloaded into the local
123 : /// tenant path while in secondary mode.
124 : pub const TENANT_HEATMAP_BASENAME: &str = "heatmap-v1.json";
125 :
126 : /// A suffix used for various temporary files. Any temporary files found in the
127 : /// data directory at pageserver startup can be automatically removed.
128 : pub const TEMP_FILE_SUFFIX: &str = "___temp";
129 :
130 : /// A marker file to mark that a timeline directory was not fully initialized.
131 : /// If a timeline directory with this marker is encountered at pageserver startup,
132 : /// the timeline directory and the marker file are both removed.
133 : /// Full path: `tenants/<tenant_id>/timelines/<timeline_id>___uninit`.
134 : pub const TIMELINE_UNINIT_MARK_SUFFIX: &str = "___uninit";
135 :
136 : pub const TIMELINE_DELETE_MARK_SUFFIX: &str = "___delete";
137 :
138 : /// A marker file to prevent pageserver from loading a certain tenant on restart.
139 : /// Different from [`TIMELINE_UNINIT_MARK_SUFFIX`] due to semantics of the corresponding
140 : /// `ignore` management API command, that expects the ignored tenant to be properly loaded
141 : /// into pageserver's memory before being ignored.
142 : /// Full path: `tenants/<tenant_id>/___ignored_tenant`.
143 : pub const IGNORED_TENANT_FILE_NAME: &str = "___ignored_tenant";
144 :
145 1526 : pub fn is_temporary(path: &Utf8Path) -> bool {
146 1526 : match path.file_name() {
147 1526 : Some(name) => name.ends_with(TEMP_FILE_SUFFIX),
148 0 : None => false,
149 : }
150 1526 : }
151 :
152 866 : fn ends_with_suffix(path: &Utf8Path, suffix: &str) -> bool {
153 866 : match path.file_name() {
154 866 : Some(name) => name.ends_with(suffix),
155 0 : None => false,
156 : }
157 866 : }
158 :
159 : // FIXME: DO NOT ADD new query methods like this, which will have a next step of parsing timelineid
160 : // from the directory name. Instead create type "UninitMark(TimelineId)" and only parse it once
161 : // from the name.
162 :
163 435 : pub fn is_uninit_mark(path: &Utf8Path) -> bool {
164 435 : ends_with_suffix(path, TIMELINE_UNINIT_MARK_SUFFIX)
165 435 : }
166 :
167 431 : pub fn is_delete_mark(path: &Utf8Path) -> bool {
168 431 : ends_with_suffix(path, TIMELINE_DELETE_MARK_SUFFIX)
169 431 : }
170 :
171 0 : fn is_walkdir_io_not_found(e: &walkdir::Error) -> bool {
172 0 : if let Some(e) = e.io_error() {
173 0 : if e.kind() == std::io::ErrorKind::NotFound {
174 0 : return true;
175 0 : }
176 0 : }
177 0 : false
178 0 : }
179 :
180 : /// During pageserver startup, we need to order operations not to exhaust tokio worker threads by
181 : /// blocking.
182 : ///
183 : /// The instances of this value exist only during startup, otherwise `None` is provided, meaning no
184 : /// delaying is needed.
185 207 : #[derive(Clone)]
186 : pub struct InitializationOrder {
187 : /// Each initial tenant load task carries this until it is done loading timelines from remote storage
188 : pub initial_tenant_load_remote: Option<utils::completion::Completion>,
189 :
190 : /// Each initial tenant load task carries this until completion.
191 : pub initial_tenant_load: Option<utils::completion::Completion>,
192 :
193 : /// Barrier for when we can start any background jobs.
194 : ///
195 : /// This can be broken up later on, but right now there is just one class of a background job.
196 : pub background_jobs_can_start: utils::completion::Barrier,
197 : }
198 :
199 : /// Time the future with a warning when it exceeds a threshold.
200 1065 : async fn timed<Fut: std::future::Future>(
201 1065 : fut: Fut,
202 1065 : name: &str,
203 1065 : warn_at: std::time::Duration,
204 1065 : ) -> <Fut as std::future::Future>::Output {
205 1065 : let started = std::time::Instant::now();
206 1065 :
207 1065 : let mut fut = std::pin::pin!(fut);
208 1065 :
209 1065 : match tokio::time::timeout(warn_at, &mut fut).await {
210 1063 : Ok(ret) => {
211 1061 : tracing::info!(
212 1061 : stage = name,
213 1061 : elapsed_ms = started.elapsed().as_millis(),
214 1061 : "completed"
215 1061 : );
216 1063 : ret
217 : }
218 : Err(_) => {
219 0 : tracing::info!(
220 0 : stage = name,
221 0 : elapsed_ms = started.elapsed().as_millis(),
222 0 : "still waiting, taking longer than expected..."
223 0 : );
224 :
225 2 : let ret = fut.await;
226 :
227 : // this has a global allowed_errors
228 0 : tracing::warn!(
229 0 : stage = name,
230 0 : elapsed_ms = started.elapsed().as_millis(),
231 0 : "completed, took longer than expected"
232 0 : );
233 :
234 2 : ret
235 : }
236 : }
237 1065 : }
238 :
239 : #[cfg(test)]
240 : mod timed_tests {
241 : use super::timed;
242 : use std::time::Duration;
243 :
244 2 : #[tokio::test]
245 2 : async fn timed_completes_when_inner_future_completes() {
246 : // A future that completes on time should have its result returned
247 2 : let r1 = timed(
248 2 : async move {
249 2 : tokio::time::sleep(Duration::from_millis(10)).await;
250 2 : 123
251 2 : },
252 2 : "test 1",
253 2 : Duration::from_millis(50),
254 2 : )
255 2 : .await;
256 2 : assert_eq!(r1, 123);
257 :
258 : // A future that completes too slowly should also have its result returned
259 2 : let r1 = timed(
260 2 : async move {
261 6 : tokio::time::sleep(Duration::from_millis(50)).await;
262 2 : 456
263 2 : },
264 2 : "test 1",
265 2 : Duration::from_millis(10),
266 2 : )
267 4 : .await;
268 2 : assert_eq!(r1, 456);
269 : }
270 : }
|