LCOV - code coverage report
Current view: top level - pageserver/src - lib.rs (source / functions) Coverage Total Hit
Test: 32f4a56327bc9da697706839ed4836b2a00a408f.info Lines: 74.7 % 79 59
Test Date: 2024-02-07 07:37:29 Functions: 63.2 % 38 24

            Line data    Source code
       1              : #![recursion_limit = "300"]
       2              : #![deny(clippy::undocumented_unsafe_blocks)]
       3              : 
       4              : mod auth;
       5              : pub mod basebackup;
       6              : pub mod config;
       7              : pub mod consumption_metrics;
       8              : pub mod context;
       9              : pub mod control_plane_client;
      10              : pub mod deletion_queue;
      11              : pub mod disk_usage_eviction_task;
      12              : pub mod http;
      13              : pub mod import_datadir;
      14              : pub use pageserver_api::keyspace;
      15              : pub mod metrics;
      16              : pub mod page_cache;
      17              : pub mod page_service;
      18              : pub mod pgdatadir_mapping;
      19              : pub mod repository;
      20              : pub mod span;
      21              : pub(crate) mod statvfs;
      22              : pub mod task_mgr;
      23              : pub mod tenant;
      24              : pub mod trace;
      25              : pub mod virtual_file;
      26              : pub mod walingest;
      27              : pub mod walrecord;
      28              : pub mod walredo;
      29              : 
      30              : use crate::task_mgr::TaskKind;
      31              : use camino::Utf8Path;
      32              : use deletion_queue::DeletionQueue;
      33              : use tracing::info;
      34              : 
      35              : /// Current storage format version
      36              : ///
      37              : /// This is embedded in the header of all the layer files.
      38              : /// If you make any backwards-incompatible changes to the storage
      39              : /// format, bump this!
      40              : /// Note that TimelineMetadata uses its own version number to track
      41              : /// backwards-compatible changes to the metadata format.
      42              : pub const STORAGE_FORMAT_VERSION: u16 = 3;
      43              : 
      44              : pub const DEFAULT_PG_VERSION: u32 = 15;
      45              : 
      46              : // Magic constants used to identify different kinds of files
      47              : pub const IMAGE_FILE_MAGIC: u16 = 0x5A60;
      48              : pub const DELTA_FILE_MAGIC: u16 = 0x5A61;
      49              : 
      50              : static ZERO_PAGE: bytes::Bytes = bytes::Bytes::from_static(&[0u8; 8192]);
      51              : 
      52              : pub use crate::metrics::preinitialize_metrics;
      53              : 
      54          172 : #[tracing::instrument(skip_all, fields(%exit_code))]
      55              : pub async fn shutdown_pageserver(deletion_queue: Option<DeletionQueue>, exit_code: i32) {
      56              :     use std::time::Duration;
      57              :     // Shut down the libpq endpoint task. This prevents new connections from
      58              :     // being accepted.
      59              :     timed(
      60              :         task_mgr::shutdown_tasks(Some(TaskKind::LibpqEndpointListener), None, None),
      61              :         "shutdown LibpqEndpointListener",
      62              :         Duration::from_secs(1),
      63              :     )
      64              :     .await;
      65              : 
      66              :     // Shut down all the tenants. This flushes everything to disk and kills
      67              :     // the checkpoint and GC tasks.
      68              :     timed(
      69              :         tenant::mgr::shutdown_all_tenants(),
      70              :         "shutdown all tenants",
      71              :         Duration::from_secs(5),
      72              :     )
      73              :     .await;
      74              : 
      75              :     // Shut down any page service tasks: any in-progress work for particular timelines or tenants
      76              :     // should already have been canclled via mgr::shutdown_all_tenants
      77              :     timed(
      78              :         task_mgr::shutdown_tasks(Some(TaskKind::PageRequestHandler), None, None),
      79              :         "shutdown PageRequestHandlers",
      80              :         Duration::from_secs(1),
      81              :     )
      82              :     .await;
      83              : 
      84              :     // Best effort to persist any outstanding deletions, to avoid leaking objects
      85              :     if let Some(mut deletion_queue) = deletion_queue {
      86              :         deletion_queue.shutdown(Duration::from_secs(5)).await;
      87              :     }
      88              : 
      89              :     // Shut down the HTTP endpoint last, so that you can still check the server's
      90              :     // status while it's shutting down.
      91              :     // FIXME: We should probably stop accepting commands like attach/detach earlier.
      92              :     timed(
      93              :         task_mgr::shutdown_tasks(Some(TaskKind::HttpEndpointListener), None, None),
      94              :         "shutdown http",
      95              :         Duration::from_secs(1),
      96              :     )
      97              :     .await;
      98              : 
      99              :     // There should be nothing left, but let's be sure
     100              :     timed(
     101              :         task_mgr::shutdown_tasks(None, None, None),
     102              :         "shutdown leftovers",
     103              :         Duration::from_secs(1),
     104              :     )
     105              :     .await;
     106          172 :     info!("Shut down successfully completed");
     107              :     std::process::exit(exit_code);
     108              : }
     109              : 
     110              : /// The name of the metadata file pageserver creates per timeline.
     111              : /// Full path: `tenants/<tenant_id>/timelines/<timeline_id>/metadata`.
     112              : pub const METADATA_FILE_NAME: &str = "metadata";
     113              : 
     114              : /// Per-tenant configuration file.
     115              : /// Full path: `tenants/<tenant_id>/config`.
     116              : pub const TENANT_CONFIG_NAME: &str = "config";
     117              : 
     118              : /// Per-tenant configuration file.
     119              : /// Full path: `tenants/<tenant_id>/config`.
     120              : pub const TENANT_LOCATION_CONFIG_NAME: &str = "config-v1";
     121              : 
     122              : /// Per-tenant copy of their remote heatmap, downloaded into the local
     123              : /// tenant path while in secondary mode.
     124              : pub const TENANT_HEATMAP_BASENAME: &str = "heatmap-v1.json";
     125              : 
     126              : /// A suffix used for various temporary files. Any temporary files found in the
     127              : /// data directory at pageserver startup can be automatically removed.
     128              : pub const TEMP_FILE_SUFFIX: &str = "___temp";
     129              : 
     130              : /// A marker file to mark that a timeline directory was not fully initialized.
     131              : /// If a timeline directory with this marker is encountered at pageserver startup,
     132              : /// the timeline directory and the marker file are both removed.
     133              : /// Full path: `tenants/<tenant_id>/timelines/<timeline_id>___uninit`.
     134              : pub const TIMELINE_UNINIT_MARK_SUFFIX: &str = "___uninit";
     135              : 
     136              : pub const TIMELINE_DELETE_MARK_SUFFIX: &str = "___delete";
     137              : 
     138              : /// A marker file to prevent pageserver from loading a certain tenant on restart.
     139              : /// Different from [`TIMELINE_UNINIT_MARK_SUFFIX`] due to semantics of the corresponding
     140              : /// `ignore` management API command, that expects the ignored tenant to be properly loaded
     141              : /// into pageserver's memory before being ignored.
     142              : /// Full path: `tenants/<tenant_id>/___ignored_tenant`.
     143              : pub const IGNORED_TENANT_FILE_NAME: &str = "___ignored_tenant";
     144              : 
     145         1526 : pub fn is_temporary(path: &Utf8Path) -> bool {
     146         1526 :     match path.file_name() {
     147         1526 :         Some(name) => name.ends_with(TEMP_FILE_SUFFIX),
     148            0 :         None => false,
     149              :     }
     150         1526 : }
     151              : 
     152          866 : fn ends_with_suffix(path: &Utf8Path, suffix: &str) -> bool {
     153          866 :     match path.file_name() {
     154          866 :         Some(name) => name.ends_with(suffix),
     155            0 :         None => false,
     156              :     }
     157          866 : }
     158              : 
     159              : // FIXME: DO NOT ADD new query methods like this, which will have a next step of parsing timelineid
     160              : // from the directory name. Instead create type "UninitMark(TimelineId)" and only parse it once
     161              : // from the name.
     162              : 
     163          435 : pub fn is_uninit_mark(path: &Utf8Path) -> bool {
     164          435 :     ends_with_suffix(path, TIMELINE_UNINIT_MARK_SUFFIX)
     165          435 : }
     166              : 
     167          431 : pub fn is_delete_mark(path: &Utf8Path) -> bool {
     168          431 :     ends_with_suffix(path, TIMELINE_DELETE_MARK_SUFFIX)
     169          431 : }
     170              : 
     171            0 : fn is_walkdir_io_not_found(e: &walkdir::Error) -> bool {
     172            0 :     if let Some(e) = e.io_error() {
     173            0 :         if e.kind() == std::io::ErrorKind::NotFound {
     174            0 :             return true;
     175            0 :         }
     176            0 :     }
     177            0 :     false
     178            0 : }
     179              : 
     180              : /// During pageserver startup, we need to order operations not to exhaust tokio worker threads by
     181              : /// blocking.
     182              : ///
     183              : /// The instances of this value exist only during startup, otherwise `None` is provided, meaning no
     184              : /// delaying is needed.
     185          207 : #[derive(Clone)]
     186              : pub struct InitializationOrder {
     187              :     /// Each initial tenant load task carries this until it is done loading timelines from remote storage
     188              :     pub initial_tenant_load_remote: Option<utils::completion::Completion>,
     189              : 
     190              :     /// Each initial tenant load task carries this until completion.
     191              :     pub initial_tenant_load: Option<utils::completion::Completion>,
     192              : 
     193              :     /// Barrier for when we can start any background jobs.
     194              :     ///
     195              :     /// This can be broken up later on, but right now there is just one class of a background job.
     196              :     pub background_jobs_can_start: utils::completion::Barrier,
     197              : }
     198              : 
     199              : /// Time the future with a warning when it exceeds a threshold.
     200         1065 : async fn timed<Fut: std::future::Future>(
     201         1065 :     fut: Fut,
     202         1065 :     name: &str,
     203         1065 :     warn_at: std::time::Duration,
     204         1065 : ) -> <Fut as std::future::Future>::Output {
     205         1065 :     let started = std::time::Instant::now();
     206         1065 : 
     207         1065 :     let mut fut = std::pin::pin!(fut);
     208         1065 : 
     209         1065 :     match tokio::time::timeout(warn_at, &mut fut).await {
     210         1063 :         Ok(ret) => {
     211         1061 :             tracing::info!(
     212         1061 :                 stage = name,
     213         1061 :                 elapsed_ms = started.elapsed().as_millis(),
     214         1061 :                 "completed"
     215         1061 :             );
     216         1063 :             ret
     217              :         }
     218              :         Err(_) => {
     219            0 :             tracing::info!(
     220            0 :                 stage = name,
     221            0 :                 elapsed_ms = started.elapsed().as_millis(),
     222            0 :                 "still waiting, taking longer than expected..."
     223            0 :             );
     224              : 
     225            2 :             let ret = fut.await;
     226              : 
     227              :             // this has a global allowed_errors
     228            0 :             tracing::warn!(
     229            0 :                 stage = name,
     230            0 :                 elapsed_ms = started.elapsed().as_millis(),
     231            0 :                 "completed, took longer than expected"
     232            0 :             );
     233              : 
     234            2 :             ret
     235              :         }
     236              :     }
     237         1065 : }
     238              : 
     239              : #[cfg(test)]
     240              : mod timed_tests {
     241              :     use super::timed;
     242              :     use std::time::Duration;
     243              : 
     244            2 :     #[tokio::test]
     245            2 :     async fn timed_completes_when_inner_future_completes() {
     246              :         // A future that completes on time should have its result returned
     247            2 :         let r1 = timed(
     248            2 :             async move {
     249            2 :                 tokio::time::sleep(Duration::from_millis(10)).await;
     250            2 :                 123
     251            2 :             },
     252            2 :             "test 1",
     253            2 :             Duration::from_millis(50),
     254            2 :         )
     255            2 :         .await;
     256            2 :         assert_eq!(r1, 123);
     257              : 
     258              :         // A future that completes too slowly should also have its result returned
     259            2 :         let r1 = timed(
     260            2 :             async move {
     261            6 :                 tokio::time::sleep(Duration::from_millis(50)).await;
     262            2 :                 456
     263            2 :             },
     264            2 :             "test 1",
     265            2 :             Duration::from_millis(10),
     266            2 :         )
     267            4 :         .await;
     268            2 :         assert_eq!(r1, 456);
     269              :     }
     270              : }
        

Generated by: LCOV version 2.1-beta