Line data Source code
1 : #![recursion_limit = "300"]
2 : #![deny(clippy::undocumented_unsafe_blocks)]
3 :
4 : mod auth;
5 : pub mod basebackup;
6 : pub mod config;
7 : pub mod consumption_metrics;
8 : pub mod context;
9 : pub mod control_plane_client;
10 : pub mod deletion_queue;
11 : pub mod disk_usage_eviction_task;
12 : pub mod http;
13 : pub mod import_datadir;
14 : pub mod l0_flush;
15 : pub use pageserver_api::keyspace;
16 : pub mod aux_file;
17 : pub mod metrics;
18 : pub mod page_cache;
19 : pub mod page_service;
20 : pub mod pgdatadir_mapping;
21 : pub mod repository;
22 : pub mod span;
23 : pub(crate) mod statvfs;
24 : pub mod task_mgr;
25 : pub mod tenant;
26 : pub mod trace;
27 : pub mod utilization;
28 : pub mod virtual_file;
29 : pub mod walingest;
30 : pub mod walrecord;
31 : pub mod walredo;
32 :
33 : use crate::task_mgr::TaskKind;
34 : use camino::Utf8Path;
35 : use deletion_queue::DeletionQueue;
36 : use tenant::mgr::TenantManager;
37 : use tracing::info;
38 :
39 : /// Current storage format version
40 : ///
41 : /// This is embedded in the header of all the layer files.
42 : /// If you make any backwards-incompatible changes to the storage
43 : /// format, bump this!
44 : /// Note that TimelineMetadata uses its own version number to track
45 : /// backwards-compatible changes to the metadata format.
46 : pub const STORAGE_FORMAT_VERSION: u16 = 3;
47 :
48 : pub const DEFAULT_PG_VERSION: u32 = 15;
49 :
50 : // Magic constants used to identify different kinds of files
51 : pub const IMAGE_FILE_MAGIC: u16 = 0x5A60;
52 : pub const DELTA_FILE_MAGIC: u16 = 0x5A61;
53 :
54 : static ZERO_PAGE: bytes::Bytes = bytes::Bytes::from_static(&[0u8; 8192]);
55 :
56 : pub use crate::metrics::preinitialize_metrics;
57 :
58 0 : #[tracing::instrument(skip_all, fields(%exit_code))]
59 : pub async fn shutdown_pageserver(
60 : tenant_manager: &TenantManager,
61 : mut deletion_queue: DeletionQueue,
62 : exit_code: i32,
63 : ) {
64 : use std::time::Duration;
65 : // Shut down the libpq endpoint task. This prevents new connections from
66 : // being accepted.
67 : timed(
68 : task_mgr::shutdown_tasks(Some(TaskKind::LibpqEndpointListener), None, None),
69 : "shutdown LibpqEndpointListener",
70 : Duration::from_secs(1),
71 : )
72 : .await;
73 :
74 : // Shut down all the tenants. This flushes everything to disk and kills
75 : // the checkpoint and GC tasks.
76 : timed(
77 : tenant_manager.shutdown(),
78 : "shutdown all tenants",
79 : Duration::from_secs(5),
80 : )
81 : .await;
82 :
83 : // Shut down any page service tasks: any in-progress work for particular timelines or tenants
84 : // should already have been canclled via mgr::shutdown_all_tenants
85 : timed(
86 : task_mgr::shutdown_tasks(Some(TaskKind::PageRequestHandler), None, None),
87 : "shutdown PageRequestHandlers",
88 : Duration::from_secs(1),
89 : )
90 : .await;
91 :
92 : // Best effort to persist any outstanding deletions, to avoid leaking objects
93 : deletion_queue.shutdown(Duration::from_secs(5)).await;
94 :
95 : // Shut down the HTTP endpoint last, so that you can still check the server's
96 : // status while it's shutting down.
97 : // FIXME: We should probably stop accepting commands like attach/detach earlier.
98 : timed(
99 : task_mgr::shutdown_tasks(Some(TaskKind::HttpEndpointListener), None, None),
100 : "shutdown http",
101 : Duration::from_secs(1),
102 : )
103 : .await;
104 :
105 : // There should be nothing left, but let's be sure
106 : timed(
107 : task_mgr::shutdown_tasks(None, None, None),
108 : "shutdown leftovers",
109 : Duration::from_secs(1),
110 : )
111 : .await;
112 : info!("Shut down successfully completed");
113 : std::process::exit(exit_code);
114 : }
115 :
116 : /// Per-tenant configuration file.
117 : /// Full path: `tenants/<tenant_id>/config-v1`.
118 : pub(crate) const TENANT_LOCATION_CONFIG_NAME: &str = "config-v1";
119 :
120 : /// Per-tenant copy of their remote heatmap, downloaded into the local
121 : /// tenant path while in secondary mode.
122 : pub(crate) const TENANT_HEATMAP_BASENAME: &str = "heatmap-v1.json";
123 :
124 : /// A suffix used for various temporary files. Any temporary files found in the
125 : /// data directory at pageserver startup can be automatically removed.
126 : pub(crate) const TEMP_FILE_SUFFIX: &str = "___temp";
127 :
128 : /// A marker file to mark that a timeline directory was not fully initialized.
129 : /// If a timeline directory with this marker is encountered at pageserver startup,
130 : /// the timeline directory and the marker file are both removed.
131 : /// Full path: `tenants/<tenant_id>/timelines/<timeline_id>___uninit`.
132 : pub(crate) const TIMELINE_UNINIT_MARK_SUFFIX: &str = "___uninit";
133 :
134 : pub(crate) const TIMELINE_DELETE_MARK_SUFFIX: &str = "___delete";
135 :
136 8 : pub fn is_temporary(path: &Utf8Path) -> bool {
137 8 : match path.file_name() {
138 8 : Some(name) => name.ends_with(TEMP_FILE_SUFFIX),
139 0 : None => false,
140 : }
141 8 : }
142 :
143 16 : fn ends_with_suffix(path: &Utf8Path, suffix: &str) -> bool {
144 16 : match path.file_name() {
145 16 : Some(name) => name.ends_with(suffix),
146 0 : None => false,
147 : }
148 16 : }
149 :
150 : // FIXME: DO NOT ADD new query methods like this, which will have a next step of parsing timelineid
151 : // from the directory name. Instead create type "UninitMark(TimelineId)" and only parse it once
152 : // from the name.
153 :
154 8 : pub(crate) fn is_uninit_mark(path: &Utf8Path) -> bool {
155 8 : ends_with_suffix(path, TIMELINE_UNINIT_MARK_SUFFIX)
156 8 : }
157 :
158 8 : pub(crate) fn is_delete_mark(path: &Utf8Path) -> bool {
159 8 : ends_with_suffix(path, TIMELINE_DELETE_MARK_SUFFIX)
160 8 : }
161 :
162 : /// During pageserver startup, we need to order operations not to exhaust tokio worker threads by
163 : /// blocking.
164 : ///
165 : /// The instances of this value exist only during startup, otherwise `None` is provided, meaning no
166 : /// delaying is needed.
167 : #[derive(Clone)]
168 : pub struct InitializationOrder {
169 : /// Each initial tenant load task carries this until it is done loading timelines from remote storage
170 : pub initial_tenant_load_remote: Option<utils::completion::Completion>,
171 :
172 : /// Each initial tenant load task carries this until completion.
173 : pub initial_tenant_load: Option<utils::completion::Completion>,
174 :
175 : /// Barrier for when we can start any background jobs.
176 : ///
177 : /// This can be broken up later on, but right now there is just one class of a background job.
178 : pub background_jobs_can_start: utils::completion::Barrier,
179 : }
180 :
181 : /// Time the future with a warning when it exceeds a threshold.
182 4 : async fn timed<Fut: std::future::Future>(
183 4 : fut: Fut,
184 4 : name: &str,
185 4 : warn_at: std::time::Duration,
186 4 : ) -> <Fut as std::future::Future>::Output {
187 4 : let started = std::time::Instant::now();
188 4 :
189 4 : let mut fut = std::pin::pin!(fut);
190 4 :
191 4 : match tokio::time::timeout(warn_at, &mut fut).await {
192 2 : Ok(ret) => {
193 2 : tracing::info!(
194 : stage = name,
195 0 : elapsed_ms = started.elapsed().as_millis(),
196 0 : "completed"
197 : );
198 2 : ret
199 : }
200 : Err(_) => {
201 2 : tracing::info!(
202 : stage = name,
203 0 : elapsed_ms = started.elapsed().as_millis(),
204 0 : "still waiting, taking longer than expected..."
205 : );
206 :
207 2 : let ret = fut.await;
208 :
209 : // this has a global allowed_errors
210 2 : tracing::warn!(
211 : stage = name,
212 0 : elapsed_ms = started.elapsed().as_millis(),
213 0 : "completed, took longer than expected"
214 : );
215 :
216 2 : ret
217 : }
218 : }
219 4 : }
220 :
221 : #[cfg(test)]
222 : mod timed_tests {
223 : use super::timed;
224 : use std::time::Duration;
225 :
226 : #[tokio::test]
227 2 : async fn timed_completes_when_inner_future_completes() {
228 2 : // A future that completes on time should have its result returned
229 2 : let r1 = timed(
230 2 : async move {
231 2 : tokio::time::sleep(Duration::from_millis(10)).await;
232 2 : 123
233 2 : },
234 2 : "test 1",
235 2 : Duration::from_millis(50),
236 2 : )
237 2 : .await;
238 2 : assert_eq!(r1, 123);
239 2 :
240 2 : // A future that completes too slowly should also have its result returned
241 2 : let r1 = timed(
242 2 : async move {
243 6 : tokio::time::sleep(Duration::from_millis(50)).await;
244 2 : 456
245 2 : },
246 2 : "test 1",
247 2 : Duration::from_millis(10),
248 2 : )
249 4 : .await;
250 2 : assert_eq!(r1, 456);
251 2 : }
252 : }
|