Line data Source code
1 : #![recursion_limit = "300"]
2 : #![deny(clippy::undocumented_unsafe_blocks)]
3 :
4 : mod auth;
5 : pub mod basebackup;
6 : pub mod config;
7 : pub mod consumption_metrics;
8 : pub mod context;
9 : pub mod controller_upcall_client;
10 : pub mod deletion_queue;
11 : pub mod disk_usage_eviction_task;
12 : pub mod http;
13 : pub mod import_datadir;
14 : pub mod l0_flush;
15 :
16 : extern crate hyper0 as hyper;
17 :
18 : use futures::StreamExt;
19 : use futures::stream::FuturesUnordered;
20 : pub use pageserver_api::keyspace;
21 : use tokio_util::sync::CancellationToken;
22 : mod assert_u64_eq_usize;
23 : pub mod aux_file;
24 : pub mod metrics;
25 : pub mod page_cache;
26 : pub mod page_service;
27 : pub mod pgdatadir_mapping;
28 : pub mod span;
29 : pub(crate) mod statvfs;
30 : pub mod task_mgr;
31 : pub mod tenant;
32 : pub mod utilization;
33 : pub mod virtual_file;
34 : pub mod walingest;
35 : pub mod walredo;
36 :
37 : use camino::Utf8Path;
38 : use deletion_queue::DeletionQueue;
39 : use tenant::mgr::{BackgroundPurges, TenantManager};
40 : use tenant::secondary;
41 : use tracing::{info, info_span};
42 :
43 : /// Current storage format version
44 : ///
45 : /// This is embedded in the header of all the layer files.
46 : /// If you make any backwards-incompatible changes to the storage
47 : /// format, bump this!
48 : /// Note that TimelineMetadata uses its own version number to track
49 : /// backwards-compatible changes to the metadata format.
50 : pub const STORAGE_FORMAT_VERSION: u16 = 3;
51 :
52 : pub const DEFAULT_PG_VERSION: u32 = 16;
53 :
54 : // Magic constants used to identify different kinds of files
55 : pub const IMAGE_FILE_MAGIC: u16 = 0x5A60;
56 : pub const DELTA_FILE_MAGIC: u16 = 0x5A61;
57 :
58 : // Target used for performance traces.
59 : pub const PERF_TRACE_TARGET: &str = "P";
60 :
61 : static ZERO_PAGE: bytes::Bytes = bytes::Bytes::from_static(&[0u8; 8192]);
62 :
63 : pub use crate::metrics::preinitialize_metrics;
64 :
65 : pub struct CancellableTask {
66 : pub task: tokio::task::JoinHandle<()>,
67 : pub cancel: CancellationToken,
68 : }
69 : pub struct HttpEndpointListener(pub CancellableTask);
70 : pub struct HttpsEndpointListener(pub CancellableTask);
71 : pub struct ConsumptionMetricsTasks(pub CancellableTask);
72 : pub struct DiskUsageEvictionTask(pub CancellableTask);
73 : impl CancellableTask {
74 0 : pub async fn shutdown(self) {
75 0 : self.cancel.cancel();
76 0 : self.task.await.unwrap();
77 0 : }
78 : }
79 :
80 : #[tracing::instrument(skip_all, fields(%exit_code))]
81 : #[allow(clippy::too_many_arguments)]
82 : pub async fn shutdown_pageserver(
83 : http_listener: HttpEndpointListener,
84 : https_listener: Option<HttpsEndpointListener>,
85 : page_service: page_service::Listener,
86 : consumption_metrics_worker: ConsumptionMetricsTasks,
87 : disk_usage_eviction_task: Option<DiskUsageEvictionTask>,
88 : tenant_manager: &TenantManager,
89 : background_purges: BackgroundPurges,
90 : mut deletion_queue: DeletionQueue,
91 : secondary_controller_tasks: secondary::GlobalTasks,
92 : exit_code: i32,
93 : ) {
94 : use std::time::Duration;
95 :
96 : let started_at = std::time::Instant::now();
97 :
98 : // If the orderly shutdown below takes too long, we still want to make
99 : // sure that all walredo processes are killed and wait()ed on by us, not systemd.
100 : //
101 : // (Leftover walredo processes are the hypothesized trigger for the systemd freezes
102 : // that we keep seeing in prod => https://github.com/neondatabase/cloud/issues/11387.
103 : //
104 : // We use a thread instead of a tokio task because the background runtime is likely busy
105 : // with the final flushing / uploads. This activity here has priority, and due to lack
106 : // of scheduling priority feature sin the tokio scheduler, using a separate thread is
107 : // an effective priority booster.
108 : let walredo_extraordinary_shutdown_thread_span = {
109 : let span = info_span!(parent: None, "walredo_extraordinary_shutdown_thread");
110 : span.follows_from(tracing::Span::current());
111 : span
112 : };
113 : let walredo_extraordinary_shutdown_thread_cancel = CancellationToken::new();
114 : let walredo_extraordinary_shutdown_thread = std::thread::spawn({
115 : let walredo_extraordinary_shutdown_thread_cancel =
116 : walredo_extraordinary_shutdown_thread_cancel.clone();
117 0 : move || {
118 0 : let rt = tokio::runtime::Builder::new_current_thread()
119 0 : .enable_all()
120 0 : .build()
121 0 : .unwrap();
122 0 : let _entered = rt.enter();
123 0 : let _entered = walredo_extraordinary_shutdown_thread_span.enter();
124 0 : if let Ok(()) = rt.block_on(tokio::time::timeout(
125 0 : Duration::from_secs(8),
126 0 : walredo_extraordinary_shutdown_thread_cancel.cancelled(),
127 0 : )) {
128 0 : info!("cancellation requested");
129 0 : return;
130 0 : }
131 0 : let managers = tenant::WALREDO_MANAGERS
132 0 : .lock()
133 0 : .unwrap()
134 0 : // prevents new walredo managers from being inserted
135 0 : .take()
136 0 : .expect("only we take()");
137 0 : // Use FuturesUnordered to get in queue early for each manager's
138 0 : // heavier_once_cell semaphore wait list.
139 0 : // Also, for idle tenants that for some reason haven't
140 0 : // shut down yet, it's quite likely that we're not going
141 0 : // to get Poll::Pending once.
142 0 : let mut futs: FuturesUnordered<_> = managers
143 0 : .into_iter()
144 0 : .filter_map(|(_, mgr)| mgr.upgrade())
145 0 : .map(|mgr| async move { tokio::task::unconstrained(mgr.shutdown()).await })
146 0 : .collect();
147 0 : info!(count=%futs.len(), "built FuturesUnordered");
148 0 : let mut last_log_at = std::time::Instant::now();
149 : #[derive(Debug, Default)]
150 : struct Results {
151 : initiated: u64,
152 : already: u64,
153 : }
154 0 : let mut results = Results::default();
155 0 : while let Some(we_initiated) = rt.block_on(futs.next()) {
156 0 : if we_initiated {
157 0 : results.initiated += 1;
158 0 : } else {
159 0 : results.already += 1;
160 0 : }
161 0 : if last_log_at.elapsed() > Duration::from_millis(100) {
162 0 : info!(remaining=%futs.len(), ?results, "progress");
163 0 : last_log_at = std::time::Instant::now();
164 0 : }
165 : }
166 0 : info!(?results, "done");
167 0 : }
168 : });
169 :
170 : // Shut down the libpq endpoint task. This prevents new connections from
171 : // being accepted.
172 : let remaining_connections = timed(
173 : page_service.stop_accepting(),
174 : "shutdown LibpqEndpointListener",
175 : Duration::from_secs(1),
176 : )
177 : .await;
178 :
179 : // Shut down all the tenants. This flushes everything to disk and kills
180 : // the checkpoint and GC tasks.
181 : timed(
182 : tenant_manager.shutdown(),
183 : "shutdown all tenants",
184 : Duration::from_secs(5),
185 : )
186 : .await;
187 :
188 : // Shut down any page service tasks: any in-progress work for particular timelines or tenants
189 : // should already have been canclled via mgr::shutdown_all_tenants
190 : timed(
191 : remaining_connections.shutdown(),
192 : "shutdown PageRequestHandlers",
193 : Duration::from_secs(1),
194 : )
195 : .await;
196 :
197 : // Best effort to persist any outstanding deletions, to avoid leaking objects
198 : deletion_queue.shutdown(Duration::from_secs(5)).await;
199 :
200 : timed(
201 : consumption_metrics_worker.0.shutdown(),
202 : "shutdown consumption metrics",
203 : Duration::from_secs(1),
204 : )
205 : .await;
206 :
207 : timed(
208 0 : futures::future::OptionFuture::from(disk_usage_eviction_task.map(|t| t.0.shutdown())),
209 : "shutdown disk usage eviction",
210 : Duration::from_secs(1),
211 : )
212 : .await;
213 :
214 : timed(
215 : background_purges.shutdown(),
216 : "shutdown background purges",
217 : Duration::from_secs(1),
218 : )
219 : .await;
220 :
221 : if let Some(https_listener) = https_listener {
222 : timed(
223 : https_listener.0.shutdown(),
224 : "shutdown https",
225 : Duration::from_secs(1),
226 : )
227 : .await;
228 : }
229 :
230 : // Shut down the HTTP endpoint last, so that you can still check the server's
231 : // status while it's shutting down.
232 : // FIXME: We should probably stop accepting commands like attach/detach earlier.
233 : timed(
234 : http_listener.0.shutdown(),
235 : "shutdown http",
236 : Duration::from_secs(1),
237 : )
238 : .await;
239 :
240 : timed(
241 : secondary_controller_tasks.wait(), // cancellation happened in caller
242 : "secondary controller wait",
243 : Duration::from_secs(1),
244 : )
245 : .await;
246 :
247 : // There should be nothing left, but let's be sure
248 : timed(
249 : task_mgr::shutdown_tasks(None, None, None),
250 : "shutdown leftovers",
251 : Duration::from_secs(1),
252 : )
253 : .await;
254 :
255 : info!("cancel & join walredo_extraordinary_shutdown_thread");
256 : walredo_extraordinary_shutdown_thread_cancel.cancel();
257 : walredo_extraordinary_shutdown_thread.join().unwrap();
258 : info!("walredo_extraordinary_shutdown_thread done");
259 :
260 : info!(
261 : elapsed_ms = started_at.elapsed().as_millis(),
262 : "Shut down successfully completed"
263 : );
264 : std::process::exit(exit_code);
265 : }
266 :
267 : /// Per-tenant configuration file.
268 : /// Full path: `tenants/<tenant_id>/config-v1`.
269 : pub(crate) const TENANT_LOCATION_CONFIG_NAME: &str = "config-v1";
270 :
271 : /// Per-tenant copy of their remote heatmap, downloaded into the local
272 : /// tenant path while in secondary mode.
273 : pub(crate) const TENANT_HEATMAP_BASENAME: &str = "heatmap-v1.json";
274 :
275 : /// A suffix used for various temporary files. Any temporary files found in the
276 : /// data directory at pageserver startup can be automatically removed.
277 : pub(crate) const TEMP_FILE_SUFFIX: &str = "___temp";
278 :
279 16 : pub fn is_temporary(path: &Utf8Path) -> bool {
280 16 : match path.file_name() {
281 16 : Some(name) => name.ends_with(TEMP_FILE_SUFFIX),
282 0 : None => false,
283 : }
284 16 : }
285 :
286 : /// During pageserver startup, we need to order operations not to exhaust tokio worker threads by
287 : /// blocking.
288 : ///
289 : /// The instances of this value exist only during startup, otherwise `None` is provided, meaning no
290 : /// delaying is needed.
291 : #[derive(Clone)]
292 : pub struct InitializationOrder {
293 : /// Each initial tenant load task carries this until it is done loading timelines from remote storage
294 : pub initial_tenant_load_remote: Option<utils::completion::Completion>,
295 :
296 : /// Each initial tenant load task carries this until completion.
297 : pub initial_tenant_load: Option<utils::completion::Completion>,
298 :
299 : /// Barrier for when we can start any background jobs.
300 : ///
301 : /// This can be broken up later on, but right now there is just one class of a background job.
302 : pub background_jobs_can_start: utils::completion::Barrier,
303 : }
304 :
305 : /// Time the future with a warning when it exceeds a threshold.
306 128 : async fn timed<Fut: std::future::Future>(
307 128 : fut: Fut,
308 128 : name: &str,
309 128 : warn_at: std::time::Duration,
310 128 : ) -> <Fut as std::future::Future>::Output {
311 128 : let started = std::time::Instant::now();
312 128 :
313 128 : let mut fut = std::pin::pin!(fut);
314 128 :
315 128 : match tokio::time::timeout(warn_at, &mut fut).await {
316 124 : Ok(ret) => {
317 124 : tracing::info!(
318 : stage = name,
319 0 : elapsed_ms = started.elapsed().as_millis(),
320 0 : "completed"
321 : );
322 124 : ret
323 : }
324 : Err(_) => {
325 4 : tracing::info!(
326 : stage = name,
327 0 : elapsed_ms = started.elapsed().as_millis(),
328 0 : "still waiting, taking longer than expected..."
329 : );
330 :
331 4 : let ret = fut.await;
332 :
333 : // this has a global allowed_errors
334 4 : tracing::warn!(
335 : stage = name,
336 0 : elapsed_ms = started.elapsed().as_millis(),
337 0 : "completed, took longer than expected"
338 : );
339 :
340 4 : ret
341 : }
342 : }
343 128 : }
344 :
345 : /// Like [`timed`], but the warning timeout only starts after `cancel` has been cancelled.
346 0 : async fn timed_after_cancellation<Fut: std::future::Future>(
347 0 : fut: Fut,
348 0 : name: &str,
349 0 : warn_at: std::time::Duration,
350 0 : cancel: &CancellationToken,
351 0 : ) -> <Fut as std::future::Future>::Output {
352 0 : let mut fut = std::pin::pin!(fut);
353 0 :
354 0 : tokio::select! {
355 0 : _ = cancel.cancelled() => {
356 0 : timed(fut, name, warn_at).await
357 : }
358 0 : ret = &mut fut => {
359 0 : ret
360 : }
361 : }
362 0 : }
363 :
364 : #[cfg(test)]
365 : mod timed_tests {
366 : use std::time::Duration;
367 :
368 : use super::timed;
369 :
370 : #[tokio::test]
371 4 : async fn timed_completes_when_inner_future_completes() {
372 4 : // A future that completes on time should have its result returned
373 4 : let r1 = timed(
374 4 : async move {
375 4 : tokio::time::sleep(Duration::from_millis(10)).await;
376 4 : 123
377 4 : },
378 4 : "test 1",
379 4 : Duration::from_millis(50),
380 4 : )
381 4 : .await;
382 4 : assert_eq!(r1, 123);
383 4 :
384 4 : // A future that completes too slowly should also have its result returned
385 4 : let r1 = timed(
386 4 : async move {
387 4 : tokio::time::sleep(Duration::from_millis(50)).await;
388 4 : 456
389 4 : },
390 4 : "test 1",
391 4 : Duration::from_millis(10),
392 4 : )
393 4 : .await;
394 4 : assert_eq!(r1, 456);
395 4 : }
396 : }
|