LCOV - code coverage report
Current view: top level - pageserver/src - lib.rs (source / functions) Coverage Total Hit
Test: 1e20c4f2b28aa592527961bb32170ebbd2c9172f.info Lines: 38.1 % 105 40
Test Date: 2025-07-16 12:29:03 Functions: 29.5 % 44 13

            Line data    Source code
       1              : #![recursion_limit = "300"]
       2              : #![deny(clippy::undocumented_unsafe_blocks)]
       3              : 
       4              : mod auth;
       5              : pub mod basebackup;
       6              : pub mod basebackup_cache;
       7              : pub mod config;
       8              : pub mod consumption_metrics;
       9              : pub mod context;
      10              : pub mod controller_upcall_client;
      11              : pub mod deletion_queue;
      12              : pub mod disk_usage_eviction_task;
      13              : pub mod feature_resolver;
      14              : pub mod http;
      15              : pub mod import_datadir;
      16              : pub mod l0_flush;
      17              : 
      18              : extern crate hyper0 as hyper;
      19              : 
      20              : use futures::StreamExt;
      21              : use futures::stream::FuturesUnordered;
      22              : pub use pageserver_api::keyspace;
      23              : use tokio_util::sync::CancellationToken;
      24              : mod assert_u64_eq_usize;
      25              : pub mod aux_file;
      26              : pub mod metrics;
      27              : pub mod page_cache;
      28              : pub mod page_service;
      29              : pub mod pgdatadir_mapping;
      30              : pub mod span;
      31              : pub(crate) mod statvfs;
      32              : pub mod task_mgr;
      33              : pub mod tenant;
      34              : pub mod utilization;
      35              : pub mod virtual_file;
      36              : pub mod walingest;
      37              : pub mod walredo;
      38              : 
      39              : use camino::Utf8Path;
      40              : use deletion_queue::DeletionQueue;
      41              : use postgres_ffi::PgMajorVersion;
      42              : use tenant::mgr::{BackgroundPurges, TenantManager};
      43              : use tenant::secondary;
      44              : use tracing::{info, info_span};
      45              : 
      46              : /// Current storage format version
      47              : ///
      48              : /// This is embedded in the header of all the layer files.
      49              : /// If you make any backwards-incompatible changes to the storage
      50              : /// format, bump this!
      51              : /// Note that TimelineMetadata uses its own version number to track
      52              : /// backwards-compatible changes to the metadata format.
      53              : pub const STORAGE_FORMAT_VERSION: u16 = 3;
      54              : 
      55              : pub const DEFAULT_PG_VERSION: PgMajorVersion = PgMajorVersion::PG17;
      56              : 
      57              : // Magic constants used to identify different kinds of files
      58              : pub const IMAGE_FILE_MAGIC: u16 = 0x5A60;
      59              : pub const DELTA_FILE_MAGIC: u16 = 0x5A61;
      60              : 
      61              : // Target used for performance traces.
      62              : pub const PERF_TRACE_TARGET: &str = "P";
      63              : 
      64              : static ZERO_PAGE: bytes::Bytes = bytes::Bytes::from_static(&[0u8; 8192]);
      65              : 
      66              : pub use crate::metrics::preinitialize_metrics;
      67              : 
      68              : pub struct CancellableTask {
      69              :     pub task: tokio::task::JoinHandle<()>,
      70              :     pub cancel: CancellationToken,
      71              : }
      72              : pub struct HttpEndpointListener(pub CancellableTask);
      73              : pub struct HttpsEndpointListener(pub CancellableTask);
      74              : pub struct ConsumptionMetricsTasks(pub CancellableTask);
      75              : pub struct DiskUsageEvictionTask(pub CancellableTask);
      76              : // HADRON
      77              : pub struct MetricsCollectionTask(pub CancellableTask);
      78              : 
      79              : impl CancellableTask {
      80            0 :     pub async fn shutdown(self) {
      81            0 :         self.cancel.cancel();
      82            0 :         self.task.await.unwrap();
      83            0 :     }
      84              : }
      85              : 
      86              : #[tracing::instrument(skip_all, fields(%exit_code))]
      87              : #[allow(clippy::too_many_arguments)]
      88              : pub async fn shutdown_pageserver(
      89              :     http_listener: HttpEndpointListener,
      90              :     https_listener: Option<HttpsEndpointListener>,
      91              :     page_service: page_service::Listener,
      92              :     grpc_task: Option<CancellableTask>,
      93              :     metrics_collection_task: MetricsCollectionTask,
      94              :     consumption_metrics_worker: ConsumptionMetricsTasks,
      95              :     disk_usage_eviction_task: Option<DiskUsageEvictionTask>,
      96              :     tenant_manager: &TenantManager,
      97              :     background_purges: BackgroundPurges,
      98              :     mut deletion_queue: DeletionQueue,
      99              :     secondary_controller_tasks: secondary::GlobalTasks,
     100              :     exit_code: i32,
     101              : ) {
     102              :     use std::time::Duration;
     103              : 
     104              :     let started_at = std::time::Instant::now();
     105              : 
     106              :     // If the orderly shutdown below takes too long, we still want to make
     107              :     // sure that all walredo processes are killed and wait()ed on by us, not systemd.
     108              :     //
     109              :     // (Leftover walredo processes are the hypothesized trigger for the systemd freezes
     110              :     //  that we keep seeing in prod => https://github.com/neondatabase/cloud/issues/11387.
     111              :     //
     112              :     // We use a thread instead of a tokio task because the background runtime is likely busy
     113              :     // with the final flushing / uploads. This activity here has priority, and due to lack
     114              :     // of scheduling priority feature sin the tokio scheduler, using a separate thread is
     115              :     // an effective priority booster.
     116              :     let walredo_extraordinary_shutdown_thread_span = {
     117              :         let span = info_span!(parent: None, "walredo_extraordinary_shutdown_thread");
     118              :         span.follows_from(tracing::Span::current());
     119              :         span
     120              :     };
     121              :     let walredo_extraordinary_shutdown_thread_cancel = CancellationToken::new();
     122              :     let walredo_extraordinary_shutdown_thread = std::thread::spawn({
     123              :         let walredo_extraordinary_shutdown_thread_cancel =
     124              :             walredo_extraordinary_shutdown_thread_cancel.clone();
     125            0 :         move || {
     126            0 :             let rt = tokio::runtime::Builder::new_current_thread()
     127            0 :                 .enable_all()
     128            0 :                 .build()
     129            0 :                 .unwrap();
     130            0 :             let _entered = rt.enter();
     131            0 :             let _entered = walredo_extraordinary_shutdown_thread_span.enter();
     132            0 :             if let Ok(()) = rt.block_on(tokio::time::timeout(
     133            0 :                 Duration::from_secs(8),
     134            0 :                 walredo_extraordinary_shutdown_thread_cancel.cancelled(),
     135            0 :             )) {
     136            0 :                 info!("cancellation requested");
     137            0 :                 return;
     138            0 :             }
     139            0 :             let managers = tenant::WALREDO_MANAGERS
     140            0 :                 .lock()
     141            0 :                 .unwrap()
     142            0 :                 // prevents new walredo managers from being inserted
     143            0 :                 .take()
     144            0 :                 .expect("only we take()");
     145              :             // Use FuturesUnordered to get in queue early for each manager's
     146              :             // heavier_once_cell semaphore wait list.
     147              :             // Also, for idle tenants that for some reason haven't
     148              :             // shut down yet, it's quite likely that we're not going
     149              :             // to get Poll::Pending once.
     150            0 :             let mut futs: FuturesUnordered<_> = managers
     151            0 :                 .into_iter()
     152            0 :                 .filter_map(|(_, mgr)| mgr.upgrade())
     153            0 :                 .map(|mgr| async move { tokio::task::unconstrained(mgr.shutdown()).await })
     154            0 :                 .collect();
     155            0 :             info!(count=%futs.len(), "built FuturesUnordered");
     156            0 :             let mut last_log_at = std::time::Instant::now();
     157              :             #[derive(Debug, Default)]
     158              :             struct Results {
     159              :                 initiated: u64,
     160              :                 already: u64,
     161              :             }
     162            0 :             let mut results = Results::default();
     163            0 :             while let Some(we_initiated) = rt.block_on(futs.next()) {
     164            0 :                 if we_initiated {
     165            0 :                     results.initiated += 1;
     166            0 :                 } else {
     167            0 :                     results.already += 1;
     168            0 :                 }
     169            0 :                 if last_log_at.elapsed() > Duration::from_millis(100) {
     170            0 :                     info!(remaining=%futs.len(), ?results, "progress");
     171            0 :                     last_log_at = std::time::Instant::now();
     172            0 :                 }
     173              :             }
     174            0 :             info!(?results, "done");
     175            0 :         }
     176              :     });
     177              : 
     178              :     // Shut down the libpq endpoint task. This prevents new connections from
     179              :     // being accepted.
     180              :     let remaining_connections = timed(
     181              :         page_service.stop_accepting(),
     182              :         "shutdown LibpqEndpointListener",
     183              :         Duration::from_secs(1),
     184              :     )
     185              :     .await;
     186              : 
     187              :     // Shut down the gRPC server task, including request handlers.
     188              :     if let Some(grpc_task) = grpc_task {
     189              :         timed(
     190              :             grpc_task.shutdown(),
     191              :             "shutdown gRPC PageRequestHandler",
     192              :             Duration::from_secs(3),
     193              :         )
     194              :         .await;
     195              :     }
     196              : 
     197              :     // Shut down all the tenants. This flushes everything to disk and kills
     198              :     // the checkpoint and GC tasks.
     199              :     timed(
     200              :         tenant_manager.shutdown(),
     201              :         "shutdown all tenants",
     202              :         Duration::from_secs(5),
     203              :     )
     204              :     .await;
     205              : 
     206              :     // Shut down any page service tasks: any in-progress work for particular timelines or tenants
     207              :     // should already have been canclled via mgr::shutdown_all_tenants
     208              :     timed(
     209              :         remaining_connections.shutdown(),
     210              :         "shutdown PageRequestHandlers",
     211              :         Duration::from_secs(1),
     212              :     )
     213              :     .await;
     214              : 
     215              :     // Best effort to persist any outstanding deletions, to avoid leaking objects
     216              :     deletion_queue.shutdown(Duration::from_secs(5)).await;
     217              : 
     218              :     // HADRON
     219              :     timed(
     220              :         metrics_collection_task.0.shutdown(),
     221              :         "shutdown metrics collections metrics",
     222              :         Duration::from_secs(1),
     223              :     )
     224              :     .await;
     225              : 
     226              :     timed(
     227              :         consumption_metrics_worker.0.shutdown(),
     228              :         "shutdown consumption metrics",
     229              :         Duration::from_secs(1),
     230              :     )
     231              :     .await;
     232              : 
     233              :     timed(
     234            0 :         futures::future::OptionFuture::from(disk_usage_eviction_task.map(|t| t.0.shutdown())),
     235              :         "shutdown disk usage eviction",
     236              :         Duration::from_secs(1),
     237              :     )
     238              :     .await;
     239              : 
     240              :     timed(
     241              :         background_purges.shutdown(),
     242              :         "shutdown background purges",
     243              :         Duration::from_secs(1),
     244              :     )
     245              :     .await;
     246              : 
     247              :     if let Some(https_listener) = https_listener {
     248              :         timed(
     249              :             https_listener.0.shutdown(),
     250              :             "shutdown https",
     251              :             Duration::from_secs(1),
     252              :         )
     253              :         .await;
     254              :     }
     255              : 
     256              :     // Shut down the HTTP endpoint last, so that you can still check the server's
     257              :     // status while it's shutting down.
     258              :     // FIXME: We should probably stop accepting commands like attach/detach earlier.
     259              :     timed(
     260              :         http_listener.0.shutdown(),
     261              :         "shutdown http",
     262              :         Duration::from_secs(1),
     263              :     )
     264              :     .await;
     265              : 
     266              :     timed(
     267              :         secondary_controller_tasks.wait(), // cancellation happened in caller
     268              :         "secondary controller wait",
     269              :         Duration::from_secs(1),
     270              :     )
     271              :     .await;
     272              : 
     273              :     // There should be nothing left, but let's be sure
     274              :     timed(
     275              :         task_mgr::shutdown_tasks(None, None, None),
     276              :         "shutdown leftovers",
     277              :         Duration::from_secs(1),
     278              :     )
     279              :     .await;
     280              : 
     281              :     info!("cancel & join walredo_extraordinary_shutdown_thread");
     282              :     walredo_extraordinary_shutdown_thread_cancel.cancel();
     283              :     walredo_extraordinary_shutdown_thread.join().unwrap();
     284              :     info!("walredo_extraordinary_shutdown_thread done");
     285              : 
     286              :     info!(
     287              :         elapsed_ms = started_at.elapsed().as_millis(),
     288              :         "Shut down successfully completed"
     289              :     );
     290              :     std::process::exit(exit_code);
     291              : }
     292              : 
     293              : /// Per-tenant configuration file.
     294              : /// Full path: `tenants/<tenant_id>/config-v1`.
     295              : pub(crate) const TENANT_LOCATION_CONFIG_NAME: &str = "config-v1";
     296              : 
     297              : /// Per-tenant copy of their remote heatmap, downloaded into the local
     298              : /// tenant path while in secondary mode.
     299              : pub(crate) const TENANT_HEATMAP_BASENAME: &str = "heatmap-v1.json";
     300              : 
     301              : /// A suffix used for various temporary files. Any temporary files found in the
     302              : /// data directory at pageserver startup can be automatically removed.
     303              : pub(crate) const TEMP_FILE_SUFFIX: &str = "___temp";
     304              : 
     305            4 : pub fn is_temporary(path: &Utf8Path) -> bool {
     306            4 :     match path.file_name() {
     307            4 :         Some(name) => name.ends_with(TEMP_FILE_SUFFIX),
     308            0 :         None => false,
     309              :     }
     310            4 : }
     311              : 
     312              : /// During pageserver startup, we need to order operations not to exhaust tokio worker threads by
     313              : /// blocking.
     314              : ///
     315              : /// The instances of this value exist only during startup, otherwise `None` is provided, meaning no
     316              : /// delaying is needed.
     317              : #[derive(Clone)]
     318              : pub struct InitializationOrder {
     319              :     /// Each initial tenant load task carries this until it is done loading timelines from remote storage
     320              :     pub initial_tenant_load_remote: Option<utils::completion::Completion>,
     321              : 
     322              :     /// Each initial tenant load task carries this until completion.
     323              :     pub initial_tenant_load: Option<utils::completion::Completion>,
     324              : 
     325              :     /// Barrier for when we can start any background jobs.
     326              :     ///
     327              :     /// This can be broken up later on, but right now there is just one class of a background job.
     328              :     pub background_jobs_can_start: utils::completion::Barrier,
     329              : }
     330              : 
     331              : /// Time the future with a warning when it exceeds a threshold.
     332           32 : async fn timed<Fut: std::future::Future>(
     333           32 :     fut: Fut,
     334           32 :     name: &str,
     335           32 :     warn_at: std::time::Duration,
     336           32 : ) -> <Fut as std::future::Future>::Output {
     337           32 :     let started = std::time::Instant::now();
     338              : 
     339           32 :     let mut fut = std::pin::pin!(fut);
     340              : 
     341           32 :     match tokio::time::timeout(warn_at, &mut fut).await {
     342           31 :         Ok(ret) => {
     343           31 :             tracing::info!(
     344              :                 stage = name,
     345            0 :                 elapsed_ms = started.elapsed().as_millis(),
     346            0 :                 "completed"
     347              :             );
     348           31 :             ret
     349              :         }
     350              :         Err(_) => {
     351            1 :             tracing::info!(
     352              :                 stage = name,
     353            0 :                 elapsed_ms = started.elapsed().as_millis(),
     354            0 :                 "still waiting, taking longer than expected..."
     355              :             );
     356              : 
     357            1 :             let ret = fut.await;
     358              : 
     359              :             // this has a global allowed_errors
     360            1 :             tracing::warn!(
     361              :                 stage = name,
     362            0 :                 elapsed_ms = started.elapsed().as_millis(),
     363            0 :                 "completed, took longer than expected"
     364              :             );
     365              : 
     366            1 :             ret
     367              :         }
     368              :     }
     369           32 : }
     370              : 
     371              : /// Like [`timed`], but the warning timeout only starts after `cancel` has been cancelled.
     372            0 : async fn timed_after_cancellation<Fut: std::future::Future>(
     373            0 :     fut: Fut,
     374            0 :     name: &str,
     375            0 :     warn_at: std::time::Duration,
     376            0 :     cancel: &CancellationToken,
     377            0 : ) -> <Fut as std::future::Future>::Output {
     378            0 :     let mut fut = std::pin::pin!(fut);
     379              : 
     380            0 :     tokio::select! {
     381            0 :         _ = cancel.cancelled() => {
     382            0 :             timed(fut, name, warn_at).await
     383              :         }
     384            0 :         ret = &mut fut => {
     385            0 :             ret
     386              :         }
     387              :     }
     388            0 : }
     389              : 
     390              : #[cfg(test)]
     391              : mod timed_tests {
     392              :     use std::time::Duration;
     393              : 
     394              :     use super::timed;
     395              : 
     396              :     #[tokio::test]
     397            1 :     async fn timed_completes_when_inner_future_completes() {
     398              :         // A future that completes on time should have its result returned
     399            1 :         let r1 = timed(
     400            1 :             async move {
     401            1 :                 tokio::time::sleep(Duration::from_millis(10)).await;
     402            1 :                 123
     403            1 :             },
     404            1 :             "test 1",
     405            1 :             Duration::from_millis(50),
     406              :         )
     407            1 :         .await;
     408            1 :         assert_eq!(r1, 123);
     409              : 
     410              :         // A future that completes too slowly should also have its result returned
     411            1 :         let r1 = timed(
     412            1 :             async move {
     413            1 :                 tokio::time::sleep(Duration::from_millis(50)).await;
     414            1 :                 456
     415            1 :             },
     416            1 :             "test 1",
     417            1 :             Duration::from_millis(10),
     418              :         )
     419            1 :         .await;
     420            1 :         assert_eq!(r1, 456);
     421            1 :     }
     422              : }
        

Generated by: LCOV version 2.1-beta