Line data Source code
1 : #![recursion_limit = "300"]
2 : #![deny(clippy::undocumented_unsafe_blocks)]
3 :
4 : mod auth;
5 : pub mod basebackup;
6 : pub mod basebackup_cache;
7 : pub mod config;
8 : pub mod consumption_metrics;
9 : pub mod context;
10 : pub mod controller_upcall_client;
11 : pub mod deletion_queue;
12 : pub mod disk_usage_eviction_task;
13 : pub mod feature_resolver;
14 : pub mod http;
15 : pub mod import_datadir;
16 : pub mod l0_flush;
17 :
18 : extern crate hyper0 as hyper;
19 :
20 : use futures::StreamExt;
21 : use futures::stream::FuturesUnordered;
22 : pub use pageserver_api::keyspace;
23 : use tokio_util::sync::CancellationToken;
24 : mod assert_u64_eq_usize;
25 : pub mod aux_file;
26 : pub mod metrics;
27 : pub mod page_cache;
28 : pub mod page_service;
29 : pub mod pgdatadir_mapping;
30 : pub mod span;
31 : pub(crate) mod statvfs;
32 : pub mod task_mgr;
33 : pub mod tenant;
34 : pub mod utilization;
35 : pub mod virtual_file;
36 : pub mod walingest;
37 : pub mod walredo;
38 :
39 : use camino::Utf8Path;
40 : use deletion_queue::DeletionQueue;
41 : use postgres_ffi::PgMajorVersion;
42 : use tenant::mgr::{BackgroundPurges, TenantManager};
43 : use tenant::secondary;
44 : use tracing::{info, info_span};
45 :
46 : /// Current storage format version
47 : ///
48 : /// This is embedded in the header of all the layer files.
49 : /// If you make any backwards-incompatible changes to the storage
50 : /// format, bump this!
51 : /// Note that TimelineMetadata uses its own version number to track
52 : /// backwards-compatible changes to the metadata format.
53 : pub const STORAGE_FORMAT_VERSION: u16 = 3;
54 :
55 : pub const DEFAULT_PG_VERSION: PgMajorVersion = PgMajorVersion::PG17;
56 :
57 : // Magic constants used to identify different kinds of files
58 : pub const IMAGE_FILE_MAGIC: u16 = 0x5A60;
59 : pub const DELTA_FILE_MAGIC: u16 = 0x5A61;
60 :
61 : // Target used for performance traces.
62 : pub const PERF_TRACE_TARGET: &str = "P";
63 :
64 : static ZERO_PAGE: bytes::Bytes = bytes::Bytes::from_static(&[0u8; 8192]);
65 :
66 : pub use crate::metrics::preinitialize_metrics;
67 :
68 : pub struct CancellableTask {
69 : pub task: tokio::task::JoinHandle<()>,
70 : pub cancel: CancellationToken,
71 : }
72 : pub struct HttpEndpointListener(pub CancellableTask);
73 : pub struct HttpsEndpointListener(pub CancellableTask);
74 : pub struct ConsumptionMetricsTasks(pub CancellableTask);
75 : pub struct DiskUsageEvictionTask(pub CancellableTask);
76 : impl CancellableTask {
77 0 : pub async fn shutdown(self) {
78 0 : self.cancel.cancel();
79 0 : self.task.await.unwrap();
80 0 : }
81 : }
82 :
83 : #[tracing::instrument(skip_all, fields(%exit_code))]
84 : #[allow(clippy::too_many_arguments)]
85 : pub async fn shutdown_pageserver(
86 : http_listener: HttpEndpointListener,
87 : https_listener: Option<HttpsEndpointListener>,
88 : page_service: page_service::Listener,
89 : grpc_task: Option<CancellableTask>,
90 : consumption_metrics_worker: ConsumptionMetricsTasks,
91 : disk_usage_eviction_task: Option<DiskUsageEvictionTask>,
92 : tenant_manager: &TenantManager,
93 : background_purges: BackgroundPurges,
94 : mut deletion_queue: DeletionQueue,
95 : secondary_controller_tasks: secondary::GlobalTasks,
96 : exit_code: i32,
97 : ) {
98 : use std::time::Duration;
99 :
100 : let started_at = std::time::Instant::now();
101 :
102 : // If the orderly shutdown below takes too long, we still want to make
103 : // sure that all walredo processes are killed and wait()ed on by us, not systemd.
104 : //
105 : // (Leftover walredo processes are the hypothesized trigger for the systemd freezes
106 : // that we keep seeing in prod => https://github.com/neondatabase/cloud/issues/11387.
107 : //
108 : // We use a thread instead of a tokio task because the background runtime is likely busy
109 : // with the final flushing / uploads. This activity here has priority, and due to lack
110 : // of scheduling priority feature sin the tokio scheduler, using a separate thread is
111 : // an effective priority booster.
112 : let walredo_extraordinary_shutdown_thread_span = {
113 : let span = info_span!(parent: None, "walredo_extraordinary_shutdown_thread");
114 : span.follows_from(tracing::Span::current());
115 : span
116 : };
117 : let walredo_extraordinary_shutdown_thread_cancel = CancellationToken::new();
118 : let walredo_extraordinary_shutdown_thread = std::thread::spawn({
119 : let walredo_extraordinary_shutdown_thread_cancel =
120 : walredo_extraordinary_shutdown_thread_cancel.clone();
121 0 : move || {
122 0 : let rt = tokio::runtime::Builder::new_current_thread()
123 0 : .enable_all()
124 0 : .build()
125 0 : .unwrap();
126 0 : let _entered = rt.enter();
127 0 : let _entered = walredo_extraordinary_shutdown_thread_span.enter();
128 0 : if let Ok(()) = rt.block_on(tokio::time::timeout(
129 0 : Duration::from_secs(8),
130 0 : walredo_extraordinary_shutdown_thread_cancel.cancelled(),
131 0 : )) {
132 0 : info!("cancellation requested");
133 0 : return;
134 0 : }
135 0 : let managers = tenant::WALREDO_MANAGERS
136 0 : .lock()
137 0 : .unwrap()
138 0 : // prevents new walredo managers from being inserted
139 0 : .take()
140 0 : .expect("only we take()");
141 : // Use FuturesUnordered to get in queue early for each manager's
142 : // heavier_once_cell semaphore wait list.
143 : // Also, for idle tenants that for some reason haven't
144 : // shut down yet, it's quite likely that we're not going
145 : // to get Poll::Pending once.
146 0 : let mut futs: FuturesUnordered<_> = managers
147 0 : .into_iter()
148 0 : .filter_map(|(_, mgr)| mgr.upgrade())
149 0 : .map(|mgr| async move { tokio::task::unconstrained(mgr.shutdown()).await })
150 0 : .collect();
151 0 : info!(count=%futs.len(), "built FuturesUnordered");
152 0 : let mut last_log_at = std::time::Instant::now();
153 : #[derive(Debug, Default)]
154 : struct Results {
155 : initiated: u64,
156 : already: u64,
157 : }
158 0 : let mut results = Results::default();
159 0 : while let Some(we_initiated) = rt.block_on(futs.next()) {
160 0 : if we_initiated {
161 0 : results.initiated += 1;
162 0 : } else {
163 0 : results.already += 1;
164 0 : }
165 0 : if last_log_at.elapsed() > Duration::from_millis(100) {
166 0 : info!(remaining=%futs.len(), ?results, "progress");
167 0 : last_log_at = std::time::Instant::now();
168 0 : }
169 : }
170 0 : info!(?results, "done");
171 0 : }
172 : });
173 :
174 : // Shut down the libpq endpoint task. This prevents new connections from
175 : // being accepted.
176 : let remaining_connections = timed(
177 : page_service.stop_accepting(),
178 : "shutdown LibpqEndpointListener",
179 : Duration::from_secs(1),
180 : )
181 : .await;
182 :
183 : // Shut down the gRPC server task, including request handlers.
184 : if let Some(grpc_task) = grpc_task {
185 : timed(
186 : grpc_task.shutdown(),
187 : "shutdown gRPC PageRequestHandler",
188 : Duration::from_secs(3),
189 : )
190 : .await;
191 : }
192 :
193 : // Shut down all the tenants. This flushes everything to disk and kills
194 : // the checkpoint and GC tasks.
195 : timed(
196 : tenant_manager.shutdown(),
197 : "shutdown all tenants",
198 : Duration::from_secs(5),
199 : )
200 : .await;
201 :
202 : // Shut down any page service tasks: any in-progress work for particular timelines or tenants
203 : // should already have been canclled via mgr::shutdown_all_tenants
204 : timed(
205 : remaining_connections.shutdown(),
206 : "shutdown PageRequestHandlers",
207 : Duration::from_secs(1),
208 : )
209 : .await;
210 :
211 : // Best effort to persist any outstanding deletions, to avoid leaking objects
212 : deletion_queue.shutdown(Duration::from_secs(5)).await;
213 :
214 : timed(
215 : consumption_metrics_worker.0.shutdown(),
216 : "shutdown consumption metrics",
217 : Duration::from_secs(1),
218 : )
219 : .await;
220 :
221 : timed(
222 0 : futures::future::OptionFuture::from(disk_usage_eviction_task.map(|t| t.0.shutdown())),
223 : "shutdown disk usage eviction",
224 : Duration::from_secs(1),
225 : )
226 : .await;
227 :
228 : timed(
229 : background_purges.shutdown(),
230 : "shutdown background purges",
231 : Duration::from_secs(1),
232 : )
233 : .await;
234 :
235 : if let Some(https_listener) = https_listener {
236 : timed(
237 : https_listener.0.shutdown(),
238 : "shutdown https",
239 : Duration::from_secs(1),
240 : )
241 : .await;
242 : }
243 :
244 : // Shut down the HTTP endpoint last, so that you can still check the server's
245 : // status while it's shutting down.
246 : // FIXME: We should probably stop accepting commands like attach/detach earlier.
247 : timed(
248 : http_listener.0.shutdown(),
249 : "shutdown http",
250 : Duration::from_secs(1),
251 : )
252 : .await;
253 :
254 : timed(
255 : secondary_controller_tasks.wait(), // cancellation happened in caller
256 : "secondary controller wait",
257 : Duration::from_secs(1),
258 : )
259 : .await;
260 :
261 : // There should be nothing left, but let's be sure
262 : timed(
263 : task_mgr::shutdown_tasks(None, None, None),
264 : "shutdown leftovers",
265 : Duration::from_secs(1),
266 : )
267 : .await;
268 :
269 : info!("cancel & join walredo_extraordinary_shutdown_thread");
270 : walredo_extraordinary_shutdown_thread_cancel.cancel();
271 : walredo_extraordinary_shutdown_thread.join().unwrap();
272 : info!("walredo_extraordinary_shutdown_thread done");
273 :
274 : info!(
275 : elapsed_ms = started_at.elapsed().as_millis(),
276 : "Shut down successfully completed"
277 : );
278 : std::process::exit(exit_code);
279 : }
280 :
281 : /// Per-tenant configuration file.
282 : /// Full path: `tenants/<tenant_id>/config-v1`.
283 : pub(crate) const TENANT_LOCATION_CONFIG_NAME: &str = "config-v1";
284 :
285 : /// Per-tenant copy of their remote heatmap, downloaded into the local
286 : /// tenant path while in secondary mode.
287 : pub(crate) const TENANT_HEATMAP_BASENAME: &str = "heatmap-v1.json";
288 :
289 : /// A suffix used for various temporary files. Any temporary files found in the
290 : /// data directory at pageserver startup can be automatically removed.
291 : pub(crate) const TEMP_FILE_SUFFIX: &str = "___temp";
292 :
293 4 : pub fn is_temporary(path: &Utf8Path) -> bool {
294 4 : match path.file_name() {
295 4 : Some(name) => name.ends_with(TEMP_FILE_SUFFIX),
296 0 : None => false,
297 : }
298 4 : }
299 :
300 : /// During pageserver startup, we need to order operations not to exhaust tokio worker threads by
301 : /// blocking.
302 : ///
303 : /// The instances of this value exist only during startup, otherwise `None` is provided, meaning no
304 : /// delaying is needed.
305 : #[derive(Clone)]
306 : pub struct InitializationOrder {
307 : /// Each initial tenant load task carries this until it is done loading timelines from remote storage
308 : pub initial_tenant_load_remote: Option<utils::completion::Completion>,
309 :
310 : /// Each initial tenant load task carries this until completion.
311 : pub initial_tenant_load: Option<utils::completion::Completion>,
312 :
313 : /// Barrier for when we can start any background jobs.
314 : ///
315 : /// This can be broken up later on, but right now there is just one class of a background job.
316 : pub background_jobs_can_start: utils::completion::Barrier,
317 : }
318 :
319 : /// Time the future with a warning when it exceeds a threshold.
320 32 : async fn timed<Fut: std::future::Future>(
321 32 : fut: Fut,
322 32 : name: &str,
323 32 : warn_at: std::time::Duration,
324 32 : ) -> <Fut as std::future::Future>::Output {
325 32 : let started = std::time::Instant::now();
326 :
327 32 : let mut fut = std::pin::pin!(fut);
328 :
329 32 : match tokio::time::timeout(warn_at, &mut fut).await {
330 31 : Ok(ret) => {
331 31 : tracing::info!(
332 : stage = name,
333 0 : elapsed_ms = started.elapsed().as_millis(),
334 0 : "completed"
335 : );
336 31 : ret
337 : }
338 : Err(_) => {
339 1 : tracing::info!(
340 : stage = name,
341 0 : elapsed_ms = started.elapsed().as_millis(),
342 0 : "still waiting, taking longer than expected..."
343 : );
344 :
345 1 : let ret = fut.await;
346 :
347 : // this has a global allowed_errors
348 1 : tracing::warn!(
349 : stage = name,
350 0 : elapsed_ms = started.elapsed().as_millis(),
351 0 : "completed, took longer than expected"
352 : );
353 :
354 1 : ret
355 : }
356 : }
357 32 : }
358 :
359 : /// Like [`timed`], but the warning timeout only starts after `cancel` has been cancelled.
360 0 : async fn timed_after_cancellation<Fut: std::future::Future>(
361 0 : fut: Fut,
362 0 : name: &str,
363 0 : warn_at: std::time::Duration,
364 0 : cancel: &CancellationToken,
365 0 : ) -> <Fut as std::future::Future>::Output {
366 0 : let mut fut = std::pin::pin!(fut);
367 :
368 0 : tokio::select! {
369 0 : _ = cancel.cancelled() => {
370 0 : timed(fut, name, warn_at).await
371 : }
372 0 : ret = &mut fut => {
373 0 : ret
374 : }
375 : }
376 0 : }
377 :
378 : #[cfg(test)]
379 : mod timed_tests {
380 : use std::time::Duration;
381 :
382 : use super::timed;
383 :
384 : #[tokio::test]
385 1 : async fn timed_completes_when_inner_future_completes() {
386 : // A future that completes on time should have its result returned
387 1 : let r1 = timed(
388 1 : async move {
389 1 : tokio::time::sleep(Duration::from_millis(10)).await;
390 1 : 123
391 1 : },
392 1 : "test 1",
393 1 : Duration::from_millis(50),
394 : )
395 1 : .await;
396 1 : assert_eq!(r1, 123);
397 :
398 : // A future that completes too slowly should also have its result returned
399 1 : let r1 = timed(
400 1 : async move {
401 1 : tokio::time::sleep(Duration::from_millis(50)).await;
402 1 : 456
403 1 : },
404 1 : "test 1",
405 1 : Duration::from_millis(10),
406 : )
407 1 : .await;
408 1 : assert_eq!(r1, 456);
409 1 : }
410 : }
|