Line data Source code
1 : #![recursion_limit = "300"]
2 : #![deny(clippy::undocumented_unsafe_blocks)]
3 :
4 : mod auth;
5 : pub mod basebackup;
6 : pub mod basebackup_cache;
7 : pub mod config;
8 : pub mod consumption_metrics;
9 : pub mod context;
10 : pub mod controller_upcall_client;
11 : pub mod deletion_queue;
12 : pub mod disk_usage_eviction_task;
13 : pub mod feature_resolver;
14 : pub mod http;
15 : pub mod import_datadir;
16 : pub mod l0_flush;
17 :
18 : extern crate hyper0 as hyper;
19 :
20 : use futures::StreamExt;
21 : use futures::stream::FuturesUnordered;
22 : pub use pageserver_api::keyspace;
23 : use tokio_util::sync::CancellationToken;
24 : mod assert_u64_eq_usize;
25 : pub mod aux_file;
26 : pub mod metrics;
27 : pub mod page_cache;
28 : pub mod page_service;
29 : pub mod pgdatadir_mapping;
30 : pub mod span;
31 : pub(crate) mod statvfs;
32 : pub mod task_mgr;
33 : pub mod tenant;
34 : pub mod utilization;
35 : pub mod virtual_file;
36 : pub mod walingest;
37 : pub mod walredo;
38 :
39 : use camino::Utf8Path;
40 : use deletion_queue::DeletionQueue;
41 : use postgres_ffi::PgMajorVersion;
42 : use tenant::mgr::{BackgroundPurges, TenantManager};
43 : use tenant::secondary;
44 : use tracing::{info, info_span};
45 :
46 : /// Current storage format version
47 : ///
48 : /// This is embedded in the header of all the layer files.
49 : /// If you make any backwards-incompatible changes to the storage
50 : /// format, bump this!
51 : /// Note that TimelineMetadata uses its own version number to track
52 : /// backwards-compatible changes to the metadata format.
53 : pub const STORAGE_FORMAT_VERSION: u16 = 3;
54 :
55 : pub const DEFAULT_PG_VERSION: PgMajorVersion = PgMajorVersion::PG17;
56 :
57 : // Magic constants used to identify different kinds of files
58 : pub const IMAGE_FILE_MAGIC: u16 = 0x5A60;
59 : pub const DELTA_FILE_MAGIC: u16 = 0x5A61;
60 :
61 : // Target used for performance traces.
62 : pub const PERF_TRACE_TARGET: &str = "P";
63 :
64 : static ZERO_PAGE: bytes::Bytes = bytes::Bytes::from_static(&[0u8; 8192]);
65 :
66 : pub use crate::metrics::preinitialize_metrics;
67 :
68 : pub struct CancellableTask {
69 : pub task: tokio::task::JoinHandle<()>,
70 : pub cancel: CancellationToken,
71 : }
72 : pub struct HttpEndpointListener(pub CancellableTask);
73 : pub struct HttpsEndpointListener(pub CancellableTask);
74 : pub struct ConsumptionMetricsTasks(pub CancellableTask);
75 : pub struct DiskUsageEvictionTask(pub CancellableTask);
76 : // HADRON
77 : pub struct MetricsCollectionTask(pub CancellableTask);
78 :
79 : impl CancellableTask {
80 0 : pub async fn shutdown(self) {
81 0 : self.cancel.cancel();
82 0 : self.task.await.unwrap();
83 0 : }
84 : }
85 :
86 : #[tracing::instrument(skip_all, fields(%exit_code))]
87 : #[allow(clippy::too_many_arguments)]
88 : pub async fn shutdown_pageserver(
89 : http_listener: HttpEndpointListener,
90 : https_listener: Option<HttpsEndpointListener>,
91 : page_service: page_service::Listener,
92 : grpc_task: Option<CancellableTask>,
93 : metrics_collection_task: MetricsCollectionTask,
94 : consumption_metrics_worker: ConsumptionMetricsTasks,
95 : disk_usage_eviction_task: Option<DiskUsageEvictionTask>,
96 : tenant_manager: &TenantManager,
97 : background_purges: BackgroundPurges,
98 : mut deletion_queue: DeletionQueue,
99 : secondary_controller_tasks: secondary::GlobalTasks,
100 : exit_code: i32,
101 : ) {
102 : use std::time::Duration;
103 :
104 : let started_at = std::time::Instant::now();
105 :
106 : // If the orderly shutdown below takes too long, we still want to make
107 : // sure that all walredo processes are killed and wait()ed on by us, not systemd.
108 : //
109 : // (Leftover walredo processes are the hypothesized trigger for the systemd freezes
110 : // that we keep seeing in prod => https://github.com/neondatabase/cloud/issues/11387.
111 : //
112 : // We use a thread instead of a tokio task because the background runtime is likely busy
113 : // with the final flushing / uploads. This activity here has priority, and due to lack
114 : // of scheduling priority feature sin the tokio scheduler, using a separate thread is
115 : // an effective priority booster.
116 : let walredo_extraordinary_shutdown_thread_span = {
117 : let span = info_span!(parent: None, "walredo_extraordinary_shutdown_thread");
118 : span.follows_from(tracing::Span::current());
119 : span
120 : };
121 : let walredo_extraordinary_shutdown_thread_cancel = CancellationToken::new();
122 : let walredo_extraordinary_shutdown_thread = std::thread::spawn({
123 : let walredo_extraordinary_shutdown_thread_cancel =
124 : walredo_extraordinary_shutdown_thread_cancel.clone();
125 0 : move || {
126 0 : let rt = tokio::runtime::Builder::new_current_thread()
127 0 : .enable_all()
128 0 : .build()
129 0 : .unwrap();
130 0 : let _entered = rt.enter();
131 0 : let _entered = walredo_extraordinary_shutdown_thread_span.enter();
132 0 : if let Ok(()) = rt.block_on(tokio::time::timeout(
133 0 : Duration::from_secs(8),
134 0 : walredo_extraordinary_shutdown_thread_cancel.cancelled(),
135 0 : )) {
136 0 : info!("cancellation requested");
137 0 : return;
138 0 : }
139 0 : let managers = tenant::WALREDO_MANAGERS
140 0 : .lock()
141 0 : .unwrap()
142 0 : // prevents new walredo managers from being inserted
143 0 : .take()
144 0 : .expect("only we take()");
145 : // Use FuturesUnordered to get in queue early for each manager's
146 : // heavier_once_cell semaphore wait list.
147 : // Also, for idle tenants that for some reason haven't
148 : // shut down yet, it's quite likely that we're not going
149 : // to get Poll::Pending once.
150 0 : let mut futs: FuturesUnordered<_> = managers
151 0 : .into_iter()
152 0 : .filter_map(|(_, mgr)| mgr.upgrade())
153 0 : .map(|mgr| async move { tokio::task::unconstrained(mgr.shutdown()).await })
154 0 : .collect();
155 0 : info!(count=%futs.len(), "built FuturesUnordered");
156 0 : let mut last_log_at = std::time::Instant::now();
157 : #[derive(Debug, Default)]
158 : struct Results {
159 : initiated: u64,
160 : already: u64,
161 : }
162 0 : let mut results = Results::default();
163 0 : while let Some(we_initiated) = rt.block_on(futs.next()) {
164 0 : if we_initiated {
165 0 : results.initiated += 1;
166 0 : } else {
167 0 : results.already += 1;
168 0 : }
169 0 : if last_log_at.elapsed() > Duration::from_millis(100) {
170 0 : info!(remaining=%futs.len(), ?results, "progress");
171 0 : last_log_at = std::time::Instant::now();
172 0 : }
173 : }
174 0 : info!(?results, "done");
175 0 : }
176 : });
177 :
178 : // Shut down the libpq endpoint task. This prevents new connections from
179 : // being accepted.
180 : let remaining_connections = timed(
181 : page_service.stop_accepting(),
182 : "shutdown LibpqEndpointListener",
183 : Duration::from_secs(1),
184 : )
185 : .await;
186 :
187 : // Shut down the gRPC server task, including request handlers.
188 : if let Some(grpc_task) = grpc_task {
189 : timed(
190 : grpc_task.shutdown(),
191 : "shutdown gRPC PageRequestHandler",
192 : Duration::from_secs(3),
193 : )
194 : .await;
195 : }
196 :
197 : // Shut down all the tenants. This flushes everything to disk and kills
198 : // the checkpoint and GC tasks.
199 : timed(
200 : tenant_manager.shutdown(),
201 : "shutdown all tenants",
202 : Duration::from_secs(5),
203 : )
204 : .await;
205 :
206 : // Shut down any page service tasks: any in-progress work for particular timelines or tenants
207 : // should already have been canclled via mgr::shutdown_all_tenants
208 : timed(
209 : remaining_connections.shutdown(),
210 : "shutdown PageRequestHandlers",
211 : Duration::from_secs(1),
212 : )
213 : .await;
214 :
215 : // Best effort to persist any outstanding deletions, to avoid leaking objects
216 : deletion_queue.shutdown(Duration::from_secs(5)).await;
217 :
218 : // HADRON
219 : timed(
220 : metrics_collection_task.0.shutdown(),
221 : "shutdown metrics collections metrics",
222 : Duration::from_secs(1),
223 : )
224 : .await;
225 :
226 : timed(
227 : consumption_metrics_worker.0.shutdown(),
228 : "shutdown consumption metrics",
229 : Duration::from_secs(1),
230 : )
231 : .await;
232 :
233 : timed(
234 0 : futures::future::OptionFuture::from(disk_usage_eviction_task.map(|t| t.0.shutdown())),
235 : "shutdown disk usage eviction",
236 : Duration::from_secs(1),
237 : )
238 : .await;
239 :
240 : timed(
241 : background_purges.shutdown(),
242 : "shutdown background purges",
243 : Duration::from_secs(1),
244 : )
245 : .await;
246 :
247 : if let Some(https_listener) = https_listener {
248 : timed(
249 : https_listener.0.shutdown(),
250 : "shutdown https",
251 : Duration::from_secs(1),
252 : )
253 : .await;
254 : }
255 :
256 : // Shut down the HTTP endpoint last, so that you can still check the server's
257 : // status while it's shutting down.
258 : // FIXME: We should probably stop accepting commands like attach/detach earlier.
259 : timed(
260 : http_listener.0.shutdown(),
261 : "shutdown http",
262 : Duration::from_secs(1),
263 : )
264 : .await;
265 :
266 : timed(
267 : secondary_controller_tasks.wait(), // cancellation happened in caller
268 : "secondary controller wait",
269 : Duration::from_secs(1),
270 : )
271 : .await;
272 :
273 : // There should be nothing left, but let's be sure
274 : timed(
275 : task_mgr::shutdown_tasks(None, None, None),
276 : "shutdown leftovers",
277 : Duration::from_secs(1),
278 : )
279 : .await;
280 :
281 : info!("cancel & join walredo_extraordinary_shutdown_thread");
282 : walredo_extraordinary_shutdown_thread_cancel.cancel();
283 : walredo_extraordinary_shutdown_thread.join().unwrap();
284 : info!("walredo_extraordinary_shutdown_thread done");
285 :
286 : info!(
287 : elapsed_ms = started_at.elapsed().as_millis(),
288 : "Shut down successfully completed"
289 : );
290 : std::process::exit(exit_code);
291 : }
292 :
293 : /// Per-tenant configuration file.
294 : /// Full path: `tenants/<tenant_id>/config-v1`.
295 : pub(crate) const TENANT_LOCATION_CONFIG_NAME: &str = "config-v1";
296 :
297 : /// Per-tenant copy of their remote heatmap, downloaded into the local
298 : /// tenant path while in secondary mode.
299 : pub(crate) const TENANT_HEATMAP_BASENAME: &str = "heatmap-v1.json";
300 :
301 : /// A suffix used for various temporary files. Any temporary files found in the
302 : /// data directory at pageserver startup can be automatically removed.
303 : pub(crate) const TEMP_FILE_SUFFIX: &str = "___temp";
304 :
305 4 : pub fn is_temporary(path: &Utf8Path) -> bool {
306 4 : match path.file_name() {
307 4 : Some(name) => name.ends_with(TEMP_FILE_SUFFIX),
308 0 : None => false,
309 : }
310 4 : }
311 :
312 : /// During pageserver startup, we need to order operations not to exhaust tokio worker threads by
313 : /// blocking.
314 : ///
315 : /// The instances of this value exist only during startup, otherwise `None` is provided, meaning no
316 : /// delaying is needed.
317 : #[derive(Clone)]
318 : pub struct InitializationOrder {
319 : /// Each initial tenant load task carries this until it is done loading timelines from remote storage
320 : pub initial_tenant_load_remote: Option<utils::completion::Completion>,
321 :
322 : /// Each initial tenant load task carries this until completion.
323 : pub initial_tenant_load: Option<utils::completion::Completion>,
324 :
325 : /// Barrier for when we can start any background jobs.
326 : ///
327 : /// This can be broken up later on, but right now there is just one class of a background job.
328 : pub background_jobs_can_start: utils::completion::Barrier,
329 : }
330 :
331 : /// Time the future with a warning when it exceeds a threshold.
332 32 : async fn timed<Fut: std::future::Future>(
333 32 : fut: Fut,
334 32 : name: &str,
335 32 : warn_at: std::time::Duration,
336 32 : ) -> <Fut as std::future::Future>::Output {
337 32 : let started = std::time::Instant::now();
338 :
339 32 : let mut fut = std::pin::pin!(fut);
340 :
341 32 : match tokio::time::timeout(warn_at, &mut fut).await {
342 31 : Ok(ret) => {
343 31 : tracing::info!(
344 : stage = name,
345 0 : elapsed_ms = started.elapsed().as_millis(),
346 0 : "completed"
347 : );
348 31 : ret
349 : }
350 : Err(_) => {
351 1 : tracing::info!(
352 : stage = name,
353 0 : elapsed_ms = started.elapsed().as_millis(),
354 0 : "still waiting, taking longer than expected..."
355 : );
356 :
357 1 : let ret = fut.await;
358 :
359 : // this has a global allowed_errors
360 1 : tracing::warn!(
361 : stage = name,
362 0 : elapsed_ms = started.elapsed().as_millis(),
363 0 : "completed, took longer than expected"
364 : );
365 :
366 1 : ret
367 : }
368 : }
369 32 : }
370 :
371 : /// Like [`timed`], but the warning timeout only starts after `cancel` has been cancelled.
372 0 : async fn timed_after_cancellation<Fut: std::future::Future>(
373 0 : fut: Fut,
374 0 : name: &str,
375 0 : warn_at: std::time::Duration,
376 0 : cancel: &CancellationToken,
377 0 : ) -> <Fut as std::future::Future>::Output {
378 0 : let mut fut = std::pin::pin!(fut);
379 :
380 0 : tokio::select! {
381 0 : _ = cancel.cancelled() => {
382 0 : timed(fut, name, warn_at).await
383 : }
384 0 : ret = &mut fut => {
385 0 : ret
386 : }
387 : }
388 0 : }
389 :
390 : #[cfg(test)]
391 : mod timed_tests {
392 : use std::time::Duration;
393 :
394 : use super::timed;
395 :
396 : #[tokio::test]
397 1 : async fn timed_completes_when_inner_future_completes() {
398 : // A future that completes on time should have its result returned
399 1 : let r1 = timed(
400 1 : async move {
401 1 : tokio::time::sleep(Duration::from_millis(10)).await;
402 1 : 123
403 1 : },
404 1 : "test 1",
405 1 : Duration::from_millis(50),
406 : )
407 1 : .await;
408 1 : assert_eq!(r1, 123);
409 :
410 : // A future that completes too slowly should also have its result returned
411 1 : let r1 = timed(
412 1 : async move {
413 1 : tokio::time::sleep(Duration::from_millis(50)).await;
414 1 : 456
415 1 : },
416 1 : "test 1",
417 1 : Duration::from_millis(10),
418 : )
419 1 : .await;
420 1 : assert_eq!(r1, 456);
421 1 : }
422 : }
|