Line data Source code
1 : #![recursion_limit = "300"]
2 : #![deny(clippy::undocumented_unsafe_blocks)]
3 :
4 : mod auth;
5 : pub mod basebackup;
6 : pub mod basebackup_cache;
7 : pub mod config;
8 : pub mod consumption_metrics;
9 : pub mod context;
10 : pub mod controller_upcall_client;
11 : pub mod deletion_queue;
12 : pub mod disk_usage_eviction_task;
13 : pub mod http;
14 : pub mod import_datadir;
15 : pub mod l0_flush;
16 :
17 : extern crate hyper0 as hyper;
18 :
19 : use futures::StreamExt;
20 : use futures::stream::FuturesUnordered;
21 : pub use pageserver_api::keyspace;
22 : use tokio_util::sync::CancellationToken;
23 : mod assert_u64_eq_usize;
24 : pub mod aux_file;
25 : pub mod metrics;
26 : pub mod page_cache;
27 : pub mod page_service;
28 : pub mod pgdatadir_mapping;
29 : pub mod span;
30 : pub(crate) mod statvfs;
31 : pub mod task_mgr;
32 : pub mod tenant;
33 : pub mod utilization;
34 : pub mod virtual_file;
35 : pub mod walingest;
36 : pub mod walredo;
37 :
38 : use camino::Utf8Path;
39 : use deletion_queue::DeletionQueue;
40 : use tenant::mgr::{BackgroundPurges, TenantManager};
41 : use tenant::secondary;
42 : use tracing::{info, info_span};
43 :
44 : /// Current storage format version
45 : ///
46 : /// This is embedded in the header of all the layer files.
47 : /// If you make any backwards-incompatible changes to the storage
48 : /// format, bump this!
49 : /// Note that TimelineMetadata uses its own version number to track
50 : /// backwards-compatible changes to the metadata format.
51 : pub const STORAGE_FORMAT_VERSION: u16 = 3;
52 :
53 : pub const DEFAULT_PG_VERSION: u32 = 17;
54 :
55 : // Magic constants used to identify different kinds of files
56 : pub const IMAGE_FILE_MAGIC: u16 = 0x5A60;
57 : pub const DELTA_FILE_MAGIC: u16 = 0x5A61;
58 :
59 : // Target used for performance traces.
60 : pub const PERF_TRACE_TARGET: &str = "P";
61 :
62 : static ZERO_PAGE: bytes::Bytes = bytes::Bytes::from_static(&[0u8; 8192]);
63 :
64 : pub use crate::metrics::preinitialize_metrics;
65 :
66 : pub struct CancellableTask {
67 : pub task: tokio::task::JoinHandle<()>,
68 : pub cancel: CancellationToken,
69 : }
70 : pub struct HttpEndpointListener(pub CancellableTask);
71 : pub struct HttpsEndpointListener(pub CancellableTask);
72 : pub struct ConsumptionMetricsTasks(pub CancellableTask);
73 : pub struct DiskUsageEvictionTask(pub CancellableTask);
74 : impl CancellableTask {
75 0 : pub async fn shutdown(self) {
76 0 : self.cancel.cancel();
77 0 : self.task.await.unwrap();
78 0 : }
79 : }
80 :
81 : #[tracing::instrument(skip_all, fields(%exit_code))]
82 : #[allow(clippy::too_many_arguments)]
83 : pub async fn shutdown_pageserver(
84 : http_listener: HttpEndpointListener,
85 : https_listener: Option<HttpsEndpointListener>,
86 : page_service: page_service::Listener,
87 : consumption_metrics_worker: ConsumptionMetricsTasks,
88 : disk_usage_eviction_task: Option<DiskUsageEvictionTask>,
89 : tenant_manager: &TenantManager,
90 : background_purges: BackgroundPurges,
91 : mut deletion_queue: DeletionQueue,
92 : secondary_controller_tasks: secondary::GlobalTasks,
93 : exit_code: i32,
94 : ) {
95 : use std::time::Duration;
96 :
97 : let started_at = std::time::Instant::now();
98 :
99 : // If the orderly shutdown below takes too long, we still want to make
100 : // sure that all walredo processes are killed and wait()ed on by us, not systemd.
101 : //
102 : // (Leftover walredo processes are the hypothesized trigger for the systemd freezes
103 : // that we keep seeing in prod => https://github.com/neondatabase/cloud/issues/11387.
104 : //
105 : // We use a thread instead of a tokio task because the background runtime is likely busy
106 : // with the final flushing / uploads. This activity here has priority, and due to lack
107 : // of scheduling priority feature sin the tokio scheduler, using a separate thread is
108 : // an effective priority booster.
109 : let walredo_extraordinary_shutdown_thread_span = {
110 : let span = info_span!(parent: None, "walredo_extraordinary_shutdown_thread");
111 : span.follows_from(tracing::Span::current());
112 : span
113 : };
114 : let walredo_extraordinary_shutdown_thread_cancel = CancellationToken::new();
115 : let walredo_extraordinary_shutdown_thread = std::thread::spawn({
116 : let walredo_extraordinary_shutdown_thread_cancel =
117 : walredo_extraordinary_shutdown_thread_cancel.clone();
118 0 : move || {
119 0 : let rt = tokio::runtime::Builder::new_current_thread()
120 0 : .enable_all()
121 0 : .build()
122 0 : .unwrap();
123 0 : let _entered = rt.enter();
124 0 : let _entered = walredo_extraordinary_shutdown_thread_span.enter();
125 0 : if let Ok(()) = rt.block_on(tokio::time::timeout(
126 0 : Duration::from_secs(8),
127 0 : walredo_extraordinary_shutdown_thread_cancel.cancelled(),
128 0 : )) {
129 0 : info!("cancellation requested");
130 0 : return;
131 0 : }
132 0 : let managers = tenant::WALREDO_MANAGERS
133 0 : .lock()
134 0 : .unwrap()
135 0 : // prevents new walredo managers from being inserted
136 0 : .take()
137 0 : .expect("only we take()");
138 0 : // Use FuturesUnordered to get in queue early for each manager's
139 0 : // heavier_once_cell semaphore wait list.
140 0 : // Also, for idle tenants that for some reason haven't
141 0 : // shut down yet, it's quite likely that we're not going
142 0 : // to get Poll::Pending once.
143 0 : let mut futs: FuturesUnordered<_> = managers
144 0 : .into_iter()
145 0 : .filter_map(|(_, mgr)| mgr.upgrade())
146 0 : .map(|mgr| async move { tokio::task::unconstrained(mgr.shutdown()).await })
147 0 : .collect();
148 0 : info!(count=%futs.len(), "built FuturesUnordered");
149 0 : let mut last_log_at = std::time::Instant::now();
150 : #[derive(Debug, Default)]
151 : struct Results {
152 : initiated: u64,
153 : already: u64,
154 : }
155 0 : let mut results = Results::default();
156 0 : while let Some(we_initiated) = rt.block_on(futs.next()) {
157 0 : if we_initiated {
158 0 : results.initiated += 1;
159 0 : } else {
160 0 : results.already += 1;
161 0 : }
162 0 : if last_log_at.elapsed() > Duration::from_millis(100) {
163 0 : info!(remaining=%futs.len(), ?results, "progress");
164 0 : last_log_at = std::time::Instant::now();
165 0 : }
166 : }
167 0 : info!(?results, "done");
168 0 : }
169 : });
170 :
171 : // Shut down the libpq endpoint task. This prevents new connections from
172 : // being accepted.
173 : let remaining_connections = timed(
174 : page_service.stop_accepting(),
175 : "shutdown LibpqEndpointListener",
176 : Duration::from_secs(1),
177 : )
178 : .await;
179 :
180 : // Shut down all the tenants. This flushes everything to disk and kills
181 : // the checkpoint and GC tasks.
182 : timed(
183 : tenant_manager.shutdown(),
184 : "shutdown all tenants",
185 : Duration::from_secs(5),
186 : )
187 : .await;
188 :
189 : // Shut down any page service tasks: any in-progress work for particular timelines or tenants
190 : // should already have been canclled via mgr::shutdown_all_tenants
191 : timed(
192 : remaining_connections.shutdown(),
193 : "shutdown PageRequestHandlers",
194 : Duration::from_secs(1),
195 : )
196 : .await;
197 :
198 : // Best effort to persist any outstanding deletions, to avoid leaking objects
199 : deletion_queue.shutdown(Duration::from_secs(5)).await;
200 :
201 : timed(
202 : consumption_metrics_worker.0.shutdown(),
203 : "shutdown consumption metrics",
204 : Duration::from_secs(1),
205 : )
206 : .await;
207 :
208 : timed(
209 0 : futures::future::OptionFuture::from(disk_usage_eviction_task.map(|t| t.0.shutdown())),
210 : "shutdown disk usage eviction",
211 : Duration::from_secs(1),
212 : )
213 : .await;
214 :
215 : timed(
216 : background_purges.shutdown(),
217 : "shutdown background purges",
218 : Duration::from_secs(1),
219 : )
220 : .await;
221 :
222 : if let Some(https_listener) = https_listener {
223 : timed(
224 : https_listener.0.shutdown(),
225 : "shutdown https",
226 : Duration::from_secs(1),
227 : )
228 : .await;
229 : }
230 :
231 : // Shut down the HTTP endpoint last, so that you can still check the server's
232 : // status while it's shutting down.
233 : // FIXME: We should probably stop accepting commands like attach/detach earlier.
234 : timed(
235 : http_listener.0.shutdown(),
236 : "shutdown http",
237 : Duration::from_secs(1),
238 : )
239 : .await;
240 :
241 : timed(
242 : secondary_controller_tasks.wait(), // cancellation happened in caller
243 : "secondary controller wait",
244 : Duration::from_secs(1),
245 : )
246 : .await;
247 :
248 : // There should be nothing left, but let's be sure
249 : timed(
250 : task_mgr::shutdown_tasks(None, None, None),
251 : "shutdown leftovers",
252 : Duration::from_secs(1),
253 : )
254 : .await;
255 :
256 : info!("cancel & join walredo_extraordinary_shutdown_thread");
257 : walredo_extraordinary_shutdown_thread_cancel.cancel();
258 : walredo_extraordinary_shutdown_thread.join().unwrap();
259 : info!("walredo_extraordinary_shutdown_thread done");
260 :
261 : info!(
262 : elapsed_ms = started_at.elapsed().as_millis(),
263 : "Shut down successfully completed"
264 : );
265 : std::process::exit(exit_code);
266 : }
267 :
268 : /// Per-tenant configuration file.
269 : /// Full path: `tenants/<tenant_id>/config-v1`.
270 : pub(crate) const TENANT_LOCATION_CONFIG_NAME: &str = "config-v1";
271 :
272 : /// Per-tenant copy of their remote heatmap, downloaded into the local
273 : /// tenant path while in secondary mode.
274 : pub(crate) const TENANT_HEATMAP_BASENAME: &str = "heatmap-v1.json";
275 :
276 : /// A suffix used for various temporary files. Any temporary files found in the
277 : /// data directory at pageserver startup can be automatically removed.
278 : pub(crate) const TEMP_FILE_SUFFIX: &str = "___temp";
279 :
280 4 : pub fn is_temporary(path: &Utf8Path) -> bool {
281 4 : match path.file_name() {
282 4 : Some(name) => name.ends_with(TEMP_FILE_SUFFIX),
283 0 : None => false,
284 : }
285 4 : }
286 :
287 : /// During pageserver startup, we need to order operations not to exhaust tokio worker threads by
288 : /// blocking.
289 : ///
290 : /// The instances of this value exist only during startup, otherwise `None` is provided, meaning no
291 : /// delaying is needed.
292 : #[derive(Clone)]
293 : pub struct InitializationOrder {
294 : /// Each initial tenant load task carries this until it is done loading timelines from remote storage
295 : pub initial_tenant_load_remote: Option<utils::completion::Completion>,
296 :
297 : /// Each initial tenant load task carries this until completion.
298 : pub initial_tenant_load: Option<utils::completion::Completion>,
299 :
300 : /// Barrier for when we can start any background jobs.
301 : ///
302 : /// This can be broken up later on, but right now there is just one class of a background job.
303 : pub background_jobs_can_start: utils::completion::Barrier,
304 : }
305 :
306 : /// Time the future with a warning when it exceeds a threshold.
307 32 : async fn timed<Fut: std::future::Future>(
308 32 : fut: Fut,
309 32 : name: &str,
310 32 : warn_at: std::time::Duration,
311 32 : ) -> <Fut as std::future::Future>::Output {
312 32 : let started = std::time::Instant::now();
313 32 :
314 32 : let mut fut = std::pin::pin!(fut);
315 32 :
316 32 : match tokio::time::timeout(warn_at, &mut fut).await {
317 31 : Ok(ret) => {
318 31 : tracing::info!(
319 : stage = name,
320 0 : elapsed_ms = started.elapsed().as_millis(),
321 0 : "completed"
322 : );
323 31 : ret
324 : }
325 : Err(_) => {
326 1 : tracing::info!(
327 : stage = name,
328 0 : elapsed_ms = started.elapsed().as_millis(),
329 0 : "still waiting, taking longer than expected..."
330 : );
331 :
332 1 : let ret = fut.await;
333 :
334 : // this has a global allowed_errors
335 1 : tracing::warn!(
336 : stage = name,
337 0 : elapsed_ms = started.elapsed().as_millis(),
338 0 : "completed, took longer than expected"
339 : );
340 :
341 1 : ret
342 : }
343 : }
344 32 : }
345 :
346 : /// Like [`timed`], but the warning timeout only starts after `cancel` has been cancelled.
347 0 : async fn timed_after_cancellation<Fut: std::future::Future>(
348 0 : fut: Fut,
349 0 : name: &str,
350 0 : warn_at: std::time::Duration,
351 0 : cancel: &CancellationToken,
352 0 : ) -> <Fut as std::future::Future>::Output {
353 0 : let mut fut = std::pin::pin!(fut);
354 0 :
355 0 : tokio::select! {
356 0 : _ = cancel.cancelled() => {
357 0 : timed(fut, name, warn_at).await
358 : }
359 0 : ret = &mut fut => {
360 0 : ret
361 : }
362 : }
363 0 : }
364 :
365 : #[cfg(test)]
366 : mod timed_tests {
367 : use std::time::Duration;
368 :
369 : use super::timed;
370 :
371 : #[tokio::test]
372 1 : async fn timed_completes_when_inner_future_completes() {
373 1 : // A future that completes on time should have its result returned
374 1 : let r1 = timed(
375 1 : async move {
376 1 : tokio::time::sleep(Duration::from_millis(10)).await;
377 1 : 123
378 1 : },
379 1 : "test 1",
380 1 : Duration::from_millis(50),
381 1 : )
382 1 : .await;
383 1 : assert_eq!(r1, 123);
384 1 :
385 1 : // A future that completes too slowly should also have its result returned
386 1 : let r1 = timed(
387 1 : async move {
388 1 : tokio::time::sleep(Duration::from_millis(50)).await;
389 1 : 456
390 1 : },
391 1 : "test 1",
392 1 : Duration::from_millis(10),
393 1 : )
394 1 : .await;
395 1 : assert_eq!(r1, 456);
396 1 : }
397 : }
|