TLA Line data Source code
1 : mod auth;
2 : pub mod basebackup;
3 : pub mod config;
4 : pub mod consumption_metrics;
5 : pub mod context;
6 : pub mod control_plane_client;
7 : pub mod deletion_queue;
8 : pub mod disk_usage_eviction_task;
9 : pub mod http;
10 : pub mod import_datadir;
11 : pub mod keyspace;
12 : pub mod metrics;
13 : pub mod page_cache;
14 : pub mod page_service;
15 : pub mod pgdatadir_mapping;
16 : pub mod repository;
17 : pub(crate) mod statvfs;
18 : pub mod task_mgr;
19 : pub mod tenant;
20 : pub mod trace;
21 : pub mod virtual_file;
22 : pub mod walingest;
23 : pub mod walrecord;
24 : pub mod walredo;
25 :
26 : pub mod failpoint_support;
27 :
28 : use crate::task_mgr::TaskKind;
29 : use camino::Utf8Path;
30 : use deletion_queue::DeletionQueue;
31 : use tracing::info;
32 :
33 : /// Current storage format version
34 : ///
35 : /// This is embedded in the header of all the layer files.
36 : /// If you make any backwards-incompatible changes to the storage
37 : /// format, bump this!
38 : /// Note that TimelineMetadata uses its own version number to track
39 : /// backwards-compatible changes to the metadata format.
40 : pub const STORAGE_FORMAT_VERSION: u16 = 3;
41 :
42 : pub const DEFAULT_PG_VERSION: u32 = 15;
43 :
44 : // Magic constants used to identify different kinds of files
45 : pub const IMAGE_FILE_MAGIC: u16 = 0x5A60;
46 : pub const DELTA_FILE_MAGIC: u16 = 0x5A61;
47 :
48 : static ZERO_PAGE: bytes::Bytes = bytes::Bytes::from_static(&[0u8; 8192]);
49 :
50 : pub use crate::metrics::preinitialize_metrics;
51 :
52 CBC 432 : #[tracing::instrument(skip_all, fields(%exit_code))]
53 : pub async fn shutdown_pageserver(deletion_queue: Option<DeletionQueue>, exit_code: i32) {
54 : use std::time::Duration;
55 : // Shut down the libpq endpoint task. This prevents new connections from
56 : // being accepted.
57 : timed(
58 : task_mgr::shutdown_tasks(Some(TaskKind::LibpqEndpointListener), None, None),
59 : "shutdown LibpqEndpointListener",
60 : Duration::from_secs(1),
61 : )
62 : .await;
63 :
64 : // Shut down any page service tasks.
65 : timed(
66 : task_mgr::shutdown_tasks(Some(TaskKind::PageRequestHandler), None, None),
67 : "shutdown PageRequestHandlers",
68 : Duration::from_secs(1),
69 : )
70 : .await;
71 :
72 : // Shut down all the tenants. This flushes everything to disk and kills
73 : // the checkpoint and GC tasks.
74 : timed(
75 : tenant::mgr::shutdown_all_tenants(),
76 : "shutdown all tenants",
77 : Duration::from_secs(5),
78 : )
79 : .await;
80 :
81 : // Best effort to persist any outstanding deletions, to avoid leaking objects
82 : if let Some(mut deletion_queue) = deletion_queue {
83 : deletion_queue.shutdown(Duration::from_secs(5)).await;
84 : }
85 :
86 : // Shut down the HTTP endpoint last, so that you can still check the server's
87 : // status while it's shutting down.
88 : // FIXME: We should probably stop accepting commands like attach/detach earlier.
89 : timed(
90 : task_mgr::shutdown_tasks(Some(TaskKind::HttpEndpointListener), None, None),
91 : "shutdown http",
92 : Duration::from_secs(1),
93 : )
94 : .await;
95 :
96 : // There should be nothing left, but let's be sure
97 : timed(
98 : task_mgr::shutdown_tasks(None, None, None),
99 : "shutdown leftovers",
100 : Duration::from_secs(1),
101 : )
102 : .await;
103 144 : info!("Shut down successfully completed");
104 : std::process::exit(exit_code);
105 : }
106 :
107 : /// The name of the metadata file pageserver creates per timeline.
108 : /// Full path: `tenants/<tenant_id>/timelines/<timeline_id>/metadata`.
109 : pub const METADATA_FILE_NAME: &str = "metadata";
110 :
111 : /// Per-tenant configuration file.
112 : /// Full path: `tenants/<tenant_id>/config`.
113 : pub const TENANT_CONFIG_NAME: &str = "config";
114 :
115 : /// Per-tenant configuration file.
116 : /// Full path: `tenants/<tenant_id>/config`.
117 : pub const TENANT_LOCATION_CONFIG_NAME: &str = "config-v1";
118 :
119 : /// A suffix used for various temporary files. Any temporary files found in the
120 : /// data directory at pageserver startup can be automatically removed.
121 : pub const TEMP_FILE_SUFFIX: &str = "___temp";
122 :
123 : /// A marker file to mark that a timeline directory was not fully initialized.
124 : /// If a timeline directory with this marker is encountered at pageserver startup,
125 : /// the timeline directory and the marker file are both removed.
126 : /// Full path: `tenants/<tenant_id>/timelines/<timeline_id>___uninit`.
127 : pub const TIMELINE_UNINIT_MARK_SUFFIX: &str = "___uninit";
128 :
129 : pub const TIMELINE_DELETE_MARK_SUFFIX: &str = "___delete";
130 :
131 : /// A marker file to prevent pageserver from loading a certain tenant on restart.
132 : /// Different from [`TIMELINE_UNINIT_MARK_SUFFIX`] due to semantics of the corresponding
133 : /// `ignore` management API command, that expects the ignored tenant to be properly loaded
134 : /// into pageserver's memory before being ignored.
135 : /// Full path: `tenants/<tenant_id>/___ignored_tenant`.
136 : pub const IGNORED_TENANT_FILE_NAME: &str = "___ignored_tenant";
137 :
138 1241 : pub fn is_temporary(path: &Utf8Path) -> bool {
139 1241 : match path.file_name() {
140 1241 : Some(name) => name.ends_with(TEMP_FILE_SUFFIX),
141 UBC 0 : None => false,
142 : }
143 CBC 1241 : }
144 :
145 623 : fn ends_with_suffix(path: &Utf8Path, suffix: &str) -> bool {
146 623 : match path.file_name() {
147 623 : Some(name) => name.ends_with(suffix),
148 UBC 0 : None => false,
149 : }
150 CBC 623 : }
151 :
152 312 : pub fn is_uninit_mark(path: &Utf8Path) -> bool {
153 312 : ends_with_suffix(path, TIMELINE_UNINIT_MARK_SUFFIX)
154 312 : }
155 :
156 311 : pub fn is_delete_mark(path: &Utf8Path) -> bool {
157 311 : ends_with_suffix(path, TIMELINE_DELETE_MARK_SUFFIX)
158 311 : }
159 :
160 : fn is_walkdir_io_not_found(e: &walkdir::Error) -> bool {
161 UBC 0 : if let Some(e) = e.io_error() {
162 0 : if e.kind() == std::io::ErrorKind::NotFound {
163 0 : return true;
164 0 : }
165 0 : }
166 0 : false
167 0 : }
168 :
169 : /// During pageserver startup, we need to order operations not to exhaust tokio worker threads by
170 : /// blocking.
171 : ///
172 : /// The instances of this value exist only during startup, otherwise `None` is provided, meaning no
173 : /// delaying is needed.
174 CBC 210 : #[derive(Clone)]
175 : pub struct InitializationOrder {
176 : /// Each initial tenant load task carries this until it is done loading timelines from remote storage
177 : pub initial_tenant_load_remote: Option<utils::completion::Completion>,
178 :
179 : /// Each initial tenant load task carries this until completion.
180 : pub initial_tenant_load: Option<utils::completion::Completion>,
181 :
182 : /// Barrier for when we can start initial logical size calculations.
183 : pub initial_logical_size_can_start: utils::completion::Barrier,
184 :
185 : /// Each timeline owns a clone of this to be consumed on the initial logical size calculation
186 : /// attempt. It is important to drop this once the attempt has completed.
187 : pub initial_logical_size_attempt: Option<utils::completion::Completion>,
188 :
189 : /// Barrier for when we can start any background jobs.
190 : ///
191 : /// This can be broken up later on, but right now there is just one class of a background job.
192 : pub background_jobs_can_start: utils::completion::Barrier,
193 : }
194 :
195 : /// Time the future with a warning when it exceeds a threshold.
196 722 : async fn timed<Fut: std::future::Future>(
197 722 : fut: Fut,
198 722 : name: &str,
199 722 : warn_at: std::time::Duration,
200 722 : ) -> <Fut as std::future::Future>::Output {
201 722 : let started = std::time::Instant::now();
202 722 :
203 722 : let mut fut = std::pin::pin!(fut);
204 722 :
205 722 : match tokio::time::timeout(warn_at, &mut fut).await {
206 721 : Ok(ret) => {
207 721 : tracing::info!(
208 721 : task = name,
209 721 : elapsed_ms = started.elapsed().as_millis(),
210 721 : "completed"
211 721 : );
212 721 : ret
213 : }
214 : Err(_) => {
215 1 : tracing::info!(
216 1 : task = name,
217 1 : elapsed_ms = started.elapsed().as_millis(),
218 1 : "still waiting, taking longer than expected..."
219 1 : );
220 :
221 1 : let ret = fut.await;
222 :
223 : // this has a global allowed_errors
224 1 : tracing::warn!(
225 1 : task = name,
226 1 : elapsed_ms = started.elapsed().as_millis(),
227 1 : "completed, took longer than expected"
228 1 : );
229 :
230 1 : ret
231 : }
232 : }
233 722 : }
234 :
235 : #[cfg(test)]
236 : mod timed_tests {
237 : use super::timed;
238 : use std::time::Duration;
239 :
240 1 : #[tokio::test]
241 1 : async fn timed_completes_when_inner_future_completes() {
242 : // A future that completes on time should have its result returned
243 1 : let r1 = timed(
244 1 : async move {
245 1 : tokio::time::sleep(Duration::from_millis(10)).await;
246 1 : 123
247 1 : },
248 1 : "test 1",
249 1 : Duration::from_millis(50),
250 1 : )
251 1 : .await;
252 1 : assert_eq!(r1, 123);
253 :
254 : // A future that completes too slowly should also have its result returned
255 1 : let r1 = timed(
256 1 : async move {
257 3 : tokio::time::sleep(Duration::from_millis(50)).await;
258 1 : 456
259 1 : },
260 1 : "test 1",
261 1 : Duration::from_millis(10),
262 1 : )
263 2 : .await;
264 1 : assert_eq!(r1, 456);
265 : }
266 : }
|