Line data Source code
1 : #![recursion_limit = "300"]
2 : #![deny(clippy::undocumented_unsafe_blocks)]
3 :
4 : mod auth;
5 : pub mod basebackup;
6 : pub mod basebackup_cache;
7 : pub mod config;
8 : pub mod consumption_metrics;
9 : pub mod context;
10 : pub mod controller_upcall_client;
11 : pub mod deletion_queue;
12 : pub mod disk_usage_eviction_task;
13 : pub mod feature_resolver;
14 : pub mod http;
15 : pub mod import_datadir;
16 : pub mod l0_flush;
17 :
18 : extern crate hyper0 as hyper;
19 :
20 : use futures::StreamExt;
21 : use futures::stream::FuturesUnordered;
22 : pub use pageserver_api::keyspace;
23 : use tokio_util::sync::CancellationToken;
24 : mod assert_u64_eq_usize;
25 : pub mod aux_file;
26 : pub mod metrics;
27 : pub mod page_cache;
28 : pub mod page_service;
29 : pub mod pgdatadir_mapping;
30 : pub mod span;
31 : pub(crate) mod statvfs;
32 : pub mod task_mgr;
33 : pub mod tenant;
34 : pub mod utilization;
35 : pub mod virtual_file;
36 : pub mod walingest;
37 : pub mod walredo;
38 :
39 : use camino::Utf8Path;
40 : use deletion_queue::DeletionQueue;
41 : use tenant::mgr::{BackgroundPurges, TenantManager};
42 : use tenant::secondary;
43 : use tracing::{info, info_span};
44 :
45 : /// Current storage format version
46 : ///
47 : /// This is embedded in the header of all the layer files.
48 : /// If you make any backwards-incompatible changes to the storage
49 : /// format, bump this!
50 : /// Note that TimelineMetadata uses its own version number to track
51 : /// backwards-compatible changes to the metadata format.
52 : pub const STORAGE_FORMAT_VERSION: u16 = 3;
53 :
54 : pub const DEFAULT_PG_VERSION: u32 = 17;
55 :
56 : // Magic constants used to identify different kinds of files
57 : pub const IMAGE_FILE_MAGIC: u16 = 0x5A60;
58 : pub const DELTA_FILE_MAGIC: u16 = 0x5A61;
59 :
60 : // Target used for performance traces.
61 : pub const PERF_TRACE_TARGET: &str = "P";
62 :
63 : static ZERO_PAGE: bytes::Bytes = bytes::Bytes::from_static(&[0u8; 8192]);
64 :
65 : pub use crate::metrics::preinitialize_metrics;
66 :
67 : pub struct CancellableTask {
68 : pub task: tokio::task::JoinHandle<()>,
69 : pub cancel: CancellationToken,
70 : }
71 : pub struct HttpEndpointListener(pub CancellableTask);
72 : pub struct HttpsEndpointListener(pub CancellableTask);
73 : pub struct ConsumptionMetricsTasks(pub CancellableTask);
74 : pub struct DiskUsageEvictionTask(pub CancellableTask);
75 : impl CancellableTask {
76 0 : pub async fn shutdown(self) {
77 0 : self.cancel.cancel();
78 0 : self.task.await.unwrap();
79 0 : }
80 : }
81 :
82 : #[tracing::instrument(skip_all, fields(%exit_code))]
83 : #[allow(clippy::too_many_arguments)]
84 : pub async fn shutdown_pageserver(
85 : http_listener: HttpEndpointListener,
86 : https_listener: Option<HttpsEndpointListener>,
87 : page_service: page_service::Listener,
88 : grpc_task: Option<CancellableTask>,
89 : consumption_metrics_worker: ConsumptionMetricsTasks,
90 : disk_usage_eviction_task: Option<DiskUsageEvictionTask>,
91 : tenant_manager: &TenantManager,
92 : background_purges: BackgroundPurges,
93 : mut deletion_queue: DeletionQueue,
94 : secondary_controller_tasks: secondary::GlobalTasks,
95 : exit_code: i32,
96 : ) {
97 : use std::time::Duration;
98 :
99 : let started_at = std::time::Instant::now();
100 :
101 : // If the orderly shutdown below takes too long, we still want to make
102 : // sure that all walredo processes are killed and wait()ed on by us, not systemd.
103 : //
104 : // (Leftover walredo processes are the hypothesized trigger for the systemd freezes
105 : // that we keep seeing in prod => https://github.com/neondatabase/cloud/issues/11387.
106 : //
107 : // We use a thread instead of a tokio task because the background runtime is likely busy
108 : // with the final flushing / uploads. This activity here has priority, and due to lack
109 : // of scheduling priority feature sin the tokio scheduler, using a separate thread is
110 : // an effective priority booster.
111 : let walredo_extraordinary_shutdown_thread_span = {
112 : let span = info_span!(parent: None, "walredo_extraordinary_shutdown_thread");
113 : span.follows_from(tracing::Span::current());
114 : span
115 : };
116 : let walredo_extraordinary_shutdown_thread_cancel = CancellationToken::new();
117 : let walredo_extraordinary_shutdown_thread = std::thread::spawn({
118 : let walredo_extraordinary_shutdown_thread_cancel =
119 : walredo_extraordinary_shutdown_thread_cancel.clone();
120 0 : move || {
121 0 : let rt = tokio::runtime::Builder::new_current_thread()
122 0 : .enable_all()
123 0 : .build()
124 0 : .unwrap();
125 0 : let _entered = rt.enter();
126 0 : let _entered = walredo_extraordinary_shutdown_thread_span.enter();
127 0 : if let Ok(()) = rt.block_on(tokio::time::timeout(
128 0 : Duration::from_secs(8),
129 0 : walredo_extraordinary_shutdown_thread_cancel.cancelled(),
130 0 : )) {
131 0 : info!("cancellation requested");
132 0 : return;
133 0 : }
134 0 : let managers = tenant::WALREDO_MANAGERS
135 0 : .lock()
136 0 : .unwrap()
137 0 : // prevents new walredo managers from being inserted
138 0 : .take()
139 0 : .expect("only we take()");
140 0 : // Use FuturesUnordered to get in queue early for each manager's
141 0 : // heavier_once_cell semaphore wait list.
142 0 : // Also, for idle tenants that for some reason haven't
143 0 : // shut down yet, it's quite likely that we're not going
144 0 : // to get Poll::Pending once.
145 0 : let mut futs: FuturesUnordered<_> = managers
146 0 : .into_iter()
147 0 : .filter_map(|(_, mgr)| mgr.upgrade())
148 0 : .map(|mgr| async move { tokio::task::unconstrained(mgr.shutdown()).await })
149 0 : .collect();
150 0 : info!(count=%futs.len(), "built FuturesUnordered");
151 0 : let mut last_log_at = std::time::Instant::now();
152 : #[derive(Debug, Default)]
153 : struct Results {
154 : initiated: u64,
155 : already: u64,
156 : }
157 0 : let mut results = Results::default();
158 0 : while let Some(we_initiated) = rt.block_on(futs.next()) {
159 0 : if we_initiated {
160 0 : results.initiated += 1;
161 0 : } else {
162 0 : results.already += 1;
163 0 : }
164 0 : if last_log_at.elapsed() > Duration::from_millis(100) {
165 0 : info!(remaining=%futs.len(), ?results, "progress");
166 0 : last_log_at = std::time::Instant::now();
167 0 : }
168 : }
169 0 : info!(?results, "done");
170 0 : }
171 : });
172 :
173 : // Shut down the libpq endpoint task. This prevents new connections from
174 : // being accepted.
175 : let remaining_connections = timed(
176 : page_service.stop_accepting(),
177 : "shutdown LibpqEndpointListener",
178 : Duration::from_secs(1),
179 : )
180 : .await;
181 :
182 : // Shut down the gRPC server task, including request handlers.
183 : if let Some(grpc_task) = grpc_task {
184 : timed(
185 : grpc_task.shutdown(),
186 : "shutdown gRPC PageRequestHandler",
187 : Duration::from_secs(3),
188 : )
189 : .await;
190 : }
191 :
192 : // Shut down all the tenants. This flushes everything to disk and kills
193 : // the checkpoint and GC tasks.
194 : timed(
195 : tenant_manager.shutdown(),
196 : "shutdown all tenants",
197 : Duration::from_secs(5),
198 : )
199 : .await;
200 :
201 : // Shut down any page service tasks: any in-progress work for particular timelines or tenants
202 : // should already have been canclled via mgr::shutdown_all_tenants
203 : timed(
204 : remaining_connections.shutdown(),
205 : "shutdown PageRequestHandlers",
206 : Duration::from_secs(1),
207 : )
208 : .await;
209 :
210 : // Best effort to persist any outstanding deletions, to avoid leaking objects
211 : deletion_queue.shutdown(Duration::from_secs(5)).await;
212 :
213 : timed(
214 : consumption_metrics_worker.0.shutdown(),
215 : "shutdown consumption metrics",
216 : Duration::from_secs(1),
217 : )
218 : .await;
219 :
220 : timed(
221 0 : futures::future::OptionFuture::from(disk_usage_eviction_task.map(|t| t.0.shutdown())),
222 : "shutdown disk usage eviction",
223 : Duration::from_secs(1),
224 : )
225 : .await;
226 :
227 : timed(
228 : background_purges.shutdown(),
229 : "shutdown background purges",
230 : Duration::from_secs(1),
231 : )
232 : .await;
233 :
234 : if let Some(https_listener) = https_listener {
235 : timed(
236 : https_listener.0.shutdown(),
237 : "shutdown https",
238 : Duration::from_secs(1),
239 : )
240 : .await;
241 : }
242 :
243 : // Shut down the HTTP endpoint last, so that you can still check the server's
244 : // status while it's shutting down.
245 : // FIXME: We should probably stop accepting commands like attach/detach earlier.
246 : timed(
247 : http_listener.0.shutdown(),
248 : "shutdown http",
249 : Duration::from_secs(1),
250 : )
251 : .await;
252 :
253 : timed(
254 : secondary_controller_tasks.wait(), // cancellation happened in caller
255 : "secondary controller wait",
256 : Duration::from_secs(1),
257 : )
258 : .await;
259 :
260 : // There should be nothing left, but let's be sure
261 : timed(
262 : task_mgr::shutdown_tasks(None, None, None),
263 : "shutdown leftovers",
264 : Duration::from_secs(1),
265 : )
266 : .await;
267 :
268 : info!("cancel & join walredo_extraordinary_shutdown_thread");
269 : walredo_extraordinary_shutdown_thread_cancel.cancel();
270 : walredo_extraordinary_shutdown_thread.join().unwrap();
271 : info!("walredo_extraordinary_shutdown_thread done");
272 :
273 : info!(
274 : elapsed_ms = started_at.elapsed().as_millis(),
275 : "Shut down successfully completed"
276 : );
277 : std::process::exit(exit_code);
278 : }
279 :
280 : /// Per-tenant configuration file.
281 : /// Full path: `tenants/<tenant_id>/config-v1`.
282 : pub(crate) const TENANT_LOCATION_CONFIG_NAME: &str = "config-v1";
283 :
284 : /// Per-tenant copy of their remote heatmap, downloaded into the local
285 : /// tenant path while in secondary mode.
286 : pub(crate) const TENANT_HEATMAP_BASENAME: &str = "heatmap-v1.json";
287 :
288 : /// A suffix used for various temporary files. Any temporary files found in the
289 : /// data directory at pageserver startup can be automatically removed.
290 : pub(crate) const TEMP_FILE_SUFFIX: &str = "___temp";
291 :
292 4 : pub fn is_temporary(path: &Utf8Path) -> bool {
293 4 : match path.file_name() {
294 4 : Some(name) => name.ends_with(TEMP_FILE_SUFFIX),
295 0 : None => false,
296 : }
297 4 : }
298 :
299 : /// During pageserver startup, we need to order operations not to exhaust tokio worker threads by
300 : /// blocking.
301 : ///
302 : /// The instances of this value exist only during startup, otherwise `None` is provided, meaning no
303 : /// delaying is needed.
304 : #[derive(Clone)]
305 : pub struct InitializationOrder {
306 : /// Each initial tenant load task carries this until it is done loading timelines from remote storage
307 : pub initial_tenant_load_remote: Option<utils::completion::Completion>,
308 :
309 : /// Each initial tenant load task carries this until completion.
310 : pub initial_tenant_load: Option<utils::completion::Completion>,
311 :
312 : /// Barrier for when we can start any background jobs.
313 : ///
314 : /// This can be broken up later on, but right now there is just one class of a background job.
315 : pub background_jobs_can_start: utils::completion::Barrier,
316 : }
317 :
318 : /// Time the future with a warning when it exceeds a threshold.
319 32 : async fn timed<Fut: std::future::Future>(
320 32 : fut: Fut,
321 32 : name: &str,
322 32 : warn_at: std::time::Duration,
323 32 : ) -> <Fut as std::future::Future>::Output {
324 32 : let started = std::time::Instant::now();
325 32 :
326 32 : let mut fut = std::pin::pin!(fut);
327 32 :
328 32 : match tokio::time::timeout(warn_at, &mut fut).await {
329 31 : Ok(ret) => {
330 31 : tracing::info!(
331 : stage = name,
332 0 : elapsed_ms = started.elapsed().as_millis(),
333 0 : "completed"
334 : );
335 31 : ret
336 : }
337 : Err(_) => {
338 1 : tracing::info!(
339 : stage = name,
340 0 : elapsed_ms = started.elapsed().as_millis(),
341 0 : "still waiting, taking longer than expected..."
342 : );
343 :
344 1 : let ret = fut.await;
345 :
346 : // this has a global allowed_errors
347 1 : tracing::warn!(
348 : stage = name,
349 0 : elapsed_ms = started.elapsed().as_millis(),
350 0 : "completed, took longer than expected"
351 : );
352 :
353 1 : ret
354 : }
355 : }
356 32 : }
357 :
358 : /// Like [`timed`], but the warning timeout only starts after `cancel` has been cancelled.
359 0 : async fn timed_after_cancellation<Fut: std::future::Future>(
360 0 : fut: Fut,
361 0 : name: &str,
362 0 : warn_at: std::time::Duration,
363 0 : cancel: &CancellationToken,
364 0 : ) -> <Fut as std::future::Future>::Output {
365 0 : let mut fut = std::pin::pin!(fut);
366 0 :
367 0 : tokio::select! {
368 0 : _ = cancel.cancelled() => {
369 0 : timed(fut, name, warn_at).await
370 : }
371 0 : ret = &mut fut => {
372 0 : ret
373 : }
374 : }
375 0 : }
376 :
377 : #[cfg(test)]
378 : mod timed_tests {
379 : use std::time::Duration;
380 :
381 : use super::timed;
382 :
383 : #[tokio::test]
384 1 : async fn timed_completes_when_inner_future_completes() {
385 1 : // A future that completes on time should have its result returned
386 1 : let r1 = timed(
387 1 : async move {
388 1 : tokio::time::sleep(Duration::from_millis(10)).await;
389 1 : 123
390 1 : },
391 1 : "test 1",
392 1 : Duration::from_millis(50),
393 1 : )
394 1 : .await;
395 1 : assert_eq!(r1, 123);
396 1 :
397 1 : // A future that completes too slowly should also have its result returned
398 1 : let r1 = timed(
399 1 : async move {
400 1 : tokio::time::sleep(Duration::from_millis(50)).await;
401 1 : 456
402 1 : },
403 1 : "test 1",
404 1 : Duration::from_millis(10),
405 1 : )
406 1 : .await;
407 1 : assert_eq!(r1, 456);
408 1 : }
409 : }
|