Line data Source code
1 : #![recursion_limit = "300"]
2 : #![deny(clippy::undocumented_unsafe_blocks)]
3 :
4 : mod auth;
5 : pub mod basebackup;
6 : pub mod config;
7 : pub mod consumption_metrics;
8 : pub mod context;
9 : pub mod control_plane_client;
10 : pub mod deletion_queue;
11 : pub mod disk_usage_eviction_task;
12 : pub mod http;
13 : pub mod import_datadir;
14 : pub use pageserver_api::keyspace;
15 : pub mod metrics;
16 : pub mod page_cache;
17 : pub mod page_service;
18 : pub mod pgdatadir_mapping;
19 : pub mod repository;
20 : pub mod span;
21 : pub(crate) mod statvfs;
22 : pub mod task_mgr;
23 : pub mod tenant;
24 : pub mod trace;
25 : pub mod utilization;
26 : pub mod virtual_file;
27 : pub mod walingest;
28 : pub mod walrecord;
29 : pub mod walredo;
30 :
31 : use crate::task_mgr::TaskKind;
32 : use camino::Utf8Path;
33 : use deletion_queue::DeletionQueue;
34 : use tracing::info;
35 :
36 : /// Current storage format version
37 : ///
38 : /// This is embedded in the header of all the layer files.
39 : /// If you make any backwards-incompatible changes to the storage
40 : /// format, bump this!
41 : /// Note that TimelineMetadata uses its own version number to track
42 : /// backwards-compatible changes to the metadata format.
43 : pub const STORAGE_FORMAT_VERSION: u16 = 3;
44 :
45 : pub const DEFAULT_PG_VERSION: u32 = 15;
46 :
47 : // Magic constants used to identify different kinds of files
48 : pub const IMAGE_FILE_MAGIC: u16 = 0x5A60;
49 : pub const DELTA_FILE_MAGIC: u16 = 0x5A61;
50 :
51 : static ZERO_PAGE: bytes::Bytes = bytes::Bytes::from_static(&[0u8; 8192]);
52 :
53 : pub use crate::metrics::preinitialize_metrics;
54 :
55 0 : #[tracing::instrument(skip_all, fields(%exit_code))]
56 : pub async fn shutdown_pageserver(deletion_queue: Option<DeletionQueue>, exit_code: i32) {
57 : use std::time::Duration;
58 : // Shut down the libpq endpoint task. This prevents new connections from
59 : // being accepted.
60 : timed(
61 : task_mgr::shutdown_tasks(Some(TaskKind::LibpqEndpointListener), None, None),
62 : "shutdown LibpqEndpointListener",
63 : Duration::from_secs(1),
64 : )
65 : .await;
66 :
67 : // Shut down all the tenants. This flushes everything to disk and kills
68 : // the checkpoint and GC tasks.
69 : timed(
70 : tenant::mgr::shutdown_all_tenants(),
71 : "shutdown all tenants",
72 : Duration::from_secs(5),
73 : )
74 : .await;
75 :
76 : // Shut down any page service tasks: any in-progress work for particular timelines or tenants
77 : // should already have been canclled via mgr::shutdown_all_tenants
78 : timed(
79 : task_mgr::shutdown_tasks(Some(TaskKind::PageRequestHandler), None, None),
80 : "shutdown PageRequestHandlers",
81 : Duration::from_secs(1),
82 : )
83 : .await;
84 :
85 : // Best effort to persist any outstanding deletions, to avoid leaking objects
86 : if let Some(mut deletion_queue) = deletion_queue {
87 : deletion_queue.shutdown(Duration::from_secs(5)).await;
88 : }
89 :
90 : // Shut down the HTTP endpoint last, so that you can still check the server's
91 : // status while it's shutting down.
92 : // FIXME: We should probably stop accepting commands like attach/detach earlier.
93 : timed(
94 : task_mgr::shutdown_tasks(Some(TaskKind::HttpEndpointListener), None, None),
95 : "shutdown http",
96 : Duration::from_secs(1),
97 : )
98 : .await;
99 :
100 : // There should be nothing left, but let's be sure
101 : timed(
102 : task_mgr::shutdown_tasks(None, None, None),
103 : "shutdown leftovers",
104 : Duration::from_secs(1),
105 : )
106 : .await;
107 0 : info!("Shut down successfully completed");
108 : std::process::exit(exit_code);
109 : }
110 :
111 : /// The name of the metadata file pageserver creates per timeline.
112 : /// Full path: `tenants/<tenant_id>/timelines/<timeline_id>/metadata`.
113 : pub const METADATA_FILE_NAME: &str = "metadata";
114 :
115 : /// Per-tenant configuration file.
116 : /// Full path: `tenants/<tenant_id>/config`.
117 : pub const TENANT_CONFIG_NAME: &str = "config";
118 :
119 : /// Per-tenant configuration file.
120 : /// Full path: `tenants/<tenant_id>/config`.
121 : pub const TENANT_LOCATION_CONFIG_NAME: &str = "config-v1";
122 :
123 : /// Per-tenant copy of their remote heatmap, downloaded into the local
124 : /// tenant path while in secondary mode.
125 : pub const TENANT_HEATMAP_BASENAME: &str = "heatmap-v1.json";
126 :
127 : /// A suffix used for various temporary files. Any temporary files found in the
128 : /// data directory at pageserver startup can be automatically removed.
129 : pub const TEMP_FILE_SUFFIX: &str = "___temp";
130 :
131 : /// A marker file to mark that a timeline directory was not fully initialized.
132 : /// If a timeline directory with this marker is encountered at pageserver startup,
133 : /// the timeline directory and the marker file are both removed.
134 : /// Full path: `tenants/<tenant_id>/timelines/<timeline_id>___uninit`.
135 : pub const TIMELINE_UNINIT_MARK_SUFFIX: &str = "___uninit";
136 :
137 : pub const TIMELINE_DELETE_MARK_SUFFIX: &str = "___delete";
138 :
139 : /// A marker file to prevent pageserver from loading a certain tenant on restart.
140 : /// Different from [`TIMELINE_UNINIT_MARK_SUFFIX`] due to semantics of the corresponding
141 : /// `ignore` management API command, that expects the ignored tenant to be properly loaded
142 : /// into pageserver's memory before being ignored.
143 : /// Full path: `tenants/<tenant_id>/___ignored_tenant`.
144 : pub const IGNORED_TENANT_FILE_NAME: &str = "___ignored_tenant";
145 :
146 10 : pub fn is_temporary(path: &Utf8Path) -> bool {
147 10 : match path.file_name() {
148 10 : Some(name) => name.ends_with(TEMP_FILE_SUFFIX),
149 0 : None => false,
150 : }
151 10 : }
152 :
153 18 : fn ends_with_suffix(path: &Utf8Path, suffix: &str) -> bool {
154 18 : match path.file_name() {
155 18 : Some(name) => name.ends_with(suffix),
156 0 : None => false,
157 : }
158 18 : }
159 :
160 : // FIXME: DO NOT ADD new query methods like this, which will have a next step of parsing timelineid
161 : // from the directory name. Instead create type "UninitMark(TimelineId)" and only parse it once
162 : // from the name.
163 :
164 10 : pub fn is_uninit_mark(path: &Utf8Path) -> bool {
165 10 : ends_with_suffix(path, TIMELINE_UNINIT_MARK_SUFFIX)
166 10 : }
167 :
168 8 : pub fn is_delete_mark(path: &Utf8Path) -> bool {
169 8 : ends_with_suffix(path, TIMELINE_DELETE_MARK_SUFFIX)
170 8 : }
171 :
172 : /// During pageserver startup, we need to order operations not to exhaust tokio worker threads by
173 : /// blocking.
174 : ///
175 : /// The instances of this value exist only during startup, otherwise `None` is provided, meaning no
176 : /// delaying is needed.
177 0 : #[derive(Clone)]
178 : pub struct InitializationOrder {
179 : /// Each initial tenant load task carries this until it is done loading timelines from remote storage
180 : pub initial_tenant_load_remote: Option<utils::completion::Completion>,
181 :
182 : /// Each initial tenant load task carries this until completion.
183 : pub initial_tenant_load: Option<utils::completion::Completion>,
184 :
185 : /// Barrier for when we can start any background jobs.
186 : ///
187 : /// This can be broken up later on, but right now there is just one class of a background job.
188 : pub background_jobs_can_start: utils::completion::Barrier,
189 : }
190 :
191 : /// Time the future with a warning when it exceeds a threshold.
192 4 : async fn timed<Fut: std::future::Future>(
193 4 : fut: Fut,
194 4 : name: &str,
195 4 : warn_at: std::time::Duration,
196 4 : ) -> <Fut as std::future::Future>::Output {
197 4 : let started = std::time::Instant::now();
198 4 :
199 4 : let mut fut = std::pin::pin!(fut);
200 4 :
201 4 : match tokio::time::timeout(warn_at, &mut fut).await {
202 2 : Ok(ret) => {
203 0 : tracing::info!(
204 0 : stage = name,
205 0 : elapsed_ms = started.elapsed().as_millis(),
206 0 : "completed"
207 0 : );
208 2 : ret
209 : }
210 : Err(_) => {
211 0 : tracing::info!(
212 0 : stage = name,
213 0 : elapsed_ms = started.elapsed().as_millis(),
214 0 : "still waiting, taking longer than expected..."
215 0 : );
216 :
217 2 : let ret = fut.await;
218 :
219 : // this has a global allowed_errors
220 0 : tracing::warn!(
221 0 : stage = name,
222 0 : elapsed_ms = started.elapsed().as_millis(),
223 0 : "completed, took longer than expected"
224 0 : );
225 :
226 2 : ret
227 : }
228 : }
229 4 : }
230 :
231 : #[cfg(test)]
232 : mod timed_tests {
233 : use super::timed;
234 : use std::time::Duration;
235 :
236 2 : #[tokio::test]
237 2 : async fn timed_completes_when_inner_future_completes() {
238 2 : // A future that completes on time should have its result returned
239 2 : let r1 = timed(
240 2 : async move {
241 2 : tokio::time::sleep(Duration::from_millis(10)).await;
242 2 : 123
243 2 : },
244 2 : "test 1",
245 2 : Duration::from_millis(50),
246 2 : )
247 2 : .await;
248 2 : assert_eq!(r1, 123);
249 2 :
250 2 : // A future that completes too slowly should also have its result returned
251 2 : let r1 = timed(
252 2 : async move {
253 6 : tokio::time::sleep(Duration::from_millis(50)).await;
254 2 : 456
255 2 : },
256 2 : "test 1",
257 2 : Duration::from_millis(10),
258 2 : )
259 4 : .await;
260 2 : assert_eq!(r1, 456);
261 2 : }
262 : }
|