Line data Source code
1 : #![recursion_limit = "300"]
2 : #![deny(clippy::undocumented_unsafe_blocks)]
3 :
4 : mod auth;
5 : pub mod basebackup;
6 : pub mod basebackup_cache;
7 : pub mod config;
8 : pub mod consumption_metrics;
9 : pub mod context;
10 : pub mod controller_upcall_client;
11 : pub mod deletion_queue;
12 : pub mod disk_usage_eviction_task;
13 : pub mod http;
14 : pub mod import_datadir;
15 : pub mod l0_flush;
16 :
17 : extern crate hyper0 as hyper;
18 :
19 : use futures::StreamExt;
20 : use futures::stream::FuturesUnordered;
21 : pub use pageserver_api::keyspace;
22 : use tokio_util::sync::CancellationToken;
23 : mod assert_u64_eq_usize;
24 : pub mod aux_file;
25 : pub mod metrics;
26 : pub mod page_cache;
27 : pub mod page_service;
28 : pub mod pgdatadir_mapping;
29 : pub mod span;
30 : pub(crate) mod statvfs;
31 : pub mod task_mgr;
32 : pub mod tenant;
33 : pub mod utilization;
34 : pub mod virtual_file;
35 : pub mod walingest;
36 : pub mod walredo;
37 :
38 : use camino::Utf8Path;
39 : use deletion_queue::DeletionQueue;
40 : use tenant::mgr::{BackgroundPurges, TenantManager};
41 : use tenant::secondary;
42 : use tracing::{info, info_span};
43 :
44 : /// Current storage format version
45 : ///
46 : /// This is embedded in the header of all the layer files.
47 : /// If you make any backwards-incompatible changes to the storage
48 : /// format, bump this!
49 : /// Note that TimelineMetadata uses its own version number to track
50 : /// backwards-compatible changes to the metadata format.
51 : pub const STORAGE_FORMAT_VERSION: u16 = 3;
52 :
53 : pub const DEFAULT_PG_VERSION: u32 = 17;
54 :
55 : // Magic constants used to identify different kinds of files
56 : pub const IMAGE_FILE_MAGIC: u16 = 0x5A60;
57 : pub const DELTA_FILE_MAGIC: u16 = 0x5A61;
58 :
59 : // Target used for performance traces.
60 : pub const PERF_TRACE_TARGET: &str = "P";
61 :
62 : static ZERO_PAGE: bytes::Bytes = bytes::Bytes::from_static(&[0u8; 8192]);
63 :
64 : pub use crate::metrics::preinitialize_metrics;
65 :
66 : pub struct CancellableTask {
67 : pub task: tokio::task::JoinHandle<()>,
68 : pub cancel: CancellationToken,
69 : }
70 : pub struct HttpEndpointListener(pub CancellableTask);
71 : pub struct HttpsEndpointListener(pub CancellableTask);
72 : pub struct ConsumptionMetricsTasks(pub CancellableTask);
73 : pub struct DiskUsageEvictionTask(pub CancellableTask);
74 : impl CancellableTask {
75 0 : pub async fn shutdown(self) {
76 0 : self.cancel.cancel();
77 0 : self.task.await.unwrap();
78 0 : }
79 : }
80 :
81 : #[tracing::instrument(skip_all, fields(%exit_code))]
82 : #[allow(clippy::too_many_arguments)]
83 : pub async fn shutdown_pageserver(
84 : http_listener: HttpEndpointListener,
85 : https_listener: Option<HttpsEndpointListener>,
86 : page_service: page_service::Listener,
87 : grpc_task: Option<CancellableTask>,
88 : consumption_metrics_worker: ConsumptionMetricsTasks,
89 : disk_usage_eviction_task: Option<DiskUsageEvictionTask>,
90 : tenant_manager: &TenantManager,
91 : background_purges: BackgroundPurges,
92 : mut deletion_queue: DeletionQueue,
93 : secondary_controller_tasks: secondary::GlobalTasks,
94 : exit_code: i32,
95 : ) {
96 : use std::time::Duration;
97 :
98 : let started_at = std::time::Instant::now();
99 :
100 : // If the orderly shutdown below takes too long, we still want to make
101 : // sure that all walredo processes are killed and wait()ed on by us, not systemd.
102 : //
103 : // (Leftover walredo processes are the hypothesized trigger for the systemd freezes
104 : // that we keep seeing in prod => https://github.com/neondatabase/cloud/issues/11387.
105 : //
106 : // We use a thread instead of a tokio task because the background runtime is likely busy
107 : // with the final flushing / uploads. This activity here has priority, and due to lack
108 : // of scheduling priority feature sin the tokio scheduler, using a separate thread is
109 : // an effective priority booster.
110 : let walredo_extraordinary_shutdown_thread_span = {
111 : let span = info_span!(parent: None, "walredo_extraordinary_shutdown_thread");
112 : span.follows_from(tracing::Span::current());
113 : span
114 : };
115 : let walredo_extraordinary_shutdown_thread_cancel = CancellationToken::new();
116 : let walredo_extraordinary_shutdown_thread = std::thread::spawn({
117 : let walredo_extraordinary_shutdown_thread_cancel =
118 : walredo_extraordinary_shutdown_thread_cancel.clone();
119 0 : move || {
120 0 : let rt = tokio::runtime::Builder::new_current_thread()
121 0 : .enable_all()
122 0 : .build()
123 0 : .unwrap();
124 0 : let _entered = rt.enter();
125 0 : let _entered = walredo_extraordinary_shutdown_thread_span.enter();
126 0 : if let Ok(()) = rt.block_on(tokio::time::timeout(
127 0 : Duration::from_secs(8),
128 0 : walredo_extraordinary_shutdown_thread_cancel.cancelled(),
129 0 : )) {
130 0 : info!("cancellation requested");
131 0 : return;
132 0 : }
133 0 : let managers = tenant::WALREDO_MANAGERS
134 0 : .lock()
135 0 : .unwrap()
136 0 : // prevents new walredo managers from being inserted
137 0 : .take()
138 0 : .expect("only we take()");
139 0 : // Use FuturesUnordered to get in queue early for each manager's
140 0 : // heavier_once_cell semaphore wait list.
141 0 : // Also, for idle tenants that for some reason haven't
142 0 : // shut down yet, it's quite likely that we're not going
143 0 : // to get Poll::Pending once.
144 0 : let mut futs: FuturesUnordered<_> = managers
145 0 : .into_iter()
146 0 : .filter_map(|(_, mgr)| mgr.upgrade())
147 0 : .map(|mgr| async move { tokio::task::unconstrained(mgr.shutdown()).await })
148 0 : .collect();
149 0 : info!(count=%futs.len(), "built FuturesUnordered");
150 0 : let mut last_log_at = std::time::Instant::now();
151 : #[derive(Debug, Default)]
152 : struct Results {
153 : initiated: u64,
154 : already: u64,
155 : }
156 0 : let mut results = Results::default();
157 0 : while let Some(we_initiated) = rt.block_on(futs.next()) {
158 0 : if we_initiated {
159 0 : results.initiated += 1;
160 0 : } else {
161 0 : results.already += 1;
162 0 : }
163 0 : if last_log_at.elapsed() > Duration::from_millis(100) {
164 0 : info!(remaining=%futs.len(), ?results, "progress");
165 0 : last_log_at = std::time::Instant::now();
166 0 : }
167 : }
168 0 : info!(?results, "done");
169 0 : }
170 : });
171 :
172 : // Shut down the libpq endpoint task. This prevents new connections from
173 : // being accepted.
174 : let remaining_connections = timed(
175 : page_service.stop_accepting(),
176 : "shutdown LibpqEndpointListener",
177 : Duration::from_secs(1),
178 : )
179 : .await;
180 :
181 : // Shut down the gRPC server task, including request handlers.
182 : if let Some(grpc_task) = grpc_task {
183 : timed(
184 : grpc_task.shutdown(),
185 : "shutdown gRPC PageRequestHandler",
186 : Duration::from_secs(3),
187 : )
188 : .await;
189 : }
190 :
191 : // Shut down all the tenants. This flushes everything to disk and kills
192 : // the checkpoint and GC tasks.
193 : timed(
194 : tenant_manager.shutdown(),
195 : "shutdown all tenants",
196 : Duration::from_secs(5),
197 : )
198 : .await;
199 :
200 : // Shut down any page service tasks: any in-progress work for particular timelines or tenants
201 : // should already have been canclled via mgr::shutdown_all_tenants
202 : timed(
203 : remaining_connections.shutdown(),
204 : "shutdown PageRequestHandlers",
205 : Duration::from_secs(1),
206 : )
207 : .await;
208 :
209 : // Best effort to persist any outstanding deletions, to avoid leaking objects
210 : deletion_queue.shutdown(Duration::from_secs(5)).await;
211 :
212 : timed(
213 : consumption_metrics_worker.0.shutdown(),
214 : "shutdown consumption metrics",
215 : Duration::from_secs(1),
216 : )
217 : .await;
218 :
219 : timed(
220 0 : futures::future::OptionFuture::from(disk_usage_eviction_task.map(|t| t.0.shutdown())),
221 : "shutdown disk usage eviction",
222 : Duration::from_secs(1),
223 : )
224 : .await;
225 :
226 : timed(
227 : background_purges.shutdown(),
228 : "shutdown background purges",
229 : Duration::from_secs(1),
230 : )
231 : .await;
232 :
233 : if let Some(https_listener) = https_listener {
234 : timed(
235 : https_listener.0.shutdown(),
236 : "shutdown https",
237 : Duration::from_secs(1),
238 : )
239 : .await;
240 : }
241 :
242 : // Shut down the HTTP endpoint last, so that you can still check the server's
243 : // status while it's shutting down.
244 : // FIXME: We should probably stop accepting commands like attach/detach earlier.
245 : timed(
246 : http_listener.0.shutdown(),
247 : "shutdown http",
248 : Duration::from_secs(1),
249 : )
250 : .await;
251 :
252 : timed(
253 : secondary_controller_tasks.wait(), // cancellation happened in caller
254 : "secondary controller wait",
255 : Duration::from_secs(1),
256 : )
257 : .await;
258 :
259 : // There should be nothing left, but let's be sure
260 : timed(
261 : task_mgr::shutdown_tasks(None, None, None),
262 : "shutdown leftovers",
263 : Duration::from_secs(1),
264 : )
265 : .await;
266 :
267 : info!("cancel & join walredo_extraordinary_shutdown_thread");
268 : walredo_extraordinary_shutdown_thread_cancel.cancel();
269 : walredo_extraordinary_shutdown_thread.join().unwrap();
270 : info!("walredo_extraordinary_shutdown_thread done");
271 :
272 : info!(
273 : elapsed_ms = started_at.elapsed().as_millis(),
274 : "Shut down successfully completed"
275 : );
276 : std::process::exit(exit_code);
277 : }
278 :
279 : /// Per-tenant configuration file.
280 : /// Full path: `tenants/<tenant_id>/config-v1`.
281 : pub(crate) const TENANT_LOCATION_CONFIG_NAME: &str = "config-v1";
282 :
283 : /// Per-tenant copy of their remote heatmap, downloaded into the local
284 : /// tenant path while in secondary mode.
285 : pub(crate) const TENANT_HEATMAP_BASENAME: &str = "heatmap-v1.json";
286 :
287 : /// A suffix used for various temporary files. Any temporary files found in the
288 : /// data directory at pageserver startup can be automatically removed.
289 : pub(crate) const TEMP_FILE_SUFFIX: &str = "___temp";
290 :
291 4 : pub fn is_temporary(path: &Utf8Path) -> bool {
292 4 : match path.file_name() {
293 4 : Some(name) => name.ends_with(TEMP_FILE_SUFFIX),
294 0 : None => false,
295 : }
296 4 : }
297 :
298 : /// During pageserver startup, we need to order operations not to exhaust tokio worker threads by
299 : /// blocking.
300 : ///
301 : /// The instances of this value exist only during startup, otherwise `None` is provided, meaning no
302 : /// delaying is needed.
303 : #[derive(Clone)]
304 : pub struct InitializationOrder {
305 : /// Each initial tenant load task carries this until it is done loading timelines from remote storage
306 : pub initial_tenant_load_remote: Option<utils::completion::Completion>,
307 :
308 : /// Each initial tenant load task carries this until completion.
309 : pub initial_tenant_load: Option<utils::completion::Completion>,
310 :
311 : /// Barrier for when we can start any background jobs.
312 : ///
313 : /// This can be broken up later on, but right now there is just one class of a background job.
314 : pub background_jobs_can_start: utils::completion::Barrier,
315 : }
316 :
317 : /// Time the future with a warning when it exceeds a threshold.
318 32 : async fn timed<Fut: std::future::Future>(
319 32 : fut: Fut,
320 32 : name: &str,
321 32 : warn_at: std::time::Duration,
322 32 : ) -> <Fut as std::future::Future>::Output {
323 32 : let started = std::time::Instant::now();
324 32 :
325 32 : let mut fut = std::pin::pin!(fut);
326 32 :
327 32 : match tokio::time::timeout(warn_at, &mut fut).await {
328 31 : Ok(ret) => {
329 31 : tracing::info!(
330 : stage = name,
331 0 : elapsed_ms = started.elapsed().as_millis(),
332 0 : "completed"
333 : );
334 31 : ret
335 : }
336 : Err(_) => {
337 1 : tracing::info!(
338 : stage = name,
339 0 : elapsed_ms = started.elapsed().as_millis(),
340 0 : "still waiting, taking longer than expected..."
341 : );
342 :
343 1 : let ret = fut.await;
344 :
345 : // this has a global allowed_errors
346 1 : tracing::warn!(
347 : stage = name,
348 0 : elapsed_ms = started.elapsed().as_millis(),
349 0 : "completed, took longer than expected"
350 : );
351 :
352 1 : ret
353 : }
354 : }
355 32 : }
356 :
357 : /// Like [`timed`], but the warning timeout only starts after `cancel` has been cancelled.
358 0 : async fn timed_after_cancellation<Fut: std::future::Future>(
359 0 : fut: Fut,
360 0 : name: &str,
361 0 : warn_at: std::time::Duration,
362 0 : cancel: &CancellationToken,
363 0 : ) -> <Fut as std::future::Future>::Output {
364 0 : let mut fut = std::pin::pin!(fut);
365 0 :
366 0 : tokio::select! {
367 0 : _ = cancel.cancelled() => {
368 0 : timed(fut, name, warn_at).await
369 : }
370 0 : ret = &mut fut => {
371 0 : ret
372 : }
373 : }
374 0 : }
375 :
376 : #[cfg(test)]
377 : mod timed_tests {
378 : use std::time::Duration;
379 :
380 : use super::timed;
381 :
382 : #[tokio::test]
383 1 : async fn timed_completes_when_inner_future_completes() {
384 1 : // A future that completes on time should have its result returned
385 1 : let r1 = timed(
386 1 : async move {
387 1 : tokio::time::sleep(Duration::from_millis(10)).await;
388 1 : 123
389 1 : },
390 1 : "test 1",
391 1 : Duration::from_millis(50),
392 1 : )
393 1 : .await;
394 1 : assert_eq!(r1, 123);
395 1 :
396 1 : // A future that completes too slowly should also have its result returned
397 1 : let r1 = timed(
398 1 : async move {
399 1 : tokio::time::sleep(Duration::from_millis(50)).await;
400 1 : 456
401 1 : },
402 1 : "test 1",
403 1 : Duration::from_millis(10),
404 1 : )
405 1 : .await;
406 1 : assert_eq!(r1, 456);
407 1 : }
408 : }
|