Line data Source code
1 : #![recursion_limit = "300"]
2 : #![deny(clippy::undocumented_unsafe_blocks)]
3 :
4 : mod auth;
5 : pub mod basebackup;
6 : pub mod config;
7 : pub mod consumption_metrics;
8 : pub mod context;
9 : pub mod controller_upcall_client;
10 : pub mod deletion_queue;
11 : pub mod disk_usage_eviction_task;
12 : pub mod http;
13 : pub mod import_datadir;
14 : pub mod l0_flush;
15 :
16 : extern crate hyper0 as hyper;
17 :
18 : use futures::{stream::FuturesUnordered, StreamExt};
19 : pub use pageserver_api::keyspace;
20 : use tokio_util::sync::CancellationToken;
21 : mod assert_u64_eq_usize;
22 : pub mod aux_file;
23 : pub mod metrics;
24 : pub mod page_cache;
25 : pub mod page_service;
26 : pub mod pgdatadir_mapping;
27 : pub mod repository;
28 : pub mod span;
29 : pub(crate) mod statvfs;
30 : pub mod task_mgr;
31 : pub mod tenant;
32 : pub mod utilization;
33 : pub mod virtual_file;
34 : pub mod walingest;
35 : pub mod walrecord;
36 : pub mod walredo;
37 :
38 : use camino::Utf8Path;
39 : use deletion_queue::DeletionQueue;
40 : use tenant::{
41 : mgr::{BackgroundPurges, TenantManager},
42 : secondary,
43 : };
44 : use tracing::{info, info_span};
45 :
46 : /// Current storage format version
47 : ///
48 : /// This is embedded in the header of all the layer files.
49 : /// If you make any backwards-incompatible changes to the storage
50 : /// format, bump this!
51 : /// Note that TimelineMetadata uses its own version number to track
52 : /// backwards-compatible changes to the metadata format.
53 : pub const STORAGE_FORMAT_VERSION: u16 = 3;
54 :
55 : pub const DEFAULT_PG_VERSION: u32 = 16;
56 :
57 : // Magic constants used to identify different kinds of files
58 : pub const IMAGE_FILE_MAGIC: u16 = 0x5A60;
59 : pub const DELTA_FILE_MAGIC: u16 = 0x5A61;
60 :
61 : static ZERO_PAGE: bytes::Bytes = bytes::Bytes::from_static(&[0u8; 8192]);
62 :
63 : pub use crate::metrics::preinitialize_metrics;
64 :
65 : pub struct CancellableTask {
66 : pub task: tokio::task::JoinHandle<()>,
67 : pub cancel: CancellationToken,
68 : }
69 : pub struct HttpEndpointListener(pub CancellableTask);
70 : pub struct ConsumptionMetricsTasks(pub CancellableTask);
71 : pub struct DiskUsageEvictionTask(pub CancellableTask);
72 : impl CancellableTask {
73 0 : pub async fn shutdown(self) {
74 0 : self.cancel.cancel();
75 0 : self.task.await.unwrap();
76 0 : }
77 : }
78 :
79 0 : #[tracing::instrument(skip_all, fields(%exit_code))]
80 : #[allow(clippy::too_many_arguments)]
81 : pub async fn shutdown_pageserver(
82 : http_listener: HttpEndpointListener,
83 : page_service: page_service::Listener,
84 : consumption_metrics_worker: ConsumptionMetricsTasks,
85 : disk_usage_eviction_task: Option<DiskUsageEvictionTask>,
86 : tenant_manager: &TenantManager,
87 : background_purges: BackgroundPurges,
88 : mut deletion_queue: DeletionQueue,
89 : secondary_controller_tasks: secondary::GlobalTasks,
90 : exit_code: i32,
91 : ) {
92 : use std::time::Duration;
93 :
94 : let started_at = std::time::Instant::now();
95 :
96 : // If the orderly shutdown below takes too long, we still want to make
97 : // sure that all walredo processes are killed and wait()ed on by us, not systemd.
98 : //
99 : // (Leftover walredo processes are the hypothesized trigger for the systemd freezes
100 : // that we keep seeing in prod => https://github.com/neondatabase/cloud/issues/11387.
101 : //
102 : // We use a thread instead of a tokio task because the background runtime is likely busy
103 : // with the final flushing / uploads. This activity here has priority, and due to lack
104 : // of scheduling priority feature sin the tokio scheduler, using a separate thread is
105 : // an effective priority booster.
106 : let walredo_extraordinary_shutdown_thread_span = {
107 : let span = info_span!(parent: None, "walredo_extraordinary_shutdown_thread");
108 : span.follows_from(tracing::Span::current());
109 : span
110 : };
111 : let walredo_extraordinary_shutdown_thread_cancel = CancellationToken::new();
112 : let walredo_extraordinary_shutdown_thread = std::thread::spawn({
113 : let walredo_extraordinary_shutdown_thread_cancel =
114 : walredo_extraordinary_shutdown_thread_cancel.clone();
115 0 : move || {
116 0 : let rt = tokio::runtime::Builder::new_current_thread()
117 0 : .enable_all()
118 0 : .build()
119 0 : .unwrap();
120 0 : let _entered = rt.enter();
121 0 : let _entered = walredo_extraordinary_shutdown_thread_span.enter();
122 0 : if let Ok(()) = rt.block_on(tokio::time::timeout(
123 0 : Duration::from_secs(8),
124 0 : walredo_extraordinary_shutdown_thread_cancel.cancelled(),
125 0 : )) {
126 0 : info!("cancellation requested");
127 0 : return;
128 0 : }
129 0 : let managers = tenant::WALREDO_MANAGERS
130 0 : .lock()
131 0 : .unwrap()
132 0 : // prevents new walredo managers from being inserted
133 0 : .take()
134 0 : .expect("only we take()");
135 0 : // Use FuturesUnordered to get in queue early for each manager's
136 0 : // heavier_once_cell semaphore wait list.
137 0 : // Also, for idle tenants that for some reason haven't
138 0 : // shut down yet, it's quite likely that we're not going
139 0 : // to get Poll::Pending once.
140 0 : let mut futs: FuturesUnordered<_> = managers
141 0 : .into_iter()
142 0 : .filter_map(|(_, mgr)| mgr.upgrade())
143 0 : .map(|mgr| async move { tokio::task::unconstrained(mgr.shutdown()).await })
144 0 : .collect();
145 0 : info!(count=%futs.len(), "built FuturesUnordered");
146 0 : let mut last_log_at = std::time::Instant::now();
147 : #[derive(Debug, Default)]
148 : struct Results {
149 : initiated: u64,
150 : already: u64,
151 : }
152 0 : let mut results = Results::default();
153 0 : while let Some(we_initiated) = rt.block_on(futs.next()) {
154 0 : if we_initiated {
155 0 : results.initiated += 1;
156 0 : } else {
157 0 : results.already += 1;
158 0 : }
159 0 : if last_log_at.elapsed() > Duration::from_millis(100) {
160 0 : info!(remaining=%futs.len(), ?results, "progress");
161 0 : last_log_at = std::time::Instant::now();
162 0 : }
163 : }
164 0 : info!(?results, "done");
165 0 : }
166 : });
167 :
168 : // Shut down the libpq endpoint task. This prevents new connections from
169 : // being accepted.
170 : let remaining_connections = timed(
171 : page_service.stop_accepting(),
172 : "shutdown LibpqEndpointListener",
173 : Duration::from_secs(1),
174 : )
175 : .await;
176 :
177 : // Shut down all the tenants. This flushes everything to disk and kills
178 : // the checkpoint and GC tasks.
179 : timed(
180 : tenant_manager.shutdown(),
181 : "shutdown all tenants",
182 : Duration::from_secs(5),
183 : )
184 : .await;
185 :
186 : // Shut down any page service tasks: any in-progress work for particular timelines or tenants
187 : // should already have been canclled via mgr::shutdown_all_tenants
188 : timed(
189 : remaining_connections.shutdown(),
190 : "shutdown PageRequestHandlers",
191 : Duration::from_secs(1),
192 : )
193 : .await;
194 :
195 : // Best effort to persist any outstanding deletions, to avoid leaking objects
196 : deletion_queue.shutdown(Duration::from_secs(5)).await;
197 :
198 : timed(
199 : consumption_metrics_worker.0.shutdown(),
200 : "shutdown consumption metrics",
201 : Duration::from_secs(1),
202 : )
203 : .await;
204 :
205 : timed(
206 0 : futures::future::OptionFuture::from(disk_usage_eviction_task.map(|t| t.0.shutdown())),
207 : "shutdown disk usage eviction",
208 : Duration::from_secs(1),
209 : )
210 : .await;
211 :
212 : timed(
213 : background_purges.shutdown(),
214 : "shutdown background purges",
215 : Duration::from_secs(1),
216 : )
217 : .await;
218 :
219 : // Shut down the HTTP endpoint last, so that you can still check the server's
220 : // status while it's shutting down.
221 : // FIXME: We should probably stop accepting commands like attach/detach earlier.
222 : timed(
223 : http_listener.0.shutdown(),
224 : "shutdown http",
225 : Duration::from_secs(1),
226 : )
227 : .await;
228 :
229 : timed(
230 : secondary_controller_tasks.wait(), // cancellation happened in caller
231 : "secondary controller wait",
232 : Duration::from_secs(1),
233 : )
234 : .await;
235 :
236 : // There should be nothing left, but let's be sure
237 : timed(
238 : task_mgr::shutdown_tasks(None, None, None),
239 : "shutdown leftovers",
240 : Duration::from_secs(1),
241 : )
242 : .await;
243 :
244 : info!("cancel & join walredo_extraordinary_shutdown_thread");
245 : walredo_extraordinary_shutdown_thread_cancel.cancel();
246 : walredo_extraordinary_shutdown_thread.join().unwrap();
247 : info!("walredo_extraordinary_shutdown_thread done");
248 :
249 : info!(
250 : elapsed_ms = started_at.elapsed().as_millis(),
251 : "Shut down successfully completed"
252 : );
253 : std::process::exit(exit_code);
254 : }
255 :
256 : /// Per-tenant configuration file.
257 : /// Full path: `tenants/<tenant_id>/config-v1`.
258 : pub(crate) const TENANT_LOCATION_CONFIG_NAME: &str = "config-v1";
259 :
260 : /// Per-tenant copy of their remote heatmap, downloaded into the local
261 : /// tenant path while in secondary mode.
262 : pub(crate) const TENANT_HEATMAP_BASENAME: &str = "heatmap-v1.json";
263 :
264 : /// A suffix used for various temporary files. Any temporary files found in the
265 : /// data directory at pageserver startup can be automatically removed.
266 : pub(crate) const TEMP_FILE_SUFFIX: &str = "___temp";
267 :
268 : /// A marker file to mark that a timeline directory was not fully initialized.
269 : /// If a timeline directory with this marker is encountered at pageserver startup,
270 : /// the timeline directory and the marker file are both removed.
271 : /// Full path: `tenants/<tenant_id>/timelines/<timeline_id>___uninit`.
272 : pub(crate) const TIMELINE_UNINIT_MARK_SUFFIX: &str = "___uninit";
273 :
274 : pub(crate) const TIMELINE_DELETE_MARK_SUFFIX: &str = "___delete";
275 :
276 8 : pub fn is_temporary(path: &Utf8Path) -> bool {
277 8 : match path.file_name() {
278 8 : Some(name) => name.ends_with(TEMP_FILE_SUFFIX),
279 0 : None => false,
280 : }
281 8 : }
282 :
283 16 : fn ends_with_suffix(path: &Utf8Path, suffix: &str) -> bool {
284 16 : match path.file_name() {
285 16 : Some(name) => name.ends_with(suffix),
286 0 : None => false,
287 : }
288 16 : }
289 :
290 : // FIXME: DO NOT ADD new query methods like this, which will have a next step of parsing timelineid
291 : // from the directory name. Instead create type "UninitMark(TimelineId)" and only parse it once
292 : // from the name.
293 :
294 8 : pub(crate) fn is_uninit_mark(path: &Utf8Path) -> bool {
295 8 : ends_with_suffix(path, TIMELINE_UNINIT_MARK_SUFFIX)
296 8 : }
297 :
298 8 : pub(crate) fn is_delete_mark(path: &Utf8Path) -> bool {
299 8 : ends_with_suffix(path, TIMELINE_DELETE_MARK_SUFFIX)
300 8 : }
301 :
302 : /// During pageserver startup, we need to order operations not to exhaust tokio worker threads by
303 : /// blocking.
304 : ///
305 : /// The instances of this value exist only during startup, otherwise `None` is provided, meaning no
306 : /// delaying is needed.
307 : #[derive(Clone)]
308 : pub struct InitializationOrder {
309 : /// Each initial tenant load task carries this until it is done loading timelines from remote storage
310 : pub initial_tenant_load_remote: Option<utils::completion::Completion>,
311 :
312 : /// Each initial tenant load task carries this until completion.
313 : pub initial_tenant_load: Option<utils::completion::Completion>,
314 :
315 : /// Barrier for when we can start any background jobs.
316 : ///
317 : /// This can be broken up later on, but right now there is just one class of a background job.
318 : pub background_jobs_can_start: utils::completion::Barrier,
319 : }
320 :
321 : /// Time the future with a warning when it exceeds a threshold.
322 30 : async fn timed<Fut: std::future::Future>(
323 30 : fut: Fut,
324 30 : name: &str,
325 30 : warn_at: std::time::Duration,
326 30 : ) -> <Fut as std::future::Future>::Output {
327 30 : let started = std::time::Instant::now();
328 30 :
329 30 : let mut fut = std::pin::pin!(fut);
330 30 :
331 30 : match tokio::time::timeout(warn_at, &mut fut).await {
332 28 : Ok(ret) => {
333 28 : tracing::info!(
334 : stage = name,
335 0 : elapsed_ms = started.elapsed().as_millis(),
336 0 : "completed"
337 : );
338 28 : ret
339 : }
340 : Err(_) => {
341 2 : tracing::info!(
342 : stage = name,
343 0 : elapsed_ms = started.elapsed().as_millis(),
344 0 : "still waiting, taking longer than expected..."
345 : );
346 :
347 2 : let ret = fut.await;
348 :
349 : // this has a global allowed_errors
350 2 : tracing::warn!(
351 : stage = name,
352 0 : elapsed_ms = started.elapsed().as_millis(),
353 0 : "completed, took longer than expected"
354 : );
355 :
356 2 : ret
357 : }
358 : }
359 30 : }
360 :
361 : #[cfg(test)]
362 : mod timed_tests {
363 : use super::timed;
364 : use std::time::Duration;
365 :
366 : #[tokio::test]
367 2 : async fn timed_completes_when_inner_future_completes() {
368 2 : // A future that completes on time should have its result returned
369 2 : let r1 = timed(
370 2 : async move {
371 2 : tokio::time::sleep(Duration::from_millis(10)).await;
372 2 : 123
373 2 : },
374 2 : "test 1",
375 2 : Duration::from_millis(50),
376 2 : )
377 2 : .await;
378 2 : assert_eq!(r1, 123);
379 2 :
380 2 : // A future that completes too slowly should also have its result returned
381 2 : let r1 = timed(
382 2 : async move {
383 6 : tokio::time::sleep(Duration::from_millis(50)).await;
384 2 : 456
385 2 : },
386 2 : "test 1",
387 2 : Duration::from_millis(10),
388 2 : )
389 4 : .await;
390 2 : assert_eq!(r1, 456);
391 2 : }
392 : }
|