LCOV - differential code coverage report
Current view: top level - pageserver/src - tenant.rs (source / functions) Coverage Total Hit LBC UBC GIC CBC ECB
Current: f6946e90941b557c917ac98cd5a7e9506d180f3e.info Lines: 87.8 % 3047 2674 2 371 2674
Current Date: 2023-10-19 02:04:12 Functions: 73.1 % 353 258 95 3 255 3
Baseline: c8637f37369098875162f194f92736355783b050.info
Baseline Date: 2023-10-18 20:25:20

           TLA  Line data    Source code
       1                 : //!
       2                 : //! Timeline repository implementation that keeps old data in files on disk, and
       3                 : //! the recent changes in memory. See tenant/*_layer.rs files.
       4                 : //! The functions here are responsible for locating the correct layer for the
       5                 : //! get/put call, walking back the timeline branching history as needed.
       6                 : //!
       7                 : //! The files are stored in the .neon/tenants/<tenant_id>/timelines/<timeline_id>
       8                 : //! directory. See docs/pageserver-storage.md for how the files are managed.
       9                 : //! In addition to the layer files, there is a metadata file in the same
      10                 : //! directory that contains information about the timeline, in particular its
      11                 : //! parent timeline, and the last LSN that has been written to disk.
      12                 : //!
      13                 : 
      14                 : use anyhow::{bail, Context};
      15                 : use camino::{Utf8Path, Utf8PathBuf};
      16                 : use futures::FutureExt;
      17                 : use pageserver_api::models::TimelineState;
      18                 : use remote_storage::DownloadError;
      19                 : use remote_storage::GenericRemoteStorage;
      20                 : use storage_broker::BrokerClientChannel;
      21                 : use tokio::sync::watch;
      22                 : use tokio::task::JoinSet;
      23                 : use tokio_util::sync::CancellationToken;
      24                 : use tracing::*;
      25                 : use utils::completion;
      26                 : use utils::completion::Completion;
      27                 : use utils::crashsafe::path_with_suffix_extension;
      28                 : 
      29                 : use std::cmp::min;
      30                 : use std::collections::hash_map::Entry;
      31                 : use std::collections::BTreeSet;
      32                 : use std::collections::HashMap;
      33                 : use std::collections::HashSet;
      34                 : use std::fmt::Debug;
      35                 : use std::fmt::Display;
      36                 : use std::fs;
      37                 : use std::fs::File;
      38                 : use std::io;
      39                 : use std::ops::Bound::Included;
      40                 : use std::process::Command;
      41                 : use std::process::Stdio;
      42                 : use std::sync::atomic::AtomicU64;
      43                 : use std::sync::atomic::Ordering;
      44                 : use std::sync::Arc;
      45                 : use std::sync::MutexGuard;
      46                 : use std::sync::{Mutex, RwLock};
      47                 : use std::time::{Duration, Instant};
      48                 : 
      49                 : use self::config::AttachedLocationConfig;
      50                 : use self::config::AttachmentMode;
      51                 : use self::config::LocationConf;
      52                 : use self::config::TenantConf;
      53                 : use self::delete::DeleteTenantFlow;
      54                 : use self::metadata::LoadMetadataError;
      55                 : use self::metadata::TimelineMetadata;
      56                 : use self::mgr::TenantsMap;
      57                 : use self::remote_timeline_client::RemoteTimelineClient;
      58                 : use self::timeline::uninit::TimelineUninitMark;
      59                 : use self::timeline::uninit::UninitializedTimeline;
      60                 : use self::timeline::EvictionTaskTenantState;
      61                 : use self::timeline::TimelineResources;
      62                 : use crate::config::PageServerConf;
      63                 : use crate::context::{DownloadBehavior, RequestContext};
      64                 : use crate::deletion_queue::DeletionQueueClient;
      65                 : use crate::import_datadir;
      66                 : use crate::is_uninit_mark;
      67                 : use crate::metrics::TENANT_ACTIVATION;
      68                 : use crate::metrics::{remove_tenant_metrics, TENANT_STATE_METRIC, TENANT_SYNTHETIC_SIZE_METRIC};
      69                 : use crate::repository::GcResult;
      70                 : use crate::task_mgr;
      71                 : use crate::task_mgr::TaskKind;
      72                 : use crate::tenant::config::LocationMode;
      73                 : use crate::tenant::config::TenantConfOpt;
      74                 : use crate::tenant::metadata::load_metadata;
      75                 : pub use crate::tenant::remote_timeline_client::index::IndexPart;
      76                 : use crate::tenant::remote_timeline_client::MaybeDeletedIndexPart;
      77                 : use crate::tenant::storage_layer::DeltaLayer;
      78                 : use crate::tenant::storage_layer::ImageLayer;
      79                 : use crate::InitializationOrder;
      80                 : use crate::METADATA_FILE_NAME;
      81                 : 
      82                 : use crate::tenant::timeline::delete::DeleteTimelineFlow;
      83                 : use crate::tenant::timeline::uninit::cleanup_timeline_directory;
      84                 : use crate::virtual_file::VirtualFile;
      85                 : use crate::walredo::PostgresRedoManager;
      86                 : use crate::TEMP_FILE_SUFFIX;
      87                 : pub use pageserver_api::models::TenantState;
      88                 : 
      89                 : use toml_edit;
      90                 : use utils::{
      91                 :     crashsafe,
      92                 :     generation::Generation,
      93                 :     id::{TenantId, TimelineId},
      94                 :     lsn::{Lsn, RecordLsn},
      95                 : };
      96                 : 
      97                 : /// Declare a failpoint that can use the `pause` failpoint action.
      98                 : /// We don't want to block the executor thread, hence, spawn_blocking + await.
      99                 : macro_rules! pausable_failpoint {
     100                 :     ($name:literal) => {
     101                 :         if cfg!(feature = "testing") {
     102                 :             tokio::task::spawn_blocking({
     103                 :                 let current = tracing::Span::current();
     104                 :                 move || {
     105                 :                     let _entered = current.entered();
     106                 :                     tracing::info!("at failpoint {}", $name);
     107                 :                     fail::fail_point!($name);
     108                 :                 }
     109                 :             })
     110                 :             .await
     111                 :             .expect("spawn_blocking");
     112                 :         }
     113                 :     };
     114                 : }
     115                 : 
     116                 : pub mod blob_io;
     117                 : pub mod block_io;
     118                 : 
     119                 : pub mod disk_btree;
     120                 : pub(crate) mod ephemeral_file;
     121                 : pub mod layer_map;
     122                 : mod span;
     123                 : 
     124                 : pub mod metadata;
     125                 : mod par_fsync;
     126                 : pub mod remote_timeline_client;
     127                 : pub mod storage_layer;
     128                 : 
     129                 : pub mod config;
     130                 : pub mod delete;
     131                 : pub mod mgr;
     132                 : pub mod tasks;
     133                 : pub mod upload_queue;
     134                 : 
     135                 : pub(crate) mod timeline;
     136                 : 
     137                 : pub mod size;
     138                 : 
     139                 : pub(crate) use timeline::span::debug_assert_current_span_has_tenant_and_timeline_id;
     140                 : pub use timeline::{
     141                 :     LocalLayerInfoForDiskUsageEviction, LogicalSizeCalculationCause, PageReconstructError, Timeline,
     142                 : };
     143                 : 
     144                 : // re-export for use in remote_timeline_client.rs
     145                 : pub use crate::tenant::metadata::save_metadata;
     146                 : 
     147                 : // re-export for use in walreceiver
     148                 : pub use crate::tenant::timeline::WalReceiverInfo;
     149                 : 
     150                 : /// The "tenants" part of `tenants/<tenant>/timelines...`
     151                 : pub const TENANTS_SEGMENT_NAME: &str = "tenants";
     152                 : 
     153                 : /// Parts of the `.neon/tenants/<tenant_id>/timelines/<timeline_id>` directory prefix.
     154                 : pub const TIMELINES_SEGMENT_NAME: &str = "timelines";
     155                 : 
     156                 : pub const TENANT_ATTACHING_MARKER_FILENAME: &str = "attaching";
     157                 : 
     158                 : pub const TENANT_DELETED_MARKER_FILE_NAME: &str = "deleted";
     159                 : 
     160                 : /// References to shared objects that are passed into each tenant, such
     161                 : /// as the shared remote storage client and process initialization state.
     162 CBC         210 : #[derive(Clone)]
     163                 : pub struct TenantSharedResources {
     164                 :     pub broker_client: storage_broker::BrokerClientChannel,
     165                 :     pub remote_storage: Option<GenericRemoteStorage>,
     166                 :     pub deletion_queue_client: DeletionQueueClient,
     167                 : }
     168                 : 
     169                 : /// A [`Tenant`] is really an _attached_ tenant.  The configuration
     170                 : /// for an attached tenant is a subset of the [`LocationConf`], represented
     171                 : /// in this struct.
     172                 : pub(super) struct AttachedTenantConf {
     173                 :     tenant_conf: TenantConfOpt,
     174                 :     location: AttachedLocationConfig,
     175                 : }
     176                 : 
     177                 : impl AttachedTenantConf {
     178             748 :     fn try_from(location_conf: LocationConf) -> anyhow::Result<Self> {
     179             748 :         match &location_conf.mode {
     180             748 :             LocationMode::Attached(attach_conf) => Ok(Self {
     181             748 :                 tenant_conf: location_conf.tenant_conf,
     182             748 :                 location: attach_conf.clone(),
     183             748 :             }),
     184                 :             LocationMode::Secondary(_) => {
     185 UBC           0 :                 anyhow::bail!("Attempted to construct AttachedTenantConf from a LocationConf in secondary mode")
     186                 :             }
     187                 :         }
     188 CBC         748 :     }
     189                 : }
     190                 : struct TimelinePreload {
     191                 :     timeline_id: TimelineId,
     192                 :     client: RemoteTimelineClient,
     193                 :     index_part: Result<MaybeDeletedIndexPart, DownloadError>,
     194                 : }
     195                 : 
     196                 : ///
     197                 : /// Tenant consists of multiple timelines. Keep them in a hash table.
     198                 : ///
     199                 : pub struct Tenant {
     200                 :     // Global pageserver config parameters
     201                 :     pub conf: &'static PageServerConf,
     202                 : 
     203                 :     /// The value creation timestamp, used to measure activation delay, see:
     204                 :     /// <https://github.com/neondatabase/neon/issues/4025>
     205                 :     loading_started_at: Instant,
     206                 : 
     207                 :     state: watch::Sender<TenantState>,
     208                 : 
     209                 :     // Overridden tenant-specific config parameters.
     210                 :     // We keep TenantConfOpt sturct here to preserve the information
     211                 :     // about parameters that are not set.
     212                 :     // This is necessary to allow global config updates.
     213                 :     tenant_conf: Arc<RwLock<AttachedTenantConf>>,
     214                 : 
     215                 :     tenant_id: TenantId,
     216                 : 
     217                 :     /// The remote storage generation, used to protect S3 objects from split-brain.
     218                 :     /// Does not change over the lifetime of the [`Tenant`] object.
     219                 :     ///
     220                 :     /// This duplicates the generation stored in LocationConf, but that structure is mutable:
     221                 :     /// this copy enforces the invariant that generatio doesn't change during a Tenant's lifetime.
     222                 :     generation: Generation,
     223                 : 
     224                 :     timelines: Mutex<HashMap<TimelineId, Arc<Timeline>>>,
     225                 :     // This mutex prevents creation of new timelines during GC.
     226                 :     // Adding yet another mutex (in addition to `timelines`) is needed because holding
     227                 :     // `timelines` mutex during all GC iteration
     228                 :     // may block for a long time `get_timeline`, `get_timelines_state`,... and other operations
     229                 :     // with timelines, which in turn may cause dropping replication connection, expiration of wait_for_lsn
     230                 :     // timeout...
     231                 :     gc_cs: tokio::sync::Mutex<()>,
     232                 :     walredo_mgr: Arc<WalRedoManager>,
     233                 : 
     234                 :     // provides access to timeline data sitting in the remote storage
     235                 :     pub(crate) remote_storage: Option<GenericRemoteStorage>,
     236                 : 
     237                 :     // Access to global deletion queue for when this tenant wants to schedule a deletion
     238                 :     deletion_queue_client: DeletionQueueClient,
     239                 : 
     240                 :     /// Cached logical sizes updated updated on each [`Tenant::gather_size_inputs`].
     241                 :     cached_logical_sizes: tokio::sync::Mutex<HashMap<(TimelineId, Lsn), u64>>,
     242                 :     cached_synthetic_tenant_size: Arc<AtomicU64>,
     243                 : 
     244                 :     eviction_task_tenant_state: tokio::sync::Mutex<EvictionTaskTenantState>,
     245                 : 
     246                 :     pub(crate) delete_progress: Arc<tokio::sync::Mutex<DeleteTenantFlow>>,
     247                 : }
     248                 : 
     249                 : pub(crate) enum WalRedoManager {
     250                 :     Prod(PostgresRedoManager),
     251                 :     #[cfg(test)]
     252                 :     Test(harness::TestRedoManager),
     253                 : }
     254                 : 
     255                 : impl From<PostgresRedoManager> for WalRedoManager {
     256             706 :     fn from(mgr: PostgresRedoManager) -> Self {
     257             706 :         Self::Prod(mgr)
     258             706 :     }
     259                 : }
     260                 : 
     261                 : #[cfg(test)]
     262                 : impl From<harness::TestRedoManager> for WalRedoManager {
     263              42 :     fn from(mgr: harness::TestRedoManager) -> Self {
     264              42 :         Self::Test(mgr)
     265              42 :     }
     266                 : }
     267                 : 
     268                 : impl WalRedoManager {
     269         2255848 :     pub async fn request_redo(
     270         2255848 :         &self,
     271         2255848 :         key: crate::repository::Key,
     272         2255848 :         lsn: Lsn,
     273         2255848 :         base_img: Option<(Lsn, bytes::Bytes)>,
     274         2255848 :         records: Vec<(Lsn, crate::walrecord::NeonWalRecord)>,
     275         2255848 :         pg_version: u32,
     276         2255848 :     ) -> anyhow::Result<bytes::Bytes> {
     277 UBC           0 :         match self {
     278 CBC     2255849 :             Self::Prod(mgr) => {
     279 UBC           0 :                 mgr.request_redo(key, lsn, base_img, records, pg_version)
     280               0 :                     .await
     281                 :             }
     282                 :             #[cfg(test)]
     283               0 :             Self::Test(mgr) => {
     284               0 :                 mgr.request_redo(key, lsn, base_img, records, pg_version)
     285               0 :                     .await
     286                 :             }
     287                 :         }
     288 CBC     2255846 :     }
     289                 : }
     290                 : 
     291             156 : #[derive(Debug, thiserror::Error, PartialEq, Eq)]
     292                 : pub enum GetTimelineError {
     293                 :     #[error("Timeline {tenant_id}/{timeline_id} is not active, state: {state:?}")]
     294                 :     NotActive {
     295                 :         tenant_id: TenantId,
     296                 :         timeline_id: TimelineId,
     297                 :         state: TimelineState,
     298                 :     },
     299                 :     #[error("Timeline {tenant_id}/{timeline_id} was not found")]
     300                 :     NotFound {
     301                 :         tenant_id: TenantId,
     302                 :         timeline_id: TimelineId,
     303                 :     },
     304                 : }
     305                 : 
     306 UBC           0 : #[derive(Debug, thiserror::Error)]
     307                 : pub enum LoadLocalTimelineError {
     308                 :     #[error("FailedToLoad")]
     309                 :     Load(#[source] anyhow::Error),
     310                 :     #[error("FailedToResumeDeletion")]
     311                 :     ResumeDeletion(#[source] anyhow::Error),
     312                 : }
     313                 : 
     314 CBC         183 : #[derive(thiserror::Error)]
     315                 : pub enum DeleteTimelineError {
     316                 :     #[error("NotFound")]
     317                 :     NotFound,
     318                 : 
     319                 :     #[error("HasChildren")]
     320                 :     HasChildren(Vec<TimelineId>),
     321                 : 
     322                 :     #[error("Timeline deletion is already in progress")]
     323                 :     AlreadyInProgress(Arc<tokio::sync::Mutex<DeleteTimelineFlow>>),
     324                 : 
     325                 :     #[error(transparent)]
     326                 :     Other(#[from] anyhow::Error),
     327                 : }
     328                 : 
     329                 : impl Debug for DeleteTimelineError {
     330 UBC           0 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
     331               0 :         match self {
     332               0 :             Self::NotFound => write!(f, "NotFound"),
     333               0 :             Self::HasChildren(c) => f.debug_tuple("HasChildren").field(c).finish(),
     334               0 :             Self::AlreadyInProgress(_) => f.debug_tuple("AlreadyInProgress").finish(),
     335               0 :             Self::Other(e) => f.debug_tuple("Other").field(e).finish(),
     336                 :         }
     337               0 :     }
     338                 : }
     339                 : 
     340                 : pub enum SetStoppingError {
     341                 :     AlreadyStopping(completion::Barrier),
     342                 :     Broken,
     343                 : }
     344                 : 
     345                 : impl Debug for SetStoppingError {
     346               0 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
     347               0 :         match self {
     348               0 :             Self::AlreadyStopping(_) => f.debug_tuple("AlreadyStopping").finish(),
     349               0 :             Self::Broken => write!(f, "Broken"),
     350                 :         }
     351               0 :     }
     352                 : }
     353                 : 
     354               0 : #[derive(Debug, thiserror::Error)]
     355                 : pub(crate) enum WaitToBecomeActiveError {
     356                 :     WillNotBecomeActive {
     357                 :         tenant_id: TenantId,
     358                 :         state: TenantState,
     359                 :     },
     360                 :     TenantDropped {
     361                 :         tenant_id: TenantId,
     362                 :     },
     363                 : }
     364                 : 
     365                 : impl std::fmt::Display for WaitToBecomeActiveError {
     366 CBC          62 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
     367              62 :         match self {
     368              62 :             WaitToBecomeActiveError::WillNotBecomeActive { tenant_id, state } => {
     369              62 :                 write!(
     370              62 :                     f,
     371              62 :                     "Tenant {} will not become active. Current state: {:?}",
     372              62 :                     tenant_id, state
     373              62 :                 )
     374                 :             }
     375 UBC           0 :             WaitToBecomeActiveError::TenantDropped { tenant_id } => {
     376               0 :                 write!(f, "Tenant {tenant_id} will not become active (dropped)")
     377                 :             }
     378                 :         }
     379 CBC          62 :     }
     380                 : }
     381                 : 
     382              18 : #[derive(thiserror::Error, Debug)]
     383                 : pub enum CreateTimelineError {
     384                 :     #[error("a timeline with the given ID already exists")]
     385                 :     AlreadyExists,
     386                 :     #[error(transparent)]
     387                 :     AncestorLsn(anyhow::Error),
     388                 :     #[error("ancestor timeline is not active")]
     389                 :     AncestorNotActive,
     390                 :     #[error(transparent)]
     391                 :     Other(#[from] anyhow::Error),
     392                 : }
     393                 : 
     394                 : /// spawn_attach argument for whether the caller is using attachment markers
     395                 : pub(super) enum AttachMarkerMode {
     396                 :     Expect,
     397                 :     Ignore,
     398                 : }
     399                 : 
     400                 : struct TenantDirectoryScan {
     401                 :     sorted_timelines_to_load: Vec<(TimelineId, TimelineMetadata)>,
     402                 :     timelines_to_resume_deletion: Vec<(TimelineId, Option<TimelineMetadata>)>,
     403                 : }
     404                 : 
     405                 : enum CreateTimelineCause {
     406                 :     Load,
     407                 :     Delete,
     408                 : }
     409                 : 
     410                 : impl Tenant {
     411                 :     /// Yet another helper for timeline initialization.
     412                 :     /// Contains the common part of `load_local_timeline` and `load_remote_timeline`.
     413                 :     ///
     414                 :     /// - Initializes the Timeline struct and inserts it into the tenant's hash map
     415                 :     /// - Scans the local timeline directory for layer files and builds the layer map
     416                 :     /// - Downloads remote index file and adds remote files to the layer map
     417                 :     /// - Schedules remote upload tasks for any files that are present locally but missing from remote storage.
     418                 :     ///
     419                 :     /// If the operation fails, the timeline is left in the tenant's hash map in Broken state. On success,
     420                 :     /// it is marked as Active.
     421                 :     #[allow(clippy::too_many_arguments)]
     422             314 :     async fn timeline_init_and_sync(
     423             314 :         &self,
     424             314 :         timeline_id: TimelineId,
     425             314 :         resources: TimelineResources,
     426             314 :         index_part: Option<IndexPart>,
     427             314 :         metadata: TimelineMetadata,
     428             314 :         ancestor: Option<Arc<Timeline>>,
     429             314 :         init_order: Option<&InitializationOrder>,
     430             314 :         _ctx: &RequestContext,
     431             314 :     ) -> anyhow::Result<()> {
     432             314 :         let tenant_id = self.tenant_id;
     433                 : 
     434             314 :         let timeline = self.create_timeline_struct(
     435             314 :             timeline_id,
     436             314 :             &metadata,
     437             314 :             ancestor.clone(),
     438             314 :             resources,
     439             314 :             init_order,
     440             314 :             CreateTimelineCause::Load,
     441             314 :         )?;
     442             314 :         let disk_consistent_lsn = timeline.get_disk_consistent_lsn();
     443             314 :         anyhow::ensure!(
     444             314 :             disk_consistent_lsn.is_valid(),
     445 UBC           0 :             "Timeline {tenant_id}/{timeline_id} has invalid disk_consistent_lsn"
     446                 :         );
     447 CBC         314 :         assert_eq!(
     448             314 :             disk_consistent_lsn,
     449             314 :             metadata.disk_consistent_lsn(),
     450 UBC           0 :             "these are used interchangeably"
     451                 :         );
     452                 : 
     453 CBC         314 :         if let Some(index_part) = index_part.as_ref() {
     454             314 :             timeline
     455             314 :                 .remote_client
     456             314 :                 .as_ref()
     457             314 :                 .unwrap()
     458             314 :                 .init_upload_queue(index_part)?;
     459 UBC           0 :         } else if self.remote_storage.is_some() {
     460                 :             // No data on the remote storage, but we have local metadata file. We can end up
     461                 :             // here with timeline_create being interrupted before finishing index part upload.
     462                 :             // By doing what we do here, the index part upload is retried.
     463                 :             // If control plane retries timeline creation in the meantime, the mgmt API handler
     464                 :             // for timeline creation will coalesce on the upload we queue here.
     465               0 :             let rtc = timeline.remote_client.as_ref().unwrap();
     466               0 :             rtc.init_upload_queue_for_empty_remote(&metadata)?;
     467               0 :             rtc.schedule_index_upload_for_metadata_update(&metadata)?;
     468               0 :         }
     469                 : 
     470 CBC         314 :         timeline
     471             314 :             .load_layer_map(disk_consistent_lsn, index_part)
     472             314 :             .await
     473             314 :             .with_context(|| {
     474 UBC           0 :                 format!("Failed to load layermap for timeline {tenant_id}/{timeline_id}")
     475 CBC         314 :             })?;
     476                 : 
     477                 :         {
     478                 :             // avoiding holding it across awaits
     479             314 :             let mut timelines_accessor = self.timelines.lock().unwrap();
     480             314 :             match timelines_accessor.entry(timeline_id) {
     481                 :                 Entry::Occupied(_) => {
     482                 :                     // The uninit mark file acts as a lock that prevents another task from
     483                 :                     // initializing the timeline at the same time.
     484 UBC           0 :                     unreachable!(
     485               0 :                         "Timeline {tenant_id}/{timeline_id} already exists in the tenant map"
     486               0 :                     );
     487                 :                 }
     488 CBC         314 :                 Entry::Vacant(v) => {
     489             314 :                     v.insert(Arc::clone(&timeline));
     490             314 :                     timeline.maybe_spawn_flush_loop();
     491             314 :                 }
     492             314 :             }
     493             314 :         };
     494             314 : 
     495             314 :         // Sanity check: a timeline should have some content.
     496             314 :         anyhow::ensure!(
     497             314 :             ancestor.is_some()
     498             267 :                 || timeline
     499             267 :                     .layers
     500             267 :                     .read()
     501 UBC           0 :                     .await
     502 CBC         267 :                     .layer_map()
     503             267 :                     .iter_historic_layers()
     504             267 :                     .next()
     505             267 :                     .is_some(),
     506 UBC           0 :             "Timeline has no ancestor and no layer files"
     507                 :         );
     508                 : 
     509 CBC         314 :         Ok(())
     510             314 :     }
     511                 : 
     512                 :     ///
     513                 :     /// Attach a tenant that's available in cloud storage.
     514                 :     ///
     515                 :     /// This returns quickly, after just creating the in-memory object
     516                 :     /// Tenant struct and launching a background task to download
     517                 :     /// the remote index files.  On return, the tenant is most likely still in
     518                 :     /// Attaching state, and it will become Active once the background task
     519                 :     /// finishes. You can use wait_until_active() to wait for the task to
     520                 :     /// complete.
     521                 :     ///
     522              48 :     pub(crate) fn spawn_attach(
     523              48 :         conf: &'static PageServerConf,
     524              48 :         tenant_id: TenantId,
     525              48 :         resources: TenantSharedResources,
     526              48 :         attached_conf: AttachedTenantConf,
     527              48 :         tenants: &'static tokio::sync::RwLock<TenantsMap>,
     528              48 :         expect_marker: AttachMarkerMode,
     529              48 :         ctx: &RequestContext,
     530              48 :     ) -> anyhow::Result<Arc<Tenant>> {
     531              48 :         // TODO dedup with spawn_load
     532              48 :         let wal_redo_manager = Arc::new(WalRedoManager::from(PostgresRedoManager::new(
     533              48 :             conf, tenant_id,
     534              48 :         )));
     535              48 : 
     536              48 :         let TenantSharedResources {
     537              48 :             broker_client,
     538              48 :             remote_storage,
     539              48 :             deletion_queue_client,
     540              48 :         } = resources;
     541              48 : 
     542              48 :         let tenant = Arc::new(Tenant::new(
     543              48 :             TenantState::Attaching,
     544              48 :             conf,
     545              48 :             attached_conf,
     546              48 :             wal_redo_manager,
     547              48 :             tenant_id,
     548              48 :             remote_storage.clone(),
     549              48 :             deletion_queue_client,
     550              48 :         ));
     551              48 : 
     552              48 :         // Do all the hard work in the background
     553              48 :         let tenant_clone = Arc::clone(&tenant);
     554              48 : 
     555              48 :         let ctx = ctx.detached_child(TaskKind::Attach, DownloadBehavior::Warn);
     556              48 :         task_mgr::spawn(
     557              48 :             &tokio::runtime::Handle::current(),
     558              48 :             TaskKind::Attach,
     559              48 :             Some(tenant_id),
     560              48 :             None,
     561              48 :             "attach tenant",
     562                 :             false,
     563              48 :             async move {
     564              48 :                 // Ideally we should use Tenant::set_broken_no_wait, but it is not supposed to be used when tenant is in loading state.
     565              48 :                 let make_broken = |t: &Tenant, err: anyhow::Error| {
     566               6 :                     error!("attach failed, setting tenant state to Broken: {err:?}");
     567               6 :                     t.state.send_modify(|state| {
     568               6 :                         assert_eq!(
     569                 :                             *state,
     570                 :                             TenantState::Attaching,
     571 UBC           0 :                             "the attach task owns the tenant state until activation is complete"
     572                 :                         );
     573 CBC           6 :                         *state = TenantState::broken_from_reason(err.to_string());
     574               6 :                     });
     575               6 :                 };
     576                 : 
     577              48 :                 let pending_deletion = {
     578              48 :                     match DeleteTenantFlow::should_resume_deletion(
     579              48 :                         conf,
     580              48 :                         remote_storage.as_ref(),
     581              48 :                         &tenant_clone,
     582              48 :                     )
     583             144 :                     .await
     584                 :                     {
     585              48 :                         Ok(should_resume_deletion) => should_resume_deletion,
     586 UBC           0 :                         Err(err) => {
     587               0 :                             make_broken(&tenant_clone, anyhow::anyhow!(err));
     588               0 :                             return Ok(());
     589                 :                         }
     590                 :                     }
     591                 :                 };
     592                 : 
     593 CBC          48 :                 info!("pending_deletion {}", pending_deletion.is_some());
     594                 : 
     595              48 :                 if let Some(deletion) = pending_deletion {
     596               3 :                     match DeleteTenantFlow::resume_from_attach(
     597               3 :                         deletion,
     598               3 :                         &tenant_clone,
     599               3 :                         tenants,
     600               3 :                         &ctx,
     601               3 :                     )
     602             189 :                     .await
     603                 :                     {
     604 UBC           0 :                         Err(err) => {
     605               0 :                             make_broken(&tenant_clone, anyhow::anyhow!(err));
     606               0 :                             return Ok(());
     607                 :                         }
     608 CBC           3 :                         Ok(()) => return Ok(()),
     609                 :                     }
     610              45 :                 }
     611              45 : 
     612             316 :                 match tenant_clone.attach(&ctx, expect_marker).await {
     613                 :                     Ok(()) => {
     614              39 :                         info!("attach finished, activating");
     615              39 :                         tenant_clone.activate(broker_client, None, &ctx);
     616                 :                     }
     617               6 :                     Err(e) => {
     618               6 :                         make_broken(&tenant_clone, anyhow::anyhow!(e));
     619               6 :                     }
     620                 :                 }
     621              45 :                 Ok(())
     622              48 :             }
     623              48 :             .instrument({
     624              48 :                 let span = tracing::info_span!(parent: None, "attach", tenant_id=%tenant_id);
     625              48 :                 span.follows_from(Span::current());
     626              48 :                 span
     627              48 :             }),
     628              48 :         );
     629              48 :         Ok(tenant)
     630              48 :     }
     631                 : 
     632                 :     ///
     633                 :     /// Background task that downloads all data for a tenant and brings it to Active state.
     634                 :     ///
     635                 :     /// No background tasks are started as part of this routine.
     636                 :     ///
     637              48 :     async fn attach(
     638              48 :         self: &Arc<Tenant>,
     639              48 :         ctx: &RequestContext,
     640              48 :         expect_marker: AttachMarkerMode,
     641              48 :     ) -> anyhow::Result<()> {
     642              48 :         span::debug_assert_current_span_has_tenant_id();
     643              48 : 
     644              48 :         let marker_file = self.conf.tenant_attaching_mark_file_path(&self.tenant_id);
     645              48 :         if let AttachMarkerMode::Expect = expect_marker {
     646              48 :             if !tokio::fs::try_exists(&marker_file)
     647              48 :                 .await
     648              48 :                 .context("check for existence of marker file")?
     649                 :             {
     650 UBC           0 :                 anyhow::bail!(
     651               0 :                     "implementation error: marker file should exist at beginning of this function"
     652               0 :                 );
     653 CBC          48 :             }
     654 UBC           0 :         }
     655                 : 
     656                 :         // Get list of remote timelines
     657                 :         // download index files for every tenant timeline
     658 CBC          48 :         info!("listing remote timelines");
     659                 : 
     660              48 :         let remote_storage = self
     661              48 :             .remote_storage
     662              48 :             .as_ref()
     663              48 :             .ok_or_else(|| anyhow::anyhow!("cannot attach without remote storage"))?;
     664                 : 
     665              42 :         let remote_timeline_ids =
     666             137 :             remote_timeline_client::list_remote_timelines(remote_storage, self.tenant_id).await?;
     667                 : 
     668              42 :         info!("found {} timelines", remote_timeline_ids.len());
     669                 : 
     670                 :         // Download & parse index parts
     671              42 :         let mut part_downloads = JoinSet::new();
     672             100 :         for timeline_id in remote_timeline_ids {
     673              58 :             let client = RemoteTimelineClient::new(
     674              58 :                 remote_storage.clone(),
     675              58 :                 self.deletion_queue_client.clone(),
     676              58 :                 self.conf,
     677              58 :                 self.tenant_id,
     678              58 :                 timeline_id,
     679              58 :                 self.generation,
     680              58 :             );
     681              58 :             part_downloads.spawn(
     682              58 :                 async move {
     683              58 :                     debug!("starting index part download");
     684                 : 
     685              58 :                     let index_part = client
     686              58 :                         .download_index_file()
     687             234 :                         .await
     688              58 :                         .context("download index file")?;
     689                 : 
     690              58 :                     debug!("finished index part download");
     691                 : 
     692              58 :                     Result::<_, anyhow::Error>::Ok((timeline_id, client, index_part))
     693              58 :                 }
     694              58 :                 .map(move |res| {
     695              58 :                     res.with_context(|| format!("download index part for timeline {timeline_id}"))
     696              58 :                 })
     697              58 :                 .instrument(info_span!("download_index_part", %timeline_id)),
     698                 :             );
     699                 :         }
     700                 : 
     701              42 :         let mut timelines_to_resume_deletions = vec![];
     702              42 : 
     703              42 :         // Wait for all the download tasks to complete & collect results.
     704              42 :         let mut remote_index_and_client = HashMap::new();
     705              42 :         let mut timeline_ancestors = HashMap::new();
     706             100 :         while let Some(result) = part_downloads.join_next().await {
     707                 :             // NB: we already added timeline_id as context to the error
     708              58 :             let result: Result<_, anyhow::Error> = result.context("joinset task join")?;
     709              58 :             let (timeline_id, client, index_part) = result?;
     710 UBC           0 :             debug!("successfully downloaded index part for timeline {timeline_id}");
     711 CBC          58 :             match index_part {
     712              52 :                 MaybeDeletedIndexPart::IndexPart(index_part) => {
     713              52 :                     timeline_ancestors.insert(timeline_id, index_part.metadata.clone());
     714              52 :                     remote_index_and_client.insert(timeline_id, (index_part, client));
     715              52 :                 }
     716               6 :                 MaybeDeletedIndexPart::Deleted(index_part) => {
     717               6 :                     info!(
     718               6 :                         "timeline {} is deleted, picking to resume deletion",
     719               6 :                         timeline_id
     720               6 :                     );
     721               6 :                     timelines_to_resume_deletions.push((timeline_id, index_part, client));
     722                 :                 }
     723                 :             }
     724                 :         }
     725                 : 
     726                 :         // For every timeline, download the metadata file, scan the local directory,
     727                 :         // and build a layer map that contains an entry for each remote and local
     728                 :         // layer file.
     729              52 :         let sorted_timelines = tree_sort_timelines(timeline_ancestors, |m| m.ancestor_timeline())?;
     730              94 :         for (timeline_id, remote_metadata) in sorted_timelines {
     731              52 :             let (index_part, remote_client) = remote_index_and_client
     732              52 :                 .remove(&timeline_id)
     733              52 :                 .expect("just put it in above");
     734              52 : 
     735              52 :             // TODO again handle early failure
     736              52 :             self.load_remote_timeline(
     737              52 :                 timeline_id,
     738              52 :                 index_part,
     739              52 :                 remote_metadata,
     740              52 :                 TimelineResources {
     741              52 :                     remote_client: Some(remote_client),
     742              52 :                     deletion_queue_client: self.deletion_queue_client.clone(),
     743              52 :                 },
     744              52 :                 ctx,
     745              52 :             )
     746             104 :             .await
     747              52 :             .with_context(|| {
     748 UBC           0 :                 format!(
     749               0 :                     "failed to load remote timeline {} for tenant {}",
     750               0 :                     timeline_id, self.tenant_id
     751               0 :                 )
     752 CBC          52 :             })?;
     753                 :         }
     754                 : 
     755                 :         // Walk through deleted timelines, resume deletion
     756              48 :         for (timeline_id, index_part, remote_timeline_client) in timelines_to_resume_deletions {
     757               6 :             remote_timeline_client
     758               6 :                 .init_upload_queue_stopped_to_continue_deletion(&index_part)
     759               6 :                 .context("init queue stopped")
     760               6 :                 .map_err(LoadLocalTimelineError::ResumeDeletion)?;
     761                 : 
     762               6 :             DeleteTimelineFlow::resume_deletion(
     763               6 :                 Arc::clone(self),
     764               6 :                 timeline_id,
     765               6 :                 &index_part.metadata,
     766               6 :                 Some(remote_timeline_client),
     767               6 :                 self.deletion_queue_client.clone(),
     768               6 :                 None,
     769               6 :             )
     770 UBC           0 :             .await
     771 CBC           6 :             .context("resume_deletion")
     772               6 :             .map_err(LoadLocalTimelineError::ResumeDeletion)?;
     773                 :         }
     774                 : 
     775              42 :         if let AttachMarkerMode::Expect = expect_marker {
     776              42 :             std::fs::remove_file(&marker_file)
     777              42 :                 .with_context(|| format!("unlink attach marker file {marker_file}"))?;
     778              42 :             crashsafe::fsync(marker_file.parent().expect("marker file has parent dir"))
     779              42 :                 .context("fsync tenant directory after unlinking attach marker file")?;
     780 UBC           0 :         }
     781                 : 
     782 CBC          21 :         crate::failpoint_support::sleep_millis_async!("attach-before-activate");
     783                 : 
     784              42 :         info!("Done");
     785                 : 
     786              42 :         Ok(())
     787              48 :     }
     788                 : 
     789                 :     /// Get sum of all remote timelines sizes
     790                 :     ///
     791                 :     /// This function relies on the index_part instead of listing the remote storage
     792              15 :     pub fn remote_size(&self) -> u64 {
     793              15 :         let mut size = 0;
     794                 : 
     795              15 :         for timeline in self.list_timelines() {
     796              12 :             if let Some(remote_client) = &timeline.remote_client {
     797              12 :                 size += remote_client.get_remote_physical_size();
     798              12 :             }
     799                 :         }
     800                 : 
     801              15 :         size
     802              15 :     }
     803                 : 
     804             156 :     #[instrument(skip_all, fields(timeline_id=%timeline_id))]
     805                 :     async fn load_remote_timeline(
     806                 :         &self,
     807                 :         timeline_id: TimelineId,
     808                 :         index_part: IndexPart,
     809                 :         remote_metadata: TimelineMetadata,
     810                 :         resources: TimelineResources,
     811                 :         ctx: &RequestContext,
     812                 :     ) -> anyhow::Result<()> {
     813                 :         span::debug_assert_current_span_has_tenant_id();
     814                 : 
     815              52 :         info!("downloading index file for timeline {}", timeline_id);
     816                 :         tokio::fs::create_dir_all(self.conf.timeline_path(&self.tenant_id, &timeline_id))
     817                 :             .await
     818                 :             .context("Failed to create new timeline directory")?;
     819                 : 
     820                 :         let ancestor = if let Some(ancestor_id) = remote_metadata.ancestor_timeline() {
     821                 :             let timelines = self.timelines.lock().unwrap();
     822                 :             Some(Arc::clone(timelines.get(&ancestor_id).ok_or_else(
     823 UBC           0 :                 || {
     824               0 :                     anyhow::anyhow!(
     825               0 :                         "cannot find ancestor timeline {ancestor_id} for timeline {timeline_id}"
     826               0 :                     )
     827               0 :                 },
     828                 :             )?))
     829                 :         } else {
     830                 :             None
     831                 :         };
     832                 : 
     833                 :         // we can load remote timelines during init, but they are assumed to be so rare that
     834                 :         // initialization order is not passed to here.
     835                 :         let init_order = None;
     836                 : 
     837                 :         // timeline loading after attach expects to find metadata file for each metadata
     838                 :         save_metadata(self.conf, &self.tenant_id, &timeline_id, &remote_metadata)
     839                 :             .await
     840                 :             .context("save_metadata")
     841                 :             .map_err(LoadLocalTimelineError::Load)?;
     842                 : 
     843                 :         self.timeline_init_and_sync(
     844                 :             timeline_id,
     845                 :             resources,
     846                 :             Some(index_part),
     847                 :             remote_metadata,
     848                 :             ancestor,
     849                 :             init_order,
     850                 :             ctx,
     851                 :         )
     852                 :         .await
     853                 :     }
     854                 : 
     855                 :     /// Create a placeholder Tenant object for a broken tenant
     856               0 :     pub fn create_broken_tenant(
     857               0 :         conf: &'static PageServerConf,
     858               0 :         tenant_id: TenantId,
     859               0 :         reason: String,
     860               0 :     ) -> Arc<Tenant> {
     861               0 :         let wal_redo_manager = Arc::new(WalRedoManager::from(PostgresRedoManager::new(
     862               0 :             conf, tenant_id,
     863               0 :         )));
     864               0 :         Arc::new(Tenant::new(
     865               0 :             TenantState::Broken {
     866               0 :                 reason,
     867               0 :                 backtrace: String::new(),
     868               0 :             },
     869               0 :             conf,
     870               0 :             AttachedTenantConf::try_from(LocationConf::default()).unwrap(),
     871               0 :             wal_redo_manager,
     872               0 :             tenant_id,
     873               0 :             None,
     874               0 :             DeletionQueueClient::broken(),
     875               0 :         ))
     876               0 :     }
     877                 : 
     878                 :     /// Load a tenant that's available on local disk
     879                 :     ///
     880                 :     /// This is used at pageserver startup, to rebuild the in-memory
     881                 :     /// structures from on-disk state. This is similar to attaching a tenant,
     882                 :     /// but the index files already exist on local disk, as well as some layer
     883                 :     /// files.
     884                 :     ///
     885                 :     /// If the loading fails for some reason, the Tenant will go into Broken
     886                 :     /// state.
     887 CBC         658 :     #[instrument(skip_all, fields(tenant_id=%tenant_id))]
     888                 :     pub(crate) fn spawn_load(
     889                 :         conf: &'static PageServerConf,
     890                 :         tenant_id: TenantId,
     891                 :         attached_conf: AttachedTenantConf,
     892                 :         resources: TenantSharedResources,
     893                 :         init_order: Option<InitializationOrder>,
     894                 :         tenants: &'static tokio::sync::RwLock<TenantsMap>,
     895                 :         ctx: &RequestContext,
     896                 :     ) -> Arc<Tenant> {
     897                 :         span::debug_assert_current_span_has_tenant_id();
     898                 : 
     899                 :         let broker_client = resources.broker_client;
     900                 :         let remote_storage = resources.remote_storage;
     901                 : 
     902                 :         let wal_redo_manager = Arc::new(WalRedoManager::from(PostgresRedoManager::new(
     903                 :             conf, tenant_id,
     904                 :         )));
     905                 :         let tenant = Tenant::new(
     906                 :             TenantState::Loading,
     907                 :             conf,
     908                 :             attached_conf,
     909                 :             wal_redo_manager,
     910                 :             tenant_id,
     911                 :             remote_storage.clone(),
     912                 :             resources.deletion_queue_client.clone(),
     913                 :         );
     914                 :         let tenant = Arc::new(tenant);
     915                 : 
     916                 :         // Do all the hard work in a background task
     917                 :         let tenant_clone = Arc::clone(&tenant);
     918                 : 
     919                 :         let ctx = ctx.detached_child(TaskKind::InitialLoad, DownloadBehavior::Warn);
     920                 :         let _ = task_mgr::spawn(
     921                 :             &tokio::runtime::Handle::current(),
     922                 :             TaskKind::InitialLoad,
     923                 :             Some(tenant_id),
     924                 :             None,
     925                 :             "initial tenant load",
     926                 :             false,
     927             658 :             async move {
     928             658 :                 // Ideally we should use Tenant::set_broken_no_wait, but it is not supposed to be used when tenant is in loading state.
     929             658 :                 let make_broken = |t: &Tenant, err: anyhow::Error| {
     930               5 :                     error!("load failed, setting tenant state to Broken: {err:?}");
     931               5 :                     t.state.send_modify(|state| {
     932               5 :                         assert!(
     933               5 :                             matches!(*state, TenantState::Loading | TenantState::Stopping { .. }),
     934 UBC           0 :                             "the loading task owns the tenant state until activation is complete"
     935                 :                         );
     936 CBC           5 :                         *state = TenantState::broken_from_reason(err.to_string());
     937               5 :                     });
     938               5 :                 };
     939                 : 
     940             658 :                 let mut init_order = init_order;
     941             658 : 
     942             658 :                 // take the completion because initial tenant loading will complete when all of
     943             658 :                 // these tasks complete.
     944             658 :                 let _completion = init_order
     945             658 :                     .as_mut()
     946             658 :                     .and_then(|x| x.initial_tenant_load.take());
     947             658 :                 let remote_load_completion = init_order
     948             658 :                     .as_mut()
     949             658 :                     .and_then(|x| x.initial_tenant_load_remote.take());
     950                 : 
     951                 :                 // Dont block pageserver startup on figuring out deletion status
     952             658 :                 let pending_deletion = {
     953             658 :                     match DeleteTenantFlow::should_resume_deletion(
     954             658 :                         conf,
     955             658 :                         remote_storage.as_ref(),
     956             658 :                         &tenant_clone,
     957             658 :                     )
     958            1572 :                     .await
     959                 :                     {
     960             658 :                         Ok(should_resume_deletion) => should_resume_deletion,
     961 UBC           0 :                         Err(err) => {
     962               0 :                             make_broken(&tenant_clone, anyhow::anyhow!(err));
     963               0 :                             return Ok(());
     964                 :                         }
     965                 :                     }
     966                 :                 };
     967                 : 
     968 CBC         658 :                 info!("pending deletion {}", pending_deletion.is_some());
     969                 : 
     970             658 :                 if let Some(deletion) = pending_deletion {
     971                 :                     // as we are no longer loading, signal completion by dropping
     972                 :                     // the completion while we resume deletion
     973              20 :                     drop(_completion);
     974              20 :                     drop(remote_load_completion);
     975              20 :                     // do not hold to initial_logical_size_attempt as it will prevent loading from proceeding without timeout
     976              20 :                     let _ = init_order
     977              20 :                         .as_mut()
     978              20 :                         .and_then(|x| x.initial_logical_size_attempt.take());
     979              20 : 
     980              20 :                     match DeleteTenantFlow::resume_from_load(
     981              20 :                         deletion,
     982              20 :                         &tenant_clone,
     983              20 :                         init_order.as_ref(),
     984              20 :                         tenants,
     985              20 :                         &ctx,
     986              20 :                     )
     987            1136 :                     .await
     988                 :                     {
     989 UBC           0 :                         Err(err) => {
     990               0 :                             make_broken(&tenant_clone, anyhow::anyhow!(err));
     991               0 :                             return Ok(());
     992                 :                         }
     993 CBC          20 :                         Ok(()) => return Ok(()),
     994                 :                     }
     995             638 :                 }
     996             638 : 
     997             638 :                 let background_jobs_can_start =
     998             638 :                     init_order.as_ref().map(|x| &x.background_jobs_can_start);
     999             638 : 
    1000             638 :                 match tenant_clone
    1001             638 :                     .load(init_order.as_ref(), remote_load_completion, &ctx)
    1002            1143 :                     .await
    1003                 :                 {
    1004                 :                     Ok(()) => {
    1005             633 :                         debug!("load finished");
    1006                 : 
    1007             633 :                         tenant_clone.activate(broker_client, background_jobs_can_start, &ctx);
    1008                 :                     }
    1009               5 :                     Err(err) => make_broken(&tenant_clone, err),
    1010                 :                 }
    1011                 : 
    1012             638 :                 Ok(())
    1013             658 :             }
    1014                 :             .instrument({
    1015                 :                 let span = tracing::info_span!(parent: None, "load", tenant_id=%tenant_id);
    1016                 :                 span.follows_from(Span::current());
    1017                 :                 span
    1018                 :             }),
    1019                 :         );
    1020                 : 
    1021                 :         tenant
    1022                 :     }
    1023                 : 
    1024             698 :     fn scan_and_sort_timelines_dir(self: Arc<Tenant>) -> anyhow::Result<TenantDirectoryScan> {
    1025             698 :         let mut timelines_to_load: HashMap<TimelineId, TimelineMetadata> = HashMap::new();
    1026             698 :         // Note timelines_to_resume_deletion needs to be separate because it can be not sortable
    1027             698 :         // from the point of `tree_sort_timelines`. I e some parents can be missing because deletion
    1028             698 :         // completed in non topological order (for example because parent has smaller number of layer files in it)
    1029             698 :         let mut timelines_to_resume_deletion: Vec<(TimelineId, Option<TimelineMetadata>)> = vec![];
    1030             698 : 
    1031             698 :         let timelines_dir = self.conf.timelines_path(&self.tenant_id);
    1032                 : 
    1033             698 :         for entry in timelines_dir
    1034             698 :             .read_dir_utf8()
    1035             698 :             .context("list timelines directory for tenant")?
    1036                 :         {
    1037             312 :             let entry = entry.context("read timeline dir entry")?;
    1038             312 :             let timeline_dir = entry.path();
    1039             312 : 
    1040             312 :             if crate::is_temporary(timeline_dir) {
    1041 UBC           0 :                 info!("Found temporary timeline directory, removing: {timeline_dir}");
    1042               0 :                 if let Err(e) = std::fs::remove_dir_all(timeline_dir) {
    1043               0 :                     error!("Failed to remove temporary directory '{timeline_dir}': {e:?}");
    1044               0 :                 }
    1045 CBC         312 :             } else if is_uninit_mark(timeline_dir) {
    1046               1 :                 if !timeline_dir.exists() {
    1047 UBC           0 :                     warn!("Timeline dir entry become invalid: {timeline_dir}");
    1048               0 :                     continue;
    1049 CBC           1 :                 }
    1050               1 : 
    1051               1 :                 let timeline_uninit_mark_file = &timeline_dir;
    1052               1 :                 info!(
    1053               1 :                     "Found an uninit mark file {timeline_uninit_mark_file}, removing the timeline and its uninit mark",
    1054               1 :                 );
    1055               1 :                 let timeline_id =
    1056               1 :                     TimelineId::try_from(timeline_uninit_mark_file.file_stem())
    1057               1 :                         .with_context(|| {
    1058 UBC           0 :                             format!(
    1059               0 :                             "Could not parse timeline id out of the timeline uninit mark name {timeline_uninit_mark_file}",
    1060               0 :                         )
    1061 CBC           1 :                         })?;
    1062               1 :                 let timeline_dir = self.conf.timeline_path(&self.tenant_id, &timeline_id);
    1063 UBC           0 :                 if let Err(e) =
    1064 CBC           1 :                     remove_timeline_and_uninit_mark(&timeline_dir, timeline_uninit_mark_file)
    1065                 :                 {
    1066 UBC           0 :                     error!("Failed to clean up uninit marked timeline: {e:?}");
    1067 CBC           1 :                 }
    1068             311 :             } else if crate::is_delete_mark(timeline_dir) {
    1069                 :                 // If metadata exists, load as usual, continue deletion
    1070              23 :                 let timeline_id = TimelineId::try_from(timeline_dir.file_stem())
    1071              23 :                     .with_context(|| {
    1072 UBC           0 :                         format!(
    1073               0 :                             "Could not parse timeline id out of the timeline uninit mark name {timeline_dir}",
    1074               0 :                         )
    1075 CBC          23 :                     })?;
    1076                 : 
    1077              23 :                 info!("Found deletion mark for timeline {}", timeline_id);
    1078                 : 
    1079              23 :                 match load_metadata(self.conf, &self.tenant_id, &timeline_id) {
    1080              17 :                     Ok(metadata) => {
    1081              17 :                         timelines_to_resume_deletion.push((timeline_id, Some(metadata)))
    1082                 :                     }
    1083               6 :                     Err(e) => match &e {
    1084               6 :                         LoadMetadataError::Read(r) => {
    1085               6 :                             if r.kind() != io::ErrorKind::NotFound {
    1086 UBC           0 :                                 return Err(anyhow::anyhow!(e)).with_context(|| {
    1087               0 :                                     format!("Failed to load metadata for timeline_id {timeline_id}")
    1088               0 :                                 });
    1089 CBC           6 :                             }
    1090               6 : 
    1091               6 :                             // If metadata doesnt exist it means that we've crashed without
    1092               6 :                             // completing cleanup_remaining_timeline_fs_traces in DeleteTimelineFlow.
    1093               6 :                             // So save timeline_id for later call to `DeleteTimelineFlow::cleanup_remaining_timeline_fs_traces`.
    1094               6 :                             // We cant do it here because the method is async so we'd need block_on
    1095               6 :                             // and here we're in spawn_blocking. cleanup_remaining_timeline_fs_traces uses fs operations
    1096               6 :                             // so that basically results in a cycle:
    1097               6 :                             // spawn_blocking
    1098               6 :                             // - block_on
    1099               6 :                             //   - spawn_blocking
    1100               6 :                             // which can lead to running out of threads in blocing pool.
    1101               6 :                             timelines_to_resume_deletion.push((timeline_id, None));
    1102                 :                         }
    1103                 :                         _ => {
    1104 UBC           0 :                             return Err(anyhow::anyhow!(e)).with_context(|| {
    1105               0 :                                 format!("Failed to load metadata for timeline_id {timeline_id}")
    1106               0 :                             })
    1107                 :                         }
    1108                 :                     },
    1109                 :                 }
    1110                 :             } else {
    1111 CBC         288 :                 if !timeline_dir.exists() {
    1112               1 :                     warn!("Timeline dir entry become invalid: {timeline_dir}");
    1113               1 :                     continue;
    1114             287 :                 }
    1115             287 :                 let timeline_id = TimelineId::try_from(timeline_dir.file_name())
    1116             287 :                     .with_context(|| {
    1117 UBC           0 :                         format!(
    1118               0 :                             "Could not parse timeline id out of the timeline dir name {timeline_dir}",
    1119               0 :                         )
    1120 CBC         287 :                     })?;
    1121             287 :                 let timeline_uninit_mark_file = self
    1122             287 :                     .conf
    1123             287 :                     .timeline_uninit_mark_file_path(self.tenant_id, timeline_id);
    1124             287 :                 if timeline_uninit_mark_file.exists() {
    1125 UBC           0 :                     info!(
    1126               0 :                         %timeline_id,
    1127               0 :                         "Found an uninit mark file, removing the timeline and its uninit mark",
    1128               0 :                     );
    1129               0 :                     if let Err(e) =
    1130               0 :                         remove_timeline_and_uninit_mark(timeline_dir, &timeline_uninit_mark_file)
    1131                 :                     {
    1132               0 :                         error!("Failed to clean up uninit marked timeline: {e:?}");
    1133               0 :                     }
    1134               0 :                     continue;
    1135 CBC         287 :                 }
    1136             287 : 
    1137             287 :                 let timeline_delete_mark_file = self
    1138             287 :                     .conf
    1139             287 :                     .timeline_delete_mark_file_path(self.tenant_id, timeline_id);
    1140             287 :                 if timeline_delete_mark_file.exists() {
    1141                 :                     // Cleanup should be done in `is_delete_mark` branch above
    1142              19 :                     continue;
    1143             268 :                 }
    1144             268 : 
    1145             268 :                 let file_name = entry.file_name();
    1146             268 :                 if let Ok(timeline_id) = file_name.parse::<TimelineId>() {
    1147             268 :                     let metadata = load_metadata(self.conf, &self.tenant_id, &timeline_id)
    1148             268 :                         .context("failed to load metadata")?;
    1149             265 :                     timelines_to_load.insert(timeline_id, metadata);
    1150                 :                 } else {
    1151                 :                     // A file or directory that doesn't look like a timeline ID
    1152 UBC           0 :                     warn!("unexpected file or directory in timelines directory: {file_name}");
    1153                 :                 }
    1154                 :             }
    1155                 :         }
    1156                 : 
    1157                 :         // Sort the array of timeline IDs into tree-order, so that parent comes before
    1158                 :         // all its children.
    1159 CBC         692 :         tree_sort_timelines(timelines_to_load, |m| m.ancestor_timeline()).map(|sorted_timelines| {
    1160             692 :             TenantDirectoryScan {
    1161             692 :                 sorted_timelines_to_load: sorted_timelines,
    1162             692 :                 timelines_to_resume_deletion,
    1163             692 :             }
    1164             692 :         })
    1165             698 :     }
    1166                 : 
    1167             692 :     async fn load_timeline_metadata(
    1168             692 :         self: &Arc<Tenant>,
    1169             692 :         timeline_ids: HashSet<TimelineId>,
    1170             692 :         remote_storage: &GenericRemoteStorage,
    1171             692 :     ) -> anyhow::Result<HashMap<TimelineId, TimelinePreload>> {
    1172             692 :         let mut part_downloads = JoinSet::new();
    1173             980 :         for timeline_id in timeline_ids {
    1174             288 :             let client = RemoteTimelineClient::new(
    1175             288 :                 remote_storage.clone(),
    1176             288 :                 self.deletion_queue_client.clone(),
    1177             288 :                 self.conf,
    1178             288 :                 self.tenant_id,
    1179             288 :                 timeline_id,
    1180             288 :                 self.generation,
    1181             288 :             );
    1182             288 :             part_downloads.spawn(
    1183             288 :                 async move {
    1184             288 :                     debug!("starting index part download");
    1185                 : 
    1186            1114 :                     let index_part = client.download_index_file().await;
    1187                 : 
    1188             288 :                     debug!("finished index part download");
    1189                 : 
    1190             288 :                     Result::<_, anyhow::Error>::Ok(TimelinePreload {
    1191             288 :                         client,
    1192             288 :                         timeline_id,
    1193             288 :                         index_part,
    1194             288 :                     })
    1195             288 :                 }
    1196             288 :                 .map(move |res| {
    1197             288 :                     res.with_context(|| format!("download index part for timeline {timeline_id}"))
    1198             288 :                 })
    1199             288 :                 .instrument(info_span!("download_index_part", %timeline_id)),
    1200                 :             );
    1201                 :         }
    1202                 : 
    1203             692 :         let mut timeline_preloads: HashMap<TimelineId, TimelinePreload> = HashMap::new();
    1204             980 :         while let Some(result) = part_downloads.join_next().await {
    1205             288 :             let preload_result = result.context("join preload task")?;
    1206             288 :             let preload = preload_result?;
    1207             288 :             timeline_preloads.insert(preload.timeline_id, preload);
    1208                 :         }
    1209                 : 
    1210             692 :         Ok(timeline_preloads)
    1211             692 :     }
    1212                 : 
    1213                 :     ///
    1214                 :     /// Background task to load in-memory data structures for this tenant, from
    1215                 :     /// files on disk. Used at pageserver startup.
    1216                 :     ///
    1217                 :     /// No background tasks are started as part of this routine.
    1218             698 :     async fn load(
    1219             698 :         self: &Arc<Tenant>,
    1220             698 :         init_order: Option<&InitializationOrder>,
    1221             698 :         remote_completion: Option<Completion>,
    1222             698 :         ctx: &RequestContext,
    1223             698 :     ) -> anyhow::Result<()> {
    1224             698 :         span::debug_assert_current_span_has_tenant_id();
    1225                 : 
    1226 UBC           0 :         debug!("loading tenant task");
    1227                 : 
    1228                 :         // Load in-memory state to reflect the local files on disk
    1229                 :         //
    1230                 :         // Scan the directory, peek into the metadata file of each timeline, and
    1231                 :         // collect a list of timelines and their ancestors.
    1232 CBC         698 :         let span = info_span!("blocking");
    1233             698 :         let cloned = Arc::clone(self);
    1234                 : 
    1235             698 :         let scan = tokio::task::spawn_blocking(move || {
    1236             698 :             let _g = span.entered();
    1237             698 :             cloned.scan_and_sort_timelines_dir()
    1238             698 :         })
    1239             698 :         .await
    1240             698 :         .context("load spawn_blocking")
    1241             698 :         .and_then(|res| res)?;
    1242                 : 
    1243                 :         // FIXME original collect_timeline_files contained one more check:
    1244                 :         //    1. "Timeline has no ancestor and no layer files"
    1245                 : 
    1246                 :         // Load remote content for timelines in this tenant
    1247             692 :         let all_timeline_ids = scan
    1248             692 :             .sorted_timelines_to_load
    1249             692 :             .iter()
    1250             692 :             .map(|i| i.0)
    1251             692 :             .chain(scan.timelines_to_resume_deletion.iter().map(|i| i.0))
    1252             692 :             .collect();
    1253             692 :         let mut preload = if let Some(remote_storage) = &self.remote_storage {
    1254                 :             Some(
    1255             692 :                 self.load_timeline_metadata(all_timeline_ids, remote_storage)
    1256             256 :                     .await?,
    1257                 :             )
    1258                 :         } else {
    1259 UBC           0 :             None
    1260                 :         };
    1261                 : 
    1262 CBC         692 :         drop(remote_completion);
    1263                 : 
    1264               6 :         crate::failpoint_support::sleep_millis_async!("before-loading-tenant");
    1265                 : 
    1266                 :         // Process loadable timelines first
    1267             957 :         for (timeline_id, local_metadata) in scan.sorted_timelines_to_load {
    1268             265 :             let timeline_preload = preload.as_mut().map(|p| p.remove(&timeline_id).unwrap());
    1269             265 :             if let Err(e) = self
    1270             265 :                 .load_local_timeline(
    1271             265 :                     timeline_id,
    1272             265 :                     local_metadata,
    1273             265 :                     timeline_preload,
    1274             265 :                     init_order,
    1275             265 :                     ctx,
    1276             265 :                     false,
    1277             265 :                 )
    1278             265 :                 .await
    1279                 :             {
    1280 UBC           0 :                 match e {
    1281               0 :                     LoadLocalTimelineError::Load(source) => {
    1282               0 :                         return Err(anyhow::anyhow!(source)).with_context(|| {
    1283               0 :                             format!("Failed to load local timeline: {timeline_id}")
    1284               0 :                         })
    1285                 :                     }
    1286               0 :                     LoadLocalTimelineError::ResumeDeletion(source) => {
    1287                 :                         // Make sure resumed deletion wont fail loading for entire tenant.
    1288               0 :                         error!("Failed to resume timeline deletion: {source:#}")
    1289                 :                     }
    1290                 :                 }
    1291 CBC         265 :             }
    1292                 :         }
    1293                 : 
    1294                 :         // Resume deletion ones with deleted_mark
    1295             715 :         for (timeline_id, maybe_local_metadata) in scan.timelines_to_resume_deletion {
    1296              23 :             match maybe_local_metadata {
    1297                 :                 None => {
    1298                 :                     // See comment in `scan_and_sort_timelines_dir`.
    1299 UBC           0 :                     if let Err(e) =
    1300 CBC           6 :                         DeleteTimelineFlow::cleanup_remaining_timeline_fs_traces(self, timeline_id)
    1301              30 :                             .await
    1302                 :                     {
    1303 UBC           0 :                         warn!(
    1304               0 :                             "cannot clean up deleted timeline dir timeline_id: {} error: {:#}",
    1305               0 :                             timeline_id, e
    1306               0 :                         );
    1307 CBC           6 :                     }
    1308                 :                 }
    1309              17 :                 Some(local_metadata) => {
    1310              17 :                     let timeline_preload =
    1311              17 :                         preload.as_mut().map(|p| p.remove(&timeline_id).unwrap());
    1312              17 :                     if let Err(e) = self
    1313              17 :                         .load_local_timeline(
    1314              17 :                             timeline_id,
    1315              17 :                             local_metadata,
    1316              17 :                             timeline_preload,
    1317              17 :                             init_order,
    1318              17 :                             ctx,
    1319              17 :                             true,
    1320              17 :                         )
    1321              10 :                         .await
    1322                 :                     {
    1323 UBC           0 :                         match e {
    1324               0 :                             LoadLocalTimelineError::Load(source) => {
    1325               0 :                                 // We tried to load deleted timeline, this is a bug.
    1326               0 :                                 return Err(anyhow::anyhow!(source).context(
    1327               0 :                                     format!("This is a bug. We tried to load deleted timeline which is wrong and loading failed. Timeline: {timeline_id}")
    1328               0 :                                 ));
    1329                 :                             }
    1330               0 :                             LoadLocalTimelineError::ResumeDeletion(source) => {
    1331                 :                                 // Make sure resumed deletion wont fail loading for entire tenant.
    1332               0 :                                 error!("Failed to resume timeline deletion: {source:#}")
    1333                 :                             }
    1334                 :                         }
    1335 CBC          17 :                     }
    1336                 :                 }
    1337                 :             }
    1338                 :         }
    1339                 : 
    1340 UBC           0 :         trace!("Done");
    1341                 : 
    1342 CBC         692 :         Ok(())
    1343             698 :     }
    1344                 : 
    1345                 :     /// Subroutine of `load_tenant`, to load an individual timeline
    1346                 :     ///
    1347                 :     /// NB: The parent is assumed to be already loaded!
    1348             579 :     #[instrument(skip(self, local_metadata, init_order, preload, ctx))]
    1349                 :     async fn load_local_timeline(
    1350                 :         self: &Arc<Self>,
    1351                 :         timeline_id: TimelineId,
    1352                 :         local_metadata: TimelineMetadata,
    1353                 :         preload: Option<TimelinePreload>,
    1354                 :         init_order: Option<&InitializationOrder>,
    1355                 :         ctx: &RequestContext,
    1356                 :         found_delete_mark: bool,
    1357                 :     ) -> Result<(), LoadLocalTimelineError> {
    1358                 :         span::debug_assert_current_span_has_tenant_id();
    1359                 : 
    1360                 :         let mut resources = self.build_timeline_resources(timeline_id);
    1361                 : 
    1362                 :         struct RemoteStartupData {
    1363                 :             index_part: IndexPart,
    1364                 :             remote_metadata: TimelineMetadata,
    1365                 :         }
    1366                 : 
    1367                 :         let (remote_startup_data, remote_client) = match preload {
    1368                 :             Some(preload) => {
    1369                 :                 let TimelinePreload {
    1370                 :                     index_part,
    1371                 :                     client: remote_client,
    1372                 :                     timeline_id: _timeline_id,
    1373                 :                 } = preload;
    1374                 :                 match index_part {
    1375                 :                     Ok(index_part) => {
    1376                 :                         let index_part = match index_part {
    1377                 :                             MaybeDeletedIndexPart::IndexPart(index_part) => index_part,
    1378                 :                             MaybeDeletedIndexPart::Deleted(index_part) => {
    1379                 :                                 // TODO: we won't reach here if remote storage gets de-configured after start of the deletion operation.
    1380                 :                                 // Example:
    1381                 :                                 //  start deletion operation
    1382                 :                                 //  finishes upload of index part
    1383                 :                                 //  pageserver crashes
    1384                 :                                 //  remote storage gets de-configured
    1385                 :                                 //  pageserver starts
    1386                 :                                 //
    1387                 :                                 // We don't really anticipate remote storage to be de-configured, so, for now, this is fine.
    1388                 :                                 // Also, maybe we'll remove that option entirely in the future, see https://github.com/neondatabase/neon/issues/4099.
    1389              15 :                                 info!("is_deleted is set on remote, resuming removal of timeline data originally done by timeline deletion handler");
    1390                 : 
    1391                 :                                 remote_client
    1392                 :                                     .init_upload_queue_stopped_to_continue_deletion(&index_part)
    1393                 :                                     .context("init queue stopped")
    1394                 :                                     .map_err(LoadLocalTimelineError::ResumeDeletion)?;
    1395                 : 
    1396                 :                                 DeleteTimelineFlow::resume_deletion(
    1397                 :                                     Arc::clone(self),
    1398                 :                                     timeline_id,
    1399                 :                                     &local_metadata,
    1400                 :                                     Some(remote_client),
    1401                 :                                     self.deletion_queue_client.clone(),
    1402                 :                                     init_order,
    1403                 :                                 )
    1404                 :                                 .await
    1405                 :                                 .context("resume deletion")
    1406                 :                                 .map_err(LoadLocalTimelineError::ResumeDeletion)?;
    1407                 : 
    1408                 :                                 return Ok(());
    1409                 :                             }
    1410                 :                         };
    1411                 : 
    1412                 :                         let remote_metadata = index_part.metadata.clone();
    1413                 :                         (
    1414                 :                             Some(RemoteStartupData {
    1415                 :                                 index_part,
    1416                 :                                 remote_metadata,
    1417                 :                             }),
    1418                 :                             Some(remote_client),
    1419                 :                         )
    1420                 :                     }
    1421                 :                     Err(DownloadError::NotFound) => {
    1422               5 :                         info!(found_delete_mark, "no index file was found on the remote, resuming deletion or cleaning unuploaded up");
    1423                 : 
    1424                 :                         if found_delete_mark {
    1425                 :                             // We could've resumed at a point where remote index was deleted, but metadata file wasnt.
    1426                 :                             // Cleanup:
    1427                 :                             return DeleteTimelineFlow::cleanup_remaining_timeline_fs_traces(
    1428                 :                                 self,
    1429                 :                                 timeline_id,
    1430                 :                             )
    1431                 :                             .await
    1432                 :                             .context("cleanup_remaining_timeline_fs_traces")
    1433                 :                             .map_err(LoadLocalTimelineError::ResumeDeletion);
    1434                 :                         }
    1435                 : 
    1436                 :                         // as the remote index_part.json did not exist, this timeline is a
    1437                 :                         // not-yet-uploaded one. it should be deleted now, because the branching might
    1438                 :                         // not have been valid as it's ancestor may have been restored to earlier state
    1439                 :                         // as well. in practice, control plane will keep retrying.
    1440                 :                         //
    1441                 :                         // first ensure that the un-uploaded timeline looks like it should, as in we
    1442                 :                         // are not accidentially deleting a timeline which was ever active:
    1443                 :                         // - root timelines have metadata and one possibly partial layer
    1444                 :                         // - branched timelines have metadata
    1445                 :                         //
    1446                 :                         // if the timeline does not look like expected, fail loading of the tenant.
    1447                 :                         // cleaning the timeline up manually and reloading the tenant is possible via
    1448                 :                         // the above log message.
    1449                 :                         let path = self.conf.timeline_path(&self.tenant_id, &timeline_id);
    1450                 : 
    1451                 :                         let span = tracing::Span::current();
    1452                 : 
    1453                 :                         return tokio::task::spawn_blocking({
    1454               3 :                         move || {
    1455               3 :                             use std::str::FromStr;
    1456               3 :                             use crate::tenant::storage_layer::LayerFileName;
    1457               3 : 
    1458               3 :                             let _e = span.entered();
    1459               3 :                             let mut metadata = false;
    1460               3 :                             let mut layers = 0;
    1461               3 :                             let mut others = 0;
    1462               4 :                             for dentry in path.read_dir_utf8()? {
    1463               4 :                                 let dentry = dentry?;
    1464               4 :                                 let file_name = dentry.file_name();
    1465               4 : 
    1466               4 :                                 if file_name == METADATA_FILE_NAME {
    1467               3 :                                     metadata = true;
    1468               3 :                                     continue;
    1469               1 :                                 }
    1470               1 : 
    1471               1 :                                 if LayerFileName::from_str(file_name).is_ok()
    1472                 :                                 {
    1473               1 :                                     layers += 1;
    1474               1 :                                     continue;
    1475 UBC           0 :                                 }
    1476               0 : 
    1477               0 :                                 others += 1;
    1478                 :                             }
    1479                 : 
    1480                 :                             // bootstrapped have the one image layer file, or one partial temp
    1481                 :                             // file, branched have just the metadata
    1482 CBC           3 :                             if !(metadata && layers + others <= 1) {
    1483 UBC           0 :                                 anyhow::bail!("unexpected assumed unuploaded, never been active timeline: found metadata={}, layers={}, others={}", metadata, layers, others);
    1484 CBC           3 :                             }
    1485               3 : 
    1486               3 :                             let tmp_path =
    1487               3 :                                 path.with_file_name(format!("{timeline_id}{}", TEMP_FILE_SUFFIX));
    1488               3 :                             std::fs::rename(path, &tmp_path)?;
    1489               3 :                             std::fs::remove_dir_all(&tmp_path)?;
    1490               3 :                             Ok(())
    1491               3 :                         }
    1492                 :                     })
    1493                 :                     .await
    1494                 :                     .map_err(anyhow::Error::new)
    1495               3 :                     .and_then(|x| x)
    1496                 :                     .context("delete assumed unuploaded fresh timeline")
    1497                 :                     .map_err(LoadLocalTimelineError::Load);
    1498                 :                     }
    1499                 :                     Err(e) => return Err(LoadLocalTimelineError::Load(anyhow::Error::new(e))),
    1500                 :                 }
    1501                 :             }
    1502                 :             None => {
    1503                 :                 if found_delete_mark {
    1504                 :                     // There is no remote client, we found local metadata.
    1505                 :                     // Continue cleaning up local disk.
    1506                 :                     DeleteTimelineFlow::resume_deletion(
    1507                 :                         Arc::clone(self),
    1508                 :                         timeline_id,
    1509                 :                         &local_metadata,
    1510                 :                         None,
    1511                 :                         self.deletion_queue_client.clone(),
    1512                 :                         init_order,
    1513                 :                     )
    1514                 :                     .await
    1515                 :                     .context("resume deletion")
    1516                 :                     .map_err(LoadLocalTimelineError::ResumeDeletion)?;
    1517                 :                     return Ok(());
    1518                 :                 }
    1519                 : 
    1520                 :                 (None, resources.remote_client)
    1521                 :             }
    1522                 :         };
    1523                 :         resources.remote_client = remote_client;
    1524                 : 
    1525                 :         let ancestor = if let Some(ancestor_timeline_id) = local_metadata.ancestor_timeline() {
    1526                 :             let ancestor_timeline = self.get_timeline(ancestor_timeline_id, false)
    1527 UBC           0 :                 .with_context(|| anyhow::anyhow!("cannot find ancestor timeline {ancestor_timeline_id} for timeline {timeline_id}"))
    1528                 :                 .map_err(LoadLocalTimelineError::Load)?;
    1529                 :             Some(ancestor_timeline)
    1530                 :         } else {
    1531                 :             None
    1532                 :         };
    1533                 : 
    1534                 :         let (index_part, metadata) = match remote_startup_data {
    1535                 :             Some(RemoteStartupData {
    1536                 :                 index_part,
    1537                 :                 remote_metadata,
    1538                 :             }) => {
    1539                 :                 // always choose the remote metadata to be crash consistent (see RFC 27)
    1540                 :                 save_metadata(self.conf, &self.tenant_id, &timeline_id, &remote_metadata)
    1541                 :                     .await
    1542                 :                     .context("save_metadata")
    1543                 :                     .map_err(LoadLocalTimelineError::Load)?;
    1544                 : 
    1545                 :                 (Some(index_part), remote_metadata)
    1546                 :             }
    1547                 :             None => (None, local_metadata),
    1548                 :         };
    1549                 : 
    1550                 :         self.timeline_init_and_sync(
    1551                 :             timeline_id,
    1552                 :             resources,
    1553                 :             index_part,
    1554                 :             metadata,
    1555                 :             ancestor,
    1556                 :             init_order,
    1557                 :             ctx,
    1558                 :         )
    1559                 :         .await
    1560                 :         .map_err(LoadLocalTimelineError::Load)
    1561                 :     }
    1562                 : 
    1563 CBC        1151 :     pub fn tenant_id(&self) -> TenantId {
    1564            1151 :         self.tenant_id
    1565            1151 :     }
    1566                 : 
    1567                 :     /// Get Timeline handle for given Neon timeline ID.
    1568                 :     /// This function is idempotent. It doesn't change internal state in any way.
    1569           10445 :     pub fn get_timeline(
    1570           10445 :         &self,
    1571           10445 :         timeline_id: TimelineId,
    1572           10445 :         active_only: bool,
    1573           10445 :     ) -> Result<Arc<Timeline>, GetTimelineError> {
    1574           10445 :         let timelines_accessor = self.timelines.lock().unwrap();
    1575           10445 :         let timeline = timelines_accessor
    1576           10445 :             .get(&timeline_id)
    1577           10445 :             .ok_or(GetTimelineError::NotFound {
    1578           10445 :                 tenant_id: self.tenant_id,
    1579           10445 :                 timeline_id,
    1580           10445 :             })?;
    1581                 : 
    1582            9521 :         if active_only && !timeline.is_active() {
    1583               1 :             Err(GetTimelineError::NotActive {
    1584               1 :                 tenant_id: self.tenant_id,
    1585               1 :                 timeline_id,
    1586               1 :                 state: timeline.current_state(),
    1587               1 :             })
    1588                 :         } else {
    1589            9520 :             Ok(Arc::clone(timeline))
    1590                 :         }
    1591           10445 :     }
    1592                 : 
    1593                 :     /// Lists timelines the tenant contains.
    1594                 :     /// Up to tenant's implementation to omit certain timelines that ar not considered ready for use.
    1595             631 :     pub fn list_timelines(&self) -> Vec<Arc<Timeline>> {
    1596             631 :         self.timelines
    1597             631 :             .lock()
    1598             631 :             .unwrap()
    1599             631 :             .values()
    1600             631 :             .map(Arc::clone)
    1601             631 :             .collect()
    1602             631 :     }
    1603                 : 
    1604                 :     /// This is used to create the initial 'main' timeline during bootstrapping,
    1605                 :     /// or when importing a new base backup. The caller is expected to load an
    1606                 :     /// initial image of the datadir to the new timeline after this.
    1607                 :     ///
    1608                 :     /// Until that happens, the on-disk state is invalid (disk_consistent_lsn=Lsn(0))
    1609                 :     /// and the timeline will fail to load at a restart.
    1610                 :     ///
    1611                 :     /// That's why we add an uninit mark file, and wrap it together witht the Timeline
    1612                 :     /// in-memory object into UninitializedTimeline.
    1613                 :     /// Once the caller is done setting up the timeline, they should call
    1614                 :     /// `UninitializedTimeline::initialize_with_lock` to remove the uninit mark.
    1615                 :     ///
    1616                 :     /// For tests, use `DatadirModification::init_empty_test_timeline` + `commit` to setup the
    1617                 :     /// minimum amount of keys required to get a writable timeline.
    1618                 :     /// (Without it, `put` might fail due to `repartition` failing.)
    1619              43 :     pub async fn create_empty_timeline(
    1620              43 :         &self,
    1621              43 :         new_timeline_id: TimelineId,
    1622              43 :         initdb_lsn: Lsn,
    1623              43 :         pg_version: u32,
    1624              43 :         _ctx: &RequestContext,
    1625              43 :     ) -> anyhow::Result<UninitializedTimeline> {
    1626              43 :         anyhow::ensure!(
    1627              43 :             self.is_active(),
    1628 UBC           0 :             "Cannot create empty timelines on inactive tenant"
    1629                 :         );
    1630                 : 
    1631 CBC          42 :         let timeline_uninit_mark = {
    1632              43 :             let timelines = self.timelines.lock().unwrap();
    1633              43 :             self.create_timeline_uninit_mark(new_timeline_id, &timelines)?
    1634                 :         };
    1635              42 :         let new_metadata = TimelineMetadata::new(
    1636              42 :             // Initialize disk_consistent LSN to 0, The caller must import some data to
    1637              42 :             // make it valid, before calling finish_creation()
    1638              42 :             Lsn(0),
    1639              42 :             None,
    1640              42 :             None,
    1641              42 :             Lsn(0),
    1642              42 :             initdb_lsn,
    1643              42 :             initdb_lsn,
    1644              42 :             pg_version,
    1645              42 :         );
    1646              42 :         self.prepare_new_timeline(
    1647              42 :             new_timeline_id,
    1648              42 :             &new_metadata,
    1649              42 :             timeline_uninit_mark,
    1650              42 :             initdb_lsn,
    1651              42 :             None,
    1652              42 :         )
    1653 UBC           0 :         .await
    1654 CBC          43 :     }
    1655                 : 
    1656                 :     /// Helper for unit tests to create an empty timeline.
    1657                 :     ///
    1658                 :     /// The timeline is has state value `Active` but its background loops are not running.
    1659                 :     // This makes the various functions which anyhow::ensure! for Active state work in tests.
    1660                 :     // Our current tests don't need the background loops.
    1661                 :     #[cfg(test)]
    1662              35 :     pub async fn create_test_timeline(
    1663              35 :         &self,
    1664              35 :         new_timeline_id: TimelineId,
    1665              35 :         initdb_lsn: Lsn,
    1666              35 :         pg_version: u32,
    1667              35 :         ctx: &RequestContext,
    1668              35 :     ) -> anyhow::Result<Arc<Timeline>> {
    1669              35 :         let uninit_tl = self
    1670              35 :             .create_empty_timeline(new_timeline_id, initdb_lsn, pg_version, ctx)
    1671 UBC           0 :             .await?;
    1672 CBC          35 :         let tline = uninit_tl.raw_timeline().expect("we just created it");
    1673              35 :         assert_eq!(tline.get_last_record_lsn(), Lsn(0));
    1674                 : 
    1675                 :         // Setup minimum keys required for the timeline to be usable.
    1676              35 :         let mut modification = tline.begin_modification(initdb_lsn);
    1677              35 :         modification
    1678              35 :             .init_empty_test_timeline()
    1679              35 :             .context("init_empty_test_timeline")?;
    1680              35 :         modification
    1681              35 :             .commit(ctx)
    1682 UBC           0 :             .await
    1683 CBC          35 :             .context("commit init_empty_test_timeline modification")?;
    1684                 : 
    1685                 :         // Flush to disk so that uninit_tl's check for valid disk_consistent_lsn passes.
    1686              35 :         tline.maybe_spawn_flush_loop();
    1687              35 :         tline.freeze_and_flush().await.context("freeze_and_flush")?;
    1688                 : 
    1689                 :         // Make sure the freeze_and_flush reaches remote storage.
    1690              35 :         tline
    1691              35 :             .remote_client
    1692              35 :             .as_ref()
    1693              35 :             .unwrap()
    1694              35 :             .wait_completion()
    1695              35 :             .await
    1696              35 :             .unwrap();
    1697                 : 
    1698              35 :         let tl = uninit_tl.finish_creation()?;
    1699                 :         // The non-test code would call tl.activate() here.
    1700              35 :         tl.set_state(TimelineState::Active);
    1701              35 :         Ok(tl)
    1702              35 :     }
    1703                 : 
    1704                 :     /// Create a new timeline.
    1705                 :     ///
    1706                 :     /// Returns the new timeline ID and reference to its Timeline object.
    1707                 :     ///
    1708                 :     /// If the caller specified the timeline ID to use (`new_timeline_id`), and timeline with
    1709                 :     /// the same timeline ID already exists, returns CreateTimelineError::AlreadyExists.
    1710             847 :     pub async fn create_timeline(
    1711             847 :         &self,
    1712             847 :         new_timeline_id: TimelineId,
    1713             847 :         ancestor_timeline_id: Option<TimelineId>,
    1714             847 :         mut ancestor_start_lsn: Option<Lsn>,
    1715             847 :         pg_version: u32,
    1716             847 :         broker_client: storage_broker::BrokerClientChannel,
    1717             847 :         ctx: &RequestContext,
    1718             847 :     ) -> Result<Arc<Timeline>, CreateTimelineError> {
    1719             847 :         if !self.is_active() {
    1720 UBC           0 :             return Err(CreateTimelineError::Other(anyhow::anyhow!(
    1721               0 :                 "Cannot create timelines on inactive tenant"
    1722               0 :             )));
    1723 CBC         847 :         }
    1724                 : 
    1725             847 :         if let Ok(existing) = self.get_timeline(new_timeline_id, false) {
    1726 UBC           0 :             debug!("timeline {new_timeline_id} already exists");
    1727                 : 
    1728 CBC           1 :             if let Some(remote_client) = existing.remote_client.as_ref() {
    1729                 :                 // Wait for uploads to complete, so that when we return Ok, the timeline
    1730                 :                 // is known to be durable on remote storage. Just like we do at the end of
    1731                 :                 // this function, after we have created the timeline ourselves.
    1732                 :                 //
    1733                 :                 // We only really care that the initial version of `index_part.json` has
    1734                 :                 // been uploaded. That's enough to remember that the timeline
    1735                 :                 // exists. However, there is no function to wait specifically for that so
    1736                 :                 // we just wait for all in-progress uploads to finish.
    1737               1 :                 remote_client
    1738               1 :                     .wait_completion()
    1739               1 :                     .await
    1740               1 :                     .context("wait for timeline uploads to complete")?;
    1741 UBC           0 :             }
    1742                 : 
    1743 CBC           1 :             return Err(CreateTimelineError::AlreadyExists);
    1744             846 :         }
    1745                 : 
    1746             846 :         let loaded_timeline = match ancestor_timeline_id {
    1747             271 :             Some(ancestor_timeline_id) => {
    1748             271 :                 let ancestor_timeline = self
    1749             271 :                     .get_timeline(ancestor_timeline_id, false)
    1750             271 :                     .context("Cannot branch off the timeline that's not present in pageserver")?;
    1751                 : 
    1752                 :                 // instead of waiting around, just deny the request because ancestor is not yet
    1753                 :                 // ready for other purposes either.
    1754             271 :                 if !ancestor_timeline.is_active() {
    1755              18 :                     return Err(CreateTimelineError::AncestorNotActive);
    1756             253 :                 }
    1757                 : 
    1758             253 :                 if let Some(lsn) = ancestor_start_lsn.as_mut() {
    1759              35 :                     *lsn = lsn.align();
    1760              35 : 
    1761              35 :                     let ancestor_ancestor_lsn = ancestor_timeline.get_ancestor_lsn();
    1762              35 :                     if ancestor_ancestor_lsn > *lsn {
    1763                 :                         // can we safely just branch from the ancestor instead?
    1764               2 :                         return Err(CreateTimelineError::AncestorLsn(anyhow::anyhow!(
    1765               2 :                             "invalid start lsn {} for ancestor timeline {}: less than timeline ancestor lsn {}",
    1766               2 :                             lsn,
    1767               2 :                             ancestor_timeline_id,
    1768               2 :                             ancestor_ancestor_lsn,
    1769               2 :                         )));
    1770              33 :                     }
    1771              33 : 
    1772              33 :                     // Wait for the WAL to arrive and be processed on the parent branch up
    1773              33 :                     // to the requested branch point. The repository code itself doesn't
    1774              33 :                     // require it, but if we start to receive WAL on the new timeline,
    1775              33 :                     // decoding the new WAL might need to look up previous pages, relation
    1776              33 :                     // sizes etc. and that would get confused if the previous page versions
    1777              33 :                     // are not in the repository yet.
    1778              33 :                     ancestor_timeline.wait_lsn(*lsn, ctx).await?;
    1779             218 :                 }
    1780                 : 
    1781             251 :                 self.branch_timeline(&ancestor_timeline, new_timeline_id, ancestor_start_lsn, ctx)
    1782               6 :                     .await?
    1783                 :             }
    1784                 :             None => {
    1785             575 :                 self.bootstrap_timeline(new_timeline_id, pg_version, ctx)
    1786         2890234 :                     .await?
    1787                 :             }
    1788                 :         };
    1789                 : 
    1790             818 :         if let Some(remote_client) = loaded_timeline.remote_client.as_ref() {
    1791                 :             // Wait for the upload of the 'index_part.json` file to finish, so that when we return
    1792                 :             // Ok, the timeline is durable in remote storage.
    1793             818 :             let kind = ancestor_timeline_id
    1794             818 :                 .map(|_| "branched")
    1795             818 :                 .unwrap_or("bootstrapped");
    1796             818 :             remote_client.wait_completion().await.with_context(|| {
    1797 UBC           0 :                 format!("wait for {} timeline initial uploads to complete", kind)
    1798 CBC         813 :             })?;
    1799 UBC           0 :         }
    1800                 : 
    1801 CBC         813 :         loaded_timeline.activate(broker_client, None, ctx);
    1802             813 : 
    1803             813 :         Ok(loaded_timeline)
    1804             842 :     }
    1805                 : 
    1806                 :     /// perform one garbage collection iteration, removing old data files from disk.
    1807                 :     /// this function is periodically called by gc task.
    1808                 :     /// also it can be explicitly requested through page server api 'do_gc' command.
    1809                 :     ///
    1810                 :     /// `target_timeline_id` specifies the timeline to GC, or None for all.
    1811                 :     ///
    1812                 :     /// The `horizon` an `pitr` parameters determine how much WAL history needs to be retained.
    1813                 :     /// Also known as the retention period, or the GC cutoff point. `horizon` specifies
    1814                 :     /// the amount of history, as LSN difference from current latest LSN on each timeline.
    1815                 :     /// `pitr` specifies the same as a time difference from the current time. The effective
    1816                 :     /// GC cutoff point is determined conservatively by either `horizon` and `pitr`, whichever
    1817                 :     /// requires more history to be retained.
    1818                 :     //
    1819             406 :     pub async fn gc_iteration(
    1820             406 :         &self,
    1821             406 :         target_timeline_id: Option<TimelineId>,
    1822             406 :         horizon: u64,
    1823             406 :         pitr: Duration,
    1824             406 :         ctx: &RequestContext,
    1825             406 :     ) -> anyhow::Result<GcResult> {
    1826             406 :         // there is a global allowed_error for this
    1827             406 :         anyhow::ensure!(
    1828             406 :             self.is_active(),
    1829 UBC           0 :             "Cannot run GC iteration on inactive tenant"
    1830                 :         );
    1831                 : 
    1832                 :         {
    1833 CBC         406 :             let conf = self.tenant_conf.read().unwrap();
    1834             406 : 
    1835             406 :             if !conf.location.may_delete_layers_hint() {
    1836 UBC           0 :                 info!("Skipping GC in location state {:?}", conf.location);
    1837               0 :                 return Ok(GcResult::default());
    1838 CBC         406 :             }
    1839             406 :         }
    1840             406 : 
    1841             406 :         self.gc_iteration_internal(target_timeline_id, horizon, pitr, ctx)
    1842          336932 :             .await
    1843             401 :     }
    1844                 : 
    1845                 :     /// Perform one compaction iteration.
    1846                 :     /// This function is periodically called by compactor task.
    1847                 :     /// Also it can be explicitly requested per timeline through page server
    1848                 :     /// api's 'compact' command.
    1849             241 :     pub async fn compaction_iteration(
    1850             241 :         &self,
    1851             241 :         cancel: &CancellationToken,
    1852             241 :         ctx: &RequestContext,
    1853             241 :     ) -> anyhow::Result<()> {
    1854             241 :         anyhow::ensure!(
    1855             241 :             self.is_active(),
    1856 LBC         (1) :             "Cannot run compaction iteration on inactive tenant"
    1857                 :         );
    1858                 : 
    1859                 :         {
    1860 CBC         241 :             let conf = self.tenant_conf.read().unwrap();
    1861             241 :             if !conf.location.may_delete_layers_hint() || !conf.location.may_upload_layers_hint() {
    1862 UBC           0 :                 info!("Skipping compaction in location state {:?}", conf.location);
    1863               0 :                 return Ok(());
    1864 CBC         241 :             }
    1865             241 :         }
    1866             241 : 
    1867             241 :         // Scan through the hashmap and collect a list of all the timelines,
    1868             241 :         // while holding the lock. Then drop the lock and actually perform the
    1869             241 :         // compactions.  We don't want to block everything else while the
    1870             241 :         // compaction runs.
    1871             241 :         let timelines_to_compact = {
    1872             241 :             let timelines = self.timelines.lock().unwrap();
    1873             241 :             let timelines_to_compact = timelines
    1874             241 :                 .iter()
    1875             419 :                 .filter_map(|(timeline_id, timeline)| {
    1876             419 :                     if timeline.is_active() {
    1877             414 :                         Some((*timeline_id, timeline.clone()))
    1878                 :                     } else {
    1879               5 :                         None
    1880                 :                     }
    1881             419 :                 })
    1882             241 :                 .collect::<Vec<_>>();
    1883             241 :             drop(timelines);
    1884             241 :             timelines_to_compact
    1885                 :         };
    1886                 : 
    1887             641 :         for (timeline_id, timeline) in &timelines_to_compact {
    1888             408 :             timeline
    1889             408 :                 .compact(cancel, ctx)
    1890             408 :                 .instrument(info_span!("compact_timeline", %timeline_id))
    1891         1040273 :                 .await?;
    1892                 :         }
    1893                 : 
    1894             233 :         Ok(())
    1895             233 :     }
    1896                 : 
    1897           13609 :     pub fn current_state(&self) -> TenantState {
    1898           13609 :         self.state.borrow().clone()
    1899           13609 :     }
    1900                 : 
    1901            1537 :     pub fn is_active(&self) -> bool {
    1902            1537 :         self.current_state() == TenantState::Active
    1903            1537 :     }
    1904                 : 
    1905                 :     /// Changes tenant status to active, unless shutdown was already requested.
    1906                 :     ///
    1907                 :     /// `background_jobs_can_start` is an optional barrier set to a value during pageserver startup
    1908                 :     /// to delay background jobs. Background jobs can be started right away when None is given.
    1909             672 :     fn activate(
    1910             672 :         self: &Arc<Self>,
    1911             672 :         broker_client: BrokerClientChannel,
    1912             672 :         background_jobs_can_start: Option<&completion::Barrier>,
    1913             672 :         ctx: &RequestContext,
    1914             672 :     ) {
    1915             672 :         span::debug_assert_current_span_has_tenant_id();
    1916             672 : 
    1917             672 :         let mut activating = false;
    1918             672 :         self.state.send_modify(|current_state| {
    1919             672 :             use pageserver_api::models::ActivatingFrom;
    1920             672 :             match &*current_state {
    1921                 :                 TenantState::Activating(_) | TenantState::Active | TenantState::Broken { .. } | TenantState::Stopping { .. } => {
    1922 UBC           0 :                     panic!("caller is responsible for calling activate() only on Loading / Attaching tenants, got {state:?}", state = current_state);
    1923                 :                 }
    1924 CBC         633 :                 TenantState::Loading => {
    1925             633 :                     *current_state = TenantState::Activating(ActivatingFrom::Loading);
    1926             633 :                 }
    1927              39 :                 TenantState::Attaching => {
    1928              39 :                     *current_state = TenantState::Activating(ActivatingFrom::Attaching);
    1929              39 :                 }
    1930                 :             }
    1931             672 :             debug!(tenant_id = %self.tenant_id, "Activating tenant");
    1932             672 :             activating = true;
    1933             672 :             // Continue outside the closure. We need to grab timelines.lock()
    1934             672 :             // and we plan to turn it into a tokio::sync::Mutex in a future patch.
    1935             672 :         });
    1936             672 : 
    1937             672 :         if activating {
    1938             672 :             let timelines_accessor = self.timelines.lock().unwrap();
    1939             672 :             let timelines_to_activate = timelines_accessor
    1940             672 :                 .values()
    1941             672 :                 .filter(|timeline| !(timeline.is_broken() || timeline.is_stopping()));
    1942             672 : 
    1943             672 :             // Spawn gc and compaction loops. The loops will shut themselves
    1944             672 :             // down when they notice that the tenant is inactive.
    1945             672 :             tasks::start_background_loops(self, background_jobs_can_start);
    1946             672 : 
    1947             672 :             let mut activated_timelines = 0;
    1948                 : 
    1949             959 :             for timeline in timelines_to_activate {
    1950             287 :                 timeline.activate(broker_client.clone(), background_jobs_can_start, ctx);
    1951             287 :                 activated_timelines += 1;
    1952             287 :             }
    1953                 : 
    1954             672 :             self.state.send_modify(move |current_state| {
    1955             672 :                 assert!(
    1956             672 :                     matches!(current_state, TenantState::Activating(_)),
    1957 UBC           0 :                     "set_stopping and set_broken wait for us to leave Activating state",
    1958                 :                 );
    1959 CBC         672 :                 *current_state = TenantState::Active;
    1960             672 : 
    1961             672 :                 let elapsed = self.loading_started_at.elapsed();
    1962             672 :                 let total_timelines = timelines_accessor.len();
    1963             672 : 
    1964             672 :                 // log a lot of stuff, because some tenants sometimes suffer from user-visible
    1965             672 :                 // times to activate. see https://github.com/neondatabase/neon/issues/4025
    1966             672 :                 info!(
    1967             672 :                     since_creation_millis = elapsed.as_millis(),
    1968             672 :                     tenant_id = %self.tenant_id,
    1969             672 :                     activated_timelines,
    1970             672 :                     total_timelines,
    1971             672 :                     post_state = <&'static str>::from(&*current_state),
    1972             672 :                     "activation attempt finished"
    1973             672 :                 );
    1974                 : 
    1975             672 :                 TENANT_ACTIVATION.observe(elapsed.as_secs_f64());
    1976             672 :             });
    1977 UBC           0 :         }
    1978 CBC         672 :     }
    1979                 : 
    1980                 :     /// Shutdown the tenant and join all of the spawned tasks.
    1981                 :     ///
    1982                 :     /// The method caters for all use-cases:
    1983                 :     /// - pageserver shutdown (freeze_and_flush == true)
    1984                 :     /// - detach + ignore (freeze_and_flush == false)
    1985                 :     ///
    1986                 :     /// This will attempt to shutdown even if tenant is broken.
    1987                 :     ///
    1988                 :     /// `shutdown_progress` is a [`completion::Barrier`] for the shutdown initiated by this call.
    1989                 :     /// If the tenant is already shutting down, we return a clone of the first shutdown call's
    1990                 :     /// `Barrier` as an `Err`. This not-first caller can use the returned barrier to join with
    1991                 :     /// the ongoing shutdown.
    1992             287 :     async fn shutdown(
    1993             287 :         &self,
    1994             287 :         shutdown_progress: completion::Barrier,
    1995             287 :         freeze_and_flush: bool,
    1996             287 :     ) -> Result<(), completion::Barrier> {
    1997             287 :         span::debug_assert_current_span_has_tenant_id();
    1998             287 :         // Set tenant (and its timlines) to Stoppping state.
    1999             287 :         //
    2000             287 :         // Since we can only transition into Stopping state after activation is complete,
    2001             287 :         // run it in a JoinSet so all tenants have a chance to stop before we get SIGKILLed.
    2002             287 :         //
    2003             287 :         // Transitioning tenants to Stopping state has a couple of non-obvious side effects:
    2004             287 :         // 1. Lock out any new requests to the tenants.
    2005             287 :         // 2. Signal cancellation to WAL receivers (we wait on it below).
    2006             287 :         // 3. Signal cancellation for other tenant background loops.
    2007             287 :         // 4. ???
    2008             287 :         //
    2009             287 :         // The waiting for the cancellation is not done uniformly.
    2010             287 :         // We certainly wait for WAL receivers to shut down.
    2011             287 :         // That is necessary so that no new data comes in before the freeze_and_flush.
    2012             287 :         // But the tenant background loops are joined-on in our caller.
    2013             287 :         // It's mesed up.
    2014             287 :         // we just ignore the failure to stop
    2015             287 : 
    2016             287 :         match self.set_stopping(shutdown_progress, false, false).await {
    2017             226 :             Ok(()) => {}
    2018              58 :             Err(SetStoppingError::Broken) => {
    2019              58 :                 // assume that this is acceptable
    2020              58 :             }
    2021               3 :             Err(SetStoppingError::AlreadyStopping(other)) => {
    2022               3 :                 // give caller the option to wait for this this shutdown
    2023               3 :                 return Err(other);
    2024                 :             }
    2025                 :         };
    2026                 : 
    2027             284 :         let mut js = tokio::task::JoinSet::new();
    2028             284 :         {
    2029             284 :             let timelines = self.timelines.lock().unwrap();
    2030             431 :             timelines.values().for_each(|timeline| {
    2031             431 :                 let timeline = Arc::clone(timeline);
    2032             431 :                 let span = Span::current();
    2033             512 :                 js.spawn(async move { timeline.shutdown(freeze_and_flush).instrument(span).await });
    2034             431 :             })
    2035                 :         };
    2036             715 :         while let Some(res) = js.join_next().await {
    2037 UBC           0 :             match res {
    2038 CBC         431 :                 Ok(()) => {}
    2039 UBC           0 :                 Err(je) if je.is_cancelled() => unreachable!("no cancelling used"),
    2040               0 :                 Err(je) if je.is_panic() => { /* logged already */ }
    2041               0 :                 Err(je) => warn!("unexpected JoinError: {je:?}"),
    2042                 :             }
    2043                 :         }
    2044                 : 
    2045                 :         // shutdown all tenant and timeline tasks: gc, compaction, page service
    2046                 :         // No new tasks will be started for this tenant because it's in `Stopping` state.
    2047                 :         //
    2048                 :         // this will additionally shutdown and await all timeline tasks.
    2049 CBC         578 :         task_mgr::shutdown_tasks(None, Some(self.tenant_id), None).await;
    2050                 : 
    2051             284 :         Ok(())
    2052             287 :     }
    2053                 : 
    2054                 :     /// Change tenant status to Stopping, to mark that it is being shut down.
    2055                 :     ///
    2056                 :     /// This function waits for the tenant to become active if it isn't already, before transitioning it into Stopping state.
    2057                 :     ///
    2058                 :     /// This function is not cancel-safe!
    2059                 :     ///
    2060                 :     /// `allow_transition_from_loading` is needed for the special case of loading task deleting the tenant.
    2061                 :     /// `allow_transition_from_attaching` is needed for the special case of attaching deleted tenant.
    2062             310 :     async fn set_stopping(
    2063             310 :         &self,
    2064             310 :         progress: completion::Barrier,
    2065             310 :         allow_transition_from_loading: bool,
    2066             310 :         allow_transition_from_attaching: bool,
    2067             310 :     ) -> Result<(), SetStoppingError> {
    2068             310 :         let mut rx = self.state.subscribe();
    2069             310 : 
    2070             310 :         // cannot stop before we're done activating, so wait out until we're done activating
    2071             310 :         rx.wait_for(|state| match state {
    2072               3 :             TenantState::Attaching if allow_transition_from_attaching => true,
    2073                 :             TenantState::Activating(_) | TenantState::Attaching => {
    2074               6 :                 info!(
    2075               6 :                     "waiting for {} to turn Active|Broken|Stopping",
    2076               6 :                     <&'static str>::from(state)
    2077               6 :                 );
    2078               6 :                 false
    2079                 :             }
    2080              20 :             TenantState::Loading => allow_transition_from_loading,
    2081             287 :             TenantState::Active | TenantState::Broken { .. } | TenantState::Stopping { .. } => true,
    2082             316 :         })
    2083               6 :         .await
    2084             310 :         .expect("cannot drop self.state while on a &self method");
    2085             310 : 
    2086             310 :         // we now know we're done activating, let's see whether this task is the winner to transition into Stopping
    2087             310 :         let mut err = None;
    2088             310 :         let stopping = self.state.send_if_modified(|current_state| match current_state {
    2089                 :             TenantState::Activating(_) => {
    2090 UBC           0 :                 unreachable!("1we ensured above that we're done with activation, and, there is no re-activation")
    2091                 :             }
    2092                 :             TenantState::Attaching => {
    2093 CBC           3 :                 if !allow_transition_from_attaching {
    2094 UBC           0 :                     unreachable!("2we ensured above that we're done with activation, and, there is no re-activation")
    2095 CBC           3 :                 };
    2096               3 :                 *current_state = TenantState::Stopping { progress };
    2097               3 :                 true
    2098                 :             }
    2099                 :             TenantState::Loading => {
    2100              20 :                 if !allow_transition_from_loading {
    2101 UBC           0 :                     unreachable!("3we ensured above that we're done with activation, and, there is no re-activation")
    2102 CBC          20 :                 };
    2103              20 :                 *current_state = TenantState::Stopping { progress };
    2104              20 :                 true
    2105                 :             }
    2106                 :             TenantState::Active => {
    2107                 :                 // FIXME: due to time-of-check vs time-of-use issues, it can happen that new timelines
    2108                 :                 // are created after the transition to Stopping. That's harmless, as the Timelines
    2109                 :                 // won't be accessible to anyone afterwards, because the Tenant is in Stopping state.
    2110             226 :                 *current_state = TenantState::Stopping { progress };
    2111             226 :                 // Continue stopping outside the closure. We need to grab timelines.lock()
    2112             226 :                 // and we plan to turn it into a tokio::sync::Mutex in a future patch.
    2113             226 :                 true
    2114                 :             }
    2115              58 :             TenantState::Broken { reason, .. } => {
    2116              58 :                 info!(
    2117              58 :                     "Cannot set tenant to Stopping state, it is in Broken state due to: {reason}"
    2118              58 :                 );
    2119              58 :                 err = Some(SetStoppingError::Broken);
    2120              58 :                 false
    2121                 :             }
    2122               3 :             TenantState::Stopping { progress } => {
    2123               3 :                 info!("Tenant is already in Stopping state");
    2124               3 :                 err = Some(SetStoppingError::AlreadyStopping(progress.clone()));
    2125               3 :                 false
    2126                 :             }
    2127             310 :         });
    2128             310 :         match (stopping, err) {
    2129             249 :             (true, None) => {} // continue
    2130              61 :             (false, Some(err)) => return Err(err),
    2131 UBC           0 :             (true, Some(_)) => unreachable!(
    2132               0 :                 "send_if_modified closure must error out if not transitioning to Stopping"
    2133               0 :             ),
    2134               0 :             (false, None) => unreachable!(
    2135               0 :                 "send_if_modified closure must return true if transitioning to Stopping"
    2136               0 :             ),
    2137                 :         }
    2138                 : 
    2139 CBC         249 :         let timelines_accessor = self.timelines.lock().unwrap();
    2140             249 :         let not_broken_timelines = timelines_accessor
    2141             249 :             .values()
    2142             365 :             .filter(|timeline| !timeline.is_broken());
    2143             596 :         for timeline in not_broken_timelines {
    2144             347 :             timeline.set_state(TimelineState::Stopping);
    2145             347 :         }
    2146             249 :         Ok(())
    2147             310 :     }
    2148                 : 
    2149                 :     /// Method for tenant::mgr to transition us into Broken state in case of a late failure in
    2150                 :     /// `remove_tenant_from_memory`
    2151                 :     ///
    2152                 :     /// This function waits for the tenant to become active if it isn't already, before transitioning it into Stopping state.
    2153                 :     ///
    2154                 :     /// In tests, we also use this to set tenants to Broken state on purpose.
    2155              53 :     pub(crate) async fn set_broken(&self, reason: String) {
    2156              53 :         let mut rx = self.state.subscribe();
    2157              53 : 
    2158              53 :         // The load & attach routines own the tenant state until it has reached `Active`.
    2159              53 :         // So, wait until it's done.
    2160              53 :         rx.wait_for(|state| match state {
    2161                 :             TenantState::Activating(_) | TenantState::Loading | TenantState::Attaching => {
    2162 UBC           0 :                 info!(
    2163               0 :                     "waiting for {} to turn Active|Broken|Stopping",
    2164               0 :                     <&'static str>::from(state)
    2165               0 :                 );
    2166               0 :                 false
    2167                 :             }
    2168 CBC          53 :             TenantState::Active | TenantState::Broken { .. } | TenantState::Stopping { .. } => true,
    2169              53 :         })
    2170 UBC           0 :         .await
    2171 CBC          53 :         .expect("cannot drop self.state while on a &self method");
    2172              53 : 
    2173              53 :         // we now know we're done activating, let's see whether this task is the winner to transition into Broken
    2174              53 :         self.set_broken_no_wait(reason)
    2175              53 :     }
    2176                 : 
    2177              53 :     pub(crate) fn set_broken_no_wait(&self, reason: impl Display) {
    2178              53 :         let reason = reason.to_string();
    2179              53 :         self.state.send_modify(|current_state| {
    2180              53 :             match *current_state {
    2181                 :                 TenantState::Activating(_) | TenantState::Loading | TenantState::Attaching => {
    2182 UBC           0 :                     unreachable!("we ensured above that we're done with activation, and, there is no re-activation")
    2183                 :                 }
    2184                 :                 TenantState::Active => {
    2185 CBC           2 :                     if cfg!(feature = "testing") {
    2186               2 :                         warn!("Changing Active tenant to Broken state, reason: {}", reason);
    2187               2 :                         *current_state = TenantState::broken_from_reason(reason);
    2188                 :                     } else {
    2189 UBC           0 :                         unreachable!("not allowed to call set_broken on Active tenants in non-testing builds")
    2190                 :                     }
    2191                 :                 }
    2192                 :                 TenantState::Broken { .. } => {
    2193               0 :                     warn!("Tenant is already in Broken state");
    2194                 :                 }
    2195                 :                 // This is the only "expected" path, any other path is a bug.
    2196                 :                 TenantState::Stopping { .. } => {
    2197 CBC          51 :                     warn!(
    2198              51 :                         "Marking Stopping tenant as Broken state, reason: {}",
    2199              51 :                         reason
    2200              51 :                     );
    2201              51 :                     *current_state = TenantState::broken_from_reason(reason);
    2202                 :                 }
    2203                 :            }
    2204              53 :         });
    2205              53 :     }
    2206                 : 
    2207              41 :     pub fn subscribe_for_state_updates(&self) -> watch::Receiver<TenantState> {
    2208              41 :         self.state.subscribe()
    2209              41 :     }
    2210                 : 
    2211            5548 :     pub(crate) async fn wait_to_become_active(&self) -> Result<(), WaitToBecomeActiveError> {
    2212            5548 :         let mut receiver = self.state.subscribe();
    2213            6228 :         loop {
    2214            6228 :             let current_state = receiver.borrow_and_update().clone();
    2215            6228 :             match current_state {
    2216                 :                 TenantState::Loading | TenantState::Attaching | TenantState::Activating(_) => {
    2217                 :                     // in these states, there's a chance that we can reach ::Active
    2218             700 :                     receiver.changed().await.map_err(
    2219             680 :                         |_e: tokio::sync::watch::error::RecvError| {
    2220 UBC           0 :                             WaitToBecomeActiveError::TenantDropped {
    2221               0 :                                 tenant_id: self.tenant_id,
    2222               0 :                             }
    2223 CBC         680 :                         },
    2224             680 :                     )?;
    2225                 :                 }
    2226                 :                 TenantState::Active { .. } => {
    2227            5517 :                     return Ok(());
    2228                 :                 }
    2229                 :                 TenantState::Broken { .. } | TenantState::Stopping { .. } => {
    2230                 :                     // There's no chance the tenant can transition back into ::Active
    2231              31 :                     return Err(WaitToBecomeActiveError::WillNotBecomeActive {
    2232              31 :                         tenant_id: self.tenant_id,
    2233              31 :                         state: current_state,
    2234              31 :                     });
    2235                 :                 }
    2236                 :             }
    2237                 :         }
    2238            5548 :     }
    2239                 : 
    2240 UBC           0 :     pub(crate) fn get_attach_mode(&self) -> AttachmentMode {
    2241               0 :         self.tenant_conf
    2242               0 :             .read()
    2243               0 :             .unwrap()
    2244               0 :             .location
    2245               0 :             .attach_mode
    2246               0 :             .clone()
    2247               0 :     }
    2248                 : }
    2249                 : 
    2250                 : /// Given a Vec of timelines and their ancestors (timeline_id, ancestor_id),
    2251                 : /// perform a topological sort, so that the parent of each timeline comes
    2252                 : /// before the children.
    2253                 : /// E extracts the ancestor from T
    2254                 : /// This allows for T to be different. It can be TimelineMetadata, can be Timeline itself, etc.
    2255 CBC         829 : fn tree_sort_timelines<T, E>(
    2256             829 :     timelines: HashMap<TimelineId, T>,
    2257             829 :     extractor: E,
    2258             829 : ) -> anyhow::Result<Vec<(TimelineId, T)>>
    2259             829 : where
    2260             829 :     E: Fn(&T) -> Option<TimelineId>,
    2261             829 : {
    2262             829 :     let mut result = Vec::with_capacity(timelines.len());
    2263             829 : 
    2264             829 :     let mut now = Vec::with_capacity(timelines.len());
    2265             829 :     // (ancestor, children)
    2266             829 :     let mut later: HashMap<TimelineId, Vec<(TimelineId, T)>> =
    2267             829 :         HashMap::with_capacity(timelines.len());
    2268                 : 
    2269            1307 :     for (timeline_id, value) in timelines {
    2270             478 :         if let Some(ancestor_id) = extractor(&value) {
    2271              55 :             let children = later.entry(ancestor_id).or_default();
    2272              55 :             children.push((timeline_id, value));
    2273             423 :         } else {
    2274             423 :             now.push((timeline_id, value));
    2275             423 :         }
    2276                 :     }
    2277                 : 
    2278            1307 :     while let Some((timeline_id, metadata)) = now.pop() {
    2279             478 :         result.push((timeline_id, metadata));
    2280                 :         // All children of this can be loaded now
    2281             478 :         if let Some(mut children) = later.remove(&timeline_id) {
    2282              44 :             now.append(&mut children);
    2283             434 :         }
    2284                 :     }
    2285                 : 
    2286                 :     // All timelines should be visited now. Unless there were timelines with missing ancestors.
    2287             829 :     if !later.is_empty() {
    2288 UBC           0 :         for (missing_id, orphan_ids) in later {
    2289               0 :             for (orphan_id, _) in orphan_ids {
    2290               0 :                 error!("could not load timeline {orphan_id} because its ancestor timeline {missing_id} could not be loaded");
    2291                 :             }
    2292                 :         }
    2293               0 :         bail!("could not load tenant because some timelines are missing ancestors");
    2294 CBC         829 :     }
    2295             829 : 
    2296             829 :     Ok(result)
    2297             829 : }
    2298                 : 
    2299                 : impl Tenant {
    2300              80 :     pub fn tenant_specific_overrides(&self) -> TenantConfOpt {
    2301              80 :         self.tenant_conf.read().unwrap().tenant_conf
    2302              80 :     }
    2303                 : 
    2304              40 :     pub fn effective_config(&self) -> TenantConf {
    2305              40 :         self.tenant_specific_overrides()
    2306              40 :             .merge(self.conf.default_tenant_conf)
    2307              40 :     }
    2308                 : 
    2309               5 :     pub fn get_checkpoint_distance(&self) -> u64 {
    2310               5 :         let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
    2311               5 :         tenant_conf
    2312               5 :             .checkpoint_distance
    2313               5 :             .unwrap_or(self.conf.default_tenant_conf.checkpoint_distance)
    2314               5 :     }
    2315                 : 
    2316               5 :     pub fn get_checkpoint_timeout(&self) -> Duration {
    2317               5 :         let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
    2318               5 :         tenant_conf
    2319               5 :             .checkpoint_timeout
    2320               5 :             .unwrap_or(self.conf.default_tenant_conf.checkpoint_timeout)
    2321               5 :     }
    2322                 : 
    2323               5 :     pub fn get_compaction_target_size(&self) -> u64 {
    2324               5 :         let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
    2325               5 :         tenant_conf
    2326               5 :             .compaction_target_size
    2327               5 :             .unwrap_or(self.conf.default_tenant_conf.compaction_target_size)
    2328               5 :     }
    2329                 : 
    2330             802 :     pub fn get_compaction_period(&self) -> Duration {
    2331             802 :         let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
    2332             802 :         tenant_conf
    2333             802 :             .compaction_period
    2334             802 :             .unwrap_or(self.conf.default_tenant_conf.compaction_period)
    2335             802 :     }
    2336                 : 
    2337               5 :     pub fn get_compaction_threshold(&self) -> usize {
    2338               5 :         let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
    2339               5 :         tenant_conf
    2340               5 :             .compaction_threshold
    2341               5 :             .unwrap_or(self.conf.default_tenant_conf.compaction_threshold)
    2342               5 :     }
    2343                 : 
    2344             372 :     pub fn get_gc_horizon(&self) -> u64 {
    2345             372 :         let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
    2346             372 :         tenant_conf
    2347             372 :             .gc_horizon
    2348             372 :             .unwrap_or(self.conf.default_tenant_conf.gc_horizon)
    2349             372 :     }
    2350                 : 
    2351             738 :     pub fn get_gc_period(&self) -> Duration {
    2352             738 :         let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
    2353             738 :         tenant_conf
    2354             738 :             .gc_period
    2355             738 :             .unwrap_or(self.conf.default_tenant_conf.gc_period)
    2356             738 :     }
    2357                 : 
    2358               5 :     pub fn get_image_creation_threshold(&self) -> usize {
    2359               5 :         let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
    2360               5 :         tenant_conf
    2361               5 :             .image_creation_threshold
    2362               5 :             .unwrap_or(self.conf.default_tenant_conf.image_creation_threshold)
    2363               5 :     }
    2364                 : 
    2365             485 :     pub fn get_pitr_interval(&self) -> Duration {
    2366             485 :         let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
    2367             485 :         tenant_conf
    2368             485 :             .pitr_interval
    2369             485 :             .unwrap_or(self.conf.default_tenant_conf.pitr_interval)
    2370             485 :     }
    2371                 : 
    2372            4401 :     pub fn get_trace_read_requests(&self) -> bool {
    2373            4401 :         let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
    2374            4401 :         tenant_conf
    2375            4401 :             .trace_read_requests
    2376            4401 :             .unwrap_or(self.conf.default_tenant_conf.trace_read_requests)
    2377            4401 :     }
    2378                 : 
    2379              13 :     pub fn get_min_resident_size_override(&self) -> Option<u64> {
    2380              13 :         let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
    2381              13 :         tenant_conf
    2382              13 :             .min_resident_size_override
    2383              13 :             .or(self.conf.default_tenant_conf.min_resident_size_override)
    2384              13 :     }
    2385                 : 
    2386              27 :     pub fn set_new_tenant_config(&self, new_tenant_conf: TenantConfOpt) {
    2387              27 :         self.tenant_conf.write().unwrap().tenant_conf = new_tenant_conf;
    2388              27 :         // Don't hold self.timelines.lock() during the notifies.
    2389              27 :         // There's no risk of deadlock right now, but there could be if we consolidate
    2390              27 :         // mutexes in struct Timeline in the future.
    2391              27 :         let timelines = self.list_timelines();
    2392              56 :         for timeline in timelines {
    2393              29 :             timeline.tenant_conf_updated();
    2394              29 :         }
    2395              27 :     }
    2396                 : 
    2397 UBC           0 :     pub(crate) fn set_new_location_config(&self, new_conf: AttachedTenantConf) {
    2398               0 :         *self.tenant_conf.write().unwrap() = new_conf;
    2399               0 :         // Don't hold self.timelines.lock() during the notifies.
    2400               0 :         // There's no risk of deadlock right now, but there could be if we consolidate
    2401               0 :         // mutexes in struct Timeline in the future.
    2402               0 :         let timelines = self.list_timelines();
    2403               0 :         for timeline in timelines {
    2404               0 :             timeline.tenant_conf_updated();
    2405               0 :         }
    2406               0 :     }
    2407                 : 
    2408                 :     /// Helper function to create a new Timeline struct.
    2409                 :     ///
    2410                 :     /// The returned Timeline is in Loading state. The caller is responsible for
    2411                 :     /// initializing any on-disk state, and for inserting the Timeline to the 'timelines'
    2412                 :     /// map.
    2413                 :     ///
    2414                 :     /// `validate_ancestor == false` is used when a timeline is created for deletion
    2415                 :     /// and we might not have the ancestor present anymore which is fine for to be
    2416                 :     /// deleted timelines.
    2417 CBC        1302 :     fn create_timeline_struct(
    2418            1302 :         &self,
    2419            1302 :         new_timeline_id: TimelineId,
    2420            1302 :         new_metadata: &TimelineMetadata,
    2421            1302 :         ancestor: Option<Arc<Timeline>>,
    2422            1302 :         resources: TimelineResources,
    2423            1302 :         init_order: Option<&InitializationOrder>,
    2424            1302 :         cause: CreateTimelineCause,
    2425            1302 :     ) -> anyhow::Result<Arc<Timeline>> {
    2426            1302 :         let state = match cause {
    2427                 :             CreateTimelineCause::Load => {
    2428            1281 :                 let ancestor_id = new_metadata.ancestor_timeline();
    2429            1281 :                 anyhow::ensure!(
    2430            1281 :                     ancestor_id == ancestor.as_ref().map(|t| t.timeline_id),
    2431 UBC           0 :                     "Timeline's {new_timeline_id} ancestor {ancestor_id:?} was not found"
    2432                 :                 );
    2433 CBC        1281 :                 TimelineState::Loading
    2434                 :             }
    2435              21 :             CreateTimelineCause::Delete => TimelineState::Stopping,
    2436                 :         };
    2437                 : 
    2438            1302 :         let initial_logical_size_can_start = init_order.map(|x| &x.initial_logical_size_can_start);
    2439            1302 :         let initial_logical_size_attempt = init_order.map(|x| &x.initial_logical_size_attempt);
    2440            1302 : 
    2441            1302 :         let pg_version = new_metadata.pg_version();
    2442            1302 : 
    2443            1302 :         let timeline = Timeline::new(
    2444            1302 :             self.conf,
    2445            1302 :             Arc::clone(&self.tenant_conf),
    2446            1302 :             new_metadata,
    2447            1302 :             ancestor,
    2448            1302 :             new_timeline_id,
    2449            1302 :             self.tenant_id,
    2450            1302 :             self.generation,
    2451            1302 :             Arc::clone(&self.walredo_mgr),
    2452            1302 :             resources,
    2453            1302 :             pg_version,
    2454            1302 :             initial_logical_size_can_start.cloned(),
    2455            1302 :             initial_logical_size_attempt.cloned().flatten(),
    2456            1302 :             state,
    2457            1302 :         );
    2458            1302 : 
    2459            1302 :         Ok(timeline)
    2460            1302 :     }
    2461                 : 
    2462                 :     // Allow too_many_arguments because a constructor's argument list naturally grows with the
    2463                 :     // number of attributes in the struct: breaking these out into a builder wouldn't be helpful.
    2464                 :     #[allow(clippy::too_many_arguments)]
    2465             748 :     fn new(
    2466             748 :         state: TenantState,
    2467             748 :         conf: &'static PageServerConf,
    2468             748 :         attached_conf: AttachedTenantConf,
    2469             748 :         walredo_mgr: Arc<WalRedoManager>,
    2470             748 :         tenant_id: TenantId,
    2471             748 :         remote_storage: Option<GenericRemoteStorage>,
    2472             748 :         deletion_queue_client: DeletionQueueClient,
    2473             748 :     ) -> Tenant {
    2474             748 :         let (state, mut rx) = watch::channel(state);
    2475             748 : 
    2476             748 :         tokio::spawn(async move {
    2477             748 :             let tid = tenant_id.to_string();
    2478             748 : 
    2479            2149 :             fn inspect_state(state: &TenantState) -> ([&'static str; 1], bool) {
    2480            2149 :                 ([state.into()], matches!(state, TenantState::Broken { .. }))
    2481            2149 :             }
    2482             748 : 
    2483             748 :             let mut tuple = inspect_state(&rx.borrow_and_update());
    2484             748 : 
    2485             748 :             let is_broken = tuple.1;
    2486             748 :             let mut counted_broken = if !is_broken {
    2487                 :                 // the tenant might be ignored and reloaded, so first remove any previous set
    2488                 :                 // element. it most likely has already been scraped, as these are manual operations
    2489                 :                 // right now. most likely we will add it back very soon.
    2490             748 :                 drop(crate::metrics::BROKEN_TENANTS_SET.remove_label_values(&[&tid]));
    2491             748 :                 false
    2492                 :             } else {
    2493                 :                 // add the id to the set right away, there should not be any updates on the channel
    2494                 :                 // after
    2495 UBC           0 :                 crate::metrics::BROKEN_TENANTS_SET
    2496               0 :                     .with_label_values(&[&tid])
    2497               0 :                     .set(1);
    2498               0 :                 true
    2499                 :             };
    2500                 : 
    2501 CBC        2149 :             loop {
    2502            2149 :                 let labels = &tuple.0;
    2503            2149 :                 let current = TENANT_STATE_METRIC.with_label_values(labels);
    2504            2149 :                 current.inc();
    2505            2149 : 
    2506            2149 :                 if rx.changed().await.is_err() {
    2507                 :                     // tenant has been dropped; decrement the counter because a tenant with that
    2508                 :                     // state is no longer in tenant map, but allow any broken set item to exist
    2509                 :                     // still.
    2510             101 :                     current.dec();
    2511             101 :                     break;
    2512            1401 :                 }
    2513            1401 : 
    2514            1401 :                 current.dec();
    2515            1401 :                 tuple = inspect_state(&rx.borrow_and_update());
    2516            1401 : 
    2517            1401 :                 let is_broken = tuple.1;
    2518            1401 :                 if is_broken && !counted_broken {
    2519              64 :                     counted_broken = true;
    2520              64 :                     // insert the tenant_id (back) into the set
    2521              64 :                     crate::metrics::BROKEN_TENANTS_SET
    2522              64 :                         .with_label_values(&[&tid])
    2523              64 :                         .inc();
    2524            1337 :                 }
    2525                 :             }
    2526             748 :         });
    2527             748 : 
    2528             748 :         Tenant {
    2529             748 :             tenant_id,
    2530             748 :             generation: attached_conf.location.generation,
    2531             748 :             conf,
    2532             748 :             // using now here is good enough approximation to catch tenants with really long
    2533             748 :             // activation times.
    2534             748 :             loading_started_at: Instant::now(),
    2535             748 :             tenant_conf: Arc::new(RwLock::new(attached_conf)),
    2536             748 :             timelines: Mutex::new(HashMap::new()),
    2537             748 :             gc_cs: tokio::sync::Mutex::new(()),
    2538             748 :             walredo_mgr,
    2539             748 :             remote_storage,
    2540             748 :             deletion_queue_client,
    2541             748 :             state,
    2542             748 :             cached_logical_sizes: tokio::sync::Mutex::new(HashMap::new()),
    2543             748 :             cached_synthetic_tenant_size: Arc::new(AtomicU64::new(0)),
    2544             748 :             eviction_task_tenant_state: tokio::sync::Mutex::new(EvictionTaskTenantState::default()),
    2545             748 :             delete_progress: Arc::new(tokio::sync::Mutex::new(DeleteTenantFlow::default())),
    2546             748 :         }
    2547             748 :     }
    2548                 : 
    2549                 :     /// Locate and load config
    2550             217 :     pub(super) fn load_tenant_config(
    2551             217 :         conf: &'static PageServerConf,
    2552             217 :         tenant_id: &TenantId,
    2553             217 :     ) -> anyhow::Result<LocationConf> {
    2554             217 :         let legacy_config_path = conf.tenant_config_path(tenant_id);
    2555             217 :         let config_path = conf.tenant_location_config_path(tenant_id);
    2556             217 : 
    2557             217 :         if config_path.exists() {
    2558                 :             // New-style config takes precedence
    2559              14 :             let deserialized = Self::read_config(&config_path)?;
    2560              14 :             Ok(toml_edit::de::from_document::<LocationConf>(deserialized)?)
    2561             203 :         } else if legacy_config_path.exists() {
    2562                 :             // Upgrade path: found an old-style configuration only
    2563             199 :             let deserialized = Self::read_config(&legacy_config_path)?;
    2564                 : 
    2565             199 :             let mut tenant_conf = TenantConfOpt::default();
    2566             199 :             for (key, item) in deserialized.iter() {
    2567             199 :                 match key {
    2568             199 :                     "tenant_config" => {
    2569             199 :                         tenant_conf = PageServerConf::parse_toml_tenant_conf(item).with_context(|| {
    2570 UBC           0 :                             format!("Failed to parse config from file '{legacy_config_path}' as pageserver config")
    2571 CBC         199 :                         })?;
    2572                 :                     }
    2573 UBC           0 :                     _ => bail!(
    2574               0 :                         "config file {legacy_config_path} has unrecognized pageserver option '{key}'"
    2575               0 :                     ),
    2576                 :                 }
    2577                 :             }
    2578                 : 
    2579                 :             // Legacy configs are implicitly in attached state
    2580 CBC         199 :             Ok(LocationConf::attached_single(
    2581             199 :                 tenant_conf,
    2582             199 :                 Generation::none(),
    2583             199 :             ))
    2584                 :         } else {
    2585                 :             // FIXME If the config file is not found, assume that we're attaching
    2586                 :             // a detached tenant and config is passed via attach command.
    2587                 :             // https://github.com/neondatabase/neon/issues/1555
    2588                 :             // OR: we're loading after incomplete deletion that managed to remove config.
    2589               4 :             info!(
    2590               4 :                 "tenant config not found in {} or {}",
    2591               4 :                 config_path, legacy_config_path
    2592               4 :             );
    2593               4 :             Ok(LocationConf::default())
    2594                 :         }
    2595             217 :     }
    2596                 : 
    2597             213 :     fn read_config(path: &Utf8Path) -> anyhow::Result<toml_edit::Document> {
    2598             213 :         info!("loading tenant configuration from {path}");
    2599                 : 
    2600                 :         // load and parse file
    2601             213 :         let config = fs::read_to_string(path)
    2602             213 :             .with_context(|| format!("Failed to load config from path '{path}'"))?;
    2603                 : 
    2604             213 :         config
    2605             213 :             .parse::<toml_edit::Document>()
    2606             213 :             .with_context(|| format!("Failed to parse config from file '{path}' as toml file"))
    2607             213 :     }
    2608                 : 
    2609             968 :     #[tracing::instrument(skip_all, fields(%tenant_id))]
    2610                 :     pub(super) async fn persist_tenant_config(
    2611                 :         conf: &'static PageServerConf,
    2612                 :         tenant_id: &TenantId,
    2613                 :         location_conf: &LocationConf,
    2614                 :     ) -> anyhow::Result<()> {
    2615                 :         let legacy_config_path = conf.tenant_config_path(tenant_id);
    2616                 :         let config_path = conf.tenant_location_config_path(tenant_id);
    2617                 :         Self::persist_tenant_config_at(tenant_id, &config_path, &legacy_config_path, location_conf)
    2618                 :             .await
    2619                 :     }
    2620                 : 
    2621            2229 :     #[tracing::instrument(skip_all, fields(%tenant_id))]
    2622                 :     pub(super) async fn persist_tenant_config_at(
    2623                 :         tenant_id: &TenantId,
    2624                 :         config_path: &Utf8Path,
    2625                 :         legacy_config_path: &Utf8Path,
    2626                 :         location_conf: &LocationConf,
    2627                 :     ) -> anyhow::Result<()> {
    2628                 :         // Forward compat: write out an old-style configuration that old versions can read, in case we roll back
    2629                 :         Self::persist_tenant_config_legacy(
    2630                 :             tenant_id,
    2631                 :             legacy_config_path,
    2632                 :             &location_conf.tenant_conf,
    2633                 :         )
    2634                 :         .await?;
    2635                 : 
    2636                 :         if let LocationMode::Attached(attach_conf) = &location_conf.mode {
    2637                 :             // Once we use LocationMode, generations are mandatory.  If we aren't using generations,
    2638                 :             // then drop out after writing legacy-style config.
    2639                 :             if attach_conf.generation.is_none() {
    2640 UBC           0 :                 tracing::debug!("Running without generations, not writing new-style LocationConf");
    2641                 :                 return Ok(());
    2642                 :             }
    2643                 :         }
    2644                 : 
    2645 CBC          27 :         info!("persisting tenantconf to {config_path}");
    2646                 : 
    2647                 :         let mut conf_content = r#"# This file contains a specific per-tenant's config.
    2648                 : #  It is read in case of pageserver restart.
    2649                 : "#
    2650                 :         .to_string();
    2651                 : 
    2652                 :         // Convert the config to a toml file.
    2653                 :         conf_content += &toml_edit::ser::to_string_pretty(&location_conf)?;
    2654                 : 
    2655                 :         let conf_content = conf_content.as_bytes();
    2656                 : 
    2657                 :         let temp_path = path_with_suffix_extension(config_path, TEMP_FILE_SUFFIX);
    2658                 :         VirtualFile::crashsafe_overwrite(config_path, &temp_path, conf_content)
    2659                 :             .await
    2660 UBC           0 :             .with_context(|| format!("write tenant {tenant_id} config to {config_path}"))?;
    2661                 :         Ok(())
    2662                 :     }
    2663                 : 
    2664 CBC        2202 :     #[tracing::instrument(skip_all, fields(%tenant_id))]
    2665                 :     async fn persist_tenant_config_legacy(
    2666                 :         tenant_id: &TenantId,
    2667                 :         target_config_path: &Utf8Path,
    2668                 :         tenant_conf: &TenantConfOpt,
    2669                 :     ) -> anyhow::Result<()> {
    2670             734 :         info!("persisting tenantconf to {target_config_path}");
    2671                 : 
    2672                 :         let mut conf_content = r#"# This file contains a specific per-tenant's config.
    2673                 : #  It is read in case of pageserver restart.
    2674                 : 
    2675                 : [tenant_config]
    2676                 : "#
    2677                 :         .to_string();
    2678                 : 
    2679                 :         // Convert the config to a toml file.
    2680                 :         conf_content += &toml_edit::ser::to_string(&tenant_conf)?;
    2681                 : 
    2682                 :         let conf_content = conf_content.as_bytes();
    2683                 : 
    2684                 :         let temp_path = path_with_suffix_extension(target_config_path, TEMP_FILE_SUFFIX);
    2685                 :         VirtualFile::crashsafe_overwrite(target_config_path, &temp_path, conf_content)
    2686                 :             .await
    2687 UBC           0 :             .with_context(|| format!("write tenant {tenant_id} config to {target_config_path}"))?;
    2688                 :         Ok(())
    2689                 :     }
    2690                 : 
    2691                 :     //
    2692                 :     // How garbage collection works:
    2693                 :     //
    2694                 :     //                    +--bar------------->
    2695                 :     //                   /
    2696                 :     //             +----+-----foo---------------->
    2697                 :     //            /
    2698                 :     // ----main--+-------------------------->
    2699                 :     //                \
    2700                 :     //                 +-----baz-------->
    2701                 :     //
    2702                 :     //
    2703                 :     // 1. Grab 'gc_cs' mutex to prevent new timelines from being created while Timeline's
    2704                 :     //    `gc_infos` are being refreshed
    2705                 :     // 2. Scan collected timelines, and on each timeline, make note of the
    2706                 :     //    all the points where other timelines have been branched off.
    2707                 :     //    We will refrain from removing page versions at those LSNs.
    2708                 :     // 3. For each timeline, scan all layer files on the timeline.
    2709                 :     //    Remove all files for which a newer file exists and which
    2710                 :     //    don't cover any branch point LSNs.
    2711                 :     //
    2712                 :     // TODO:
    2713                 :     // - if a relation has a non-incremental persistent layer on a child branch, then we
    2714                 :     //   don't need to keep that in the parent anymore. But currently
    2715                 :     //   we do.
    2716 CBC         406 :     async fn gc_iteration_internal(
    2717             406 :         &self,
    2718             406 :         target_timeline_id: Option<TimelineId>,
    2719             406 :         horizon: u64,
    2720             406 :         pitr: Duration,
    2721             406 :         ctx: &RequestContext,
    2722             406 :     ) -> anyhow::Result<GcResult> {
    2723             406 :         let mut totals: GcResult = Default::default();
    2724             406 :         let now = Instant::now();
    2725                 : 
    2726             406 :         let gc_timelines = self
    2727             406 :             .refresh_gc_info_internal(target_timeline_id, horizon, pitr, ctx)
    2728          336850 :             .await?;
    2729                 : 
    2730               3 :         crate::failpoint_support::sleep_millis_async!(
    2731               3 :             "gc_iteration_internal_after_getting_gc_timelines"
    2732               3 :         );
    2733                 : 
    2734                 :         // If there is nothing to GC, we don't want any messages in the INFO log.
    2735             405 :         if !gc_timelines.is_empty() {
    2736             402 :             info!("{} timelines need GC", gc_timelines.len());
    2737                 :         } else {
    2738 UBC           0 :             debug!("{} timelines need GC", gc_timelines.len());
    2739                 :         }
    2740                 : 
    2741                 :         // Perform GC for each timeline.
    2742                 :         //
    2743                 :         // Note that we don't hold the GC lock here because we don't want
    2744                 :         // to delay the branch creation task, which requires the GC lock.
    2745                 :         // A timeline GC iteration can be slow because it may need to wait for
    2746                 :         // compaction (both require `layer_removal_cs` lock),
    2747                 :         // but the GC iteration can run concurrently with branch creation.
    2748                 :         //
    2749                 :         // See comments in [`Tenant::branch_timeline`] for more information
    2750                 :         // about why branch creation task can run concurrently with timeline's GC iteration.
    2751 CBC         945 :         for timeline in gc_timelines {
    2752             546 :             if task_mgr::is_shutdown_requested() {
    2753                 :                 // We were requested to shut down. Stop and return with the progress we
    2754                 :                 // made.
    2755               1 :                 break;
    2756             545 :             }
    2757             545 :             let result = timeline.gc().await?;
    2758             540 :             totals += result;
    2759                 :         }
    2760                 : 
    2761             400 :         totals.elapsed = now.elapsed();
    2762             400 :         Ok(totals)
    2763             401 :     }
    2764                 : 
    2765                 :     /// Refreshes the Timeline::gc_info for all timelines, returning the
    2766                 :     /// vector of timelines which have [`Timeline::get_last_record_lsn`] past
    2767                 :     /// [`Tenant::get_gc_horizon`].
    2768                 :     ///
    2769                 :     /// This is usually executed as part of periodic gc, but can now be triggered more often.
    2770              78 :     pub async fn refresh_gc_info(
    2771              78 :         &self,
    2772              78 :         ctx: &RequestContext,
    2773              78 :     ) -> anyhow::Result<Vec<Arc<Timeline>>> {
    2774              78 :         // since this method can now be called at different rates than the configured gc loop, it
    2775              78 :         // might be that these configuration values get applied faster than what it was previously,
    2776              78 :         // since these were only read from the gc task.
    2777              78 :         let horizon = self.get_gc_horizon();
    2778              78 :         let pitr = self.get_pitr_interval();
    2779              78 : 
    2780              78 :         // refresh all timelines
    2781              78 :         let target_timeline_id = None;
    2782              78 : 
    2783              78 :         self.refresh_gc_info_internal(target_timeline_id, horizon, pitr, ctx)
    2784              20 :             .await
    2785              78 :     }
    2786                 : 
    2787             484 :     async fn refresh_gc_info_internal(
    2788             484 :         &self,
    2789             484 :         target_timeline_id: Option<TimelineId>,
    2790             484 :         horizon: u64,
    2791             484 :         pitr: Duration,
    2792             484 :         ctx: &RequestContext,
    2793             484 :     ) -> anyhow::Result<Vec<Arc<Timeline>>> {
    2794                 :         // grab mutex to prevent new timelines from being created here.
    2795             484 :         let gc_cs = self.gc_cs.lock().await;
    2796                 : 
    2797                 :         // Scan all timelines. For each timeline, remember the timeline ID and
    2798                 :         // the branch point where it was created.
    2799             483 :         let (all_branchpoints, timeline_ids): (BTreeSet<(TimelineId, Lsn)>, _) = {
    2800             484 :             let timelines = self.timelines.lock().unwrap();
    2801             484 :             let mut all_branchpoints = BTreeSet::new();
    2802             483 :             let timeline_ids = {
    2803             484 :                 if let Some(target_timeline_id) = target_timeline_id.as_ref() {
    2804             374 :                     if timelines.get(target_timeline_id).is_none() {
    2805               1 :                         bail!("gc target timeline does not exist")
    2806             373 :                     }
    2807             110 :                 };
    2808                 : 
    2809             483 :                 timelines
    2810             483 :                     .iter()
    2811             483 :                     .map(|(timeline_id, timeline_entry)| {
    2812             537 :                         if let Some(ancestor_timeline_id) =
    2813            1019 :                             &timeline_entry.get_ancestor_timeline_id()
    2814                 :                         {
    2815                 :                             // If target_timeline is specified, we only need to know branchpoints of its children
    2816             537 :                             if let Some(timeline_id) = target_timeline_id {
    2817             352 :                                 if ancestor_timeline_id == &timeline_id {
    2818               6 :                                     all_branchpoints.insert((
    2819               6 :                                         *ancestor_timeline_id,
    2820               6 :                                         timeline_entry.get_ancestor_lsn(),
    2821               6 :                                     ));
    2822             346 :                                 }
    2823                 :                             }
    2824                 :                             // Collect branchpoints for all timelines
    2825             185 :                             else {
    2826             185 :                                 all_branchpoints.insert((
    2827             185 :                                     *ancestor_timeline_id,
    2828             185 :                                     timeline_entry.get_ancestor_lsn(),
    2829             185 :                                 ));
    2830             185 :                             }
    2831             482 :                         }
    2832                 : 
    2833            1019 :                         *timeline_id
    2834            1019 :                     })
    2835             483 :                     .collect::<Vec<_>>()
    2836             483 :             };
    2837             483 :             (all_branchpoints, timeline_ids)
    2838             483 :         };
    2839             483 : 
    2840             483 :         // Ok, we now know all the branch points.
    2841             483 :         // Update the GC information for each timeline.
    2842             483 :         let mut gc_timelines = Vec::with_capacity(timeline_ids.len());
    2843            1502 :         for timeline_id in timeline_ids {
    2844                 :             // Timeline is known to be local and loaded.
    2845            1019 :             let timeline = self
    2846            1019 :                 .get_timeline(timeline_id, false)
    2847            1019 :                 .with_context(|| format!("Timeline {timeline_id} was not found"))?;
    2848                 : 
    2849                 :             // If target_timeline is specified, ignore all other timelines
    2850            1019 :             if let Some(target_timeline_id) = target_timeline_id {
    2851             726 :                 if timeline_id != target_timeline_id {
    2852             353 :                     continue;
    2853             373 :                 }
    2854             293 :             }
    2855                 : 
    2856             666 :             if let Some(cutoff) = timeline.get_last_record_lsn().checked_sub(horizon) {
    2857             602 :                 let branchpoints: Vec<Lsn> = all_branchpoints
    2858             602 :                     .range((
    2859             602 :                         Included((timeline_id, Lsn(0))),
    2860             602 :                         Included((timeline_id, Lsn(u64::MAX))),
    2861             602 :                     ))
    2862             602 :                     .map(|&x| x.1)
    2863             602 :                     .collect();
    2864             602 :                 timeline
    2865             602 :                     .update_gc_info(branchpoints, cutoff, pitr, ctx)
    2866          336870 :                     .await?;
    2867                 : 
    2868             602 :                 gc_timelines.push(timeline);
    2869              64 :             }
    2870                 :         }
    2871             483 :         drop(gc_cs);
    2872             483 :         Ok(gc_timelines)
    2873             484 :     }
    2874                 : 
    2875                 :     /// A substitute for `branch_timeline` for use in unit tests.
    2876                 :     /// The returned timeline will have state value `Active` to make various `anyhow::ensure!()`
    2877                 :     /// calls pass, but, we do not actually call `.activate()` under the hood. So, none of the
    2878                 :     /// timeline background tasks are launched, except the flush loop.
    2879                 :     #[cfg(test)]
    2880             107 :     async fn branch_timeline_test(
    2881             107 :         &self,
    2882             107 :         src_timeline: &Arc<Timeline>,
    2883             107 :         dst_id: TimelineId,
    2884             107 :         start_lsn: Option<Lsn>,
    2885             107 :         ctx: &RequestContext,
    2886             107 :     ) -> Result<Arc<Timeline>, CreateTimelineError> {
    2887             107 :         let tl = self
    2888             107 :             .branch_timeline_impl(src_timeline, dst_id, start_lsn, ctx)
    2889               2 :             .await?;
    2890             105 :         tl.set_state(TimelineState::Active);
    2891             105 :         Ok(tl)
    2892             107 :     }
    2893                 : 
    2894                 :     /// Branch an existing timeline.
    2895                 :     ///
    2896                 :     /// The caller is responsible for activating the returned timeline.
    2897             251 :     async fn branch_timeline(
    2898             251 :         &self,
    2899             251 :         src_timeline: &Arc<Timeline>,
    2900             251 :         dst_id: TimelineId,
    2901             251 :         start_lsn: Option<Lsn>,
    2902             251 :         ctx: &RequestContext,
    2903             251 :     ) -> Result<Arc<Timeline>, CreateTimelineError> {
    2904             251 :         self.branch_timeline_impl(src_timeline, dst_id, start_lsn, ctx)
    2905               4 :             .await
    2906             251 :     }
    2907                 : 
    2908             358 :     async fn branch_timeline_impl(
    2909             358 :         &self,
    2910             358 :         src_timeline: &Arc<Timeline>,
    2911             358 :         dst_id: TimelineId,
    2912             358 :         start_lsn: Option<Lsn>,
    2913             358 :         _ctx: &RequestContext,
    2914             358 :     ) -> Result<Arc<Timeline>, CreateTimelineError> {
    2915             358 :         let src_id = src_timeline.timeline_id;
    2916                 : 
    2917                 :         // First acquire the GC lock so that another task cannot advance the GC
    2918                 :         // cutoff in 'gc_info', and make 'start_lsn' invalid, while we are
    2919                 :         // creating the branch.
    2920             358 :         let _gc_cs = self.gc_cs.lock().await;
    2921                 : 
    2922                 :         // If no start LSN is specified, we branch the new timeline from the source timeline's last record LSN
    2923             358 :         let start_lsn = start_lsn.unwrap_or_else(|| {
    2924             218 :             let lsn = src_timeline.get_last_record_lsn();
    2925             218 :             info!("branching timeline {dst_id} from timeline {src_id} at last record LSN: {lsn}");
    2926             218 :             lsn
    2927             358 :         });
    2928                 : 
    2929                 :         // Create a placeholder for the new branch. This will error
    2930                 :         // out if the new timeline ID is already in use.
    2931             358 :         let timeline_uninit_mark = {
    2932             358 :             let timelines = self.timelines.lock().unwrap();
    2933             358 :             self.create_timeline_uninit_mark(dst_id, &timelines)?
    2934                 :         };
    2935                 : 
    2936                 :         // Ensure that `start_lsn` is valid, i.e. the LSN is within the PITR
    2937                 :         // horizon on the source timeline
    2938                 :         //
    2939                 :         // We check it against both the planned GC cutoff stored in 'gc_info',
    2940                 :         // and the 'latest_gc_cutoff' of the last GC that was performed.  The
    2941                 :         // planned GC cutoff in 'gc_info' is normally larger than
    2942                 :         // 'latest_gc_cutoff_lsn', but beware of corner cases like if you just
    2943                 :         // changed the GC settings for the tenant to make the PITR window
    2944                 :         // larger, but some of the data was already removed by an earlier GC
    2945                 :         // iteration.
    2946                 : 
    2947                 :         // check against last actual 'latest_gc_cutoff' first
    2948             358 :         let latest_gc_cutoff_lsn = src_timeline.get_latest_gc_cutoff_lsn();
    2949             358 :         src_timeline
    2950             358 :             .check_lsn_is_in_scope(start_lsn, &latest_gc_cutoff_lsn)
    2951             358 :             .context(format!(
    2952             358 :                 "invalid branch start lsn: less than latest GC cutoff {}",
    2953             358 :                 *latest_gc_cutoff_lsn,
    2954             358 :             ))
    2955             358 :             .map_err(CreateTimelineError::AncestorLsn)?;
    2956                 : 
    2957                 :         // and then the planned GC cutoff
    2958                 :         {
    2959             350 :             let gc_info = src_timeline.gc_info.read().unwrap();
    2960             350 :             let cutoff = min(gc_info.pitr_cutoff, gc_info.horizon_cutoff);
    2961             350 :             if start_lsn < cutoff {
    2962 UBC           0 :                 return Err(CreateTimelineError::AncestorLsn(anyhow::anyhow!(
    2963               0 :                     "invalid branch start lsn: less than planned GC cutoff {cutoff}"
    2964               0 :                 )));
    2965 CBC         350 :             }
    2966             350 :         }
    2967             350 : 
    2968             350 :         //
    2969             350 :         // The branch point is valid, and we are still holding the 'gc_cs' lock
    2970             350 :         // so that GC cannot advance the GC cutoff until we are finished.
    2971             350 :         // Proceed with the branch creation.
    2972             350 :         //
    2973             350 : 
    2974             350 :         // Determine prev-LSN for the new timeline. We can only determine it if
    2975             350 :         // the timeline was branched at the current end of the source timeline.
    2976             350 :         let RecordLsn {
    2977             350 :             last: src_last,
    2978             350 :             prev: src_prev,
    2979             350 :         } = src_timeline.get_last_record_rlsn();
    2980             350 :         let dst_prev = if src_last == start_lsn {
    2981             327 :             Some(src_prev)
    2982                 :         } else {
    2983              23 :             None
    2984                 :         };
    2985                 : 
    2986                 :         // Create the metadata file, noting the ancestor of the new timeline.
    2987                 :         // There is initially no data in it, but all the read-calls know to look
    2988                 :         // into the ancestor.
    2989             350 :         let metadata = TimelineMetadata::new(
    2990             350 :             start_lsn,
    2991             350 :             dst_prev,
    2992             350 :             Some(src_id),
    2993             350 :             start_lsn,
    2994             350 :             *src_timeline.latest_gc_cutoff_lsn.read(), // FIXME: should we hold onto this guard longer?
    2995             350 :             src_timeline.initdb_lsn,
    2996             350 :             src_timeline.pg_version,
    2997             350 :         );
    2998                 : 
    2999             350 :         let uninitialized_timeline = self
    3000             350 :             .prepare_new_timeline(
    3001             350 :                 dst_id,
    3002             350 :                 &metadata,
    3003             350 :                 timeline_uninit_mark,
    3004             350 :                 start_lsn + 1,
    3005             350 :                 Some(Arc::clone(src_timeline)),
    3006             350 :             )
    3007 UBC           0 :             .await?;
    3008                 : 
    3009 CBC         350 :         let new_timeline = uninitialized_timeline.finish_creation()?;
    3010                 : 
    3011                 :         // Root timeline gets its layers during creation and uploads them along with the metadata.
    3012                 :         // A branch timeline though, when created, can get no writes for some time, hence won't get any layers created.
    3013                 :         // We still need to upload its metadata eagerly: if other nodes `attach` the tenant and miss this timeline, their GC
    3014                 :         // could get incorrect information and remove more layers, than needed.
    3015                 :         // See also https://github.com/neondatabase/neon/issues/3865
    3016             350 :         if let Some(remote_client) = new_timeline.remote_client.as_ref() {
    3017             350 :             remote_client
    3018             350 :                 .schedule_index_upload_for_metadata_update(&metadata)
    3019             350 :                 .context("branch initial metadata upload")?;
    3020 UBC           0 :         }
    3021                 : 
    3022 CBC         350 :         info!("branched timeline {dst_id} from {src_id} at {start_lsn}");
    3023                 : 
    3024             350 :         Ok(new_timeline)
    3025             358 :     }
    3026                 : 
    3027                 :     /// - run initdb to init temporary instance and get bootstrap data
    3028                 :     /// - after initialization complete, remove the temp dir.
    3029                 :     ///
    3030                 :     /// The caller is responsible for activating the returned timeline.
    3031             575 :     async fn bootstrap_timeline(
    3032             575 :         &self,
    3033             575 :         timeline_id: TimelineId,
    3034             575 :         pg_version: u32,
    3035             575 :         ctx: &RequestContext,
    3036             575 :     ) -> anyhow::Result<Arc<Timeline>> {
    3037             575 :         let timeline_uninit_mark = {
    3038             575 :             let timelines = self.timelines.lock().unwrap();
    3039             575 :             self.create_timeline_uninit_mark(timeline_id, &timelines)?
    3040                 :         };
    3041                 :         // create a `tenant/{tenant_id}/timelines/basebackup-{timeline_id}.{TEMP_FILE_SUFFIX}/`
    3042                 :         // temporary directory for basebackup files for the given timeline.
    3043             575 :         let initdb_path = path_with_suffix_extension(
    3044             575 :             self.conf
    3045             575 :                 .timelines_path(&self.tenant_id)
    3046             575 :                 .join(format!("basebackup-{timeline_id}")),
    3047             575 :             TEMP_FILE_SUFFIX,
    3048             575 :         );
    3049             575 : 
    3050             575 :         // an uninit mark was placed before, nothing else can access this timeline files
    3051             575 :         // current initdb was not run yet, so remove whatever was left from the previous runs
    3052             575 :         if initdb_path.exists() {
    3053 UBC           0 :             fs::remove_dir_all(&initdb_path).with_context(|| {
    3054               0 :                 format!("Failed to remove already existing initdb directory: {initdb_path}")
    3055               0 :             })?;
    3056 CBC         575 :         }
    3057                 :         // Init temporarily repo to get bootstrap data, this creates a directory in the `initdb_path` path
    3058             575 :         run_initdb(self.conf, &initdb_path, pg_version)?;
    3059                 :         // this new directory is very temporary, set to remove it immediately after bootstrap, we don't need it
    3060             575 :         scopeguard::defer! {
    3061             575 :             if let Err(e) = fs::remove_dir_all(&initdb_path) {
    3062                 :                 // this is unlikely, but we will remove the directory on pageserver restart or another bootstrap call
    3063 UBC           0 :                 error!("Failed to remove temporary initdb directory '{initdb_path}': {e}");
    3064 CBC         575 :             }
    3065                 :         }
    3066             575 :         let pgdata_path = &initdb_path;
    3067             575 :         let pgdata_lsn = import_datadir::get_lsn_from_controlfile(pgdata_path)?.align();
    3068             575 : 
    3069             575 :         // Import the contents of the data directory at the initial checkpoint
    3070             575 :         // LSN, and any WAL after that.
    3071             575 :         // Initdb lsn will be equal to last_record_lsn which will be set after import.
    3072             575 :         // Because we know it upfront avoid having an option or dummy zero value by passing it to the metadata.
    3073             575 :         let new_metadata = TimelineMetadata::new(
    3074             575 :             Lsn(0),
    3075             575 :             None,
    3076             575 :             None,
    3077             575 :             Lsn(0),
    3078             575 :             pgdata_lsn,
    3079             575 :             pgdata_lsn,
    3080             575 :             pg_version,
    3081             575 :         );
    3082             575 :         let raw_timeline = self
    3083             575 :             .prepare_new_timeline(
    3084             575 :                 timeline_id,
    3085             575 :                 &new_metadata,
    3086             575 :                 timeline_uninit_mark,
    3087             575 :                 pgdata_lsn,
    3088             575 :                 None,
    3089             575 :             )
    3090               1 :             .await?;
    3091                 : 
    3092             574 :         let tenant_id = raw_timeline.owning_tenant.tenant_id;
    3093             574 :         let unfinished_timeline = raw_timeline.raw_timeline()?;
    3094                 : 
    3095             574 :         import_datadir::import_timeline_from_postgres_datadir(
    3096             574 :             unfinished_timeline,
    3097             574 :             pgdata_path,
    3098             574 :             pgdata_lsn,
    3099             574 :             ctx,
    3100             574 :         )
    3101         2889661 :         .await
    3102             574 :         .with_context(|| {
    3103 UBC           0 :             format!("Failed to import pgdatadir for timeline {tenant_id}/{timeline_id}")
    3104 CBC         574 :         })?;
    3105                 : 
    3106                 :         // Flush the new layer files to disk, before we make the timeline as available to
    3107                 :         // the outside world.
    3108                 :         //
    3109                 :         // Flush loop needs to be spawned in order to be able to flush.
    3110             574 :         unfinished_timeline.maybe_spawn_flush_loop();
    3111             574 : 
    3112             574 :         fail::fail_point!("before-checkpoint-new-timeline", |_| {
    3113               1 :             anyhow::bail!("failpoint before-checkpoint-new-timeline");
    3114             574 :         });
    3115                 : 
    3116             573 :         unfinished_timeline
    3117             573 :             .freeze_and_flush()
    3118             573 :             .await
    3119             573 :             .with_context(|| {
    3120 UBC           0 :                 format!(
    3121               0 :                     "Failed to flush after pgdatadir import for timeline {tenant_id}/{timeline_id}"
    3122               0 :                 )
    3123 CBC         573 :             })?;
    3124                 : 
    3125                 :         // All done!
    3126             573 :         let timeline = raw_timeline.finish_creation()?;
    3127                 : 
    3128             573 :         info!(
    3129             573 :             "created root timeline {} timeline.lsn {}",
    3130             573 :             timeline_id,
    3131             573 :             timeline.get_last_record_lsn()
    3132             573 :         );
    3133                 : 
    3134             573 :         Ok(timeline)
    3135             575 :     }
    3136                 : 
    3137                 :     /// Call this before constructing a timeline, to build its required structures
    3138            1249 :     fn build_timeline_resources(&self, timeline_id: TimelineId) -> TimelineResources {
    3139            1249 :         let remote_client = if let Some(remote_storage) = self.remote_storage.as_ref() {
    3140            1249 :             let remote_client = RemoteTimelineClient::new(
    3141            1249 :                 remote_storage.clone(),
    3142            1249 :                 self.deletion_queue_client.clone(),
    3143            1249 :                 self.conf,
    3144            1249 :                 self.tenant_id,
    3145            1249 :                 timeline_id,
    3146            1249 :                 self.generation,
    3147            1249 :             );
    3148            1249 :             Some(remote_client)
    3149                 :         } else {
    3150 UBC           0 :             None
    3151                 :         };
    3152                 : 
    3153 CBC        1249 :         TimelineResources {
    3154            1249 :             remote_client,
    3155            1249 :             deletion_queue_client: self.deletion_queue_client.clone(),
    3156            1249 :         }
    3157            1249 :     }
    3158                 : 
    3159                 :     /// Creates intermediate timeline structure and its files.
    3160                 :     ///
    3161                 :     /// An empty layer map is initialized, and new data and WAL can be imported starting
    3162                 :     /// at 'disk_consistent_lsn'. After any initial data has been imported, call
    3163                 :     /// `finish_creation` to insert the Timeline into the timelines map and to remove the
    3164                 :     /// uninit mark file.
    3165             967 :     async fn prepare_new_timeline(
    3166             967 :         &self,
    3167             967 :         new_timeline_id: TimelineId,
    3168             967 :         new_metadata: &TimelineMetadata,
    3169             967 :         uninit_mark: TimelineUninitMark,
    3170             967 :         start_lsn: Lsn,
    3171             967 :         ancestor: Option<Arc<Timeline>>,
    3172             967 :     ) -> anyhow::Result<UninitializedTimeline> {
    3173             967 :         let tenant_id = self.tenant_id;
    3174             967 : 
    3175             967 :         let resources = self.build_timeline_resources(new_timeline_id);
    3176             967 :         if let Some(remote_client) = &resources.remote_client {
    3177             967 :             remote_client.init_upload_queue_for_empty_remote(new_metadata)?;
    3178 UBC           0 :         }
    3179                 : 
    3180 CBC         967 :         let timeline_struct = self
    3181             967 :             .create_timeline_struct(
    3182             967 :                 new_timeline_id,
    3183             967 :                 new_metadata,
    3184             967 :                 ancestor,
    3185             967 :                 resources,
    3186             967 :                 None,
    3187             967 :                 CreateTimelineCause::Load,
    3188             967 :             )
    3189             967 :             .context("Failed to create timeline data structure")?;
    3190                 : 
    3191             967 :         timeline_struct.init_empty_layer_map(start_lsn);
    3192                 : 
    3193             967 :         if let Err(e) = self
    3194             967 :             .create_timeline_files(&uninit_mark.timeline_path, &new_timeline_id, new_metadata)
    3195 UBC           0 :             .await
    3196                 :         {
    3197 CBC           1 :             error!("Failed to create initial files for timeline {tenant_id}/{new_timeline_id}, cleaning up: {e:?}");
    3198               1 :             cleanup_timeline_directory(uninit_mark);
    3199               1 :             return Err(e);
    3200             966 :         }
    3201                 : 
    3202 UBC           0 :         debug!("Successfully created initial files for timeline {tenant_id}/{new_timeline_id}");
    3203                 : 
    3204 CBC         966 :         Ok(UninitializedTimeline::new(
    3205             966 :             self,
    3206             966 :             new_timeline_id,
    3207             966 :             Some((timeline_struct, uninit_mark)),
    3208             966 :         ))
    3209             967 :     }
    3210                 : 
    3211             967 :     async fn create_timeline_files(
    3212             967 :         &self,
    3213             967 :         timeline_path: &Utf8Path,
    3214             967 :         new_timeline_id: &TimelineId,
    3215             967 :         new_metadata: &TimelineMetadata,
    3216             967 :     ) -> anyhow::Result<()> {
    3217             967 :         crashsafe::create_dir(timeline_path).context("Failed to create timeline directory")?;
    3218                 : 
    3219             967 :         fail::fail_point!("after-timeline-uninit-mark-creation", |_| {
    3220               1 :             anyhow::bail!("failpoint after-timeline-uninit-mark-creation");
    3221             967 :         });
    3222                 : 
    3223             966 :         save_metadata(self.conf, &self.tenant_id, new_timeline_id, new_metadata)
    3224 UBC           0 :             .await
    3225 CBC         966 :             .context("Failed to create timeline metadata")?;
    3226             966 :         Ok(())
    3227             967 :     }
    3228                 : 
    3229                 :     /// Attempts to create an uninit mark file for the timeline initialization.
    3230                 :     /// Bails, if the timeline is already loaded into the memory (i.e. initialized before), or the uninit mark file already exists.
    3231                 :     ///
    3232                 :     /// This way, we need to hold the timelines lock only for small amount of time during the mark check/creation per timeline init.
    3233             976 :     fn create_timeline_uninit_mark(
    3234             976 :         &self,
    3235             976 :         timeline_id: TimelineId,
    3236             976 :         timelines: &MutexGuard<HashMap<TimelineId, Arc<Timeline>>>,
    3237             976 :     ) -> anyhow::Result<TimelineUninitMark> {
    3238             976 :         let tenant_id = self.tenant_id;
    3239             976 : 
    3240             976 :         anyhow::ensure!(
    3241             976 :             timelines.get(&timeline_id).is_none(),
    3242               1 :             "Timeline {tenant_id}/{timeline_id} already exists in pageserver's memory"
    3243                 :         );
    3244             975 :         let timeline_path = self.conf.timeline_path(&tenant_id, &timeline_id);
    3245             975 :         anyhow::ensure!(
    3246             975 :             !timeline_path.exists(),
    3247 UBC           0 :             "Timeline {timeline_path} already exists, cannot create its uninit mark file",
    3248                 :         );
    3249                 : 
    3250 CBC         975 :         let uninit_mark_path = self
    3251             975 :             .conf
    3252             975 :             .timeline_uninit_mark_file_path(tenant_id, timeline_id);
    3253             975 :         fs::File::create(&uninit_mark_path)
    3254             975 :             .context("Failed to create uninit mark file")
    3255             975 :             .and_then(|_| {
    3256             975 :                 crashsafe::fsync_file_and_parent(&uninit_mark_path)
    3257             975 :                     .context("Failed to fsync uninit mark file")
    3258             975 :             })
    3259             975 :             .with_context(|| {
    3260 UBC           0 :                 format!("Failed to crate uninit mark for timeline {tenant_id}/{timeline_id}")
    3261 CBC         975 :             })?;
    3262                 : 
    3263             975 :         let uninit_mark = TimelineUninitMark::new(uninit_mark_path, timeline_path);
    3264             975 : 
    3265             975 :         Ok(uninit_mark)
    3266             976 :     }
    3267                 : 
    3268                 :     /// Gathers inputs from all of the timelines to produce a sizing model input.
    3269                 :     ///
    3270                 :     /// Future is cancellation safe. Only one calculation can be running at once per tenant.
    3271             288 :     #[instrument(skip_all, fields(tenant_id=%self.tenant_id))]
    3272                 :     pub async fn gather_size_inputs(
    3273                 :         &self,
    3274                 :         // `max_retention_period` overrides the cutoff that is used to calculate the size
    3275                 :         // (only if it is shorter than the real cutoff).
    3276                 :         max_retention_period: Option<u64>,
    3277                 :         cause: LogicalSizeCalculationCause,
    3278                 :         ctx: &RequestContext,
    3279                 :     ) -> anyhow::Result<size::ModelInputs> {
    3280                 :         let logical_sizes_at_once = self
    3281                 :             .conf
    3282                 :             .concurrent_tenant_size_logical_size_queries
    3283                 :             .inner();
    3284                 : 
    3285                 :         // TODO: Having a single mutex block concurrent reads is not great for performance.
    3286                 :         //
    3287                 :         // But the only case where we need to run multiple of these at once is when we
    3288                 :         // request a size for a tenant manually via API, while another background calculation
    3289                 :         // is in progress (which is not a common case).
    3290                 :         //
    3291                 :         // See more for on the issue #2748 condenced out of the initial PR review.
    3292                 :         let mut shared_cache = self.cached_logical_sizes.lock().await;
    3293                 : 
    3294                 :         size::gather_inputs(
    3295                 :             self,
    3296                 :             logical_sizes_at_once,
    3297                 :             max_retention_period,
    3298                 :             &mut shared_cache,
    3299                 :             cause,
    3300                 :             ctx,
    3301                 :         )
    3302                 :         .await
    3303                 :     }
    3304                 : 
    3305                 :     /// Calculate synthetic tenant size and cache the result.
    3306                 :     /// This is periodically called by background worker.
    3307                 :     /// result is cached in tenant struct
    3308              76 :     #[instrument(skip_all, fields(tenant_id=%self.tenant_id))]
    3309                 :     pub async fn calculate_synthetic_size(
    3310                 :         &self,
    3311                 :         cause: LogicalSizeCalculationCause,
    3312                 :         ctx: &RequestContext,
    3313                 :     ) -> anyhow::Result<u64> {
    3314                 :         let inputs = self.gather_size_inputs(None, cause, ctx).await?;
    3315                 : 
    3316                 :         let size = inputs.calculate()?;
    3317                 : 
    3318                 :         self.set_cached_synthetic_size(size);
    3319                 : 
    3320                 :         Ok(size)
    3321                 :     }
    3322                 : 
    3323                 :     /// Cache given synthetic size and update the metric value
    3324              19 :     pub fn set_cached_synthetic_size(&self, size: u64) {
    3325              19 :         self.cached_synthetic_tenant_size
    3326              19 :             .store(size, Ordering::Relaxed);
    3327              19 : 
    3328              19 :         TENANT_SYNTHETIC_SIZE_METRIC
    3329              19 :             .get_metric_with_label_values(&[&self.tenant_id.to_string()])
    3330              19 :             .unwrap()
    3331              19 :             .set(size);
    3332              19 :     }
    3333                 : 
    3334              15 :     pub fn cached_synthetic_size(&self) -> u64 {
    3335              15 :         self.cached_synthetic_tenant_size.load(Ordering::Relaxed)
    3336              15 :     }
    3337                 : }
    3338                 : 
    3339                 : fn remove_timeline_and_uninit_mark(
    3340                 :     timeline_dir: &Utf8Path,
    3341                 :     uninit_mark: &Utf8Path,
    3342                 : ) -> anyhow::Result<()> {
    3343               1 :     fs::remove_dir_all(timeline_dir)
    3344               1 :         .or_else(|e| {
    3345 UBC           0 :             if e.kind() == std::io::ErrorKind::NotFound {
    3346                 :                 // we can leave the uninit mark without a timeline dir,
    3347                 :                 // just remove the mark then
    3348               0 :                 Ok(())
    3349                 :             } else {
    3350               0 :                 Err(e)
    3351                 :             }
    3352 CBC           1 :         })
    3353               1 :         .with_context(|| {
    3354 UBC           0 :             format!("Failed to remove unit marked timeline directory {timeline_dir}")
    3355 CBC           1 :         })?;
    3356               1 :     fs::remove_file(uninit_mark)
    3357               1 :         .with_context(|| format!("Failed to remove timeline uninit mark file {uninit_mark}"))?;
    3358                 : 
    3359               1 :     Ok(())
    3360               1 : }
    3361                 : 
    3362                 : pub(crate) enum CreateTenantFilesMode {
    3363                 :     Create,
    3364                 :     Attach,
    3365                 : }
    3366                 : 
    3367             494 : pub(crate) async fn create_tenant_files(
    3368             494 :     conf: &'static PageServerConf,
    3369             494 :     location_conf: &LocationConf,
    3370             494 :     tenant_id: &TenantId,
    3371             494 :     mode: CreateTenantFilesMode,
    3372             494 : ) -> anyhow::Result<Utf8PathBuf> {
    3373             494 :     let target_tenant_directory = conf.tenant_path(tenant_id);
    3374             494 :     anyhow::ensure!(
    3375             494 :         !target_tenant_directory
    3376             494 :             .try_exists()
    3377             494 :             .context("check existence of tenant directory")?,
    3378               2 :         "tenant directory already exists",
    3379                 :     );
    3380                 : 
    3381             492 :     let temporary_tenant_dir =
    3382             492 :         path_with_suffix_extension(&target_tenant_directory, TEMP_FILE_SUFFIX);
    3383 UBC           0 :     debug!("Creating temporary directory structure in {temporary_tenant_dir}");
    3384                 : 
    3385                 :     // top-level dir may exist if we are creating it through CLI
    3386 CBC         492 :     crashsafe::create_dir_all(&temporary_tenant_dir).with_context(|| {
    3387 UBC           0 :         format!("could not create temporary tenant directory {temporary_tenant_dir}")
    3388 CBC         492 :     })?;
    3389                 : 
    3390             492 :     let creation_result = try_create_target_tenant_dir(
    3391             492 :         conf,
    3392             492 :         location_conf,
    3393             492 :         tenant_id,
    3394             492 :         mode,
    3395             492 :         &temporary_tenant_dir,
    3396             492 :         &target_tenant_directory,
    3397             492 :     )
    3398 UBC           0 :     .await;
    3399                 : 
    3400 CBC         492 :     if creation_result.is_err() {
    3401               1 :         error!("Failed to create directory structure for tenant {tenant_id}, cleaning tmp data");
    3402               1 :         if let Err(e) = fs::remove_dir_all(&temporary_tenant_dir) {
    3403 UBC           0 :             error!("Failed to remove temporary tenant directory {temporary_tenant_dir:?}: {e}")
    3404 CBC           1 :         } else if let Err(e) = crashsafe::fsync(&temporary_tenant_dir) {
    3405               1 :             error!(
    3406               1 :                 "Failed to fsync removed temporary tenant directory {temporary_tenant_dir:?}: {e}"
    3407               1 :             )
    3408 UBC           0 :         }
    3409 CBC         491 :     }
    3410                 : 
    3411             492 :     creation_result?;
    3412                 : 
    3413             491 :     Ok(target_tenant_directory)
    3414             494 : }
    3415                 : 
    3416             492 : async fn try_create_target_tenant_dir(
    3417             492 :     conf: &'static PageServerConf,
    3418             492 :     location_conf: &LocationConf,
    3419             492 :     tenant_id: &TenantId,
    3420             492 :     mode: CreateTenantFilesMode,
    3421             492 :     temporary_tenant_dir: &Utf8Path,
    3422             492 :     target_tenant_directory: &Utf8Path,
    3423             492 : ) -> Result<(), anyhow::Error> {
    3424             492 :     match mode {
    3425             450 :         CreateTenantFilesMode::Create => {} // needs no attach marker, writing tenant conf + atomic rename of dir is good enough
    3426                 :         CreateTenantFilesMode::Attach => {
    3427              42 :             let attach_marker_path = temporary_tenant_dir.join(TENANT_ATTACHING_MARKER_FILENAME);
    3428              42 :             let file = std::fs::OpenOptions::new()
    3429              42 :                 .create_new(true)
    3430              42 :                 .write(true)
    3431              42 :                 .open(&attach_marker_path)
    3432              42 :                 .with_context(|| {
    3433 UBC           0 :                     format!("could not create attach marker file {attach_marker_path:?}")
    3434 CBC          42 :                 })?;
    3435              42 :             file.sync_all().with_context(|| {
    3436 UBC           0 :                 format!("could not sync attach marker file: {attach_marker_path:?}")
    3437 CBC          42 :             })?;
    3438                 :             // fsync of the directory in which the file resides comes later in this function
    3439                 :         }
    3440                 :     }
    3441                 : 
    3442             492 :     let temporary_tenant_timelines_dir = rebase_directory(
    3443             492 :         &conf.timelines_path(tenant_id),
    3444             492 :         target_tenant_directory,
    3445             492 :         temporary_tenant_dir,
    3446             492 :     )
    3447             492 :     .with_context(|| format!("resolve tenant {tenant_id} temporary timelines dir"))?;
    3448             492 :     let temporary_legacy_tenant_config_path = rebase_directory(
    3449             492 :         &conf.tenant_config_path(tenant_id),
    3450             492 :         target_tenant_directory,
    3451             492 :         temporary_tenant_dir,
    3452             492 :     )
    3453             492 :     .with_context(|| format!("resolve tenant {tenant_id} temporary config path"))?;
    3454             492 :     let temporary_tenant_config_path = rebase_directory(
    3455             492 :         &conf.tenant_location_config_path(tenant_id),
    3456             492 :         target_tenant_directory,
    3457             492 :         temporary_tenant_dir,
    3458             492 :     )
    3459             492 :     .with_context(|| format!("resolve tenant {tenant_id} temporary config path"))?;
    3460                 : 
    3461             492 :     Tenant::persist_tenant_config_at(
    3462             492 :         tenant_id,
    3463             492 :         &temporary_tenant_config_path,
    3464             492 :         &temporary_legacy_tenant_config_path,
    3465             492 :         location_conf,
    3466             492 :     )
    3467 UBC           0 :     .await?;
    3468                 : 
    3469 CBC         492 :     crashsafe::create_dir(&temporary_tenant_timelines_dir).with_context(|| {
    3470 UBC           0 :         format!(
    3471               0 :             "create tenant {} temporary timelines directory {}",
    3472               0 :             tenant_id, temporary_tenant_timelines_dir,
    3473               0 :         )
    3474 CBC         492 :     })?;
    3475             492 :     fail::fail_point!("tenant-creation-before-tmp-rename", |_| {
    3476               1 :         anyhow::bail!("failpoint tenant-creation-before-tmp-rename");
    3477             492 :     });
    3478                 : 
    3479                 :     // Make sure the current tenant directory entries are durable before renaming.
    3480                 :     // Without this, a crash may reorder any of the directory entry creations above.
    3481             491 :     crashsafe::fsync(temporary_tenant_dir)
    3482             491 :         .with_context(|| format!("sync temporary tenant directory {temporary_tenant_dir:?}"))?;
    3483                 : 
    3484             491 :     fs::rename(temporary_tenant_dir, target_tenant_directory).with_context(|| {
    3485 UBC           0 :         format!(
    3486               0 :             "move tenant {} temporary directory {} into the permanent one {}",
    3487               0 :             tenant_id, temporary_tenant_dir, target_tenant_directory
    3488               0 :         )
    3489 CBC         491 :     })?;
    3490             491 :     let target_dir_parent = target_tenant_directory.parent().with_context(|| {
    3491 UBC           0 :         format!(
    3492               0 :             "get tenant {} dir parent for {}",
    3493               0 :             tenant_id, target_tenant_directory,
    3494               0 :         )
    3495 CBC         491 :     })?;
    3496             491 :     crashsafe::fsync(target_dir_parent).with_context(|| {
    3497 UBC           0 :         format!(
    3498               0 :             "fsync renamed directory's parent {} for tenant {}",
    3499               0 :             target_dir_parent, tenant_id,
    3500               0 :         )
    3501 CBC         491 :     })?;
    3502                 : 
    3503             491 :     Ok(())
    3504             492 : }
    3505                 : 
    3506            1476 : fn rebase_directory(
    3507            1476 :     original_path: &Utf8Path,
    3508            1476 :     base: &Utf8Path,
    3509            1476 :     new_base: &Utf8Path,
    3510            1476 : ) -> anyhow::Result<Utf8PathBuf> {
    3511            1476 :     let relative_path = original_path.strip_prefix(base).with_context(|| {
    3512 UBC           0 :         format!(
    3513               0 :             "Failed to strip base prefix '{}' off path '{}'",
    3514               0 :             base, original_path
    3515               0 :         )
    3516 CBC        1476 :     })?;
    3517            1476 :     Ok(new_base.join(relative_path))
    3518            1476 : }
    3519                 : 
    3520                 : /// Create the cluster temporarily in 'initdbpath' directory inside the repository
    3521                 : /// to get bootstrap data for timeline initialization.
    3522             575 : fn run_initdb(
    3523             575 :     conf: &'static PageServerConf,
    3524             575 :     initdb_target_dir: &Utf8Path,
    3525             575 :     pg_version: u32,
    3526             575 : ) -> anyhow::Result<()> {
    3527             575 :     let initdb_bin_path = conf.pg_bin_dir(pg_version)?.join("initdb");
    3528             575 :     let initdb_lib_dir = conf.pg_lib_dir(pg_version)?;
    3529             575 :     info!(
    3530             575 :         "running {} in {}, libdir: {}",
    3531             575 :         initdb_bin_path, initdb_target_dir, initdb_lib_dir,
    3532             575 :     );
    3533                 : 
    3534             575 :     let initdb_output = Command::new(&initdb_bin_path)
    3535             575 :         .args(["-D", initdb_target_dir.as_ref()])
    3536             575 :         .args(["-U", &conf.superuser])
    3537             575 :         .args(["-E", "utf8"])
    3538             575 :         .arg("--no-instructions")
    3539             575 :         // This is only used for a temporary installation that is deleted shortly after,
    3540             575 :         // so no need to fsync it
    3541             575 :         .arg("--no-sync")
    3542             575 :         .env_clear()
    3543             575 :         .env("LD_LIBRARY_PATH", &initdb_lib_dir)
    3544             575 :         .env("DYLD_LIBRARY_PATH", &initdb_lib_dir)
    3545             575 :         .stdout(Stdio::null())
    3546             575 :         .output()
    3547             575 :         .with_context(|| {
    3548 UBC           0 :             format!(
    3549               0 :                 "failed to execute {} at target dir {}",
    3550               0 :                 initdb_bin_path, initdb_target_dir,
    3551               0 :             )
    3552 CBC         575 :         })?;
    3553             575 :     if !initdb_output.status.success() {
    3554 UBC           0 :         bail!(
    3555               0 :             "initdb failed: '{}'",
    3556               0 :             String::from_utf8_lossy(&initdb_output.stderr)
    3557               0 :         );
    3558 CBC         575 :     }
    3559             575 : 
    3560             575 :     Ok(())
    3561             575 : }
    3562                 : 
    3563                 : impl Drop for Tenant {
    3564             139 :     fn drop(&mut self) {
    3565             139 :         remove_tenant_metrics(&self.tenant_id);
    3566             139 :     }
    3567                 : }
    3568                 : /// Dump contents of a layer file to stdout.
    3569 UBC           0 : pub async fn dump_layerfile_from_path(
    3570               0 :     path: &Utf8Path,
    3571               0 :     verbose: bool,
    3572               0 :     ctx: &RequestContext,
    3573               0 : ) -> anyhow::Result<()> {
    3574                 :     use std::os::unix::fs::FileExt;
    3575                 : 
    3576                 :     // All layer files start with a two-byte "magic" value, to identify the kind of
    3577                 :     // file.
    3578               0 :     let file = File::open(path)?;
    3579               0 :     let mut header_buf = [0u8; 2];
    3580               0 :     file.read_exact_at(&mut header_buf, 0)?;
    3581                 : 
    3582               0 :     match u16::from_be_bytes(header_buf) {
    3583                 :         crate::IMAGE_FILE_MAGIC => {
    3584               0 :             ImageLayer::new_for_path(path, file)?
    3585               0 :                 .dump(verbose, ctx)
    3586               0 :                 .await?
    3587                 :         }
    3588                 :         crate::DELTA_FILE_MAGIC => {
    3589               0 :             DeltaLayer::new_for_path(path, file)?
    3590               0 :                 .dump(verbose, ctx)
    3591               0 :                 .await?
    3592                 :         }
    3593               0 :         magic => bail!("unrecognized magic identifier: {:?}", magic),
    3594                 :     }
    3595                 : 
    3596               0 :     Ok(())
    3597               0 : }
    3598                 : 
    3599                 : #[cfg(test)]
    3600                 : pub(crate) mod harness {
    3601                 :     use bytes::{Bytes, BytesMut};
    3602                 :     use once_cell::sync::OnceCell;
    3603                 :     use std::fs;
    3604                 :     use std::sync::Arc;
    3605                 :     use utils::logging;
    3606                 :     use utils::lsn::Lsn;
    3607                 : 
    3608                 :     use crate::deletion_queue::mock::MockDeletionQueue;
    3609                 :     use crate::{
    3610                 :         config::PageServerConf, repository::Key, tenant::Tenant, walrecord::NeonWalRecord,
    3611                 :     };
    3612                 : 
    3613                 :     use super::*;
    3614                 :     use crate::tenant::config::{TenantConf, TenantConfOpt};
    3615                 :     use hex_literal::hex;
    3616                 :     use utils::id::{TenantId, TimelineId};
    3617                 : 
    3618                 :     pub const TIMELINE_ID: TimelineId =
    3619                 :         TimelineId::from_array(hex!("11223344556677881122334455667788"));
    3620                 :     pub const NEW_TIMELINE_ID: TimelineId =
    3621                 :         TimelineId::from_array(hex!("AA223344556677881122334455667788"));
    3622                 : 
    3623                 :     /// Convenience function to create a page image with given string as the only content
    3624                 :     #[allow(non_snake_case)]
    3625 CBC      854140 :     pub fn TEST_IMG(s: &str) -> Bytes {
    3626          854140 :         let mut buf = BytesMut::new();
    3627          854140 :         buf.extend_from_slice(s.as_bytes());
    3628          854140 :         buf.resize(64, 0);
    3629          854140 : 
    3630          854140 :         buf.freeze()
    3631          854140 :     }
    3632                 : 
    3633                 :     impl From<TenantConf> for TenantConfOpt {
    3634              42 :         fn from(tenant_conf: TenantConf) -> Self {
    3635              42 :             Self {
    3636              42 :                 checkpoint_distance: Some(tenant_conf.checkpoint_distance),
    3637              42 :                 checkpoint_timeout: Some(tenant_conf.checkpoint_timeout),
    3638              42 :                 compaction_target_size: Some(tenant_conf.compaction_target_size),
    3639              42 :                 compaction_period: Some(tenant_conf.compaction_period),
    3640              42 :                 compaction_threshold: Some(tenant_conf.compaction_threshold),
    3641              42 :                 gc_horizon: Some(tenant_conf.gc_horizon),
    3642              42 :                 gc_period: Some(tenant_conf.gc_period),
    3643              42 :                 image_creation_threshold: Some(tenant_conf.image_creation_threshold),
    3644              42 :                 pitr_interval: Some(tenant_conf.pitr_interval),
    3645              42 :                 walreceiver_connect_timeout: Some(tenant_conf.walreceiver_connect_timeout),
    3646              42 :                 lagging_wal_timeout: Some(tenant_conf.lagging_wal_timeout),
    3647              42 :                 max_lsn_wal_lag: Some(tenant_conf.max_lsn_wal_lag),
    3648              42 :                 trace_read_requests: Some(tenant_conf.trace_read_requests),
    3649              42 :                 eviction_policy: Some(tenant_conf.eviction_policy),
    3650              42 :                 min_resident_size_override: tenant_conf.min_resident_size_override,
    3651              42 :                 evictions_low_residence_duration_metric_threshold: Some(
    3652              42 :                     tenant_conf.evictions_low_residence_duration_metric_threshold,
    3653              42 :                 ),
    3654              42 :                 gc_feedback: Some(tenant_conf.gc_feedback),
    3655              42 :             }
    3656              42 :         }
    3657                 :     }
    3658                 : 
    3659                 :     pub struct TenantHarness {
    3660                 :         pub conf: &'static PageServerConf,
    3661                 :         pub tenant_conf: TenantConf,
    3662                 :         pub tenant_id: TenantId,
    3663                 :         pub generation: Generation,
    3664                 :         pub remote_storage: GenericRemoteStorage,
    3665                 :         pub remote_fs_dir: Utf8PathBuf,
    3666                 :         pub deletion_queue: MockDeletionQueue,
    3667                 :     }
    3668                 : 
    3669                 :     static LOG_HANDLE: OnceCell<()> = OnceCell::new();
    3670                 : 
    3671                 :     impl TenantHarness {
    3672              41 :         pub fn create(test_name: &'static str) -> anyhow::Result<Self> {
    3673              41 :             LOG_HANDLE.get_or_init(|| {
    3674               1 :                 logging::init(
    3675               1 :                     logging::LogFormat::Test,
    3676               1 :                     // enable it in case in case the tests exercise code paths that use
    3677               1 :                     // debug_assert_current_span_has_tenant_and_timeline_id
    3678               1 :                     logging::TracingErrorLayerEnablement::EnableWithRustLogFilter,
    3679               1 :                 )
    3680               1 :                 .expect("Failed to init test logging")
    3681              41 :             });
    3682              41 : 
    3683              41 :             let repo_dir = PageServerConf::test_repo_dir(test_name);
    3684              41 :             let _ = fs::remove_dir_all(&repo_dir);
    3685              41 :             fs::create_dir_all(&repo_dir)?;
    3686                 : 
    3687              41 :             let conf = PageServerConf::dummy_conf(repo_dir);
    3688              41 :             // Make a static copy of the config. This can never be free'd, but that's
    3689              41 :             // OK in a test.
    3690              41 :             let conf: &'static PageServerConf = Box::leak(Box::new(conf));
    3691              41 : 
    3692              41 :             // Disable automatic GC and compaction to make the unit tests more deterministic.
    3693              41 :             // The tests perform them manually if needed.
    3694              41 :             let tenant_conf = TenantConf {
    3695              41 :                 gc_period: Duration::ZERO,
    3696              41 :                 compaction_period: Duration::ZERO,
    3697              41 :                 ..TenantConf::default()
    3698              41 :             };
    3699              41 : 
    3700              41 :             let tenant_id = TenantId::generate();
    3701              41 :             fs::create_dir_all(conf.tenant_path(&tenant_id))?;
    3702              41 :             fs::create_dir_all(conf.timelines_path(&tenant_id))?;
    3703                 : 
    3704                 :             use remote_storage::{RemoteStorageConfig, RemoteStorageKind};
    3705              41 :             let remote_fs_dir = conf.workdir.join("localfs");
    3706              41 :             std::fs::create_dir_all(&remote_fs_dir).unwrap();
    3707              41 :             let config = RemoteStorageConfig {
    3708              41 :                 // TODO: why not remote_storage::DEFAULT_REMOTE_STORAGE_MAX_CONCURRENT_SYNCS,
    3709              41 :                 max_concurrent_syncs: std::num::NonZeroUsize::new(2_000_000).unwrap(),
    3710              41 :                 // TODO: why not remote_storage::DEFAULT_REMOTE_STORAGE_MAX_SYNC_ERRORS,
    3711              41 :                 max_sync_errors: std::num::NonZeroU32::new(3_000_000).unwrap(),
    3712              41 :                 storage: RemoteStorageKind::LocalFs(remote_fs_dir.clone()),
    3713              41 :             };
    3714              41 :             let remote_storage = GenericRemoteStorage::from_config(&config).unwrap();
    3715              41 :             let deletion_queue = MockDeletionQueue::new(Some(remote_storage.clone()));
    3716              41 : 
    3717              41 :             Ok(Self {
    3718              41 :                 conf,
    3719              41 :                 tenant_conf,
    3720              41 :                 tenant_id,
    3721              41 :                 generation: Generation::new(0xdeadbeef),
    3722              41 :                 remote_storage,
    3723              41 :                 remote_fs_dir,
    3724              41 :                 deletion_queue,
    3725              41 :             })
    3726              41 :         }
    3727                 : 
    3728              39 :         pub async fn load(&self) -> (Arc<Tenant>, RequestContext) {
    3729              39 :             let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error);
    3730              39 :             (
    3731              39 :                 self.try_load(&ctx)
    3732              45 :                     .await
    3733              39 :                     .expect("failed to load test tenant"),
    3734              39 :                 ctx,
    3735              39 :             )
    3736              39 :         }
    3737                 : 
    3738              42 :         pub async fn try_load(&self, ctx: &RequestContext) -> anyhow::Result<Arc<Tenant>> {
    3739              42 :             let walredo_mgr = Arc::new(WalRedoManager::from(TestRedoManager));
    3740              42 : 
    3741              42 :             let tenant = Arc::new(Tenant::new(
    3742              42 :                 TenantState::Loading,
    3743              42 :                 self.conf,
    3744              42 :                 AttachedTenantConf::try_from(LocationConf::attached_single(
    3745              42 :                     TenantConfOpt::from(self.tenant_conf),
    3746              42 :                     self.generation,
    3747              42 :                 ))
    3748              42 :                 .unwrap(),
    3749              42 :                 walredo_mgr,
    3750              42 :                 self.tenant_id,
    3751              42 :                 Some(self.remote_storage.clone()),
    3752              42 :                 self.deletion_queue.new_client(),
    3753              42 :             ));
    3754              42 :             tenant
    3755              42 :                 .load(None, None, ctx)
    3756              42 :                 .instrument(info_span!("try_load", tenant_id=%self.tenant_id))
    3757              48 :                 .await?;
    3758                 : 
    3759                 :             // TODO reuse Tenant::activate (needs broker)
    3760              41 :             tenant.state.send_replace(TenantState::Active);
    3761              41 :             for timeline in tenant.timelines.lock().unwrap().values() {
    3762               3 :                 timeline.set_state(TimelineState::Active);
    3763               3 :             }
    3764              41 :             Ok(tenant)
    3765              42 :         }
    3766                 : 
    3767               9 :         pub fn timeline_path(&self, timeline_id: &TimelineId) -> Utf8PathBuf {
    3768               9 :             self.conf.timeline_path(&self.tenant_id, timeline_id)
    3769               9 :         }
    3770                 :     }
    3771                 : 
    3772                 :     // Mock WAL redo manager that doesn't do much
    3773                 :     pub(crate) struct TestRedoManager;
    3774                 : 
    3775                 :     impl TestRedoManager {
    3776 UBC           0 :         pub async fn request_redo(
    3777               0 :             &self,
    3778               0 :             key: Key,
    3779               0 :             lsn: Lsn,
    3780               0 :             base_img: Option<(Lsn, Bytes)>,
    3781               0 :             records: Vec<(Lsn, NeonWalRecord)>,
    3782               0 :             _pg_version: u32,
    3783               0 :         ) -> anyhow::Result<Bytes> {
    3784               0 :             let s = format!(
    3785               0 :                 "redo for {} to get to {}, with {} and {} records",
    3786               0 :                 key,
    3787               0 :                 lsn,
    3788               0 :                 if base_img.is_some() {
    3789               0 :                     "base image"
    3790                 :                 } else {
    3791               0 :                     "no base image"
    3792                 :                 },
    3793               0 :                 records.len()
    3794               0 :             );
    3795               0 :             println!("{s}");
    3796               0 : 
    3797               0 :             Ok(TEST_IMG(&s))
    3798               0 :         }
    3799                 :     }
    3800                 : }
    3801                 : 
    3802                 : #[cfg(test)]
    3803                 : mod tests {
    3804                 :     use super::*;
    3805                 :     use crate::keyspace::KeySpaceAccum;
    3806                 :     use crate::repository::{Key, Value};
    3807                 :     use crate::tenant::harness::*;
    3808                 :     use crate::DEFAULT_PG_VERSION;
    3809                 :     use crate::METADATA_FILE_NAME;
    3810                 :     use bytes::BytesMut;
    3811                 :     use hex_literal::hex;
    3812                 :     use once_cell::sync::Lazy;
    3813                 :     use rand::{thread_rng, Rng};
    3814                 :     use tokio_util::sync::CancellationToken;
    3815                 : 
    3816                 :     static TEST_KEY: Lazy<Key> =
    3817 CBC           1 :         Lazy::new(|| Key::from_slice(&hex!("112222222233333333444444445500000001")));
    3818                 : 
    3819               1 :     #[tokio::test]
    3820               1 :     async fn test_basic() -> anyhow::Result<()> {
    3821               1 :         let (tenant, ctx) = TenantHarness::create("test_basic")?.load().await;
    3822               1 :         let tline = tenant
    3823               1 :             .create_test_timeline(TIMELINE_ID, Lsn(0x08), DEFAULT_PG_VERSION, &ctx)
    3824               2 :             .await?;
    3825                 : 
    3826               1 :         let writer = tline.writer().await;
    3827               1 :         writer
    3828               1 :             .put(
    3829               1 :                 *TEST_KEY,
    3830               1 :                 Lsn(0x10),
    3831               1 :                 &Value::Image(TEST_IMG("foo at 0x10")),
    3832               1 :                 &ctx,
    3833               1 :             )
    3834 UBC           0 :             .await?;
    3835 CBC           1 :         writer.finish_write(Lsn(0x10));
    3836               1 :         drop(writer);
    3837                 : 
    3838               1 :         let writer = tline.writer().await;
    3839               1 :         writer
    3840               1 :             .put(
    3841               1 :                 *TEST_KEY,
    3842               1 :                 Lsn(0x20),
    3843               1 :                 &Value::Image(TEST_IMG("foo at 0x20")),
    3844               1 :                 &ctx,
    3845               1 :             )
    3846 UBC           0 :             .await?;
    3847 CBC           1 :         writer.finish_write(Lsn(0x20));
    3848               1 :         drop(writer);
    3849                 : 
    3850               1 :         assert_eq!(
    3851               1 :             tline.get(*TEST_KEY, Lsn(0x10), &ctx).await?,
    3852               1 :             TEST_IMG("foo at 0x10")
    3853                 :         );
    3854               1 :         assert_eq!(
    3855               1 :             tline.get(*TEST_KEY, Lsn(0x1f), &ctx).await?,
    3856               1 :             TEST_IMG("foo at 0x10")
    3857                 :         );
    3858               1 :         assert_eq!(
    3859               1 :             tline.get(*TEST_KEY, Lsn(0x20), &ctx).await?,
    3860               1 :             TEST_IMG("foo at 0x20")
    3861                 :         );
    3862                 : 
    3863               1 :         Ok(())
    3864                 :     }
    3865                 : 
    3866               1 :     #[tokio::test]
    3867               1 :     async fn no_duplicate_timelines() -> anyhow::Result<()> {
    3868               1 :         let (tenant, ctx) = TenantHarness::create("no_duplicate_timelines")?
    3869               1 :             .load()
    3870               1 :             .await;
    3871               1 :         let _ = tenant
    3872               1 :             .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
    3873               2 :             .await?;
    3874                 : 
    3875               1 :         match tenant
    3876               1 :             .create_empty_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
    3877 UBC           0 :             .await
    3878                 :         {
    3879               0 :             Ok(_) => panic!("duplicate timeline creation should fail"),
    3880 CBC           1 :             Err(e) => assert_eq!(
    3881               1 :                 e.to_string(),
    3882               1 :                 format!(
    3883               1 :                     "Timeline {}/{} already exists in pageserver's memory",
    3884               1 :                     tenant.tenant_id, TIMELINE_ID
    3885               1 :                 )
    3886               1 :             ),
    3887                 :         }
    3888                 : 
    3889               1 :         Ok(())
    3890                 :     }
    3891                 : 
    3892                 :     /// Convenience function to create a page image with given string as the only content
    3893               5 :     pub fn test_value(s: &str) -> Value {
    3894               5 :         let mut buf = BytesMut::new();
    3895               5 :         buf.extend_from_slice(s.as_bytes());
    3896               5 :         Value::Image(buf.freeze())
    3897               5 :     }
    3898                 : 
    3899                 :     ///
    3900                 :     /// Test branch creation
    3901                 :     ///
    3902               1 :     #[tokio::test]
    3903               1 :     async fn test_branch() -> anyhow::Result<()> {
    3904                 :         use std::str::from_utf8;
    3905                 : 
    3906               1 :         let (tenant, ctx) = TenantHarness::create("test_branch")?.load().await;
    3907               1 :         let tline = tenant
    3908               1 :             .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
    3909               2 :             .await?;
    3910               1 :         let writer = tline.writer().await;
    3911                 : 
    3912                 :         #[allow(non_snake_case)]
    3913               1 :         let TEST_KEY_A: Key = Key::from_hex("112222222233333333444444445500000001").unwrap();
    3914               1 :         #[allow(non_snake_case)]
    3915               1 :         let TEST_KEY_B: Key = Key::from_hex("112222222233333333444444445500000002").unwrap();
    3916               1 : 
    3917               1 :         // Insert a value on the timeline
    3918               1 :         writer
    3919               1 :             .put(TEST_KEY_A, Lsn(0x20), &test_value("foo at 0x20"), &ctx)
    3920 UBC           0 :             .await?;
    3921 CBC           1 :         writer
    3922               1 :             .put(TEST_KEY_B, Lsn(0x20), &test_value("foobar at 0x20"), &ctx)
    3923 UBC           0 :             .await?;
    3924 CBC           1 :         writer.finish_write(Lsn(0x20));
    3925               1 : 
    3926               1 :         writer
    3927               1 :             .put(TEST_KEY_A, Lsn(0x30), &test_value("foo at 0x30"), &ctx)
    3928 UBC           0 :             .await?;
    3929 CBC           1 :         writer.finish_write(Lsn(0x30));
    3930               1 :         writer
    3931               1 :             .put(TEST_KEY_A, Lsn(0x40), &test_value("foo at 0x40"), &ctx)
    3932 UBC           0 :             .await?;
    3933 CBC           1 :         writer.finish_write(Lsn(0x40));
    3934               1 : 
    3935               1 :         //assert_current_logical_size(&tline, Lsn(0x40));
    3936               1 : 
    3937               1 :         // Branch the history, modify relation differently on the new timeline
    3938               1 :         tenant
    3939               1 :             .branch_timeline_test(&tline, NEW_TIMELINE_ID, Some(Lsn(0x30)), &ctx)
    3940 UBC           0 :             .await?;
    3941 CBC           1 :         let newtline = tenant
    3942               1 :             .get_timeline(NEW_TIMELINE_ID, true)
    3943               1 :             .expect("Should have a local timeline");
    3944               1 :         let new_writer = newtline.writer().await;
    3945               1 :         new_writer
    3946               1 :             .put(TEST_KEY_A, Lsn(0x40), &test_value("bar at 0x40"), &ctx)
    3947 UBC           0 :             .await?;
    3948 CBC           1 :         new_writer.finish_write(Lsn(0x40));
    3949                 : 
    3950                 :         // Check page contents on both branches
    3951               1 :         assert_eq!(
    3952               1 :             from_utf8(&tline.get(TEST_KEY_A, Lsn(0x40), &ctx).await?)?,
    3953                 :             "foo at 0x40"
    3954                 :         );
    3955               1 :         assert_eq!(
    3956               1 :             from_utf8(&newtline.get(TEST_KEY_A, Lsn(0x40), &ctx).await?)?,
    3957                 :             "bar at 0x40"
    3958                 :         );
    3959               1 :         assert_eq!(
    3960               1 :             from_utf8(&newtline.get(TEST_KEY_B, Lsn(0x40), &ctx).await?)?,
    3961                 :             "foobar at 0x20"
    3962                 :         );
    3963                 : 
    3964                 :         //assert_current_logical_size(&tline, Lsn(0x40));
    3965                 : 
    3966               1 :         Ok(())
    3967                 :     }
    3968                 : 
    3969              10 :     async fn make_some_layers(
    3970              10 :         tline: &Timeline,
    3971              10 :         start_lsn: Lsn,
    3972              10 :         ctx: &RequestContext,
    3973              10 :     ) -> anyhow::Result<()> {
    3974              10 :         let mut lsn = start_lsn;
    3975                 :         #[allow(non_snake_case)]
    3976                 :         {
    3977              10 :             let writer = tline.writer().await;
    3978                 :             // Create a relation on the timeline
    3979              10 :             writer
    3980              10 :                 .put(
    3981              10 :                     *TEST_KEY,
    3982              10 :                     lsn,
    3983              10 :                     &Value::Image(TEST_IMG(&format!("foo at {}", lsn))),
    3984              10 :                     ctx,
    3985              10 :                 )
    3986 UBC           0 :                 .await?;
    3987 CBC          10 :             writer.finish_write(lsn);
    3988              10 :             lsn += 0x10;
    3989              10 :             writer
    3990              10 :                 .put(
    3991              10 :                     *TEST_KEY,
    3992              10 :                     lsn,
    3993              10 :                     &Value::Image(TEST_IMG(&format!("foo at {}", lsn))),
    3994              10 :                     ctx,
    3995              10 :                 )
    3996 UBC           0 :                 .await?;
    3997 CBC          10 :             writer.finish_write(lsn);
    3998              10 :             lsn += 0x10;
    3999              10 :         }
    4000              10 :         tline.freeze_and_flush().await?;
    4001                 :         {
    4002              10 :             let writer = tline.writer().await;
    4003              10 :             writer
    4004              10 :                 .put(
    4005              10 :                     *TEST_KEY,
    4006              10 :                     lsn,
    4007              10 :                     &Value::Image(TEST_IMG(&format!("foo at {}", lsn))),
    4008              10 :                     ctx,
    4009              10 :                 )
    4010 UBC           0 :                 .await?;
    4011 CBC          10 :             writer.finish_write(lsn);
    4012              10 :             lsn += 0x10;
    4013              10 :             writer
    4014              10 :                 .put(
    4015              10 :                     *TEST_KEY,
    4016              10 :                     lsn,
    4017              10 :                     &Value::Image(TEST_IMG(&format!("foo at {}", lsn))),
    4018              10 :                     ctx,
    4019              10 :                 )
    4020 UBC           0 :                 .await?;
    4021 CBC          10 :             writer.finish_write(lsn);
    4022              10 :         }
    4023              10 :         tline.freeze_and_flush().await
    4024              10 :     }
    4025                 : 
    4026               1 :     #[tokio::test]
    4027               1 :     async fn test_prohibit_branch_creation_on_garbage_collected_data() -> anyhow::Result<()> {
    4028               1 :         let (tenant, ctx) =
    4029               1 :             TenantHarness::create("test_prohibit_branch_creation_on_garbage_collected_data")?
    4030               1 :                 .load()
    4031               1 :                 .await;
    4032               1 :         let tline = tenant
    4033               1 :             .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
    4034               2 :             .await?;
    4035               2 :         make_some_layers(tline.as_ref(), Lsn(0x20), &ctx).await?;
    4036                 : 
    4037                 :         // this removes layers before lsn 40 (50 minus 10), so there are two remaining layers, image and delta for 31-50
    4038                 :         // FIXME: this doesn't actually remove any layer currently, given how the flushing
    4039                 :         // and compaction works. But it does set the 'cutoff' point so that the cross check
    4040                 :         // below should fail.
    4041               1 :         tenant
    4042               1 :             .gc_iteration(Some(TIMELINE_ID), 0x10, Duration::ZERO, &ctx)
    4043 LBC         (1) :             .await?;
    4044                 : 
    4045                 :         // try to branch at lsn 25, should fail because we already garbage collected the data
    4046 CBC           1 :         match tenant
    4047               1 :             .branch_timeline_test(&tline, NEW_TIMELINE_ID, Some(Lsn(0x25)), &ctx)
    4048 UBC           0 :             .await
    4049                 :         {
    4050               0 :             Ok(_) => panic!("branching should have failed"),
    4051 CBC           1 :             Err(err) => {
    4052               1 :                 let CreateTimelineError::AncestorLsn(err) = err else {
    4053 UBC           0 :                     panic!("wrong error type")
    4054                 :                 };
    4055 CBC           1 :                 assert!(err.to_string().contains("invalid branch start lsn"));
    4056               1 :                 assert!(err
    4057               1 :                     .source()
    4058               1 :                     .unwrap()
    4059               1 :                     .to_string()
    4060               1 :                     .contains("we might've already garbage collected needed data"))
    4061                 :             }
    4062                 :         }
    4063                 : 
    4064               1 :         Ok(())
    4065                 :     }
    4066                 : 
    4067               1 :     #[tokio::test]
    4068               1 :     async fn test_prohibit_branch_creation_on_pre_initdb_lsn() -> anyhow::Result<()> {
    4069               1 :         let (tenant, ctx) =
    4070               1 :             TenantHarness::create("test_prohibit_branch_creation_on_pre_initdb_lsn")?
    4071               1 :                 .load()
    4072               1 :                 .await;
    4073                 : 
    4074               1 :         let tline = tenant
    4075               1 :             .create_test_timeline(TIMELINE_ID, Lsn(0x50), DEFAULT_PG_VERSION, &ctx)
    4076               2 :             .await?;
    4077                 :         // try to branch at lsn 0x25, should fail because initdb lsn is 0x50
    4078               1 :         match tenant
    4079               1 :             .branch_timeline_test(&tline, NEW_TIMELINE_ID, Some(Lsn(0x25)), &ctx)
    4080 UBC           0 :             .await
    4081                 :         {
    4082               0 :             Ok(_) => panic!("branching should have failed"),
    4083 CBC           1 :             Err(err) => {
    4084               1 :                 let CreateTimelineError::AncestorLsn(err) = err else {
    4085 UBC           0 :                     panic!("wrong error type");
    4086                 :                 };
    4087 CBC           1 :                 assert!(&err.to_string().contains("invalid branch start lsn"));
    4088               1 :                 assert!(&err
    4089               1 :                     .source()
    4090               1 :                     .unwrap()
    4091               1 :                     .to_string()
    4092               1 :                     .contains("is earlier than latest GC horizon"));
    4093                 :             }
    4094                 :         }
    4095                 : 
    4096               1 :         Ok(())
    4097                 :     }
    4098                 : 
    4099                 :     /*
    4100                 :     // FIXME: This currently fails to error out. Calling GC doesn't currently
    4101                 :     // remove the old value, we'd need to work a little harder
    4102                 :     #[tokio::test]
    4103                 :     async fn test_prohibit_get_for_garbage_collected_data() -> anyhow::Result<()> {
    4104                 :         let repo =
    4105                 :             RepoHarness::create("test_prohibit_get_for_garbage_collected_data")?
    4106                 :             .load();
    4107                 : 
    4108                 :         let tline = repo.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION)?;
    4109                 :         make_some_layers(tline.as_ref(), Lsn(0x20), &ctx).await?;
    4110                 : 
    4111                 :         repo.gc_iteration(Some(TIMELINE_ID), 0x10, Duration::ZERO)?;
    4112                 :         let latest_gc_cutoff_lsn = tline.get_latest_gc_cutoff_lsn();
    4113                 :         assert!(*latest_gc_cutoff_lsn > Lsn(0x25));
    4114                 :         match tline.get(*TEST_KEY, Lsn(0x25)) {
    4115                 :             Ok(_) => panic!("request for page should have failed"),
    4116                 :             Err(err) => assert!(err.to_string().contains("not found at")),
    4117                 :         }
    4118                 :         Ok(())
    4119                 :     }
    4120                 :      */
    4121                 : 
    4122               1 :     #[tokio::test]
    4123               1 :     async fn test_get_branchpoints_from_an_inactive_timeline() -> anyhow::Result<()> {
    4124               1 :         let (tenant, ctx) =
    4125               1 :             TenantHarness::create("test_get_branchpoints_from_an_inactive_timeline")?
    4126               1 :                 .load()
    4127               1 :                 .await;
    4128               1 :         let tline = tenant
    4129               1 :             .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
    4130               2 :             .await?;
    4131               2 :         make_some_layers(tline.as_ref(), Lsn(0x20), &ctx).await?;
    4132                 : 
    4133               1 :         tenant
    4134               1 :             .branch_timeline_test(&tline, NEW_TIMELINE_ID, Some(Lsn(0x40)), &ctx)
    4135 UBC           0 :             .await?;
    4136 CBC           1 :         let newtline = tenant
    4137               1 :             .get_timeline(NEW_TIMELINE_ID, true)
    4138               1 :             .expect("Should have a local timeline");
    4139               1 : 
    4140               2 :         make_some_layers(newtline.as_ref(), Lsn(0x60), &ctx).await?;
    4141                 : 
    4142               1 :         tline.set_broken("test".to_owned());
    4143               1 : 
    4144               1 :         tenant
    4145               1 :             .gc_iteration(Some(TIMELINE_ID), 0x10, Duration::ZERO, &ctx)
    4146 UBC           0 :             .await?;
    4147                 : 
    4148                 :         // The branchpoints should contain all timelines, even ones marked
    4149                 :         // as Broken.
    4150                 :         {
    4151 CBC           1 :             let branchpoints = &tline.gc_info.read().unwrap().retain_lsns;
    4152               1 :             assert_eq!(branchpoints.len(), 1);
    4153               1 :             assert_eq!(branchpoints[0], Lsn(0x40));
    4154                 :         }
    4155                 : 
    4156                 :         // You can read the key from the child branch even though the parent is
    4157                 :         // Broken, as long as you don't need to access data from the parent.
    4158               1 :         assert_eq!(
    4159               1 :             newtline.get(*TEST_KEY, Lsn(0x70), &ctx).await?,
    4160               1 :             TEST_IMG(&format!("foo at {}", Lsn(0x70)))
    4161                 :         );
    4162                 : 
    4163                 :         // This needs to traverse to the parent, and fails.
    4164               1 :         let err = newtline.get(*TEST_KEY, Lsn(0x50), &ctx).await.unwrap_err();
    4165               1 :         assert!(err
    4166               1 :             .to_string()
    4167               1 :             .contains("will not become active. Current state: Broken"));
    4168                 : 
    4169               1 :         Ok(())
    4170                 :     }
    4171                 : 
    4172               1 :     #[tokio::test]
    4173               1 :     async fn test_retain_data_in_parent_which_is_needed_for_child() -> anyhow::Result<()> {
    4174               1 :         let (tenant, ctx) =
    4175               1 :             TenantHarness::create("test_retain_data_in_parent_which_is_needed_for_child")?
    4176               1 :                 .load()
    4177               1 :                 .await;
    4178               1 :         let tline = tenant
    4179               1 :             .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
    4180               2 :             .await?;
    4181               2 :         make_some_layers(tline.as_ref(), Lsn(0x20), &ctx).await?;
    4182                 : 
    4183               1 :         tenant
    4184               1 :             .branch_timeline_test(&tline, NEW_TIMELINE_ID, Some(Lsn(0x40)), &ctx)
    4185 UBC           0 :             .await?;
    4186 CBC           1 :         let newtline = tenant
    4187               1 :             .get_timeline(NEW_TIMELINE_ID, true)
    4188               1 :             .expect("Should have a local timeline");
    4189               1 :         // this removes layers before lsn 40 (50 minus 10), so there are two remaining layers, image and delta for 31-50
    4190               1 :         tenant
    4191               1 :             .gc_iteration(Some(TIMELINE_ID), 0x10, Duration::ZERO, &ctx)
    4192               1 :             .await?;
    4193               1 :         assert!(newtline.get(*TEST_KEY, Lsn(0x25), &ctx).await.is_ok());
    4194                 : 
    4195               1 :         Ok(())
    4196                 :     }
    4197               1 :     #[tokio::test]
    4198               1 :     async fn test_parent_keeps_data_forever_after_branching() -> anyhow::Result<()> {
    4199               1 :         let (tenant, ctx) =
    4200               1 :             TenantHarness::create("test_parent_keeps_data_forever_after_branching")?
    4201               1 :                 .load()
    4202               1 :                 .await;
    4203               1 :         let tline = tenant
    4204               1 :             .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
    4205               2 :             .await?;
    4206               2 :         make_some_layers(tline.as_ref(), Lsn(0x20), &ctx).await?;
    4207                 : 
    4208               1 :         tenant
    4209               1 :             .branch_timeline_test(&tline, NEW_TIMELINE_ID, Some(Lsn(0x40)), &ctx)
    4210 UBC           0 :             .await?;
    4211 CBC           1 :         let newtline = tenant
    4212               1 :             .get_timeline(NEW_TIMELINE_ID, true)
    4213               1 :             .expect("Should have a local timeline");
    4214               1 : 
    4215               2 :         make_some_layers(newtline.as_ref(), Lsn(0x60), &ctx).await?;
    4216                 : 
    4217                 :         // run gc on parent
    4218               1 :         tenant
    4219               1 :             .gc_iteration(Some(TIMELINE_ID), 0x10, Duration::ZERO, &ctx)
    4220 UBC           0 :             .await?;
    4221                 : 
    4222                 :         // Check that the data is still accessible on the branch.
    4223 CBC           1 :         assert_eq!(
    4224               1 :             newtline.get(*TEST_KEY, Lsn(0x50), &ctx).await?,
    4225               1 :             TEST_IMG(&format!("foo at {}", Lsn(0x40)))
    4226                 :         );
    4227                 : 
    4228               1 :         Ok(())
    4229                 :     }
    4230                 : 
    4231               1 :     #[tokio::test]
    4232               1 :     async fn timeline_load() -> anyhow::Result<()> {
    4233                 :         const TEST_NAME: &str = "timeline_load";
    4234               1 :         let harness = TenantHarness::create(TEST_NAME)?;
    4235                 :         {
    4236               1 :             let (tenant, ctx) = harness.load().await;
    4237               1 :             let tline = tenant
    4238               1 :                 .create_test_timeline(TIMELINE_ID, Lsn(0x7000), DEFAULT_PG_VERSION, &ctx)
    4239               2 :                 .await?;
    4240               2 :             make_some_layers(tline.as_ref(), Lsn(0x8000), &ctx).await?;
    4241                 :             // so that all uploads finish & we can call harness.load() below again
    4242               1 :             tenant
    4243               1 :                 .shutdown(Default::default(), true)
    4244               1 :                 .instrument(info_span!("test_shutdown", tenant_id=%tenant.tenant_id))
    4245               2 :                 .await
    4246               1 :                 .ok()
    4247               1 :                 .unwrap();
    4248                 :         }
    4249                 : 
    4250               3 :         let (tenant, _ctx) = harness.load().await;
    4251               1 :         tenant
    4252               1 :             .get_timeline(TIMELINE_ID, true)
    4253               1 :             .expect("cannot load timeline");
    4254               1 : 
    4255               1 :         Ok(())
    4256                 :     }
    4257                 : 
    4258               1 :     #[tokio::test]
    4259               1 :     async fn timeline_load_with_ancestor() -> anyhow::Result<()> {
    4260                 :         const TEST_NAME: &str = "timeline_load_with_ancestor";
    4261               1 :         let harness = TenantHarness::create(TEST_NAME)?;
    4262                 :         // create two timelines
    4263                 :         {
    4264               1 :             let (tenant, ctx) = harness.load().await;
    4265               1 :             let tline = tenant
    4266               1 :                 .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
    4267               2 :                 .await?;
    4268                 : 
    4269               2 :             make_some_layers(tline.as_ref(), Lsn(0x20), &ctx).await?;
    4270                 : 
    4271               1 :             let child_tline = tenant
    4272               1 :                 .branch_timeline_test(&tline, NEW_TIMELINE_ID, Some(Lsn(0x40)), &ctx)
    4273 UBC           0 :                 .await?;
    4274 CBC           1 :             child_tline.set_state(TimelineState::Active);
    4275               1 : 
    4276               1 :             let newtline = tenant
    4277               1 :                 .get_timeline(NEW_TIMELINE_ID, true)
    4278               1 :                 .expect("Should have a local timeline");
    4279               1 : 
    4280               2 :             make_some_layers(newtline.as_ref(), Lsn(0x60), &ctx).await?;
    4281                 : 
    4282                 :             // so that all uploads finish & we can call harness.load() below again
    4283               1 :             tenant
    4284               1 :                 .shutdown(Default::default(), true)
    4285               1 :                 .instrument(info_span!("test_shutdown", tenant_id=%tenant.tenant_id))
    4286               3 :                 .await
    4287               1 :                 .ok()
    4288               1 :                 .unwrap();
    4289                 :         }
    4290                 : 
    4291                 :         // check that both of them are initially unloaded
    4292               5 :         let (tenant, _ctx) = harness.load().await;
    4293                 : 
    4294                 :         // check that both, child and ancestor are loaded
    4295               1 :         let _child_tline = tenant
    4296               1 :             .get_timeline(NEW_TIMELINE_ID, true)
    4297               1 :             .expect("cannot get child timeline loaded");
    4298               1 : 
    4299               1 :         let _ancestor_tline = tenant
    4300               1 :             .get_timeline(TIMELINE_ID, true)
    4301               1 :             .expect("cannot get ancestor timeline loaded");
    4302               1 : 
    4303               1 :         Ok(())
    4304                 :     }
    4305                 : 
    4306               1 :     #[tokio::test]
    4307               1 :     async fn delta_layer_dumping() -> anyhow::Result<()> {
    4308               1 :         let (tenant, ctx) = TenantHarness::create("test_layer_dumping")?.load().await;
    4309               1 :         let tline = tenant
    4310               1 :             .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
    4311               2 :             .await?;
    4312               2 :         make_some_layers(tline.as_ref(), Lsn(0x20), &ctx).await?;
    4313                 : 
    4314               1 :         let layer_map = tline.layers.read().await;
    4315               1 :         let level0_deltas = layer_map.layer_map().get_level0_deltas()?;
    4316                 : 
    4317               1 :         assert!(!level0_deltas.is_empty());
    4318                 : 
    4319               3 :         for delta in level0_deltas {
    4320               2 :             let delta = layer_map.get_from_desc(&delta);
    4321               2 :             // Ensure we are dumping a delta layer here
    4322               2 :             let delta = delta.downcast_delta_layer().unwrap();
    4323               2 : 
    4324               2 :             delta.dump(false, &ctx).await.unwrap();
    4325               2 :             delta.dump(true, &ctx).await.unwrap();
    4326                 :         }
    4327                 : 
    4328               1 :         Ok(())
    4329                 :     }
    4330                 : 
    4331               1 :     #[tokio::test]
    4332               1 :     async fn corrupt_metadata() -> anyhow::Result<()> {
    4333                 :         const TEST_NAME: &str = "corrupt_metadata";
    4334               1 :         let harness = TenantHarness::create(TEST_NAME)?;
    4335               1 :         let (tenant, ctx) = harness.load().await;
    4336                 : 
    4337               1 :         let tline = tenant
    4338               1 :             .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
    4339               2 :             .await?;
    4340               1 :         drop(tline);
    4341               1 :         // so that all uploads finish & we can call harness.try_load() below again
    4342               1 :         tenant
    4343               1 :             .shutdown(Default::default(), true)
    4344               1 :             .instrument(info_span!("test_shutdown", tenant_id=%tenant.tenant_id))
    4345               2 :             .await
    4346               1 :             .ok()
    4347               1 :             .unwrap();
    4348               1 :         drop(tenant);
    4349               1 : 
    4350               1 :         let metadata_path = harness.timeline_path(&TIMELINE_ID).join(METADATA_FILE_NAME);
    4351               1 : 
    4352               1 :         assert!(metadata_path.is_file());
    4353                 : 
    4354               1 :         let mut metadata_bytes = std::fs::read(&metadata_path)?;
    4355               1 :         assert_eq!(metadata_bytes.len(), 512);
    4356               1 :         metadata_bytes[8] ^= 1;
    4357               1 :         std::fs::write(metadata_path, metadata_bytes)?;
    4358                 : 
    4359               1 :         let err = harness.try_load(&ctx).await.err().expect("should fail");
    4360               1 :         // get all the stack with all .context, not only the last one
    4361               1 :         let message = format!("{err:#}");
    4362               1 :         let expected = "failed to load metadata";
    4363               1 :         assert!(
    4364               1 :             message.contains(expected),
    4365 UBC           0 :             "message '{message}' expected to contain {expected}"
    4366                 :         );
    4367                 : 
    4368 CBC           1 :         let mut found_error_message = false;
    4369               1 :         let mut err_source = err.source();
    4370               1 :         while let Some(source) = err_source {
    4371               1 :             if source.to_string().contains("metadata checksum mismatch") {
    4372               1 :                 found_error_message = true;
    4373               1 :                 break;
    4374 UBC           0 :             }
    4375               0 :             err_source = source.source();
    4376                 :         }
    4377 CBC           1 :         assert!(
    4378               1 :             found_error_message,
    4379 UBC           0 :             "didn't find the corrupted metadata error in {}",
    4380                 :             message
    4381                 :         );
    4382                 : 
    4383 CBC           1 :         Ok(())
    4384                 :     }
    4385                 : 
    4386               1 :     #[tokio::test]
    4387               1 :     async fn test_images() -> anyhow::Result<()> {
    4388               1 :         let (tenant, ctx) = TenantHarness::create("test_images")?.load().await;
    4389               1 :         let tline = tenant
    4390               1 :             .create_test_timeline(TIMELINE_ID, Lsn(0x08), DEFAULT_PG_VERSION, &ctx)
    4391               2 :             .await?;
    4392                 : 
    4393               1 :         let writer = tline.writer().await;
    4394               1 :         writer
    4395               1 :             .put(
    4396               1 :                 *TEST_KEY,
    4397               1 :                 Lsn(0x10),
    4398               1 :                 &Value::Image(TEST_IMG("foo at 0x10")),
    4399               1 :                 &ctx,
    4400               1 :             )
    4401 UBC           0 :             .await?;
    4402 CBC           1 :         writer.finish_write(Lsn(0x10));
    4403               1 :         drop(writer);
    4404               1 : 
    4405               1 :         tline.freeze_and_flush().await?;
    4406               1 :         tline.compact(&CancellationToken::new(), &ctx).await?;
    4407                 : 
    4408               1 :         let writer = tline.writer().await;
    4409               1 :         writer
    4410               1 :             .put(
    4411               1 :                 *TEST_KEY,
    4412               1 :                 Lsn(0x20),
    4413               1 :                 &Value::Image(TEST_IMG("foo at 0x20")),
    4414               1 :                 &ctx,
    4415               1 :             )
    4416 UBC           0 :             .await?;
    4417 CBC           1 :         writer.finish_write(Lsn(0x20));
    4418               1 :         drop(writer);
    4419               1 : 
    4420               1 :         tline.freeze_and_flush().await?;
    4421               1 :         tline.compact(&CancellationToken::new(), &ctx).await?;
    4422                 : 
    4423               1 :         let writer = tline.writer().await;
    4424               1 :         writer
    4425               1 :             .put(
    4426               1 :                 *TEST_KEY,
    4427               1 :                 Lsn(0x30),
    4428               1 :                 &Value::Image(TEST_IMG("foo at 0x30")),
    4429               1 :                 &ctx,
    4430               1 :             )
    4431 UBC           0 :             .await?;
    4432 CBC           1 :         writer.finish_write(Lsn(0x30));
    4433               1 :         drop(writer);
    4434               1 : 
    4435               1 :         tline.freeze_and_flush().await?;
    4436               1 :         tline.compact(&CancellationToken::new(), &ctx).await?;
    4437                 : 
    4438               1 :         let writer = tline.writer().await;
    4439               1 :         writer
    4440               1 :             .put(
    4441               1 :                 *TEST_KEY,
    4442               1 :                 Lsn(0x40),
    4443               1 :                 &Value::Image(TEST_IMG("foo at 0x40")),
    4444               1 :                 &ctx,
    4445               1 :             )
    4446 UBC           0 :             .await?;
    4447 CBC           1 :         writer.finish_write(Lsn(0x40));
    4448               1 :         drop(writer);
    4449               1 : 
    4450               1 :         tline.freeze_and_flush().await?;
    4451               1 :         tline.compact(&CancellationToken::new(), &ctx).await?;
    4452                 : 
    4453               1 :         assert_eq!(
    4454               1 :             tline.get(*TEST_KEY, Lsn(0x10), &ctx).await?,
    4455               1 :             TEST_IMG("foo at 0x10")
    4456                 :         );
    4457               1 :         assert_eq!(
    4458               1 :             tline.get(*TEST_KEY, Lsn(0x1f), &ctx).await?,
    4459               1 :             TEST_IMG("foo at 0x10")
    4460                 :         );
    4461               1 :         assert_eq!(
    4462               1 :             tline.get(*TEST_KEY, Lsn(0x20), &ctx).await?,
    4463               1 :             TEST_IMG("foo at 0x20")
    4464                 :         );
    4465               1 :         assert_eq!(
    4466               1 :             tline.get(*TEST_KEY, Lsn(0x30), &ctx).await?,
    4467               1 :             TEST_IMG("foo at 0x30")
    4468                 :         );
    4469               1 :         assert_eq!(
    4470               1 :             tline.get(*TEST_KEY, Lsn(0x40), &ctx).await?,
    4471               1 :             TEST_IMG("foo at 0x40")
    4472                 :         );
    4473                 : 
    4474               1 :         Ok(())
    4475                 :     }
    4476                 : 
    4477                 :     //
    4478                 :     // Insert 1000 key-value pairs with increasing keys, flush, compact, GC.
    4479                 :     // Repeat 50 times.
    4480                 :     //
    4481               1 :     #[tokio::test]
    4482               1 :     async fn test_bulk_insert() -> anyhow::Result<()> {
    4483               1 :         let harness = TenantHarness::create("test_bulk_insert")?;
    4484               1 :         let (tenant, ctx) = harness.load().await;
    4485               1 :         let tline = tenant
    4486               1 :             .create_test_timeline(TIMELINE_ID, Lsn(0x08), DEFAULT_PG_VERSION, &ctx)
    4487               2 :             .await?;
    4488                 : 
    4489               1 :         let mut lsn = Lsn(0x10);
    4490               1 : 
    4491               1 :         let mut keyspace = KeySpaceAccum::new();
    4492               1 : 
    4493               1 :         let mut test_key = Key::from_hex("012222222233333333444444445500000000").unwrap();
    4494               1 :         let mut blknum = 0;
    4495              51 :         for _ in 0..50 {
    4496          500050 :             for _ in 0..10000 {
    4497          500000 :                 test_key.field6 = blknum;
    4498          500000 :                 let writer = tline.writer().await;
    4499          500000 :                 writer
    4500          500000 :                     .put(
    4501          500000 :                         test_key,
    4502          500000 :                         lsn,
    4503          500000 :                         &Value::Image(TEST_IMG(&format!("{} at {}", blknum, lsn))),
    4504          500000 :                         &ctx,
    4505          500000 :                     )
    4506            7804 :                     .await?;
    4507          500000 :                 writer.finish_write(lsn);
    4508          500000 :                 drop(writer);
    4509          500000 : 
    4510          500000 :                 keyspace.add_key(test_key);
    4511          500000 : 
    4512          500000 :                 lsn = Lsn(lsn.0 + 0x10);
    4513          500000 :                 blknum += 1;
    4514                 :             }
    4515                 : 
    4516              50 :             let cutoff = tline.get_last_record_lsn();
    4517              50 : 
    4518              50 :             tline
    4519              50 :                 .update_gc_info(Vec::new(), cutoff, Duration::ZERO, &ctx)
    4520 UBC           0 :                 .await?;
    4521 CBC          50 :             tline.freeze_and_flush().await?;
    4522            7920 :             tline.compact(&CancellationToken::new(), &ctx).await?;
    4523              50 :             tline.gc().await?;
    4524                 :         }
    4525                 : 
    4526               1 :         Ok(())
    4527                 :     }
    4528                 : 
    4529               1 :     #[tokio::test]
    4530               1 :     async fn test_random_updates() -> anyhow::Result<()> {
    4531               1 :         let harness = TenantHarness::create("test_random_updates")?;
    4532               1 :         let (tenant, ctx) = harness.load().await;
    4533               1 :         let tline = tenant
    4534               1 :             .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
    4535               2 :             .await?;
    4536                 : 
    4537                 :         const NUM_KEYS: usize = 1000;
    4538                 : 
    4539               1 :         let mut test_key = Key::from_hex("012222222233333333444444445500000000").unwrap();
    4540               1 : 
    4541               1 :         let mut keyspace = KeySpaceAccum::new();
    4542               1 : 
    4543               1 :         // Track when each page was last modified. Used to assert that
    4544               1 :         // a read sees the latest page version.
    4545               1 :         let mut updated = [Lsn(0); NUM_KEYS];
    4546               1 : 
    4547               1 :         let mut lsn = Lsn(0x10);
    4548                 :         #[allow(clippy::needless_range_loop)]
    4549            1001 :         for blknum in 0..NUM_KEYS {
    4550            1000 :             lsn = Lsn(lsn.0 + 0x10);
    4551            1000 :             test_key.field6 = blknum as u32;
    4552            1000 :             let writer = tline.writer().await;
    4553            1000 :             writer
    4554            1000 :                 .put(
    4555            1000 :                     test_key,
    4556            1000 :                     lsn,
    4557            1000 :                     &Value::Image(TEST_IMG(&format!("{} at {}", blknum, lsn))),
    4558            1000 :                     &ctx,
    4559            1000 :                 )
    4560              17 :                 .await?;
    4561            1000 :             writer.finish_write(lsn);
    4562            1000 :             updated[blknum] = lsn;
    4563            1000 :             drop(writer);
    4564            1000 : 
    4565            1000 :             keyspace.add_key(test_key);
    4566                 :         }
    4567                 : 
    4568              51 :         for _ in 0..50 {
    4569           50050 :             for _ in 0..NUM_KEYS {
    4570           50000 :                 lsn = Lsn(lsn.0 + 0x10);
    4571           50000 :                 let blknum = thread_rng().gen_range(0..NUM_KEYS);
    4572           50000 :                 test_key.field6 = blknum as u32;
    4573           50000 :                 let writer = tline.writer().await;
    4574           50000 :                 writer
    4575           50000 :                     .put(
    4576           50000 :                         test_key,
    4577           50000 :                         lsn,
    4578           50000 :                         &Value::Image(TEST_IMG(&format!("{} at {}", blknum, lsn))),
    4579           50000 :                         &ctx,
    4580           50000 :                     )
    4581             704 :                     .await?;
    4582           50000 :                 writer.finish_write(lsn);
    4583           50000 :                 drop(writer);
    4584           50000 :                 updated[blknum] = lsn;
    4585                 :             }
    4586                 : 
    4587                 :             // Read all the blocks
    4588           50000 :             for (blknum, last_lsn) in updated.iter().enumerate() {
    4589           50000 :                 test_key.field6 = blknum as u32;
    4590           50000 :                 assert_eq!(
    4591           50000 :                     tline.get(test_key, lsn, &ctx).await?,
    4592           50000 :                     TEST_IMG(&format!("{} at {}", blknum, last_lsn))
    4593                 :                 );
    4594                 :             }
    4595                 : 
    4596                 :             // Perform a cycle of flush, compact, and GC
    4597              50 :             let cutoff = tline.get_last_record_lsn();
    4598              50 :             tline
    4599              50 :                 .update_gc_info(Vec::new(), cutoff, Duration::ZERO, &ctx)
    4600 UBC           0 :                 .await?;
    4601 CBC          51 :             tline.freeze_and_flush().await?;
    4602             851 :             tline.compact(&CancellationToken::new(), &ctx).await?;
    4603              50 :             tline.gc().await?;
    4604                 :         }
    4605                 : 
    4606               1 :         Ok(())
    4607                 :     }
    4608                 : 
    4609               1 :     #[tokio::test]
    4610               1 :     async fn test_traverse_branches() -> anyhow::Result<()> {
    4611               1 :         let (tenant, ctx) = TenantHarness::create("test_traverse_branches")?
    4612               1 :             .load()
    4613               1 :             .await;
    4614               1 :         let mut tline = tenant
    4615               1 :             .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
    4616               2 :             .await?;
    4617                 : 
    4618                 :         const NUM_KEYS: usize = 1000;
    4619                 : 
    4620               1 :         let mut test_key = Key::from_hex("012222222233333333444444445500000000").unwrap();
    4621               1 : 
    4622               1 :         let mut keyspace = KeySpaceAccum::new();
    4623               1 : 
    4624               1 :         // Track when each page was last modified. Used to assert that
    4625               1 :         // a read sees the latest page version.
    4626               1 :         let mut updated = [Lsn(0); NUM_KEYS];
    4627               1 : 
    4628               1 :         let mut lsn = Lsn(0x10);
    4629                 :         #[allow(clippy::needless_range_loop)]
    4630            1001 :         for blknum in 0..NUM_KEYS {
    4631            1000 :             lsn = Lsn(lsn.0 + 0x10);
    4632            1000 :             test_key.field6 = blknum as u32;
    4633            1000 :             let writer = tline.writer().await;
    4634            1000 :             writer
    4635            1000 :                 .put(
    4636            1000 :                     test_key,
    4637            1000 :                     lsn,
    4638            1000 :                     &Value::Image(TEST_IMG(&format!("{} at {}", blknum, lsn))),
    4639            1000 :                     &ctx,
    4640            1000 :                 )
    4641              17 :                 .await?;
    4642            1000 :             writer.finish_write(lsn);
    4643            1000 :             updated[blknum] = lsn;
    4644            1000 :             drop(writer);
    4645            1000 : 
    4646            1000 :             keyspace.add_key(test_key);
    4647                 :         }
    4648                 : 
    4649              51 :         for _ in 0..50 {
    4650              50 :             let new_tline_id = TimelineId::generate();
    4651              50 :             tenant
    4652              50 :                 .branch_timeline_test(&tline, new_tline_id, Some(lsn), &ctx)
    4653 UBC           0 :                 .await?;
    4654 CBC          50 :             tline = tenant
    4655              50 :                 .get_timeline(new_tline_id, true)
    4656              50 :                 .expect("Should have the branched timeline");
    4657                 : 
    4658           50050 :             for _ in 0..NUM_KEYS {
    4659           50000 :                 lsn = Lsn(lsn.0 + 0x10);
    4660           50000 :                 let blknum = thread_rng().gen_range(0..NUM_KEYS);
    4661           50000 :                 test_key.field6 = blknum as u32;
    4662           50000 :                 let writer = tline.writer().await;
    4663           50000 :                 writer
    4664           50000 :                     .put(
    4665           50000 :                         test_key,
    4666           50000 :                         lsn,
    4667           50000 :                         &Value::Image(TEST_IMG(&format!("{} at {}", blknum, lsn))),
    4668           50000 :                         &ctx,
    4669           50000 :                     )
    4670             799 :                     .await?;
    4671           50000 :                 println!("updating {} at {}", blknum, lsn);
    4672           50000 :                 writer.finish_write(lsn);
    4673           50000 :                 drop(writer);
    4674           50000 :                 updated[blknum] = lsn;
    4675                 :             }
    4676                 : 
    4677                 :             // Read all the blocks
    4678           50000 :             for (blknum, last_lsn) in updated.iter().enumerate() {
    4679           50000 :                 test_key.field6 = blknum as u32;
    4680           50000 :                 assert_eq!(
    4681           50000 :                     tline.get(test_key, lsn, &ctx).await?,
    4682           50000 :                     TEST_IMG(&format!("{} at {}", blknum, last_lsn))
    4683                 :                 );
    4684                 :             }
    4685                 : 
    4686                 :             // Perform a cycle of flush, compact, and GC
    4687              50 :             let cutoff = tline.get_last_record_lsn();
    4688              50 :             tline
    4689              50 :                 .update_gc_info(Vec::new(), cutoff, Duration::ZERO, &ctx)
    4690 UBC           0 :                 .await?;
    4691 CBC          51 :             tline.freeze_and_flush().await?;
    4692             168 :             tline.compact(&CancellationToken::new(), &ctx).await?;
    4693              50 :             tline.gc().await?;
    4694                 :         }
    4695                 : 
    4696               1 :         Ok(())
    4697                 :     }
    4698                 : 
    4699               1 :     #[tokio::test]
    4700               1 :     async fn test_traverse_ancestors() -> anyhow::Result<()> {
    4701               1 :         let (tenant, ctx) = TenantHarness::create("test_traverse_ancestors")?
    4702               1 :             .load()
    4703               1 :             .await;
    4704               1 :         let mut tline = tenant
    4705               1 :             .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
    4706               2 :             .await?;
    4707                 : 
    4708                 :         const NUM_KEYS: usize = 100;
    4709                 :         const NUM_TLINES: usize = 50;
    4710                 : 
    4711               1 :         let mut test_key = Key::from_hex("012222222233333333444444445500000000").unwrap();
    4712               1 :         // Track page mutation lsns across different timelines.
    4713               1 :         let mut updated = [[Lsn(0); NUM_KEYS]; NUM_TLINES];
    4714               1 : 
    4715               1 :         let mut lsn = Lsn(0x10);
    4716                 : 
    4717                 :         #[allow(clippy::needless_range_loop)]
    4718              51 :         for idx in 0..NUM_TLINES {
    4719              50 :             let new_tline_id = TimelineId::generate();
    4720              50 :             tenant
    4721              50 :                 .branch_timeline_test(&tline, new_tline_id, Some(lsn), &ctx)
    4722 UBC           0 :                 .await?;
    4723 CBC          50 :             tline = tenant
    4724              50 :                 .get_timeline(new_tline_id, true)
    4725              50 :                 .expect("Should have the branched timeline");
    4726                 : 
    4727            5050 :             for _ in 0..NUM_KEYS {
    4728            5000 :                 lsn = Lsn(lsn.0 + 0x10);
    4729            5000 :                 let blknum = thread_rng().gen_range(0..NUM_KEYS);
    4730            5000 :                 test_key.field6 = blknum as u32;
    4731            5000 :                 let writer = tline.writer().await;
    4732            5000 :                 writer
    4733            5000 :                     .put(
    4734            5000 :                         test_key,
    4735            5000 :                         lsn,
    4736            5000 :                         &Value::Image(TEST_IMG(&format!("{} {} at {}", idx, blknum, lsn))),
    4737            5000 :                         &ctx,
    4738            5000 :                     )
    4739              78 :                     .await?;
    4740            5000 :                 println!("updating [{}][{}] at {}", idx, blknum, lsn);
    4741            5000 :                 writer.finish_write(lsn);
    4742            5000 :                 drop(writer);
    4743            5000 :                 updated[idx][blknum] = lsn;
    4744                 :             }
    4745                 :         }
    4746                 : 
    4747                 :         // Read pages from leaf timeline across all ancestors.
    4748              50 :         for (idx, lsns) in updated.iter().enumerate() {
    4749            5000 :             for (blknum, lsn) in lsns.iter().enumerate() {
    4750                 :                 // Skip empty mutations.
    4751            5000 :                 if lsn.0 == 0 {
    4752            1810 :                     continue;
    4753            3190 :                 }
    4754            3190 :                 println!("checking [{idx}][{blknum}] at {lsn}");
    4755            3190 :                 test_key.field6 = blknum as u32;
    4756            3190 :                 assert_eq!(
    4757            3190 :                     tline.get(test_key, *lsn, &ctx).await?,
    4758            3190 :                     TEST_IMG(&format!("{idx} {blknum} at {lsn}"))
    4759                 :                 );
    4760                 :             }
    4761                 :         }
    4762               1 :         Ok(())
    4763                 :     }
    4764                 : 
    4765               1 :     #[tokio::test]
    4766               1 :     async fn test_write_at_initdb_lsn_takes_optimization_code_path() -> anyhow::Result<()> {
    4767               1 :         let (tenant, ctx) = TenantHarness::create("test_empty_test_timeline_is_usable")?
    4768               1 :             .load()
    4769               1 :             .await;
    4770                 : 
    4771               1 :         let initdb_lsn = Lsn(0x20);
    4772               1 :         let utline = tenant
    4773               1 :             .create_empty_timeline(TIMELINE_ID, initdb_lsn, DEFAULT_PG_VERSION, &ctx)
    4774 UBC           0 :             .await?;
    4775 CBC           1 :         let tline = utline.raw_timeline().unwrap();
    4776               1 : 
    4777               1 :         // Spawn flush loop now so that we can set the `expect_initdb_optimization`
    4778               1 :         tline.maybe_spawn_flush_loop();
    4779               1 : 
    4780               1 :         // Make sure the timeline has the minimum set of required keys for operation.
    4781               1 :         // The only operation you can always do on an empty timeline is to `put` new data.
    4782               1 :         // Except if you `put` at `initdb_lsn`.
    4783               1 :         // In that case, there's an optimization to directly create image layers instead of delta layers.
    4784               1 :         // It uses `repartition()`, which assumes some keys to be present.
    4785               1 :         // Let's make sure the test timeline can handle that case.
    4786               1 :         {
    4787               1 :             let mut state = tline.flush_loop_state.lock().unwrap();
    4788               1 :             assert_eq!(
    4789               1 :                 timeline::FlushLoopState::Running {
    4790               1 :                     expect_initdb_optimization: false,
    4791               1 :                     initdb_optimization_count: 0,
    4792               1 :                 },
    4793               1 :                 *state
    4794               1 :             );
    4795               1 :             *state = timeline::FlushLoopState::Running {
    4796               1 :                 expect_initdb_optimization: true,
    4797               1 :                 initdb_optimization_count: 0,
    4798               1 :             };
    4799               1 :         }
    4800               1 : 
    4801               1 :         // Make writes at the initdb_lsn. When we flush it below, it should be handled by the optimization.
    4802               1 :         // As explained above, the optimization requires some keys to be present.
    4803               1 :         // As per `create_empty_timeline` documentation, use init_empty to set them.
    4804               1 :         // This is what `create_test_timeline` does, by the way.
    4805               1 :         let mut modification = tline.begin_modification(initdb_lsn);
    4806               1 :         modification
    4807               1 :             .init_empty_test_timeline()
    4808               1 :             .context("init_empty_test_timeline")?;
    4809               1 :         modification
    4810               1 :             .commit(&ctx)
    4811 UBC           0 :             .await
    4812 CBC           1 :             .context("commit init_empty_test_timeline modification")?;
    4813                 : 
    4814                 :         // Do the flush. The flush code will check the expectations that we set above.
    4815               1 :         tline.freeze_and_flush().await?;
    4816                 : 
    4817                 :         // assert freeze_and_flush exercised the initdb optimization
    4818                 :         {
    4819               1 :             let state = tline.flush_loop_state.lock().unwrap();
    4820                 :             let timeline::FlushLoopState::Running {
    4821               1 :                 expect_initdb_optimization,
    4822               1 :                 initdb_optimization_count,
    4823               1 :             } = *state
    4824                 :             else {
    4825 UBC           0 :                 panic!("unexpected state: {:?}", *state);
    4826                 :             };
    4827 CBC           1 :             assert!(expect_initdb_optimization);
    4828               1 :             assert!(initdb_optimization_count > 0);
    4829                 :         }
    4830               1 :         Ok(())
    4831                 :     }
    4832                 : 
    4833               1 :     #[tokio::test]
    4834               1 :     async fn test_uninit_mark_crash() -> anyhow::Result<()> {
    4835               1 :         let name = "test_uninit_mark_crash";
    4836               1 :         let harness = TenantHarness::create(name)?;
    4837                 :         {
    4838               1 :             let (tenant, ctx) = harness.load().await;
    4839               1 :             let tline = tenant
    4840               1 :                 .create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)
    4841 UBC           0 :                 .await?;
    4842                 :             // Keeps uninit mark in place
    4843 CBC           1 :             let raw_tline = tline.raw_timeline().unwrap();
    4844               1 :             raw_tline
    4845               1 :                 .shutdown(false)
    4846               1 :                 .instrument(info_span!("test_shutdown", tenant_id=%raw_tline.tenant_id))
    4847 UBC           0 :                 .await;
    4848 CBC           1 :             std::mem::forget(tline);
    4849                 :         }
    4850                 : 
    4851               1 :         let (tenant, _) = harness.load().await;
    4852               1 :         match tenant.get_timeline(TIMELINE_ID, false) {
    4853 UBC           0 :             Ok(_) => panic!("timeline should've been removed during load"),
    4854 CBC           1 :             Err(e) => {
    4855               1 :                 assert_eq!(
    4856               1 :                     e,
    4857               1 :                     GetTimelineError::NotFound {
    4858               1 :                         tenant_id: tenant.tenant_id,
    4859               1 :                         timeline_id: TIMELINE_ID,
    4860               1 :                     }
    4861               1 :                 )
    4862                 :             }
    4863                 :         }
    4864                 : 
    4865               1 :         assert!(!harness
    4866               1 :             .conf
    4867               1 :             .timeline_path(&tenant.tenant_id, &TIMELINE_ID)
    4868               1 :             .exists());
    4869                 : 
    4870               1 :         assert!(!harness
    4871               1 :             .conf
    4872               1 :             .timeline_uninit_mark_file_path(tenant.tenant_id, TIMELINE_ID)
    4873               1 :             .exists());
    4874                 : 
    4875               1 :         Ok(())
    4876                 :     }
    4877                 : }
        

Generated by: LCOV version 2.1-beta