LCOV - code coverage report
Current view: top level - pageserver/src - lib.rs (source / functions) Coverage Total Hit
Test: 42f947419473a288706e86ecdf7c2863d760d5d7.info Lines: 47.1 % 121 57
Test Date: 2024-08-02 21:34:27 Functions: 35.0 % 40 14

            Line data    Source code
       1              : #![recursion_limit = "300"]
       2              : #![deny(clippy::undocumented_unsafe_blocks)]
       3              : 
       4              : mod auth;
       5              : pub mod basebackup;
       6              : pub mod config;
       7              : pub mod consumption_metrics;
       8              : pub mod context;
       9              : pub mod control_plane_client;
      10              : pub mod deletion_queue;
      11              : pub mod disk_usage_eviction_task;
      12              : pub mod http;
      13              : pub mod import_datadir;
      14              : pub mod l0_flush;
      15              : 
      16              : use futures::{stream::FuturesUnordered, StreamExt};
      17              : pub use pageserver_api::keyspace;
      18              : use tokio_util::sync::CancellationToken;
      19              : pub mod aux_file;
      20              : pub mod metrics;
      21              : pub mod page_cache;
      22              : pub mod page_service;
      23              : pub mod pgdatadir_mapping;
      24              : pub mod repository;
      25              : pub mod span;
      26              : pub(crate) mod statvfs;
      27              : pub mod task_mgr;
      28              : pub mod tenant;
      29              : pub mod utilization;
      30              : pub mod virtual_file;
      31              : pub mod walingest;
      32              : pub mod walrecord;
      33              : pub mod walredo;
      34              : 
      35              : use camino::Utf8Path;
      36              : use deletion_queue::DeletionQueue;
      37              : use tenant::{
      38              :     mgr::{BackgroundPurges, TenantManager},
      39              :     secondary,
      40              : };
      41              : use tracing::{info, info_span};
      42              : 
      43              : /// Current storage format version
      44              : ///
      45              : /// This is embedded in the header of all the layer files.
      46              : /// If you make any backwards-incompatible changes to the storage
      47              : /// format, bump this!
      48              : /// Note that TimelineMetadata uses its own version number to track
      49              : /// backwards-compatible changes to the metadata format.
      50              : pub const STORAGE_FORMAT_VERSION: u16 = 3;
      51              : 
      52              : pub const DEFAULT_PG_VERSION: u32 = 15;
      53              : 
      54              : // Magic constants used to identify different kinds of files
      55              : pub const IMAGE_FILE_MAGIC: u16 = 0x5A60;
      56              : pub const DELTA_FILE_MAGIC: u16 = 0x5A61;
      57              : 
      58              : static ZERO_PAGE: bytes::Bytes = bytes::Bytes::from_static(&[0u8; 8192]);
      59              : 
      60              : pub use crate::metrics::preinitialize_metrics;
      61              : 
      62              : pub struct CancellableTask {
      63              :     pub task: tokio::task::JoinHandle<()>,
      64              :     pub cancel: CancellationToken,
      65              : }
      66              : pub struct HttpEndpointListener(pub CancellableTask);
      67              : pub struct ConsumptionMetricsTasks(pub CancellableTask);
      68              : pub struct DiskUsageEvictionTask(pub CancellableTask);
      69              : impl CancellableTask {
      70            0 :     pub async fn shutdown(self) {
      71            0 :         self.cancel.cancel();
      72            0 :         self.task.await.unwrap();
      73            0 :     }
      74              : }
      75              : 
      76            0 : #[tracing::instrument(skip_all, fields(%exit_code))]
      77              : #[allow(clippy::too_many_arguments)]
      78              : pub async fn shutdown_pageserver(
      79              :     http_listener: HttpEndpointListener,
      80              :     page_service: page_service::Listener,
      81              :     consumption_metrics_worker: ConsumptionMetricsTasks,
      82              :     disk_usage_eviction_task: Option<DiskUsageEvictionTask>,
      83              :     tenant_manager: &TenantManager,
      84              :     background_purges: BackgroundPurges,
      85              :     mut deletion_queue: DeletionQueue,
      86              :     secondary_controller_tasks: secondary::GlobalTasks,
      87              :     exit_code: i32,
      88              : ) {
      89              :     use std::time::Duration;
      90              : 
      91              :     // If the orderly shutdown below takes too long, we still want to make
      92              :     // sure that all walredo processes are killed and wait()ed on by us, not systemd.
      93              :     //
      94              :     // (Leftover walredo processes are the hypothesized trigger for the systemd freezes
      95              :     //  that we keep seeing in prod => https://github.com/neondatabase/cloud/issues/11387.
      96              :     //
      97              :     // We use a thread instead of a tokio task because the background runtime is likely busy
      98              :     // with the final flushing / uploads. This activity here has priority, and due to lack
      99              :     // of scheduling priority feature sin the tokio scheduler, using a separate thread is
     100              :     // an effective priority booster.
     101              :     let walredo_extraordinary_shutdown_thread_span = {
     102              :         let span = info_span!(parent: None, "walredo_extraordinary_shutdown_thread");
     103              :         span.follows_from(tracing::Span::current());
     104              :         span
     105              :     };
     106              :     let walredo_extraordinary_shutdown_thread_cancel = CancellationToken::new();
     107              :     let walredo_extraordinary_shutdown_thread = std::thread::spawn({
     108              :         let walredo_extraordinary_shutdown_thread_cancel =
     109              :             walredo_extraordinary_shutdown_thread_cancel.clone();
     110            0 :         move || {
     111            0 :             let rt = tokio::runtime::Builder::new_current_thread()
     112            0 :                 .enable_all()
     113            0 :                 .build()
     114            0 :                 .unwrap();
     115            0 :             let _entered = rt.enter();
     116            0 :             let _entered = walredo_extraordinary_shutdown_thread_span.enter();
     117            0 :             if let Ok(()) = rt.block_on(tokio::time::timeout(
     118            0 :                 Duration::from_secs(8),
     119            0 :                 walredo_extraordinary_shutdown_thread_cancel.cancelled(),
     120            0 :             )) {
     121            0 :                 info!("cancellation requested");
     122            0 :                 return;
     123            0 :             }
     124            0 :             let managers = tenant::WALREDO_MANAGERS
     125            0 :                 .lock()
     126            0 :                 .unwrap()
     127            0 :                 // prevents new walredo managers from being inserted
     128            0 :                 .take()
     129            0 :                 .expect("only we take()");
     130            0 :             // Use FuturesUnordered to get in queue early for each manager's
     131            0 :             // heavier_once_cell semaphore wait list.
     132            0 :             // Also, for idle tenants that for some reason haven't
     133            0 :             // shut down yet, it's quite likely that we're not going
     134            0 :             // to get Poll::Pending once.
     135            0 :             let mut futs: FuturesUnordered<_> = managers
     136            0 :                 .into_iter()
     137            0 :                 .filter_map(|(_, mgr)| mgr.upgrade())
     138            0 :                 .map(|mgr| async move { tokio::task::unconstrained(mgr.shutdown()).await })
     139            0 :                 .collect();
     140            0 :             info!(count=%futs.len(), "built FuturesUnordered");
     141            0 :             let mut last_log_at = std::time::Instant::now();
     142            0 :             #[derive(Debug, Default)]
     143            0 :             struct Results {
     144            0 :                 initiated: u64,
     145            0 :                 already: u64,
     146            0 :             }
     147            0 :             let mut results = Results::default();
     148            0 :             while let Some(we_initiated) = rt.block_on(futs.next()) {
     149            0 :                 if we_initiated {
     150            0 :                     results.initiated += 1;
     151            0 :                 } else {
     152            0 :                     results.already += 1;
     153            0 :                 }
     154            0 :                 if last_log_at.elapsed() > Duration::from_millis(100) {
     155            0 :                     info!(remaining=%futs.len(), ?results, "progress");
     156            0 :                     last_log_at = std::time::Instant::now();
     157            0 :                 }
     158              :             }
     159            0 :             info!(?results, "done");
     160            0 :         }
     161              :     });
     162              : 
     163              :     // Shut down the libpq endpoint task. This prevents new connections from
     164              :     // being accepted.
     165              :     let remaining_connections = timed(
     166              :         page_service.stop_accepting(),
     167              :         "shutdown LibpqEndpointListener",
     168              :         Duration::from_secs(1),
     169              :     )
     170              :     .await;
     171              : 
     172              :     // Shut down all the tenants. This flushes everything to disk and kills
     173              :     // the checkpoint and GC tasks.
     174              :     timed(
     175              :         tenant_manager.shutdown(),
     176              :         "shutdown all tenants",
     177              :         Duration::from_secs(5),
     178              :     )
     179              :     .await;
     180              : 
     181              :     // Shut down any page service tasks: any in-progress work for particular timelines or tenants
     182              :     // should already have been canclled via mgr::shutdown_all_tenants
     183              :     timed(
     184              :         remaining_connections.shutdown(),
     185              :         "shutdown PageRequestHandlers",
     186              :         Duration::from_secs(1),
     187              :     )
     188              :     .await;
     189              : 
     190              :     // Best effort to persist any outstanding deletions, to avoid leaking objects
     191              :     deletion_queue.shutdown(Duration::from_secs(5)).await;
     192              : 
     193              :     timed(
     194              :         consumption_metrics_worker.0.shutdown(),
     195              :         "shutdown consumption metrics",
     196              :         Duration::from_secs(1),
     197              :     )
     198              :     .await;
     199              : 
     200              :     timed(
     201            0 :         futures::future::OptionFuture::from(disk_usage_eviction_task.map(|t| t.0.shutdown())),
     202              :         "shutdown disk usage eviction",
     203              :         Duration::from_secs(1),
     204              :     )
     205              :     .await;
     206              : 
     207              :     timed(
     208              :         background_purges.shutdown(),
     209              :         "shutdown background purges",
     210              :         Duration::from_secs(1),
     211              :     )
     212              :     .await;
     213              : 
     214              :     // Shut down the HTTP endpoint last, so that you can still check the server's
     215              :     // status while it's shutting down.
     216              :     // FIXME: We should probably stop accepting commands like attach/detach earlier.
     217              :     timed(
     218              :         http_listener.0.shutdown(),
     219              :         "shutdown http",
     220              :         Duration::from_secs(1),
     221              :     )
     222              :     .await;
     223              : 
     224              :     timed(
     225              :         secondary_controller_tasks.wait(), // cancellation happened in caller
     226              :         "secondary controller wait",
     227              :         Duration::from_secs(1),
     228              :     )
     229              :     .await;
     230              : 
     231              :     // There should be nothing left, but let's be sure
     232              :     timed(
     233              :         task_mgr::shutdown_tasks(None, None, None),
     234              :         "shutdown leftovers",
     235              :         Duration::from_secs(1),
     236              :     )
     237              :     .await;
     238              : 
     239              :     info!("cancel & join walredo_extraordinary_shutdown_thread");
     240              :     walredo_extraordinary_shutdown_thread_cancel.cancel();
     241              :     walredo_extraordinary_shutdown_thread.join().unwrap();
     242              :     info!("walredo_extraordinary_shutdown_thread done");
     243              : 
     244              :     info!("Shut down successfully completed");
     245              :     std::process::exit(exit_code);
     246              : }
     247              : 
     248              : /// Per-tenant configuration file.
     249              : /// Full path: `tenants/<tenant_id>/config-v1`.
     250              : pub(crate) const TENANT_LOCATION_CONFIG_NAME: &str = "config-v1";
     251              : 
     252              : /// Per-tenant copy of their remote heatmap, downloaded into the local
     253              : /// tenant path while in secondary mode.
     254              : pub(crate) const TENANT_HEATMAP_BASENAME: &str = "heatmap-v1.json";
     255              : 
     256              : /// A suffix used for various temporary files. Any temporary files found in the
     257              : /// data directory at pageserver startup can be automatically removed.
     258              : pub(crate) const TEMP_FILE_SUFFIX: &str = "___temp";
     259              : 
     260              : /// A marker file to mark that a timeline directory was not fully initialized.
     261              : /// If a timeline directory with this marker is encountered at pageserver startup,
     262              : /// the timeline directory and the marker file are both removed.
     263              : /// Full path: `tenants/<tenant_id>/timelines/<timeline_id>___uninit`.
     264              : pub(crate) const TIMELINE_UNINIT_MARK_SUFFIX: &str = "___uninit";
     265              : 
     266              : pub(crate) const TIMELINE_DELETE_MARK_SUFFIX: &str = "___delete";
     267              : 
     268            8 : pub fn is_temporary(path: &Utf8Path) -> bool {
     269            8 :     match path.file_name() {
     270            8 :         Some(name) => name.ends_with(TEMP_FILE_SUFFIX),
     271            0 :         None => false,
     272              :     }
     273            8 : }
     274              : 
     275           16 : fn ends_with_suffix(path: &Utf8Path, suffix: &str) -> bool {
     276           16 :     match path.file_name() {
     277           16 :         Some(name) => name.ends_with(suffix),
     278            0 :         None => false,
     279              :     }
     280           16 : }
     281              : 
     282              : // FIXME: DO NOT ADD new query methods like this, which will have a next step of parsing timelineid
     283              : // from the directory name. Instead create type "UninitMark(TimelineId)" and only parse it once
     284              : // from the name.
     285              : 
     286            8 : pub(crate) fn is_uninit_mark(path: &Utf8Path) -> bool {
     287            8 :     ends_with_suffix(path, TIMELINE_UNINIT_MARK_SUFFIX)
     288            8 : }
     289              : 
     290            8 : pub(crate) fn is_delete_mark(path: &Utf8Path) -> bool {
     291            8 :     ends_with_suffix(path, TIMELINE_DELETE_MARK_SUFFIX)
     292            8 : }
     293              : 
     294              : /// During pageserver startup, we need to order operations not to exhaust tokio worker threads by
     295              : /// blocking.
     296              : ///
     297              : /// The instances of this value exist only during startup, otherwise `None` is provided, meaning no
     298              : /// delaying is needed.
     299              : #[derive(Clone)]
     300              : pub struct InitializationOrder {
     301              :     /// Each initial tenant load task carries this until it is done loading timelines from remote storage
     302              :     pub initial_tenant_load_remote: Option<utils::completion::Completion>,
     303              : 
     304              :     /// Each initial tenant load task carries this until completion.
     305              :     pub initial_tenant_load: Option<utils::completion::Completion>,
     306              : 
     307              :     /// Barrier for when we can start any background jobs.
     308              :     ///
     309              :     /// This can be broken up later on, but right now there is just one class of a background job.
     310              :     pub background_jobs_can_start: utils::completion::Barrier,
     311              : }
     312              : 
     313              : /// Time the future with a warning when it exceeds a threshold.
     314           22 : async fn timed<Fut: std::future::Future>(
     315           22 :     fut: Fut,
     316           22 :     name: &str,
     317           22 :     warn_at: std::time::Duration,
     318           22 : ) -> <Fut as std::future::Future>::Output {
     319           22 :     let started = std::time::Instant::now();
     320           22 : 
     321           22 :     let mut fut = std::pin::pin!(fut);
     322           22 : 
     323           22 :     match tokio::time::timeout(warn_at, &mut fut).await {
     324           20 :         Ok(ret) => {
     325           20 :             tracing::info!(
     326              :                 stage = name,
     327            0 :                 elapsed_ms = started.elapsed().as_millis(),
     328            0 :                 "completed"
     329              :             );
     330           20 :             ret
     331              :         }
     332              :         Err(_) => {
     333            2 :             tracing::info!(
     334              :                 stage = name,
     335            0 :                 elapsed_ms = started.elapsed().as_millis(),
     336            0 :                 "still waiting, taking longer than expected..."
     337              :             );
     338              : 
     339            2 :             let ret = fut.await;
     340              : 
     341              :             // this has a global allowed_errors
     342            2 :             tracing::warn!(
     343              :                 stage = name,
     344            0 :                 elapsed_ms = started.elapsed().as_millis(),
     345            0 :                 "completed, took longer than expected"
     346              :             );
     347              : 
     348            2 :             ret
     349              :         }
     350              :     }
     351           22 : }
     352              : 
     353              : #[cfg(test)]
     354              : mod timed_tests {
     355              :     use super::timed;
     356              :     use std::time::Duration;
     357              : 
     358              :     #[tokio::test]
     359            2 :     async fn timed_completes_when_inner_future_completes() {
     360            2 :         // A future that completes on time should have its result returned
     361            2 :         let r1 = timed(
     362            2 :             async move {
     363            2 :                 tokio::time::sleep(Duration::from_millis(10)).await;
     364            2 :                 123
     365            2 :             },
     366            2 :             "test 1",
     367            2 :             Duration::from_millis(50),
     368            2 :         )
     369            2 :         .await;
     370            2 :         assert_eq!(r1, 123);
     371            2 : 
     372            2 :         // A future that completes too slowly should also have its result returned
     373            2 :         let r1 = timed(
     374            2 :             async move {
     375            6 :                 tokio::time::sleep(Duration::from_millis(50)).await;
     376            2 :                 456
     377            2 :             },
     378            2 :             "test 1",
     379            2 :             Duration::from_millis(10),
     380            2 :         )
     381            4 :         .await;
     382            2 :         assert_eq!(r1, 456);
     383            2 :     }
     384              : }
        

Generated by: LCOV version 2.1-beta