LCOV - code coverage report
Current view: top level - pageserver/src - tenant.rs (source / functions) Coverage Total Hit
Test: 8ac049b474321fdc72ddcb56d7165153a1a900e8.info Lines: 86.7 % 2854 2473
Test Date: 2023-09-06 10:18:01 Functions: 74.3 % 323 240

            Line data    Source code
       1              : //!
       2              : //! Timeline repository implementation that keeps old data in files on disk, and
       3              : //! the recent changes in memory. See tenant/*_layer.rs files.
       4              : //! The functions here are responsible for locating the correct layer for the
       5              : //! get/put call, walking back the timeline branching history as needed.
       6              : //!
       7              : //! The files are stored in the .neon/tenants/<tenant_id>/timelines/<timeline_id>
       8              : //! directory. See docs/pageserver-storage.md for how the files are managed.
       9              : //! In addition to the layer files, there is a metadata file in the same
      10              : //! directory that contains information about the timeline, in particular its
      11              : //! parent timeline, and the last LSN that has been written to disk.
      12              : //!
      13              : 
      14              : use anyhow::{bail, Context};
      15              : use futures::FutureExt;
      16              : use pageserver_api::models::TimelineState;
      17              : use remote_storage::DownloadError;
      18              : use remote_storage::GenericRemoteStorage;
      19              : use storage_broker::BrokerClientChannel;
      20              : use tokio::sync::watch;
      21              : use tokio::task::JoinSet;
      22              : use tokio_util::sync::CancellationToken;
      23              : use tracing::*;
      24              : use utils::completion;
      25              : use utils::crashsafe::path_with_suffix_extension;
      26              : 
      27              : use std::cmp::min;
      28              : use std::collections::hash_map::Entry;
      29              : use std::collections::BTreeSet;
      30              : use std::collections::HashMap;
      31              : use std::fmt::Debug;
      32              : use std::fmt::Display;
      33              : use std::fs;
      34              : use std::fs::File;
      35              : use std::io;
      36              : use std::ops::Bound::Included;
      37              : use std::path::Path;
      38              : use std::path::PathBuf;
      39              : use std::process::Command;
      40              : use std::process::Stdio;
      41              : use std::sync::atomic::AtomicU64;
      42              : use std::sync::atomic::Ordering;
      43              : use std::sync::Arc;
      44              : use std::sync::MutexGuard;
      45              : use std::sync::{Mutex, RwLock};
      46              : use std::time::{Duration, Instant};
      47              : 
      48              : use self::config::TenantConf;
      49              : use self::delete::DeleteTenantFlow;
      50              : use self::metadata::LoadMetadataError;
      51              : use self::metadata::TimelineMetadata;
      52              : use self::mgr::TenantsMap;
      53              : use self::remote_timeline_client::RemoteTimelineClient;
      54              : use self::timeline::uninit::TimelineUninitMark;
      55              : use self::timeline::uninit::UninitializedTimeline;
      56              : use self::timeline::EvictionTaskTenantState;
      57              : use self::timeline::TimelineResources;
      58              : use crate::config::PageServerConf;
      59              : use crate::context::{DownloadBehavior, RequestContext};
      60              : use crate::import_datadir;
      61              : use crate::is_uninit_mark;
      62              : use crate::metrics::TENANT_ACTIVATION;
      63              : use crate::metrics::{remove_tenant_metrics, TENANT_STATE_METRIC, TENANT_SYNTHETIC_SIZE_METRIC};
      64              : use crate::repository::GcResult;
      65              : use crate::task_mgr;
      66              : use crate::task_mgr::TaskKind;
      67              : use crate::tenant::config::TenantConfOpt;
      68              : use crate::tenant::metadata::load_metadata;
      69              : pub use crate::tenant::remote_timeline_client::index::IndexPart;
      70              : use crate::tenant::remote_timeline_client::MaybeDeletedIndexPart;
      71              : use crate::tenant::storage_layer::DeltaLayer;
      72              : use crate::tenant::storage_layer::ImageLayer;
      73              : use crate::InitializationOrder;
      74              : 
      75              : use crate::tenant::timeline::delete::DeleteTimelineFlow;
      76              : use crate::tenant::timeline::uninit::cleanup_timeline_directory;
      77              : use crate::virtual_file::VirtualFile;
      78              : use crate::walredo::PostgresRedoManager;
      79              : use crate::walredo::WalRedoManager;
      80              : use crate::TEMP_FILE_SUFFIX;
      81              : pub use pageserver_api::models::TenantState;
      82              : 
      83              : use toml_edit;
      84              : use utils::{
      85              :     crashsafe,
      86              :     generation::Generation,
      87              :     id::{TenantId, TimelineId},
      88              :     lsn::{Lsn, RecordLsn},
      89              : };
      90              : 
      91              : /// Declare a failpoint that can use the `pause` failpoint action.
      92              : /// We don't want to block the executor thread, hence, spawn_blocking + await.
      93              : macro_rules! pausable_failpoint {
      94              :     ($name:literal) => {
      95              :         if cfg!(feature = "testing") {
      96              :             tokio::task::spawn_blocking({
      97              :                 let current = tracing::Span::current();
      98              :                 move || {
      99              :                     let _entered = current.entered();
     100              :                     tracing::info!("at failpoint {}", $name);
     101              :                     fail::fail_point!($name);
     102              :                 }
     103              :             })
     104              :             .await
     105              :             .expect("spawn_blocking");
     106              :         }
     107              :     };
     108              : }
     109              : 
     110              : pub mod blob_io;
     111              : pub mod block_io;
     112              : 
     113              : pub mod disk_btree;
     114              : pub(crate) mod ephemeral_file;
     115              : pub mod layer_map;
     116              : mod span;
     117              : 
     118              : pub mod metadata;
     119              : mod par_fsync;
     120              : mod remote_timeline_client;
     121              : pub mod storage_layer;
     122              : 
     123              : pub mod config;
     124              : pub mod delete;
     125              : pub mod mgr;
     126              : pub mod tasks;
     127              : pub mod upload_queue;
     128              : 
     129              : pub(crate) mod timeline;
     130              : 
     131              : pub mod size;
     132              : 
     133              : pub(crate) use timeline::span::debug_assert_current_span_has_tenant_and_timeline_id;
     134              : pub use timeline::{
     135              :     LocalLayerInfoForDiskUsageEviction, LogicalSizeCalculationCause, PageReconstructError, Timeline,
     136              : };
     137              : 
     138              : // re-export for use in remote_timeline_client.rs
     139              : pub use crate::tenant::metadata::save_metadata;
     140              : 
     141              : // re-export for use in walreceiver
     142              : pub use crate::tenant::timeline::WalReceiverInfo;
     143              : 
     144              : /// The "tenants" part of `tenants/<tenant>/timelines...`
     145              : pub const TENANTS_SEGMENT_NAME: &str = "tenants";
     146              : 
     147              : /// Parts of the `.neon/tenants/<tenant_id>/timelines/<timeline_id>` directory prefix.
     148              : pub const TIMELINES_SEGMENT_NAME: &str = "timelines";
     149              : 
     150              : pub const TENANT_ATTACHING_MARKER_FILENAME: &str = "attaching";
     151              : 
     152              : pub const TENANT_DELETED_MARKER_FILE_NAME: &str = "deleted";
     153              : 
     154              : /// References to shared objects that are passed into each tenant, such
     155              : /// as the shared remote storage client and process initialization state.
     156          215 : #[derive(Clone)]
     157              : pub struct TenantSharedResources {
     158              :     pub broker_client: storage_broker::BrokerClientChannel,
     159              :     pub remote_storage: Option<GenericRemoteStorage>,
     160              : }
     161              : 
     162              : ///
     163              : /// Tenant consists of multiple timelines. Keep them in a hash table.
     164              : ///
     165              : pub struct Tenant {
     166              :     // Global pageserver config parameters
     167              :     pub conf: &'static PageServerConf,
     168              : 
     169              :     /// The value creation timestamp, used to measure activation delay, see:
     170              :     /// <https://github.com/neondatabase/neon/issues/4025>
     171              :     loading_started_at: Instant,
     172              : 
     173              :     state: watch::Sender<TenantState>,
     174              : 
     175              :     // Overridden tenant-specific config parameters.
     176              :     // We keep TenantConfOpt sturct here to preserve the information
     177              :     // about parameters that are not set.
     178              :     // This is necessary to allow global config updates.
     179              :     tenant_conf: Arc<RwLock<TenantConfOpt>>,
     180              : 
     181              :     tenant_id: TenantId,
     182              : 
     183              :     /// The remote storage generation, used to protect S3 objects from split-brain.
     184              :     /// Does not change over the lifetime of the [`Tenant`] object.
     185              :     generation: Generation,
     186              : 
     187              :     timelines: Mutex<HashMap<TimelineId, Arc<Timeline>>>,
     188              :     // This mutex prevents creation of new timelines during GC.
     189              :     // Adding yet another mutex (in addition to `timelines`) is needed because holding
     190              :     // `timelines` mutex during all GC iteration
     191              :     // may block for a long time `get_timeline`, `get_timelines_state`,... and other operations
     192              :     // with timelines, which in turn may cause dropping replication connection, expiration of wait_for_lsn
     193              :     // timeout...
     194              :     gc_cs: tokio::sync::Mutex<()>,
     195              :     walredo_mgr: Arc<dyn WalRedoManager + Send + Sync>,
     196              : 
     197              :     // provides access to timeline data sitting in the remote storage
     198              :     pub(crate) remote_storage: Option<GenericRemoteStorage>,
     199              : 
     200              :     /// Cached logical sizes updated updated on each [`Tenant::gather_size_inputs`].
     201              :     cached_logical_sizes: tokio::sync::Mutex<HashMap<(TimelineId, Lsn), u64>>,
     202              :     cached_synthetic_tenant_size: Arc<AtomicU64>,
     203              : 
     204              :     eviction_task_tenant_state: tokio::sync::Mutex<EvictionTaskTenantState>,
     205              : 
     206              :     pub(crate) delete_progress: Arc<tokio::sync::Mutex<DeleteTenantFlow>>,
     207              : }
     208              : 
     209              : // We should not blindly overwrite local metadata with remote one.
     210              : // For example, consider the following case:
     211              : //     Image layer is flushed to disk as a new delta layer, we update local metadata and start upload task but after that
     212              : //     pageserver crashes. During startup we'll load new metadata, and then reset it
     213              : //     to the state of remote one. But current layermap will have layers from the old
     214              : //     metadata which is inconsistent.
     215              : //     And with current logic it wont disgard them during load because during layermap
     216              : //     load it sees local disk consistent lsn which is ahead of layer lsns.
     217              : //     If we treat remote as source of truth we need to completely sync with it,
     218              : //     i e delete local files which are missing on the remote. This will add extra work,
     219              : //     wal for these layers needs to be reingested for example
     220              : //
     221              : // So the solution is to take remote metadata only when we're attaching.
     222              : pub fn merge_local_remote_metadata<'a>(
     223              :     local: Option<&'a TimelineMetadata>,
     224              :     remote: Option<&'a TimelineMetadata>,
     225              : ) -> anyhow::Result<(&'a TimelineMetadata, bool)> {
     226          326 :     match (local, remote) {
     227            0 :         (None, None) => anyhow::bail!("we should have either local metadata or remote"),
     228          148 :         (Some(local), None) => Ok((local, true)),
     229              :         // happens if we crash during attach, before writing out the metadata file
     230           49 :         (None, Some(remote)) => Ok((remote, false)),
     231              :         // This is the regular case where we crash/exit before finishing queued uploads.
     232              :         // Also, it happens if we crash during attach after writing the metadata file
     233              :         // but before removing the attaching marker file.
     234          129 :         (Some(local), Some(remote)) => {
     235          129 :             let consistent_lsn_cmp = local
     236          129 :                 .disk_consistent_lsn()
     237          129 :                 .cmp(&remote.disk_consistent_lsn());
     238          129 :             let gc_cutoff_lsn_cmp = local
     239          129 :                 .latest_gc_cutoff_lsn()
     240          129 :                 .cmp(&remote.latest_gc_cutoff_lsn());
     241          129 :             use std::cmp::Ordering::*;
     242          129 :             match (consistent_lsn_cmp, gc_cutoff_lsn_cmp) {
     243              :                 // It wouldn't matter, but pick the local one so that we don't rewrite the metadata file.
     244          127 :                 (Equal, Equal) => Ok((local, true)),
     245              :                 // Local state is clearly ahead of the remote.
     246            0 :                 (Greater, Greater) => Ok((local, true)),
     247              :                 // We have local layer files that aren't on the remote, but GC horizon is on par.
     248            2 :                 (Greater, Equal) => Ok((local, true)),
     249              :                 // Local GC started running but we couldn't sync it to the remote.
     250            0 :                 (Equal, Greater) => Ok((local, true)),
     251              : 
     252              :                 // We always update the local value first, so something else must have
     253              :                 // updated the remote value, probably a different pageserver.
     254              :                 // The control plane is supposed to prevent this from happening.
     255              :                 // Bail out.
     256              :                 (Less, Less)
     257              :                 | (Less, Equal)
     258              :                 | (Equal, Less)
     259              :                 | (Less, Greater)
     260              :                 | (Greater, Less) => {
     261            0 :                     anyhow::bail!(
     262            0 :                         r#"remote metadata appears to be ahead of local metadata:
     263            0 : local:
     264            0 :   {local:#?}
     265            0 : remote:
     266            0 :   {remote:#?}
     267            0 : "#
     268            0 :                     );
     269              :                 }
     270              :             }
     271              :         }
     272              :     }
     273          326 : }
     274              : 
     275          176 : #[derive(Debug, thiserror::Error, PartialEq, Eq)]
     276              : pub enum GetTimelineError {
     277              :     #[error("Timeline {tenant_id}/{timeline_id} is not active, state: {state:?}")]
     278              :     NotActive {
     279              :         tenant_id: TenantId,
     280              :         timeline_id: TimelineId,
     281              :         state: TimelineState,
     282              :     },
     283              :     #[error("Timeline {tenant_id}/{timeline_id} was not found")]
     284              :     NotFound {
     285              :         tenant_id: TenantId,
     286              :         timeline_id: TimelineId,
     287              :     },
     288              : }
     289              : 
     290            0 : #[derive(Debug, thiserror::Error)]
     291              : pub enum LoadLocalTimelineError {
     292              :     #[error("FailedToLoad")]
     293              :     Load(#[source] anyhow::Error),
     294              :     #[error("FailedToResumeDeletion")]
     295              :     ResumeDeletion(#[source] anyhow::Error),
     296              : }
     297              : 
     298          239 : #[derive(thiserror::Error)]
     299              : pub enum DeleteTimelineError {
     300              :     #[error("NotFound")]
     301              :     NotFound,
     302              : 
     303              :     #[error("HasChildren")]
     304              :     HasChildren(Vec<TimelineId>),
     305              : 
     306              :     #[error("Timeline deletion is already in progress")]
     307              :     AlreadyInProgress(Arc<tokio::sync::Mutex<DeleteTimelineFlow>>),
     308              : 
     309              :     #[error(transparent)]
     310              :     Other(#[from] anyhow::Error),
     311              : }
     312              : 
     313              : impl Debug for DeleteTimelineError {
     314            0 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
     315            0 :         match self {
     316            0 :             Self::NotFound => write!(f, "NotFound"),
     317            0 :             Self::HasChildren(c) => f.debug_tuple("HasChildren").field(c).finish(),
     318            0 :             Self::AlreadyInProgress(_) => f.debug_tuple("AlreadyInProgress").finish(),
     319            0 :             Self::Other(e) => f.debug_tuple("Other").field(e).finish(),
     320              :         }
     321            0 :     }
     322              : }
     323              : 
     324              : pub enum SetStoppingError {
     325              :     AlreadyStopping(completion::Barrier),
     326              :     Broken,
     327              : }
     328              : 
     329              : impl Debug for SetStoppingError {
     330            0 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
     331            0 :         match self {
     332            0 :             Self::AlreadyStopping(_) => f.debug_tuple("AlreadyStopping").finish(),
     333            0 :             Self::Broken => write!(f, "Broken"),
     334              :         }
     335            0 :     }
     336              : }
     337              : 
     338              : struct RemoteStartupData {
     339              :     index_part: IndexPart,
     340              :     remote_metadata: TimelineMetadata,
     341              : }
     342              : 
     343            0 : #[derive(Debug, thiserror::Error)]
     344              : pub(crate) enum WaitToBecomeActiveError {
     345              :     WillNotBecomeActive {
     346              :         tenant_id: TenantId,
     347              :         state: TenantState,
     348              :     },
     349              :     TenantDropped {
     350              :         tenant_id: TenantId,
     351              :     },
     352              : }
     353              : 
     354              : impl std::fmt::Display for WaitToBecomeActiveError {
     355          132 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
     356          132 :         match self {
     357          132 :             WaitToBecomeActiveError::WillNotBecomeActive { tenant_id, state } => {
     358          132 :                 write!(
     359          132 :                     f,
     360          132 :                     "Tenant {} will not become active. Current state: {:?}",
     361          132 :                     tenant_id, state
     362          132 :                 )
     363              :             }
     364            0 :             WaitToBecomeActiveError::TenantDropped { tenant_id } => {
     365            0 :                 write!(f, "Tenant {tenant_id} will not become active (dropped)")
     366              :             }
     367              :         }
     368          132 :     }
     369              : }
     370              : 
     371            0 : #[derive(thiserror::Error, Debug)]
     372              : pub enum CreateTimelineError {
     373              :     #[error("a timeline with the given ID already exists")]
     374              :     AlreadyExists,
     375              :     #[error(transparent)]
     376              :     AncestorLsn(anyhow::Error),
     377              :     #[error(transparent)]
     378              :     Other(#[from] anyhow::Error),
     379              : }
     380              : 
     381              : struct TenantDirectoryScan {
     382              :     sorted_timelines_to_load: Vec<(TimelineId, TimelineMetadata)>,
     383              :     timelines_to_resume_deletion: Vec<(TimelineId, Option<TimelineMetadata>)>,
     384              : }
     385              : 
     386              : enum CreateTimelineCause {
     387              :     Load,
     388              :     Delete,
     389              : }
     390              : 
     391              : impl Tenant {
     392              :     /// Yet another helper for timeline initialization.
     393              :     /// Contains the common part of `load_local_timeline` and `load_remote_timeline`.
     394              :     ///
     395              :     /// - Initializes the Timeline struct and inserts it into the tenant's hash map
     396              :     /// - Scans the local timeline directory for layer files and builds the layer map
     397              :     /// - Downloads remote index file and adds remote files to the layer map
     398              :     /// - Schedules remote upload tasks for any files that are present locally but missing from remote storage.
     399              :     ///
     400              :     /// If the operation fails, the timeline is left in the tenant's hash map in Broken state. On success,
     401              :     /// it is marked as Active.
     402              :     #[allow(clippy::too_many_arguments)]
     403          326 :     async fn timeline_init_and_sync(
     404          326 :         &self,
     405          326 :         timeline_id: TimelineId,
     406          326 :         resources: TimelineResources,
     407          326 :         remote_startup_data: Option<RemoteStartupData>,
     408          326 :         local_metadata: Option<TimelineMetadata>,
     409          326 :         ancestor: Option<Arc<Timeline>>,
     410          326 :         init_order: Option<&InitializationOrder>,
     411          326 :         _ctx: &RequestContext,
     412          326 :     ) -> anyhow::Result<()> {
     413          326 :         let tenant_id = self.tenant_id;
     414              : 
     415          326 :         let (up_to_date_metadata, picked_local) = merge_local_remote_metadata(
     416          326 :             local_metadata.as_ref(),
     417          326 :             remote_startup_data.as_ref().map(|r| &r.remote_metadata),
     418          326 :         )
     419          326 :         .context("merge_local_remote_metadata")?
     420          326 :         .to_owned();
     421              : 
     422          326 :         let timeline = self.create_timeline_struct(
     423          326 :             timeline_id,
     424          326 :             up_to_date_metadata,
     425          326 :             ancestor.clone(),
     426          326 :             resources,
     427          326 :             init_order,
     428          326 :             CreateTimelineCause::Load,
     429          326 :         )?;
     430          326 :         let disk_consistent_lsn = timeline.get_disk_consistent_lsn();
     431          326 :         anyhow::ensure!(
     432          326 :             disk_consistent_lsn.is_valid(),
     433            0 :             "Timeline {tenant_id}/{timeline_id} has invalid disk_consistent_lsn"
     434              :         );
     435          326 :         assert_eq!(
     436          326 :             disk_consistent_lsn,
     437          326 :             up_to_date_metadata.disk_consistent_lsn(),
     438            0 :             "these are used interchangeably"
     439              :         );
     440              : 
     441              :         // Save the metadata file to local disk.
     442          326 :         if !picked_local {
     443           49 :             save_metadata(self.conf, &tenant_id, &timeline_id, up_to_date_metadata)
     444            0 :                 .await
     445           49 :                 .context("save_metadata")?;
     446          277 :         }
     447              : 
     448          326 :         let index_part = remote_startup_data.as_ref().map(|x| &x.index_part);
     449              : 
     450          326 :         if let Some(index_part) = index_part {
     451          178 :             timeline
     452          178 :                 .remote_client
     453          178 :                 .as_ref()
     454          178 :                 .unwrap()
     455          178 :                 .init_upload_queue(index_part)?;
     456          148 :         } else if self.remote_storage.is_some() {
     457              :             // No data on the remote storage, but we have local metadata file. We can end up
     458              :             // here with timeline_create being interrupted before finishing index part upload.
     459              :             // By doing what we do here, the index part upload is retried.
     460              :             // If control plane retries timeline creation in the meantime, the mgmt API handler
     461              :             // for timeline creation will coalesce on the upload we queue here.
     462            1 :             let rtc = timeline.remote_client.as_ref().unwrap();
     463            1 :             rtc.init_upload_queue_for_empty_remote(up_to_date_metadata)?;
     464            1 :             rtc.schedule_index_upload_for_metadata_update(up_to_date_metadata)?;
     465          147 :         }
     466              : 
     467          326 :         timeline
     468          326 :             .load_layer_map(
     469          326 :                 disk_consistent_lsn,
     470          326 :                 remote_startup_data.map(|x| x.index_part),
     471          326 :             )
     472          324 :             .await
     473          326 :             .with_context(|| {
     474            0 :                 format!("Failed to load layermap for timeline {tenant_id}/{timeline_id}")
     475          326 :             })?;
     476              : 
     477              :         {
     478              :             // avoiding holding it across awaits
     479          326 :             let mut timelines_accessor = self.timelines.lock().unwrap();
     480          326 :             match timelines_accessor.entry(timeline_id) {
     481              :                 Entry::Occupied(_) => {
     482              :                     // The uninit mark file acts as a lock that prevents another task from
     483              :                     // initializing the timeline at the same time.
     484            0 :                     unreachable!(
     485            0 :                         "Timeline {tenant_id}/{timeline_id} already exists in the tenant map"
     486            0 :                     );
     487              :                 }
     488          326 :                 Entry::Vacant(v) => {
     489          326 :                     v.insert(Arc::clone(&timeline));
     490          326 :                     timeline.maybe_spawn_flush_loop();
     491          326 :                 }
     492          326 :             }
     493          326 :         };
     494          326 : 
     495          326 :         // Sanity check: a timeline should have some content.
     496          326 :         anyhow::ensure!(
     497          326 :             ancestor.is_some()
     498          274 :                 || timeline
     499          274 :                     .layers
     500          274 :                     .read()
     501            0 :                     .await
     502          274 :                     .layer_map()
     503          274 :                     .iter_historic_layers()
     504          274 :                     .next()
     505          274 :                     .is_some(),
     506            1 :             "Timeline has no ancestor and no layer files"
     507              :         );
     508              : 
     509          325 :         Ok(())
     510          326 :     }
     511              : 
     512              :     ///
     513              :     /// Attach a tenant that's available in cloud storage.
     514              :     ///
     515              :     /// This returns quickly, after just creating the in-memory object
     516              :     /// Tenant struct and launching a background task to download
     517              :     /// the remote index files.  On return, the tenant is most likely still in
     518              :     /// Attaching state, and it will become Active once the background task
     519              :     /// finishes. You can use wait_until_active() to wait for the task to
     520              :     /// complete.
     521              :     ///
     522           42 :     pub(crate) fn spawn_attach(
     523           42 :         conf: &'static PageServerConf,
     524           42 :         tenant_id: TenantId,
     525           42 :         generation: Generation,
     526           42 :         broker_client: storage_broker::BrokerClientChannel,
     527           42 :         tenants: &'static tokio::sync::RwLock<TenantsMap>,
     528           42 :         remote_storage: GenericRemoteStorage,
     529           42 :         ctx: &RequestContext,
     530           42 :     ) -> anyhow::Result<Arc<Tenant>> {
     531              :         // TODO dedup with spawn_load
     532           42 :         let tenant_conf =
     533           42 :             Self::load_tenant_config(conf, &tenant_id).context("load tenant config")?;
     534              : 
     535           42 :         let wal_redo_manager = Arc::new(PostgresRedoManager::new(conf, tenant_id));
     536           42 :         let tenant = Arc::new(Tenant::new(
     537           42 :             TenantState::Attaching,
     538           42 :             conf,
     539           42 :             tenant_conf,
     540           42 :             wal_redo_manager,
     541           42 :             tenant_id,
     542           42 :             generation,
     543           42 :             Some(remote_storage.clone()),
     544           42 :         ));
     545           42 : 
     546           42 :         // Do all the hard work in the background
     547           42 :         let tenant_clone = Arc::clone(&tenant);
     548           42 : 
     549           42 :         let ctx = ctx.detached_child(TaskKind::Attach, DownloadBehavior::Warn);
     550           42 :         task_mgr::spawn(
     551           42 :             &tokio::runtime::Handle::current(),
     552           42 :             TaskKind::Attach,
     553           42 :             Some(tenant_id),
     554           42 :             None,
     555           42 :             "attach tenant",
     556              :             false,
     557           42 :             async move {
     558           42 :                 // Ideally we should use Tenant::set_broken_no_wait, but it is not supposed to be used when tenant is in loading state.
     559           42 :                 let make_broken = |t: &Tenant, err: anyhow::Error| {
     560            3 :                     error!("attach failed, setting tenant state to Broken: {err:?}");
     561            3 :                     t.state.send_modify(|state| {
     562            3 :                         assert_eq!(
     563              :                             *state,
     564              :                             TenantState::Attaching,
     565            0 :                             "the attach task owns the tenant state until activation is complete"
     566              :                         );
     567            3 :                         *state = TenantState::broken_from_reason(err.to_string());
     568            3 :                     });
     569            3 :                 };
     570              : 
     571           42 :                 let pending_deletion = {
     572           42 :                     match DeleteTenantFlow::should_resume_deletion(
     573           42 :                         conf,
     574           42 :                         Some(&remote_storage),
     575           42 :                         &tenant_clone,
     576           42 :                     )
     577          121 :                     .await
     578              :                     {
     579           42 :                         Ok(should_resume_deletion) => should_resume_deletion,
     580            0 :                         Err(err) => {
     581            0 :                             make_broken(&tenant_clone, anyhow::anyhow!(err));
     582            0 :                             return Ok(());
     583              :                         }
     584              :                     }
     585              :                 };
     586              : 
     587           42 :                 info!("pending_deletion {}", pending_deletion.is_some());
     588              : 
     589           42 :                 if let Some(deletion) = pending_deletion {
     590            3 :                     match DeleteTenantFlow::resume_from_attach(
     591            3 :                         deletion,
     592            3 :                         &tenant_clone,
     593            3 :                         tenants,
     594            3 :                         &ctx,
     595            3 :                     )
     596          199 :                     .await
     597              :                     {
     598            0 :                         Err(err) => {
     599            0 :                             make_broken(&tenant_clone, anyhow::anyhow!(err));
     600            0 :                             return Ok(());
     601              :                         }
     602            3 :                         Ok(()) => return Ok(()),
     603              :                     }
     604           39 :                 }
     605           39 : 
     606          296 :                 match tenant_clone.attach(&ctx).await {
     607              :                     Ok(()) => {
     608           36 :                         info!("attach finished, activating");
     609           36 :                         tenant_clone.activate(broker_client, None, &ctx);
     610              :                     }
     611            3 :                     Err(e) => {
     612            3 :                         make_broken(&tenant_clone, anyhow::anyhow!(e));
     613            3 :                     }
     614              :                 }
     615           39 :                 Ok(())
     616           42 :             }
     617           42 :             .instrument({
     618           42 :                 let span = tracing::info_span!(parent: None, "attach", tenant_id=%tenant_id);
     619           42 :                 span.follows_from(Span::current());
     620           42 :                 span
     621           42 :             }),
     622           42 :         );
     623           42 :         Ok(tenant)
     624           42 :     }
     625              : 
     626              :     ///
     627              :     /// Background task that downloads all data for a tenant and brings it to Active state.
     628              :     ///
     629              :     /// No background tasks are started as part of this routine.
     630              :     ///
     631           42 :     async fn attach(self: &Arc<Tenant>, ctx: &RequestContext) -> anyhow::Result<()> {
     632           42 :         span::debug_assert_current_span_has_tenant_id();
     633           42 : 
     634           42 :         let marker_file = self.conf.tenant_attaching_mark_file_path(&self.tenant_id);
     635           42 :         if !tokio::fs::try_exists(&marker_file)
     636           42 :             .await
     637           42 :             .context("check for existence of marker file")?
     638              :         {
     639            0 :             anyhow::bail!(
     640            0 :                 "implementation error: marker file should exist at beginning of this function"
     641            0 :             );
     642           42 :         }
     643              : 
     644              :         // Get list of remote timelines
     645              :         // download index files for every tenant timeline
     646           42 :         info!("listing remote timelines");
     647              : 
     648           42 :         let remote_storage = self
     649           42 :             .remote_storage
     650           42 :             .as_ref()
     651           42 :             .ok_or_else(|| anyhow::anyhow!("cannot attach without remote storage"))?;
     652              : 
     653           39 :         let remote_timeline_ids =
     654          132 :             remote_timeline_client::list_remote_timelines(remote_storage, self.tenant_id).await?;
     655              : 
     656           39 :         info!("found {} timelines", remote_timeline_ids.len());
     657              : 
     658              :         // Download & parse index parts
     659           39 :         let mut part_downloads = JoinSet::new();
     660           94 :         for timeline_id in remote_timeline_ids {
     661           55 :             let client = RemoteTimelineClient::new(
     662           55 :                 remote_storage.clone(),
     663           55 :                 self.conf,
     664           55 :                 self.tenant_id,
     665           55 :                 timeline_id,
     666           55 :                 self.generation,
     667           55 :             );
     668           55 :             part_downloads.spawn(
     669           55 :                 async move {
     670           55 :                     debug!("starting index part download");
     671              : 
     672           55 :                     let index_part = client
     673           55 :                         .download_index_file()
     674          211 :                         .await
     675           55 :                         .context("download index file")?;
     676              : 
     677           55 :                     debug!("finished index part download");
     678              : 
     679           55 :                     Result::<_, anyhow::Error>::Ok((timeline_id, client, index_part))
     680           55 :                 }
     681           55 :                 .map(move |res| {
     682           55 :                     res.with_context(|| format!("download index part for timeline {timeline_id}"))
     683           55 :                 })
     684           55 :                 .instrument(info_span!("download_index_part", %timeline_id)),
     685              :             );
     686              :         }
     687              : 
     688           39 :         let mut timelines_to_resume_deletions = vec![];
     689           39 : 
     690           39 :         // Wait for all the download tasks to complete & collect results.
     691           39 :         let mut remote_index_and_client = HashMap::new();
     692           39 :         let mut timeline_ancestors = HashMap::new();
     693           94 :         while let Some(result) = part_downloads.join_next().await {
     694              :             // NB: we already added timeline_id as context to the error
     695           55 :             let result: Result<_, anyhow::Error> = result.context("joinset task join")?;
     696           55 :             let (timeline_id, client, index_part) = result?;
     697            0 :             debug!("successfully downloaded index part for timeline {timeline_id}");
     698           55 :             match index_part {
     699           49 :                 MaybeDeletedIndexPart::IndexPart(index_part) => {
     700           49 :                     timeline_ancestors.insert(timeline_id, index_part.metadata.clone());
     701           49 :                     remote_index_and_client.insert(timeline_id, (index_part, client));
     702           49 :                 }
     703            6 :                 MaybeDeletedIndexPart::Deleted(index_part) => {
     704            6 :                     info!(
     705            6 :                         "timeline {} is deleted, picking to resume deletion",
     706            6 :                         timeline_id
     707            6 :                     );
     708            6 :                     timelines_to_resume_deletions.push((timeline_id, index_part, client));
     709              :                 }
     710              :             }
     711              :         }
     712              : 
     713              :         // For every timeline, download the metadata file, scan the local directory,
     714              :         // and build a layer map that contains an entry for each remote and local
     715              :         // layer file.
     716           49 :         let sorted_timelines = tree_sort_timelines(timeline_ancestors, |m| m.ancestor_timeline())?;
     717           88 :         for (timeline_id, remote_metadata) in sorted_timelines {
     718           49 :             let (index_part, remote_client) = remote_index_and_client
     719           49 :                 .remove(&timeline_id)
     720           49 :                 .expect("just put it in above");
     721           49 : 
     722           49 :             // TODO again handle early failure
     723           49 :             self.load_remote_timeline(
     724           49 :                 timeline_id,
     725           49 :                 index_part,
     726           49 :                 remote_metadata,
     727           49 :                 TimelineResources {
     728           49 :                     remote_client: Some(remote_client),
     729           49 :                 },
     730           49 :                 ctx,
     731           49 :             )
     732           98 :             .await
     733           49 :             .with_context(|| {
     734            0 :                 format!(
     735            0 :                     "failed to load remote timeline {} for tenant {}",
     736            0 :                     timeline_id, self.tenant_id
     737            0 :                 )
     738           49 :             })?;
     739              :         }
     740              : 
     741              :         // Walk through deleted timelines, resume deletion
     742           45 :         for (timeline_id, index_part, remote_timeline_client) in timelines_to_resume_deletions {
     743            6 :             remote_timeline_client
     744            6 :                 .init_upload_queue_stopped_to_continue_deletion(&index_part)
     745            6 :                 .context("init queue stopped")
     746            6 :                 .map_err(LoadLocalTimelineError::ResumeDeletion)?;
     747              : 
     748            6 :             DeleteTimelineFlow::resume_deletion(
     749            6 :                 Arc::clone(self),
     750            6 :                 timeline_id,
     751            6 :                 &index_part.metadata,
     752            6 :                 Some(remote_timeline_client),
     753            6 :                 None,
     754            6 :             )
     755            0 :             .await
     756            6 :             .context("resume_deletion")
     757            6 :             .map_err(LoadLocalTimelineError::ResumeDeletion)?;
     758              :         }
     759              : 
     760           39 :         std::fs::remove_file(&marker_file)
     761           39 :             .with_context(|| format!("unlink attach marker file {}", marker_file.display()))?;
     762           39 :         crashsafe::fsync(marker_file.parent().expect("marker file has parent dir"))
     763           39 :             .context("fsync tenant directory after unlinking attach marker file")?;
     764              : 
     765           21 :         crate::failpoint_support::sleep_millis_async!("attach-before-activate");
     766              : 
     767           39 :         info!("Done");
     768              : 
     769           39 :         Ok(())
     770           42 :     }
     771              : 
     772              :     /// Get sum of all remote timelines sizes
     773              :     ///
     774              :     /// This function relies on the index_part instead of listing the remote storage
     775           40 :     pub fn remote_size(&self) -> u64 {
     776           40 :         let mut size = 0;
     777              : 
     778           46 :         for timeline in self.list_timelines() {
     779           46 :             if let Some(remote_client) = &timeline.remote_client {
     780           38 :                 size += remote_client.get_remote_physical_size();
     781           38 :             }
     782              :         }
     783              : 
     784           40 :         size
     785           40 :     }
     786              : 
     787          147 :     #[instrument(skip_all, fields(timeline_id=%timeline_id))]
     788              :     async fn load_remote_timeline(
     789              :         &self,
     790              :         timeline_id: TimelineId,
     791              :         index_part: IndexPart,
     792              :         remote_metadata: TimelineMetadata,
     793              :         resources: TimelineResources,
     794              :         ctx: &RequestContext,
     795              :     ) -> anyhow::Result<()> {
     796              :         span::debug_assert_current_span_has_tenant_id();
     797              : 
     798           49 :         info!("downloading index file for timeline {}", timeline_id);
     799              :         tokio::fs::create_dir_all(self.conf.timeline_path(&self.tenant_id, &timeline_id))
     800              :             .await
     801              :             .context("Failed to create new timeline directory")?;
     802              : 
     803              :         let ancestor = if let Some(ancestor_id) = remote_metadata.ancestor_timeline() {
     804              :             let timelines = self.timelines.lock().unwrap();
     805              :             Some(Arc::clone(timelines.get(&ancestor_id).ok_or_else(
     806            0 :                 || {
     807            0 :                     anyhow::anyhow!(
     808            0 :                         "cannot find ancestor timeline {ancestor_id} for timeline {timeline_id}"
     809            0 :                     )
     810            0 :                 },
     811              :             )?))
     812              :         } else {
     813              :             None
     814              :         };
     815              : 
     816              :         // Even if there is local metadata it cannot be ahead of the remote one
     817              :         // since we're attaching. Even if we resume interrupted attach remote one
     818              :         // cannot be older than the local one
     819              :         let local_metadata = None;
     820              : 
     821              :         self.timeline_init_and_sync(
     822              :             timeline_id,
     823              :             resources,
     824              :             Some(RemoteStartupData {
     825              :                 index_part,
     826              :                 remote_metadata,
     827              :             }),
     828              :             local_metadata,
     829              :             ancestor,
     830              :             None,
     831              :             ctx,
     832              :         )
     833              :         .await
     834              :     }
     835              : 
     836              :     /// Create a placeholder Tenant object for a broken tenant
     837            0 :     pub fn create_broken_tenant(
     838            0 :         conf: &'static PageServerConf,
     839            0 :         tenant_id: TenantId,
     840            0 :         reason: String,
     841            0 :     ) -> Arc<Tenant> {
     842            0 :         let wal_redo_manager = Arc::new(PostgresRedoManager::new(conf, tenant_id));
     843            0 :         Arc::new(Tenant::new(
     844            0 :             TenantState::Broken {
     845            0 :                 reason,
     846            0 :                 backtrace: String::new(),
     847            0 :             },
     848            0 :             conf,
     849            0 :             TenantConfOpt::default(),
     850            0 :             wal_redo_manager,
     851            0 :             tenant_id,
     852            0 :             Generation::broken(),
     853            0 :             None,
     854            0 :         ))
     855            0 :     }
     856              : 
     857              :     /// Load a tenant that's available on local disk
     858              :     ///
     859              :     /// This is used at pageserver startup, to rebuild the in-memory
     860              :     /// structures from on-disk state. This is similar to attaching a tenant,
     861              :     /// but the index files already exist on local disk, as well as some layer
     862              :     /// files.
     863              :     ///
     864              :     /// If the loading fails for some reason, the Tenant will go into Broken
     865              :     /// state.
     866          697 :     #[instrument(skip_all, fields(tenant_id=%tenant_id))]
     867              :     pub(crate) fn spawn_load(
     868              :         conf: &'static PageServerConf,
     869              :         tenant_id: TenantId,
     870              :         generation: Generation,
     871              :         resources: TenantSharedResources,
     872              :         init_order: Option<InitializationOrder>,
     873              :         tenants: &'static tokio::sync::RwLock<TenantsMap>,
     874              :         ctx: &RequestContext,
     875              :     ) -> Arc<Tenant> {
     876              :         span::debug_assert_current_span_has_tenant_id();
     877              : 
     878              :         let tenant_conf = match Self::load_tenant_config(conf, &tenant_id) {
     879              :             Ok(conf) => conf,
     880              :             Err(e) => {
     881            0 :                 error!("load tenant config failed: {:?}", e);
     882              :                 return Tenant::create_broken_tenant(conf, tenant_id, format!("{e:#}"));
     883              :             }
     884              :         };
     885              : 
     886              :         let broker_client = resources.broker_client;
     887              :         let remote_storage = resources.remote_storage;
     888              : 
     889              :         let wal_redo_manager = Arc::new(PostgresRedoManager::new(conf, tenant_id));
     890              :         let tenant = Tenant::new(
     891              :             TenantState::Loading,
     892              :             conf,
     893              :             tenant_conf,
     894              :             wal_redo_manager,
     895              :             tenant_id,
     896              :             generation,
     897              :             remote_storage.clone(),
     898              :         );
     899              :         let tenant = Arc::new(tenant);
     900              : 
     901              :         // Do all the hard work in a background task
     902              :         let tenant_clone = Arc::clone(&tenant);
     903              : 
     904              :         let ctx = ctx.detached_child(TaskKind::InitialLoad, DownloadBehavior::Warn);
     905              :         let _ = task_mgr::spawn(
     906              :             &tokio::runtime::Handle::current(),
     907              :             TaskKind::InitialLoad,
     908              :             Some(tenant_id),
     909              :             None,
     910              :             "initial tenant load",
     911              :             false,
     912          697 :             async move {
     913          697 :                 // Ideally we should use Tenant::set_broken_no_wait, but it is not supposed to be used when tenant is in loading state.
     914          697 :                 let make_broken = |t: &Tenant, err: anyhow::Error| {
     915            7 :                     error!("load failed, setting tenant state to Broken: {err:?}");
     916            7 :                     t.state.send_modify(|state| {
     917            7 :                         assert!(
     918            7 :                             matches!(*state, TenantState::Loading | TenantState::Stopping { .. }),
     919            0 :                             "the loading task owns the tenant state until activation is complete"
     920              :                         );
     921            7 :                         *state = TenantState::broken_from_reason(err.to_string());
     922            7 :                     });
     923            7 :                 };
     924              : 
     925          697 :                 let mut init_order = init_order;
     926          697 : 
     927          697 :                 // take the completion because initial tenant loading will complete when all of
     928          697 :                 // these tasks complete.
     929          697 :                 let _completion = init_order
     930          697 :                     .as_mut()
     931          697 :                     .and_then(|x| x.initial_tenant_load.take());
     932              : 
     933              :                 // Dont block pageserver startup on figuring out deletion status
     934          697 :                 let pending_deletion = {
     935          697 :                     match DeleteTenantFlow::should_resume_deletion(
     936          697 :                         conf,
     937          697 :                         remote_storage.as_ref(),
     938          697 :                         &tenant_clone,
     939          697 :                     )
     940         1498 :                     .await
     941              :                     {
     942          697 :                         Ok(should_resume_deletion) => should_resume_deletion,
     943            0 :                         Err(err) => {
     944            0 :                             make_broken(&tenant_clone, anyhow::anyhow!(err));
     945            0 :                             return Ok(());
     946              :                         }
     947              :                     }
     948              :                 };
     949              : 
     950          697 :                 info!("pending deletion {}", pending_deletion.is_some());
     951              : 
     952          697 :                 if let Some(deletion) = pending_deletion {
     953              :                     // as we are no longer loading, signal completion by dropping
     954              :                     // the completion while we resume deletion
     955           28 :                     drop(_completion);
     956           28 :                     // do not hold to initial_logical_size_attempt as it will prevent loading from proceeding without timeout
     957           28 :                     let _ = init_order
     958           28 :                         .as_mut()
     959           28 :                         .and_then(|x| x.initial_logical_size_attempt.take());
     960           28 : 
     961           28 :                     match DeleteTenantFlow::resume_from_load(
     962           28 :                         deletion,
     963           28 :                         &tenant_clone,
     964           28 :                         init_order.as_ref(),
     965           28 :                         tenants,
     966           28 :                         &ctx,
     967           28 :                     )
     968         1467 :                     .await
     969              :                     {
     970            0 :                         Err(err) => {
     971            0 :                             make_broken(&tenant_clone, anyhow::anyhow!(err));
     972            0 :                             return Ok(());
     973              :                         }
     974           28 :                         Ok(()) => return Ok(()),
     975              :                     }
     976          669 :                 }
     977          669 : 
     978          669 :                 let background_jobs_can_start =
     979          669 :                     init_order.as_ref().map(|x| &x.background_jobs_can_start);
     980          669 : 
     981         1304 :                 match tenant_clone.load(init_order.as_ref(), &ctx).await {
     982              :                     Ok(()) => {
     983          662 :                         debug!("load finished");
     984              : 
     985          662 :                         tenant_clone.activate(broker_client, background_jobs_can_start, &ctx);
     986              :                     }
     987            7 :                     Err(err) => make_broken(&tenant_clone, err),
     988              :                 }
     989              : 
     990          669 :                 Ok(())
     991          697 :             }
     992              :             .instrument({
     993              :                 let span = tracing::info_span!(parent: None, "load", tenant_id=%tenant_id);
     994              :                 span.follows_from(Span::current());
     995              :                 span
     996              :             }),
     997              :         );
     998              : 
     999              :         tenant
    1000              :     }
    1001              : 
    1002          734 :     fn scan_and_sort_timelines_dir(self: Arc<Tenant>) -> anyhow::Result<TenantDirectoryScan> {
    1003          734 :         let mut timelines_to_load: HashMap<TimelineId, TimelineMetadata> = HashMap::new();
    1004          734 :         // Note timelines_to_resume_deletion needs to be separate because it can be not sortable
    1005          734 :         // from the point of `tree_sort_timelines`. I e some parents can be missing because deletion
    1006          734 :         // completed in non topological order (for example because parent has smaller number of layer files in it)
    1007          734 :         let mut timelines_to_resume_deletion: Vec<(TimelineId, Option<TimelineMetadata>)> = vec![];
    1008          734 : 
    1009          734 :         let timelines_dir = self.conf.timelines_path(&self.tenant_id);
    1010              : 
    1011          338 :         for entry in
    1012          734 :             std::fs::read_dir(&timelines_dir).context("list timelines directory for tenant")?
    1013              :         {
    1014          338 :             let entry = entry.context("read timeline dir entry")?;
    1015          338 :             let timeline_dir = entry.path();
    1016          338 : 
    1017          338 :             if crate::is_temporary(&timeline_dir) {
    1018            0 :                 info!(
    1019            0 :                     "Found temporary timeline directory, removing: {}",
    1020            0 :                     timeline_dir.display()
    1021            0 :                 );
    1022            0 :                 if let Err(e) = std::fs::remove_dir_all(&timeline_dir) {
    1023            0 :                     error!(
    1024            0 :                         "Failed to remove temporary directory '{}': {:?}",
    1025            0 :                         timeline_dir.display(),
    1026            0 :                         e
    1027            0 :                     );
    1028            0 :                 }
    1029          338 :             } else if is_uninit_mark(&timeline_dir) {
    1030            1 :                 if !timeline_dir.exists() {
    1031            0 :                     warn!(
    1032            0 :                         "Timeline dir entry become invalid: {}",
    1033            0 :                         timeline_dir.display()
    1034            0 :                     );
    1035            0 :                     continue;
    1036            1 :                 }
    1037            1 : 
    1038            1 :                 let timeline_uninit_mark_file = &timeline_dir;
    1039            1 :                 info!(
    1040            1 :                     "Found an uninit mark file {}, removing the timeline and its uninit mark",
    1041            1 :                     timeline_uninit_mark_file.display()
    1042            1 :                 );
    1043            1 :                 let timeline_id = TimelineId::try_from(timeline_uninit_mark_file.file_stem())
    1044            1 :                     .with_context(|| {
    1045            0 :                         format!(
    1046            0 :                             "Could not parse timeline id out of the timeline uninit mark name {}",
    1047            0 :                             timeline_uninit_mark_file.display()
    1048            0 :                         )
    1049            1 :                     })?;
    1050            1 :                 let timeline_dir = self.conf.timeline_path(&self.tenant_id, &timeline_id);
    1051            0 :                 if let Err(e) =
    1052            1 :                     remove_timeline_and_uninit_mark(&timeline_dir, timeline_uninit_mark_file)
    1053              :                 {
    1054            0 :                     error!("Failed to clean up uninit marked timeline: {e:?}");
    1055            1 :                 }
    1056          337 :             } else if crate::is_delete_mark(&timeline_dir) {
    1057              :                 // If metadata exists, load as usual, continue deletion
    1058           31 :                 let timeline_id =
    1059           31 :                     TimelineId::try_from(timeline_dir.file_stem()).with_context(|| {
    1060            0 :                         format!(
    1061            0 :                             "Could not parse timeline id out of the timeline uninit mark name {}",
    1062            0 :                             timeline_dir.display()
    1063            0 :                         )
    1064           31 :                     })?;
    1065              : 
    1066           31 :                 info!("Found deletion mark for timeline {}", timeline_id);
    1067              : 
    1068           31 :                 match load_metadata(self.conf, &self.tenant_id, &timeline_id) {
    1069           22 :                     Ok(metadata) => {
    1070           22 :                         timelines_to_resume_deletion.push((timeline_id, Some(metadata)))
    1071              :                     }
    1072            9 :                     Err(e) => match &e {
    1073            9 :                         LoadMetadataError::Read(r) => {
    1074            9 :                             if r.kind() != io::ErrorKind::NotFound {
    1075            0 :                                 return Err(anyhow::anyhow!(e)).with_context(|| {
    1076            0 :                                     format!("Failed to load metadata for timeline_id {timeline_id}")
    1077            0 :                                 });
    1078            9 :                             }
    1079            9 : 
    1080            9 :                             // If metadata doesnt exist it means that we've crashed without
    1081            9 :                             // completing cleanup_remaining_timeline_fs_traces in DeleteTimelineFlow.
    1082            9 :                             // So save timeline_id for later call to `DeleteTimelineFlow::cleanup_remaining_timeline_fs_traces`.
    1083            9 :                             // We cant do it here because the method is async so we'd need block_on
    1084            9 :                             // and here we're in spawn_blocking. cleanup_remaining_timeline_fs_traces uses fs operations
    1085            9 :                             // so that basically results in a cycle:
    1086            9 :                             // spawn_blocking
    1087            9 :                             // - block_on
    1088            9 :                             //   - spawn_blocking
    1089            9 :                             // which can lead to running out of threads in blocing pool.
    1090            9 :                             timelines_to_resume_deletion.push((timeline_id, None));
    1091              :                         }
    1092              :                         _ => {
    1093            0 :                             return Err(anyhow::anyhow!(e)).with_context(|| {
    1094            0 :                                 format!("Failed to load metadata for timeline_id {timeline_id}")
    1095            0 :                             })
    1096              :                         }
    1097              :                     },
    1098              :                 }
    1099              :             } else {
    1100          306 :                 if !timeline_dir.exists() {
    1101            1 :                     warn!(
    1102            1 :                         "Timeline dir entry become invalid: {}",
    1103            1 :                         timeline_dir.display()
    1104            1 :                     );
    1105            1 :                     continue;
    1106          305 :                 }
    1107          305 :                 let timeline_id =
    1108          305 :                     TimelineId::try_from(timeline_dir.file_name()).with_context(|| {
    1109            0 :                         format!(
    1110            0 :                             "Could not parse timeline id out of the timeline dir name {}",
    1111            0 :                             timeline_dir.display()
    1112            0 :                         )
    1113          305 :                     })?;
    1114          305 :                 let timeline_uninit_mark_file = self
    1115          305 :                     .conf
    1116          305 :                     .timeline_uninit_mark_file_path(self.tenant_id, timeline_id);
    1117          305 :                 if timeline_uninit_mark_file.exists() {
    1118            0 :                     info!(
    1119            0 :                         %timeline_id,
    1120            0 :                         "Found an uninit mark file, removing the timeline and its uninit mark",
    1121            0 :                     );
    1122            0 :                     if let Err(e) =
    1123            0 :                         remove_timeline_and_uninit_mark(&timeline_dir, &timeline_uninit_mark_file)
    1124              :                     {
    1125            0 :                         error!("Failed to clean up uninit marked timeline: {e:?}");
    1126            0 :                     }
    1127            0 :                     continue;
    1128          305 :                 }
    1129          305 : 
    1130          305 :                 let timeline_delete_mark_file = self
    1131          305 :                     .conf
    1132          305 :                     .timeline_delete_mark_file_path(self.tenant_id, timeline_id);
    1133          305 :                 if timeline_delete_mark_file.exists() {
    1134              :                     // Cleanup should be done in `is_delete_mark` branch above
    1135           25 :                     continue;
    1136          280 :                 }
    1137          280 : 
    1138          280 :                 let file_name = entry.file_name();
    1139          280 :                 if let Ok(timeline_id) =
    1140          280 :                     file_name.to_str().unwrap_or_default().parse::<TimelineId>()
    1141          277 :                 {
    1142          280 :                     let metadata = load_metadata(self.conf, &self.tenant_id, &timeline_id)
    1143          280 :                         .context("failed to load metadata")?;
    1144          277 :                     timelines_to_load.insert(timeline_id, metadata);
    1145              :                 } else {
    1146              :                     // A file or directory that doesn't look like a timeline ID
    1147            0 :                     warn!(
    1148            0 :                         "unexpected file or directory in timelines directory: {}",
    1149            0 :                         file_name.to_string_lossy()
    1150            0 :                     );
    1151              :                 }
    1152              :             }
    1153              :         }
    1154              : 
    1155              :         // Sort the array of timeline IDs into tree-order, so that parent comes before
    1156              :         // all its children.
    1157          727 :         tree_sort_timelines(timelines_to_load, |m| m.ancestor_timeline()).map(|sorted_timelines| {
    1158          727 :             TenantDirectoryScan {
    1159          727 :                 sorted_timelines_to_load: sorted_timelines,
    1160          727 :                 timelines_to_resume_deletion,
    1161          727 :             }
    1162          727 :         })
    1163          734 :     }
    1164              : 
    1165              :     ///
    1166              :     /// Background task to load in-memory data structures for this tenant, from
    1167              :     /// files on disk. Used at pageserver startup.
    1168              :     ///
    1169              :     /// No background tasks are started as part of this routine.
    1170          734 :     async fn load(
    1171          734 :         self: &Arc<Tenant>,
    1172          734 :         init_order: Option<&InitializationOrder>,
    1173          734 :         ctx: &RequestContext,
    1174          734 :     ) -> anyhow::Result<()> {
    1175          734 :         span::debug_assert_current_span_has_tenant_id();
    1176              : 
    1177            0 :         debug!("loading tenant task");
    1178              : 
    1179            3 :         crate::failpoint_support::sleep_millis_async!("before-loading-tenant");
    1180              : 
    1181              :         // Load in-memory state to reflect the local files on disk
    1182              :         //
    1183              :         // Scan the directory, peek into the metadata file of each timeline, and
    1184              :         // collect a list of timelines and their ancestors.
    1185          734 :         let span = info_span!("blocking");
    1186          734 :         let cloned = Arc::clone(self);
    1187              : 
    1188          734 :         let scan = tokio::task::spawn_blocking(move || {
    1189          734 :             let _g = span.entered();
    1190          734 :             cloned.scan_and_sort_timelines_dir()
    1191          734 :         })
    1192          734 :         .await
    1193          734 :         .context("load spawn_blocking")
    1194          734 :         .and_then(|res| res)?;
    1195              : 
    1196              :         // FIXME original collect_timeline_files contained one more check:
    1197              :         //    1. "Timeline has no ancestor and no layer files"
    1198              : 
    1199              :         // Process loadable timelines first
    1200         1003 :         for (timeline_id, local_metadata) in scan.sorted_timelines_to_load {
    1201          277 :             if let Err(e) = self
    1202          277 :                 .load_local_timeline(timeline_id, local_metadata, init_order, ctx, false)
    1203          676 :                 .await
    1204              :             {
    1205            1 :                 match e {
    1206            1 :                     LoadLocalTimelineError::Load(source) => {
    1207            1 :                         return Err(anyhow::anyhow!(source)).with_context(|| {
    1208            1 :                             format!("Failed to load local timeline: {timeline_id}")
    1209            1 :                         })
    1210              :                     }
    1211            0 :                     LoadLocalTimelineError::ResumeDeletion(source) => {
    1212              :                         // Make sure resumed deletion wont fail loading for entire tenant.
    1213            0 :                         error!("Failed to resume timeline deletion: {source:#}")
    1214              :                     }
    1215              :                 }
    1216          276 :             }
    1217              :         }
    1218              : 
    1219              :         // Resume deletion ones with deleted_mark
    1220          757 :         for (timeline_id, maybe_local_metadata) in scan.timelines_to_resume_deletion {
    1221           31 :             match maybe_local_metadata {
    1222              :                 None => {
    1223              :                     // See comment in `scan_and_sort_timelines_dir`.
    1224            0 :                     if let Err(e) =
    1225            9 :                         DeleteTimelineFlow::cleanup_remaining_timeline_fs_traces(self, timeline_id)
    1226           45 :                             .await
    1227              :                     {
    1228            0 :                         warn!(
    1229            0 :                             "cannot clean up deleted timeline dir timeline_id: {} error: {:#}",
    1230            0 :                             timeline_id, e
    1231            0 :                         );
    1232            9 :                     }
    1233              :                 }
    1234           22 :                 Some(local_metadata) => {
    1235           22 :                     if let Err(e) = self
    1236           22 :                         .load_local_timeline(timeline_id, local_metadata, init_order, ctx, true)
    1237           59 :                         .await
    1238              :                     {
    1239            0 :                         match e {
    1240            0 :                             LoadLocalTimelineError::Load(source) => {
    1241            0 :                                 // We tried to load deleted timeline, this is a bug.
    1242            0 :                                 return Err(anyhow::anyhow!(source).context(
    1243            0 :                                 "This is a bug. We tried to load deleted timeline which is wrong and loading failed. Timeline: {timeline_id}"
    1244            0 :                             ));
    1245              :                             }
    1246            0 :                             LoadLocalTimelineError::ResumeDeletion(source) => {
    1247              :                                 // Make sure resumed deletion wont fail loading for entire tenant.
    1248            0 :                                 error!("Failed to resume timeline deletion: {source:#}")
    1249              :                             }
    1250              :                         }
    1251           22 :                     }
    1252              :                 }
    1253              :             }
    1254              :         }
    1255              : 
    1256            0 :         trace!("Done");
    1257              : 
    1258          726 :         Ok(())
    1259          734 :     }
    1260              : 
    1261              :     /// Subroutine of `load_tenant`, to load an individual timeline
    1262              :     ///
    1263              :     /// NB: The parent is assumed to be already loaded!
    1264          598 :     #[instrument(skip(self, local_metadata, init_order, ctx))]
    1265              :     async fn load_local_timeline(
    1266              :         self: &Arc<Self>,
    1267              :         timeline_id: TimelineId,
    1268              :         local_metadata: TimelineMetadata,
    1269              :         init_order: Option<&InitializationOrder>,
    1270              :         ctx: &RequestContext,
    1271              :         found_delete_mark: bool,
    1272              :     ) -> Result<(), LoadLocalTimelineError> {
    1273              :         span::debug_assert_current_span_has_tenant_id();
    1274              : 
    1275              :         let mut resources = self.build_timeline_resources(timeline_id);
    1276              : 
    1277              :         let (remote_startup_data, remote_client) = match resources.remote_client {
    1278              :             Some(remote_client) => match remote_client.download_index_file().await {
    1279              :                 Ok(index_part) => {
    1280              :                     let index_part = match index_part {
    1281              :                         MaybeDeletedIndexPart::IndexPart(index_part) => index_part,
    1282              :                         MaybeDeletedIndexPart::Deleted(index_part) => {
    1283              :                             // TODO: we won't reach here if remote storage gets de-configured after start of the deletion operation.
    1284              :                             // Example:
    1285              :                             //  start deletion operation
    1286              :                             //  finishes upload of index part
    1287              :                             //  pageserver crashes
    1288              :                             //  remote storage gets de-configured
    1289              :                             //  pageserver starts
    1290              :                             //
    1291              :                             // We don't really anticipate remote storage to be de-configured, so, for now, this is fine.
    1292              :                             // Also, maybe we'll remove that option entirely in the future, see https://github.com/neondatabase/neon/issues/4099.
    1293           15 :                             info!("is_deleted is set on remote, resuming removal of timeline data originally done by timeline deletion handler");
    1294              : 
    1295              :                             remote_client
    1296              :                                 .init_upload_queue_stopped_to_continue_deletion(&index_part)
    1297              :                                 .context("init queue stopped")
    1298              :                                 .map_err(LoadLocalTimelineError::ResumeDeletion)?;
    1299              : 
    1300              :                             DeleteTimelineFlow::resume_deletion(
    1301              :                                 Arc::clone(self),
    1302              :                                 timeline_id,
    1303              :                                 &local_metadata,
    1304              :                                 Some(remote_client),
    1305              :                                 init_order,
    1306              :                             )
    1307              :                             .await
    1308              :                             .context("resume deletion")
    1309              :                             .map_err(LoadLocalTimelineError::ResumeDeletion)?;
    1310              : 
    1311              :                             return Ok(());
    1312              :                         }
    1313              :                     };
    1314              : 
    1315              :                     let remote_metadata = index_part.metadata.clone();
    1316              :                     (
    1317              :                         Some(RemoteStartupData {
    1318              :                             index_part,
    1319              :                             remote_metadata,
    1320              :                         }),
    1321              :                         Some(remote_client),
    1322              :                     )
    1323              :                 }
    1324              :                 Err(DownloadError::NotFound) => {
    1325            3 :                     info!("no index file was found on the remote, found_delete_mark: {found_delete_mark}");
    1326              : 
    1327              :                     if found_delete_mark {
    1328              :                         // We could've resumed at a point where remote index was deleted, but metadata file wasnt.
    1329              :                         // Cleanup:
    1330              :                         return DeleteTimelineFlow::cleanup_remaining_timeline_fs_traces(
    1331              :                             self,
    1332              :                             timeline_id,
    1333              :                         )
    1334              :                         .await
    1335              :                         .context("cleanup_remaining_timeline_fs_traces")
    1336              :                         .map_err(LoadLocalTimelineError::ResumeDeletion);
    1337              :                     }
    1338              : 
    1339              :                     // We're loading fresh timeline that didnt yet make it into remote.
    1340              :                     (None, Some(remote_client))
    1341              :                 }
    1342              :                 Err(e) => return Err(LoadLocalTimelineError::Load(anyhow::Error::new(e))),
    1343              :             },
    1344              :             None => {
    1345              :                 // No remote client
    1346              :                 if found_delete_mark {
    1347              :                     // There is no remote client, we found local metadata.
    1348              :                     // Continue cleaning up local disk.
    1349              :                     DeleteTimelineFlow::resume_deletion(
    1350              :                         Arc::clone(self),
    1351              :                         timeline_id,
    1352              :                         &local_metadata,
    1353              :                         None,
    1354              :                         init_order,
    1355              :                     )
    1356              :                     .await
    1357              :                     .context("resume deletion")
    1358              :                     .map_err(LoadLocalTimelineError::ResumeDeletion)?;
    1359              :                     return Ok(());
    1360              :                 }
    1361              : 
    1362              :                 (None, resources.remote_client)
    1363              :             }
    1364              :         };
    1365              :         resources.remote_client = remote_client;
    1366              : 
    1367              :         let ancestor = if let Some(ancestor_timeline_id) = local_metadata.ancestor_timeline() {
    1368              :             let ancestor_timeline = self.get_timeline(ancestor_timeline_id, false)
    1369            0 :                 .with_context(|| anyhow::anyhow!("cannot find ancestor timeline {ancestor_timeline_id} for timeline {timeline_id}"))
    1370              :                 .map_err(LoadLocalTimelineError::Load)?;
    1371              :             Some(ancestor_timeline)
    1372              :         } else {
    1373              :             None
    1374              :         };
    1375              : 
    1376              :         self.timeline_init_and_sync(
    1377              :             timeline_id,
    1378              :             resources,
    1379              :             remote_startup_data,
    1380              :             Some(local_metadata),
    1381              :             ancestor,
    1382              :             init_order,
    1383              :             ctx,
    1384              :         )
    1385              :         .await
    1386              :         .map_err(LoadLocalTimelineError::Load)
    1387              :     }
    1388              : 
    1389         1213 :     pub fn tenant_id(&self) -> TenantId {
    1390         1213 :         self.tenant_id
    1391         1213 :     }
    1392              : 
    1393              :     /// Get Timeline handle for given Neon timeline ID.
    1394              :     /// This function is idempotent. It doesn't change internal state in any way.
    1395        10280 :     pub fn get_timeline(
    1396        10280 :         &self,
    1397        10280 :         timeline_id: TimelineId,
    1398        10280 :         active_only: bool,
    1399        10280 :     ) -> Result<Arc<Timeline>, GetTimelineError> {
    1400        10280 :         let timelines_accessor = self.timelines.lock().unwrap();
    1401        10280 :         let timeline = timelines_accessor
    1402        10280 :             .get(&timeline_id)
    1403        10280 :             .ok_or(GetTimelineError::NotFound {
    1404        10280 :                 tenant_id: self.tenant_id,
    1405        10280 :                 timeline_id,
    1406        10280 :             })?;
    1407              : 
    1408         9286 :         if active_only && !timeline.is_active() {
    1409            0 :             Err(GetTimelineError::NotActive {
    1410            0 :                 tenant_id: self.tenant_id,
    1411            0 :                 timeline_id,
    1412            0 :                 state: timeline.current_state(),
    1413            0 :             })
    1414              :         } else {
    1415         9286 :             Ok(Arc::clone(timeline))
    1416              :         }
    1417        10280 :     }
    1418              : 
    1419              :     /// Lists timelines the tenant contains.
    1420              :     /// Up to tenant's implementation to omit certain timelines that ar not considered ready for use.
    1421          661 :     pub fn list_timelines(&self) -> Vec<Arc<Timeline>> {
    1422          661 :         self.timelines
    1423          661 :             .lock()
    1424          661 :             .unwrap()
    1425          661 :             .values()
    1426          661 :             .map(Arc::clone)
    1427          661 :             .collect()
    1428          661 :     }
    1429              : 
    1430              :     /// This is used to create the initial 'main' timeline during bootstrapping,
    1431              :     /// or when importing a new base backup. The caller is expected to load an
    1432              :     /// initial image of the datadir to the new timeline after this.
    1433              :     ///
    1434              :     /// Until that happens, the on-disk state is invalid (disk_consistent_lsn=Lsn(0))
    1435              :     /// and the timeline will fail to load at a restart.
    1436              :     ///
    1437              :     /// That's why we add an uninit mark file, and wrap it together witht the Timeline
    1438              :     /// in-memory object into UninitializedTimeline.
    1439              :     /// Once the caller is done setting up the timeline, they should call
    1440              :     /// `UninitializedTimeline::initialize_with_lock` to remove the uninit mark.
    1441              :     ///
    1442              :     /// For tests, use `DatadirModification::init_empty_test_timeline` + `commit` to setup the
    1443              :     /// minimum amount of keys required to get a writable timeline.
    1444              :     /// (Without it, `put` might fail due to `repartition` failing.)
    1445           41 :     pub async fn create_empty_timeline(
    1446           41 :         &self,
    1447           41 :         new_timeline_id: TimelineId,
    1448           41 :         initdb_lsn: Lsn,
    1449           41 :         pg_version: u32,
    1450           41 :         _ctx: &RequestContext,
    1451           41 :     ) -> anyhow::Result<UninitializedTimeline> {
    1452           41 :         anyhow::ensure!(
    1453           41 :             self.is_active(),
    1454            0 :             "Cannot create empty timelines on inactive tenant"
    1455              :         );
    1456              : 
    1457           40 :         let timeline_uninit_mark = {
    1458           41 :             let timelines = self.timelines.lock().unwrap();
    1459           41 :             self.create_timeline_uninit_mark(new_timeline_id, &timelines)?
    1460              :         };
    1461           40 :         let new_metadata = TimelineMetadata::new(
    1462           40 :             // Initialize disk_consistent LSN to 0, The caller must import some data to
    1463           40 :             // make it valid, before calling finish_creation()
    1464           40 :             Lsn(0),
    1465           40 :             None,
    1466           40 :             None,
    1467           40 :             Lsn(0),
    1468           40 :             initdb_lsn,
    1469           40 :             initdb_lsn,
    1470           40 :             pg_version,
    1471           40 :         );
    1472           40 :         self.prepare_new_timeline(
    1473           40 :             new_timeline_id,
    1474           40 :             &new_metadata,
    1475           40 :             timeline_uninit_mark,
    1476           40 :             initdb_lsn,
    1477           40 :             None,
    1478           40 :         )
    1479            0 :         .await
    1480           41 :     }
    1481              : 
    1482              :     /// Helper for unit tests to create an empty timeline.
    1483              :     ///
    1484              :     /// The timeline is has state value `Active` but its background loops are not running.
    1485              :     // This makes the various functions which anyhow::ensure! for Active state work in tests.
    1486              :     // Our current tests don't need the background loops.
    1487              :     #[cfg(test)]
    1488           33 :     pub async fn create_test_timeline(
    1489           33 :         &self,
    1490           33 :         new_timeline_id: TimelineId,
    1491           33 :         initdb_lsn: Lsn,
    1492           33 :         pg_version: u32,
    1493           33 :         ctx: &RequestContext,
    1494           33 :     ) -> anyhow::Result<Arc<Timeline>> {
    1495           33 :         let uninit_tl = self
    1496           33 :             .create_empty_timeline(new_timeline_id, initdb_lsn, pg_version, ctx)
    1497            0 :             .await?;
    1498           33 :         let tline = uninit_tl.raw_timeline().expect("we just created it");
    1499           33 :         assert_eq!(tline.get_last_record_lsn(), Lsn(0));
    1500              : 
    1501              :         // Setup minimum keys required for the timeline to be usable.
    1502           33 :         let mut modification = tline.begin_modification(initdb_lsn);
    1503           33 :         modification
    1504           33 :             .init_empty_test_timeline()
    1505           33 :             .context("init_empty_test_timeline")?;
    1506           33 :         modification
    1507           33 :             .commit()
    1508            0 :             .await
    1509           33 :             .context("commit init_empty_test_timeline modification")?;
    1510              : 
    1511              :         // Flush to disk so that uninit_tl's check for valid disk_consistent_lsn passes.
    1512           33 :         tline.maybe_spawn_flush_loop();
    1513           33 :         tline.freeze_and_flush().await.context("freeze_and_flush")?;
    1514              : 
    1515              :         // Make sure the freeze_and_flush reaches remote storage.
    1516           33 :         tline
    1517           33 :             .remote_client
    1518           33 :             .as_ref()
    1519           33 :             .unwrap()
    1520           33 :             .wait_completion()
    1521           33 :             .await
    1522           33 :             .unwrap();
    1523              : 
    1524           33 :         let tl = uninit_tl.finish_creation()?;
    1525              :         // The non-test code would call tl.activate() here.
    1526           33 :         tl.set_state(TimelineState::Active);
    1527           33 :         Ok(tl)
    1528           33 :     }
    1529              : 
    1530              :     /// Create a new timeline.
    1531              :     ///
    1532              :     /// Returns the new timeline ID and reference to its Timeline object.
    1533              :     ///
    1534              :     /// If the caller specified the timeline ID to use (`new_timeline_id`), and timeline with
    1535              :     /// the same timeline ID already exists, returns CreateTimelineError::AlreadyExists.
    1536          906 :     pub async fn create_timeline(
    1537          906 :         &self,
    1538          906 :         new_timeline_id: TimelineId,
    1539          906 :         ancestor_timeline_id: Option<TimelineId>,
    1540          906 :         mut ancestor_start_lsn: Option<Lsn>,
    1541          906 :         pg_version: u32,
    1542          906 :         broker_client: storage_broker::BrokerClientChannel,
    1543          906 :         ctx: &RequestContext,
    1544          906 :     ) -> Result<Arc<Timeline>, CreateTimelineError> {
    1545          906 :         if !self.is_active() {
    1546            0 :             return Err(CreateTimelineError::Other(anyhow::anyhow!(
    1547            0 :                 "Cannot create timelines on inactive tenant"
    1548            0 :             )));
    1549          906 :         }
    1550              : 
    1551          906 :         if let Ok(existing) = self.get_timeline(new_timeline_id, false) {
    1552            0 :             debug!("timeline {new_timeline_id} already exists");
    1553              : 
    1554            1 :             if let Some(remote_client) = existing.remote_client.as_ref() {
    1555              :                 // Wait for uploads to complete, so that when we return Ok, the timeline
    1556              :                 // is known to be durable on remote storage. Just like we do at the end of
    1557              :                 // this function, after we have created the timeline ourselves.
    1558              :                 //
    1559              :                 // We only really care that the initial version of `index_part.json` has
    1560              :                 // been uploaded. That's enough to remember that the timeline
    1561              :                 // exists. However, there is no function to wait specifically for that so
    1562              :                 // we just wait for all in-progress uploads to finish.
    1563            1 :                 remote_client
    1564            1 :                     .wait_completion()
    1565            1 :                     .await
    1566            1 :                     .context("wait for timeline uploads to complete")?;
    1567            0 :             }
    1568              : 
    1569            1 :             return Err(CreateTimelineError::AlreadyExists);
    1570          905 :         }
    1571              : 
    1572          905 :         let loaded_timeline = match ancestor_timeline_id {
    1573          261 :             Some(ancestor_timeline_id) => {
    1574          261 :                 let ancestor_timeline = self
    1575          261 :                     .get_timeline(ancestor_timeline_id, false)
    1576          261 :                     .context("Cannot branch off the timeline that's not present in pageserver")?;
    1577              : 
    1578          261 :                 if let Some(lsn) = ancestor_start_lsn.as_mut() {
    1579           35 :                     *lsn = lsn.align();
    1580           35 : 
    1581           35 :                     let ancestor_ancestor_lsn = ancestor_timeline.get_ancestor_lsn();
    1582           35 :                     if ancestor_ancestor_lsn > *lsn {
    1583              :                         // can we safely just branch from the ancestor instead?
    1584            2 :                         return Err(CreateTimelineError::AncestorLsn(anyhow::anyhow!(
    1585            2 :                             "invalid start lsn {} for ancestor timeline {}: less than timeline ancestor lsn {}",
    1586            2 :                             lsn,
    1587            2 :                             ancestor_timeline_id,
    1588            2 :                             ancestor_ancestor_lsn,
    1589            2 :                         )));
    1590           33 :                     }
    1591           33 : 
    1592           33 :                     // Wait for the WAL to arrive and be processed on the parent branch up
    1593           33 :                     // to the requested branch point. The repository code itself doesn't
    1594           33 :                     // require it, but if we start to receive WAL on the new timeline,
    1595           33 :                     // decoding the new WAL might need to look up previous pages, relation
    1596           33 :                     // sizes etc. and that would get confused if the previous page versions
    1597           33 :                     // are not in the repository yet.
    1598           33 :                     ancestor_timeline.wait_lsn(*lsn, ctx).await?;
    1599          226 :                 }
    1600              : 
    1601          259 :                 self.branch_timeline(&ancestor_timeline, new_timeline_id, ancestor_start_lsn, ctx)
    1602            6 :                     .await?
    1603              :             }
    1604              :             None => {
    1605          644 :                 self.bootstrap_timeline(new_timeline_id, pg_version, ctx)
    1606      3210901 :                     .await?
    1607              :             }
    1608              :         };
    1609              : 
    1610          895 :         loaded_timeline.activate(broker_client, None, ctx);
    1611              : 
    1612          895 :         if let Some(remote_client) = loaded_timeline.remote_client.as_ref() {
    1613              :             // Wait for the upload of the 'index_part.json` file to finish, so that when we return
    1614              :             // Ok, the timeline is durable in remote storage.
    1615          418 :             let kind = ancestor_timeline_id
    1616          418 :                 .map(|_| "branched")
    1617          418 :                 .unwrap_or("bootstrapped");
    1618          418 :             remote_client.wait_completion().await.with_context(|| {
    1619            0 :                 format!("wait for {} timeline initial uploads to complete", kind)
    1620          417 :             })?;
    1621          477 :         }
    1622              : 
    1623          894 :         Ok(loaded_timeline)
    1624          905 :     }
    1625              : 
    1626              :     /// perform one garbage collection iteration, removing old data files from disk.
    1627              :     /// this function is periodically called by gc task.
    1628              :     /// also it can be explicitly requested through page server api 'do_gc' command.
    1629              :     ///
    1630              :     /// `target_timeline_id` specifies the timeline to GC, or None for all.
    1631              :     ///
    1632              :     /// The `horizon` an `pitr` parameters determine how much WAL history needs to be retained.
    1633              :     /// Also known as the retention period, or the GC cutoff point. `horizon` specifies
    1634              :     /// the amount of history, as LSN difference from current latest LSN on each timeline.
    1635              :     /// `pitr` specifies the same as a time difference from the current time. The effective
    1636              :     /// GC cutoff point is determined conservatively by either `horizon` and `pitr`, whichever
    1637              :     /// requires more history to be retained.
    1638              :     //
    1639          544 :     pub async fn gc_iteration(
    1640          544 :         &self,
    1641          544 :         target_timeline_id: Option<TimelineId>,
    1642          544 :         horizon: u64,
    1643          544 :         pitr: Duration,
    1644          544 :         ctx: &RequestContext,
    1645          544 :     ) -> anyhow::Result<GcResult> {
    1646          544 :         // there is a global allowed_error for this
    1647          544 :         anyhow::ensure!(
    1648          544 :             self.is_active(),
    1649            0 :             "Cannot run GC iteration on inactive tenant"
    1650              :         );
    1651              : 
    1652          544 :         self.gc_iteration_internal(target_timeline_id, horizon, pitr, ctx)
    1653       213123 :             .await
    1654          539 :     }
    1655              : 
    1656              :     /// Perform one compaction iteration.
    1657              :     /// This function is periodically called by compactor task.
    1658              :     /// Also it can be explicitly requested per timeline through page server
    1659              :     /// api's 'compact' command.
    1660          263 :     pub async fn compaction_iteration(
    1661          263 :         &self,
    1662          263 :         cancel: &CancellationToken,
    1663          263 :         ctx: &RequestContext,
    1664          263 :     ) -> anyhow::Result<()> {
    1665          263 :         anyhow::ensure!(
    1666          263 :             self.is_active(),
    1667            0 :             "Cannot run compaction iteration on inactive tenant"
    1668              :         );
    1669              : 
    1670              :         // Scan through the hashmap and collect a list of all the timelines,
    1671              :         // while holding the lock. Then drop the lock and actually perform the
    1672              :         // compactions.  We don't want to block everything else while the
    1673              :         // compaction runs.
    1674          263 :         let timelines_to_compact = {
    1675          263 :             let timelines = self.timelines.lock().unwrap();
    1676          263 :             let timelines_to_compact = timelines
    1677          263 :                 .iter()
    1678          435 :                 .filter_map(|(timeline_id, timeline)| {
    1679          435 :                     if timeline.is_active() {
    1680          435 :                         Some((*timeline_id, timeline.clone()))
    1681              :                     } else {
    1682            0 :                         None
    1683              :                     }
    1684          435 :                 })
    1685          263 :                 .collect::<Vec<_>>();
    1686          263 :             drop(timelines);
    1687          263 :             timelines_to_compact
    1688              :         };
    1689              : 
    1690          683 :         for (timeline_id, timeline) in &timelines_to_compact {
    1691          430 :             timeline
    1692          430 :                 .compact(cancel, ctx)
    1693          430 :                 .instrument(info_span!("compact_timeline", %timeline_id))
    1694       820174 :                 .await?;
    1695              :         }
    1696              : 
    1697          253 :         Ok(())
    1698          253 :     }
    1699              : 
    1700         7784 :     pub fn current_state(&self) -> TenantState {
    1701         7784 :         self.state.borrow().clone()
    1702         7784 :     }
    1703              : 
    1704         5433 :     pub fn is_active(&self) -> bool {
    1705         5433 :         self.current_state() == TenantState::Active
    1706         5433 :     }
    1707              : 
    1708              :     /// Changes tenant status to active, unless shutdown was already requested.
    1709              :     ///
    1710              :     /// `background_jobs_can_start` is an optional barrier set to a value during pageserver startup
    1711              :     /// to delay background jobs. Background jobs can be started right away when None is given.
    1712          698 :     fn activate(
    1713          698 :         self: &Arc<Self>,
    1714          698 :         broker_client: BrokerClientChannel,
    1715          698 :         background_jobs_can_start: Option<&completion::Barrier>,
    1716          698 :         ctx: &RequestContext,
    1717          698 :     ) {
    1718          698 :         span::debug_assert_current_span_has_tenant_id();
    1719          698 : 
    1720          698 :         let mut activating = false;
    1721          698 :         self.state.send_modify(|current_state| {
    1722          698 :             use pageserver_api::models::ActivatingFrom;
    1723          698 :             match &*current_state {
    1724              :                 TenantState::Activating(_) | TenantState::Active | TenantState::Broken { .. } | TenantState::Stopping { .. } => {
    1725            0 :                     panic!("caller is responsible for calling activate() only on Loading / Attaching tenants, got {state:?}", state = current_state);
    1726              :                 }
    1727          662 :                 TenantState::Loading => {
    1728          662 :                     *current_state = TenantState::Activating(ActivatingFrom::Loading);
    1729          662 :                 }
    1730           36 :                 TenantState::Attaching => {
    1731           36 :                     *current_state = TenantState::Activating(ActivatingFrom::Attaching);
    1732           36 :                 }
    1733              :             }
    1734          698 :             debug!(tenant_id = %self.tenant_id, "Activating tenant");
    1735          698 :             activating = true;
    1736          698 :             // Continue outside the closure. We need to grab timelines.lock()
    1737          698 :             // and we plan to turn it into a tokio::sync::Mutex in a future patch.
    1738          698 :         });
    1739          698 : 
    1740          698 :         if activating {
    1741          698 :             let timelines_accessor = self.timelines.lock().unwrap();
    1742          698 :             let timelines_to_activate = timelines_accessor
    1743          698 :                 .values()
    1744          698 :                 .filter(|timeline| !(timeline.is_broken() || timeline.is_stopping()));
    1745          698 : 
    1746          698 :             // Spawn gc and compaction loops. The loops will shut themselves
    1747          698 :             // down when they notice that the tenant is inactive.
    1748          698 :             tasks::start_background_loops(self, background_jobs_can_start);
    1749          698 : 
    1750          698 :             let mut activated_timelines = 0;
    1751              : 
    1752          990 :             for timeline in timelines_to_activate {
    1753          292 :                 timeline.activate(broker_client.clone(), background_jobs_can_start, ctx);
    1754          292 :                 activated_timelines += 1;
    1755          292 :             }
    1756              : 
    1757          698 :             self.state.send_modify(move |current_state| {
    1758          698 :                 assert!(
    1759          698 :                     matches!(current_state, TenantState::Activating(_)),
    1760            0 :                     "set_stopping and set_broken wait for us to leave Activating state",
    1761              :                 );
    1762          698 :                 *current_state = TenantState::Active;
    1763          698 : 
    1764          698 :                 let elapsed = self.loading_started_at.elapsed();
    1765          698 :                 let total_timelines = timelines_accessor.len();
    1766          698 : 
    1767          698 :                 // log a lot of stuff, because some tenants sometimes suffer from user-visible
    1768          698 :                 // times to activate. see https://github.com/neondatabase/neon/issues/4025
    1769          698 :                 info!(
    1770          698 :                     since_creation_millis = elapsed.as_millis(),
    1771          698 :                     tenant_id = %self.tenant_id,
    1772          698 :                     activated_timelines,
    1773          698 :                     total_timelines,
    1774          698 :                     post_state = <&'static str>::from(&*current_state),
    1775          698 :                     "activation attempt finished"
    1776          698 :                 );
    1777              : 
    1778          698 :                 TENANT_ACTIVATION.observe(elapsed.as_secs_f64());
    1779          698 :             });
    1780            0 :         }
    1781          698 :     }
    1782              : 
    1783              :     /// Shutdown the tenant and join all of the spawned tasks.
    1784              :     ///
    1785              :     /// The method caters for all use-cases:
    1786              :     /// - pageserver shutdown (freeze_and_flush == true)
    1787              :     /// - detach + ignore (freeze_and_flush == false)
    1788              :     ///
    1789              :     /// This will attempt to shutdown even if tenant is broken.
    1790              :     ///
    1791              :     /// `shutdown_progress` is a [`completion::Barrier`] for the shutdown initiated by this call.
    1792              :     /// If the tenant is already shutting down, we return a clone of the first shutdown call's
    1793              :     /// `Barrier` as an `Err`. This not-first caller can use the returned barrier to join with
    1794              :     /// the ongoing shutdown.
    1795          336 :     async fn shutdown(
    1796          336 :         &self,
    1797          336 :         shutdown_progress: completion::Barrier,
    1798          336 :         freeze_and_flush: bool,
    1799          336 :     ) -> Result<(), completion::Barrier> {
    1800          336 :         span::debug_assert_current_span_has_tenant_id();
    1801          336 :         // Set tenant (and its timlines) to Stoppping state.
    1802          336 :         //
    1803          336 :         // Since we can only transition into Stopping state after activation is complete,
    1804          336 :         // run it in a JoinSet so all tenants have a chance to stop before we get SIGKILLed.
    1805          336 :         //
    1806          336 :         // Transitioning tenants to Stopping state has a couple of non-obvious side effects:
    1807          336 :         // 1. Lock out any new requests to the tenants.
    1808          336 :         // 2. Signal cancellation to WAL receivers (we wait on it below).
    1809          336 :         // 3. Signal cancellation for other tenant background loops.
    1810          336 :         // 4. ???
    1811          336 :         //
    1812          336 :         // The waiting for the cancellation is not done uniformly.
    1813          336 :         // We certainly wait for WAL receivers to shut down.
    1814          336 :         // That is necessary so that no new data comes in before the freeze_and_flush.
    1815          336 :         // But the tenant background loops are joined-on in our caller.
    1816          336 :         // It's mesed up.
    1817          336 :         // we just ignore the failure to stop
    1818          336 : 
    1819          336 :         match self.set_stopping(shutdown_progress, false, false).await {
    1820          256 :             Ok(()) => {}
    1821           77 :             Err(SetStoppingError::Broken) => {
    1822           77 :                 // assume that this is acceptable
    1823           77 :             }
    1824            3 :             Err(SetStoppingError::AlreadyStopping(other)) => {
    1825            3 :                 // give caller the option to wait for this this shutdown
    1826            3 :                 return Err(other);
    1827              :             }
    1828              :         };
    1829              : 
    1830          333 :         let mut js = tokio::task::JoinSet::new();
    1831          333 :         {
    1832          333 :             let timelines = self.timelines.lock().unwrap();
    1833          525 :             timelines.values().for_each(|timeline| {
    1834          525 :                 let timeline = Arc::clone(timeline);
    1835          525 :                 let span = Span::current();
    1836          525 :                 js.spawn(async move { timeline.shutdown(freeze_and_flush).instrument(span).await });
    1837          525 :             })
    1838              :         };
    1839          858 :         while let Some(res) = js.join_next().await {
    1840            0 :             match res {
    1841          525 :                 Ok(()) => {}
    1842            0 :                 Err(je) if je.is_cancelled() => unreachable!("no cancelling used"),
    1843            0 :                 Err(je) if je.is_panic() => { /* logged already */ }
    1844            0 :                 Err(je) => warn!("unexpected JoinError: {je:?}"),
    1845              :             }
    1846              :         }
    1847              : 
    1848              :         // shutdown all tenant and timeline tasks: gc, compaction, page service
    1849              :         // No new tasks will be started for this tenant because it's in `Stopping` state.
    1850              :         //
    1851              :         // this will additionally shutdown and await all timeline tasks.
    1852          616 :         task_mgr::shutdown_tasks(None, Some(self.tenant_id), None).await;
    1853              : 
    1854          333 :         Ok(())
    1855          336 :     }
    1856              : 
    1857              :     /// Change tenant status to Stopping, to mark that it is being shut down.
    1858              :     ///
    1859              :     /// This function waits for the tenant to become active if it isn't already, before transitioning it into Stopping state.
    1860              :     ///
    1861              :     /// This function is not cancel-safe!
    1862              :     ///
    1863              :     /// `allow_transition_from_loading` is needed for the special case of loading task deleting the tenant.
    1864              :     /// `allow_transition_from_attaching` is needed for the special case of attaching deleted tenant.
    1865          367 :     async fn set_stopping(
    1866          367 :         &self,
    1867          367 :         progress: completion::Barrier,
    1868          367 :         allow_transition_from_loading: bool,
    1869          367 :         allow_transition_from_attaching: bool,
    1870          367 :     ) -> Result<(), SetStoppingError> {
    1871          367 :         let mut rx = self.state.subscribe();
    1872          367 : 
    1873          367 :         // cannot stop before we're done activating, so wait out until we're done activating
    1874          367 :         rx.wait_for(|state| match state {
    1875            3 :             TenantState::Attaching if allow_transition_from_attaching => true,
    1876              :             TenantState::Activating(_) | TenantState::Attaching => {
    1877            7 :                 info!(
    1878            7 :                     "waiting for {} to turn Active|Broken|Stopping",
    1879            7 :                     <&'static str>::from(state)
    1880            7 :                 );
    1881            7 :                 false
    1882              :             }
    1883           28 :             TenantState::Loading => allow_transition_from_loading,
    1884          336 :             TenantState::Active | TenantState::Broken { .. } | TenantState::Stopping { .. } => true,
    1885          374 :         })
    1886            7 :         .await
    1887          367 :         .expect("cannot drop self.state while on a &self method");
    1888          367 : 
    1889          367 :         // we now know we're done activating, let's see whether this task is the winner to transition into Stopping
    1890          367 :         let mut err = None;
    1891          367 :         let stopping = self.state.send_if_modified(|current_state| match current_state {
    1892              :             TenantState::Activating(_) => {
    1893            0 :                 unreachable!("1we ensured above that we're done with activation, and, there is no re-activation")
    1894              :             }
    1895              :             TenantState::Attaching => {
    1896            3 :                 if !allow_transition_from_attaching {
    1897            0 :                     unreachable!("2we ensured above that we're done with activation, and, there is no re-activation")
    1898            3 :                 };
    1899            3 :                 *current_state = TenantState::Stopping { progress };
    1900            3 :                 true
    1901              :             }
    1902              :             TenantState::Loading => {
    1903           28 :                 if !allow_transition_from_loading {
    1904            0 :                     unreachable!("3we ensured above that we're done with activation, and, there is no re-activation")
    1905           28 :                 };
    1906           28 :                 *current_state = TenantState::Stopping { progress };
    1907           28 :                 true
    1908              :             }
    1909              :             TenantState::Active => {
    1910              :                 // FIXME: due to time-of-check vs time-of-use issues, it can happen that new timelines
    1911              :                 // are created after the transition to Stopping. That's harmless, as the Timelines
    1912              :                 // won't be accessible to anyone afterwards, because the Tenant is in Stopping state.
    1913          256 :                 *current_state = TenantState::Stopping { progress };
    1914          256 :                 // Continue stopping outside the closure. We need to grab timelines.lock()
    1915          256 :                 // and we plan to turn it into a tokio::sync::Mutex in a future patch.
    1916          256 :                 true
    1917              :             }
    1918           77 :             TenantState::Broken { reason, .. } => {
    1919           77 :                 info!(
    1920           77 :                     "Cannot set tenant to Stopping state, it is in Broken state due to: {reason}"
    1921           77 :                 );
    1922           77 :                 err = Some(SetStoppingError::Broken);
    1923           77 :                 false
    1924              :             }
    1925            3 :             TenantState::Stopping { progress } => {
    1926            3 :                 info!("Tenant is already in Stopping state");
    1927            3 :                 err = Some(SetStoppingError::AlreadyStopping(progress.clone()));
    1928            3 :                 false
    1929              :             }
    1930          367 :         });
    1931          367 :         match (stopping, err) {
    1932          287 :             (true, None) => {} // continue
    1933           80 :             (false, Some(err)) => return Err(err),
    1934            0 :             (true, Some(_)) => unreachable!(
    1935            0 :                 "send_if_modified closure must error out if not transitioning to Stopping"
    1936            0 :             ),
    1937            0 :             (false, None) => unreachable!(
    1938            0 :                 "send_if_modified closure must return true if transitioning to Stopping"
    1939            0 :             ),
    1940              :         }
    1941              : 
    1942          287 :         let timelines_accessor = self.timelines.lock().unwrap();
    1943          287 :         let not_broken_timelines = timelines_accessor
    1944          287 :             .values()
    1945          435 :             .filter(|timeline| !timeline.is_broken());
    1946          699 :         for timeline in not_broken_timelines {
    1947          412 :             timeline.set_state(TimelineState::Stopping);
    1948          412 :         }
    1949          287 :         Ok(())
    1950          367 :     }
    1951              : 
    1952              :     /// Method for tenant::mgr to transition us into Broken state in case of a late failure in
    1953              :     /// `remove_tenant_from_memory`
    1954              :     ///
    1955              :     /// This function waits for the tenant to become active if it isn't already, before transitioning it into Stopping state.
    1956              :     ///
    1957              :     /// In tests, we also use this to set tenants to Broken state on purpose.
    1958           75 :     pub(crate) async fn set_broken(&self, reason: String) {
    1959           75 :         let mut rx = self.state.subscribe();
    1960           75 : 
    1961           75 :         // The load & attach routines own the tenant state until it has reached `Active`.
    1962           75 :         // So, wait until it's done.
    1963           75 :         rx.wait_for(|state| match state {
    1964              :             TenantState::Activating(_) | TenantState::Loading | TenantState::Attaching => {
    1965            0 :                 info!(
    1966            0 :                     "waiting for {} to turn Active|Broken|Stopping",
    1967            0 :                     <&'static str>::from(state)
    1968            0 :                 );
    1969            0 :                 false
    1970              :             }
    1971           75 :             TenantState::Active | TenantState::Broken { .. } | TenantState::Stopping { .. } => true,
    1972           75 :         })
    1973            0 :         .await
    1974           75 :         .expect("cannot drop self.state while on a &self method");
    1975           75 : 
    1976           75 :         // we now know we're done activating, let's see whether this task is the winner to transition into Broken
    1977           75 :         self.set_broken_no_wait(reason)
    1978           75 :     }
    1979              : 
    1980           75 :     pub(crate) fn set_broken_no_wait(&self, reason: impl Display) {
    1981           75 :         let reason = reason.to_string();
    1982           75 :         self.state.send_modify(|current_state| {
    1983           75 :             match *current_state {
    1984              :                 TenantState::Activating(_) | TenantState::Loading | TenantState::Attaching => {
    1985            0 :                     unreachable!("we ensured above that we're done with activation, and, there is no re-activation")
    1986              :                 }
    1987              :                 TenantState::Active => {
    1988            2 :                     if cfg!(feature = "testing") {
    1989            2 :                         warn!("Changing Active tenant to Broken state, reason: {}", reason);
    1990            2 :                         *current_state = TenantState::broken_from_reason(reason);
    1991              :                     } else {
    1992            0 :                         unreachable!("not allowed to call set_broken on Active tenants in non-testing builds")
    1993              :                     }
    1994              :                 }
    1995              :                 TenantState::Broken { .. } => {
    1996            0 :                     warn!("Tenant is already in Broken state");
    1997              :                 }
    1998              :                 // This is the only "expected" path, any other path is a bug.
    1999              :                 TenantState::Stopping { .. } => {
    2000           73 :                     warn!(
    2001           73 :                         "Marking Stopping tenant as Broken state, reason: {}",
    2002           73 :                         reason
    2003           73 :                     );
    2004           73 :                     *current_state = TenantState::broken_from_reason(reason);
    2005              :                 }
    2006              :            }
    2007           75 :         });
    2008           75 :     }
    2009              : 
    2010           39 :     pub fn subscribe_for_state_updates(&self) -> watch::Receiver<TenantState> {
    2011           39 :         self.state.subscribe()
    2012           39 :     }
    2013              : 
    2014         5786 :     pub(crate) async fn wait_to_become_active(&self) -> Result<(), WaitToBecomeActiveError> {
    2015         5786 :         let mut receiver = self.state.subscribe();
    2016         6505 :         loop {
    2017         6505 :             let current_state = receiver.borrow_and_update().clone();
    2018         6505 :             match current_state {
    2019              :                 TenantState::Loading | TenantState::Attaching | TenantState::Activating(_) => {
    2020              :                     // in these states, there's a chance that we can reach ::Active
    2021          737 :                     receiver.changed().await.map_err(
    2022          719 :                         |_e: tokio::sync::watch::error::RecvError| {
    2023            0 :                             WaitToBecomeActiveError::TenantDropped {
    2024            0 :                                 tenant_id: self.tenant_id,
    2025            0 :                             }
    2026          719 :                         },
    2027          719 :                     )?;
    2028              :                 }
    2029              :                 TenantState::Active { .. } => {
    2030         5720 :                     return Ok(());
    2031              :                 }
    2032              :                 TenantState::Broken { .. } | TenantState::Stopping { .. } => {
    2033              :                     // There's no chance the tenant can transition back into ::Active
    2034           66 :                     return Err(WaitToBecomeActiveError::WillNotBecomeActive {
    2035           66 :                         tenant_id: self.tenant_id,
    2036           66 :                         state: current_state,
    2037           66 :                     });
    2038              :                 }
    2039              :             }
    2040              :         }
    2041         5786 :     }
    2042              : }
    2043              : 
    2044              : /// Given a Vec of timelines and their ancestors (timeline_id, ancestor_id),
    2045              : /// perform a topological sort, so that the parent of each timeline comes
    2046              : /// before the children.
    2047              : /// E extracts the ancestor from T
    2048              : /// This allows for T to be different. It can be TimelineMetadata, can be Timeline itself, etc.
    2049          901 : fn tree_sort_timelines<T, E>(
    2050          901 :     timelines: HashMap<TimelineId, T>,
    2051          901 :     extractor: E,
    2052          901 : ) -> anyhow::Result<Vec<(TimelineId, T)>>
    2053          901 : where
    2054          901 :     E: Fn(&T) -> Option<TimelineId>,
    2055          901 : {
    2056          901 :     let mut result = Vec::with_capacity(timelines.len());
    2057          901 : 
    2058          901 :     let mut now = Vec::with_capacity(timelines.len());
    2059          901 :     // (ancestor, children)
    2060          901 :     let mut later: HashMap<TimelineId, Vec<(TimelineId, T)>> =
    2061          901 :         HashMap::with_capacity(timelines.len());
    2062              : 
    2063         1450 :     for (timeline_id, value) in timelines {
    2064          549 :         if let Some(ancestor_id) = extractor(&value) {
    2065           60 :             let children = later.entry(ancestor_id).or_default();
    2066           60 :             children.push((timeline_id, value));
    2067          489 :         } else {
    2068          489 :             now.push((timeline_id, value));
    2069          489 :         }
    2070              :     }
    2071              : 
    2072         1450 :     while let Some((timeline_id, metadata)) = now.pop() {
    2073          549 :         result.push((timeline_id, metadata));
    2074              :         // All children of this can be loaded now
    2075          549 :         if let Some(mut children) = later.remove(&timeline_id) {
    2076           49 :             now.append(&mut children);
    2077          500 :         }
    2078              :     }
    2079              : 
    2080              :     // All timelines should be visited now. Unless there were timelines with missing ancestors.
    2081          901 :     if !later.is_empty() {
    2082            0 :         for (missing_id, orphan_ids) in later {
    2083            0 :             for (orphan_id, _) in orphan_ids {
    2084            0 :                 error!("could not load timeline {orphan_id} because its ancestor timeline {missing_id} could not be loaded");
    2085              :             }
    2086              :         }
    2087            0 :         bail!("could not load tenant because some timelines are missing ancestors");
    2088          901 :     }
    2089          901 : 
    2090          901 :     Ok(result)
    2091          901 : }
    2092              : 
    2093              : impl Tenant {
    2094           80 :     pub fn tenant_specific_overrides(&self) -> TenantConfOpt {
    2095           80 :         *self.tenant_conf.read().unwrap()
    2096           80 :     }
    2097              : 
    2098           40 :     pub fn effective_config(&self) -> TenantConf {
    2099           40 :         self.tenant_specific_overrides()
    2100           40 :             .merge(self.conf.default_tenant_conf)
    2101           40 :     }
    2102              : 
    2103            5 :     pub fn get_checkpoint_distance(&self) -> u64 {
    2104            5 :         let tenant_conf = self.tenant_conf.read().unwrap();
    2105            5 :         tenant_conf
    2106            5 :             .checkpoint_distance
    2107            5 :             .unwrap_or(self.conf.default_tenant_conf.checkpoint_distance)
    2108            5 :     }
    2109              : 
    2110            5 :     pub fn get_checkpoint_timeout(&self) -> Duration {
    2111            5 :         let tenant_conf = self.tenant_conf.read().unwrap();
    2112            5 :         tenant_conf
    2113            5 :             .checkpoint_timeout
    2114            5 :             .unwrap_or(self.conf.default_tenant_conf.checkpoint_timeout)
    2115            5 :     }
    2116              : 
    2117            5 :     pub fn get_compaction_target_size(&self) -> u64 {
    2118            5 :         let tenant_conf = self.tenant_conf.read().unwrap();
    2119            5 :         tenant_conf
    2120            5 :             .compaction_target_size
    2121            5 :             .unwrap_or(self.conf.default_tenant_conf.compaction_target_size)
    2122            5 :     }
    2123              : 
    2124          845 :     pub fn get_compaction_period(&self) -> Duration {
    2125          845 :         let tenant_conf = self.tenant_conf.read().unwrap();
    2126          845 :         tenant_conf
    2127          845 :             .compaction_period
    2128          845 :             .unwrap_or(self.conf.default_tenant_conf.compaction_period)
    2129          845 :     }
    2130              : 
    2131            5 :     pub fn get_compaction_threshold(&self) -> usize {
    2132            5 :         let tenant_conf = self.tenant_conf.read().unwrap();
    2133            5 :         tenant_conf
    2134            5 :             .compaction_threshold
    2135            5 :             .unwrap_or(self.conf.default_tenant_conf.compaction_threshold)
    2136            5 :     }
    2137              : 
    2138          396 :     pub fn get_gc_horizon(&self) -> u64 {
    2139          396 :         let tenant_conf = self.tenant_conf.read().unwrap();
    2140          396 :         tenant_conf
    2141          396 :             .gc_horizon
    2142          396 :             .unwrap_or(self.conf.default_tenant_conf.gc_horizon)
    2143          396 :     }
    2144              : 
    2145          748 :     pub fn get_gc_period(&self) -> Duration {
    2146          748 :         let tenant_conf = self.tenant_conf.read().unwrap();
    2147          748 :         tenant_conf
    2148          748 :             .gc_period
    2149          748 :             .unwrap_or(self.conf.default_tenant_conf.gc_period)
    2150          748 :     }
    2151              : 
    2152            5 :     pub fn get_image_creation_threshold(&self) -> usize {
    2153            5 :         let tenant_conf = self.tenant_conf.read().unwrap();
    2154            5 :         tenant_conf
    2155            5 :             .image_creation_threshold
    2156            5 :             .unwrap_or(self.conf.default_tenant_conf.image_creation_threshold)
    2157            5 :     }
    2158              : 
    2159          618 :     pub fn get_pitr_interval(&self) -> Duration {
    2160          618 :         let tenant_conf = self.tenant_conf.read().unwrap();
    2161          618 :         tenant_conf
    2162          618 :             .pitr_interval
    2163          618 :             .unwrap_or(self.conf.default_tenant_conf.pitr_interval)
    2164          618 :     }
    2165              : 
    2166         4553 :     pub fn get_trace_read_requests(&self) -> bool {
    2167         4553 :         let tenant_conf = self.tenant_conf.read().unwrap();
    2168         4553 :         tenant_conf
    2169         4553 :             .trace_read_requests
    2170         4553 :             .unwrap_or(self.conf.default_tenant_conf.trace_read_requests)
    2171         4553 :     }
    2172              : 
    2173           13 :     pub fn get_min_resident_size_override(&self) -> Option<u64> {
    2174           13 :         let tenant_conf = self.tenant_conf.read().unwrap();
    2175           13 :         tenant_conf
    2176           13 :             .min_resident_size_override
    2177           13 :             .or(self.conf.default_tenant_conf.min_resident_size_override)
    2178           13 :     }
    2179              : 
    2180           27 :     pub fn set_new_tenant_config(&self, new_tenant_conf: TenantConfOpt) {
    2181           27 :         *self.tenant_conf.write().unwrap() = new_tenant_conf;
    2182           27 :         // Don't hold self.timelines.lock() during the notifies.
    2183           27 :         // There's no risk of deadlock right now, but there could be if we consolidate
    2184           27 :         // mutexes in struct Timeline in the future.
    2185           27 :         let timelines = self.list_timelines();
    2186           56 :         for timeline in timelines {
    2187           29 :             timeline.tenant_conf_updated();
    2188           29 :         }
    2189           27 :     }
    2190              : 
    2191              :     /// Helper function to create a new Timeline struct.
    2192              :     ///
    2193              :     /// The returned Timeline is in Loading state. The caller is responsible for
    2194              :     /// initializing any on-disk state, and for inserting the Timeline to the 'timelines'
    2195              :     /// map.
    2196              :     ///
    2197              :     /// `validate_ancestor == false` is used when a timeline is created for deletion
    2198              :     /// and we might not have the ancestor present anymore which is fine for to be
    2199              :     /// deleted timelines.
    2200         1394 :     fn create_timeline_struct(
    2201         1394 :         &self,
    2202         1394 :         new_timeline_id: TimelineId,
    2203         1394 :         new_metadata: &TimelineMetadata,
    2204         1394 :         ancestor: Option<Arc<Timeline>>,
    2205         1394 :         resources: TimelineResources,
    2206         1394 :         init_order: Option<&InitializationOrder>,
    2207         1394 :         cause: CreateTimelineCause,
    2208         1394 :     ) -> anyhow::Result<Arc<Timeline>> {
    2209         1394 :         let state = match cause {
    2210              :             CreateTimelineCause::Load => {
    2211         1368 :                 let ancestor_id = new_metadata.ancestor_timeline();
    2212         1368 :                 anyhow::ensure!(
    2213         1368 :                     ancestor_id == ancestor.as_ref().map(|t| t.timeline_id),
    2214            0 :                     "Timeline's {new_timeline_id} ancestor {ancestor_id:?} was not found"
    2215              :                 );
    2216         1368 :                 TimelineState::Loading
    2217              :             }
    2218           26 :             CreateTimelineCause::Delete => TimelineState::Stopping,
    2219              :         };
    2220              : 
    2221         1394 :         let initial_logical_size_can_start = init_order.map(|x| &x.initial_logical_size_can_start);
    2222         1394 :         let initial_logical_size_attempt = init_order.map(|x| &x.initial_logical_size_attempt);
    2223         1394 : 
    2224         1394 :         let pg_version = new_metadata.pg_version();
    2225         1394 : 
    2226         1394 :         let timeline = Timeline::new(
    2227         1394 :             self.conf,
    2228         1394 :             Arc::clone(&self.tenant_conf),
    2229         1394 :             new_metadata,
    2230         1394 :             ancestor,
    2231         1394 :             new_timeline_id,
    2232         1394 :             self.tenant_id,
    2233         1394 :             self.generation,
    2234         1394 :             Arc::clone(&self.walredo_mgr),
    2235         1394 :             resources,
    2236         1394 :             pg_version,
    2237         1394 :             initial_logical_size_can_start.cloned(),
    2238         1394 :             initial_logical_size_attempt.cloned().flatten(),
    2239         1394 :             state,
    2240         1394 :         );
    2241         1394 : 
    2242         1394 :         Ok(timeline)
    2243         1394 :     }
    2244              : 
    2245          779 :     fn new(
    2246          779 :         state: TenantState,
    2247          779 :         conf: &'static PageServerConf,
    2248          779 :         tenant_conf: TenantConfOpt,
    2249          779 :         walredo_mgr: Arc<dyn WalRedoManager + Send + Sync>,
    2250          779 :         tenant_id: TenantId,
    2251          779 :         generation: Generation,
    2252          779 :         remote_storage: Option<GenericRemoteStorage>,
    2253          779 :     ) -> Tenant {
    2254          779 :         let (state, mut rx) = watch::channel(state);
    2255          779 : 
    2256          779 :         tokio::spawn(async move {
    2257          779 :             let tid = tenant_id.to_string();
    2258          779 : 
    2259         2238 :             fn inspect_state(state: &TenantState) -> ([&'static str; 1], bool) {
    2260         2238 :                 ([state.into()], matches!(state, TenantState::Broken { .. }))
    2261         2238 :             }
    2262          779 : 
    2263          779 :             let mut tuple = inspect_state(&rx.borrow_and_update());
    2264          779 : 
    2265          779 :             let is_broken = tuple.1;
    2266          779 :             let mut counted_broken = if !is_broken {
    2267              :                 // the tenant might be ignored and reloaded, so first remove any previous set
    2268              :                 // element. it most likely has already been scraped, as these are manual operations
    2269              :                 // right now. most likely we will add it back very soon.
    2270          779 :                 drop(crate::metrics::BROKEN_TENANTS_SET.remove_label_values(&[&tid]));
    2271          779 :                 false
    2272              :             } else {
    2273              :                 // add the id to the set right away, there should not be any updates on the channel
    2274              :                 // after
    2275            0 :                 crate::metrics::BROKEN_TENANTS_SET
    2276            0 :                     .with_label_values(&[&tid])
    2277            0 :                     .set(1);
    2278            0 :                 true
    2279              :             };
    2280              : 
    2281         2238 :             loop {
    2282         2238 :                 let labels = &tuple.0;
    2283         2238 :                 let current = TENANT_STATE_METRIC.with_label_values(labels);
    2284         2238 :                 current.inc();
    2285         2238 : 
    2286         2238 :                 if rx.changed().await.is_err() {
    2287              :                     // tenant has been dropped; decrement the counter because a tenant with that
    2288              :                     // state is no longer in tenant map, but allow any broken set item to exist
    2289              :                     // still.
    2290          128 :                     current.dec();
    2291          128 :                     break;
    2292         1459 :                 }
    2293         1459 : 
    2294         1459 :                 current.dec();
    2295         1459 :                 tuple = inspect_state(&rx.borrow_and_update());
    2296         1459 : 
    2297         1459 :                 let is_broken = tuple.1;
    2298         1459 :                 if is_broken && !counted_broken {
    2299           85 :                     counted_broken = true;
    2300           85 :                     // insert the tenant_id (back) into the set
    2301           85 :                     crate::metrics::BROKEN_TENANTS_SET
    2302           85 :                         .with_label_values(&[&tid])
    2303           85 :                         .inc();
    2304         1374 :                 }
    2305              :             }
    2306          779 :         });
    2307          779 : 
    2308          779 :         Tenant {
    2309          779 :             tenant_id,
    2310          779 :             generation,
    2311          779 :             conf,
    2312          779 :             // using now here is good enough approximation to catch tenants with really long
    2313          779 :             // activation times.
    2314          779 :             loading_started_at: Instant::now(),
    2315          779 :             tenant_conf: Arc::new(RwLock::new(tenant_conf)),
    2316          779 :             timelines: Mutex::new(HashMap::new()),
    2317          779 :             gc_cs: tokio::sync::Mutex::new(()),
    2318          779 :             walredo_mgr,
    2319          779 :             remote_storage,
    2320          779 :             state,
    2321          779 :             cached_logical_sizes: tokio::sync::Mutex::new(HashMap::new()),
    2322          779 :             cached_synthetic_tenant_size: Arc::new(AtomicU64::new(0)),
    2323          779 :             eviction_task_tenant_state: tokio::sync::Mutex::new(EvictionTaskTenantState::default()),
    2324          779 :             delete_progress: Arc::new(tokio::sync::Mutex::new(DeleteTenantFlow::default())),
    2325          779 :         }
    2326          779 :     }
    2327              : 
    2328              :     /// Locate and load config
    2329          739 :     pub(super) fn load_tenant_config(
    2330          739 :         conf: &'static PageServerConf,
    2331          739 :         tenant_id: &TenantId,
    2332          739 :     ) -> anyhow::Result<TenantConfOpt> {
    2333          739 :         let target_config_path = conf.tenant_config_path(tenant_id);
    2334          739 :         let target_config_display = target_config_path.display();
    2335          739 : 
    2336          739 :         info!("loading tenantconf from {target_config_display}");
    2337              : 
    2338              :         // FIXME If the config file is not found, assume that we're attaching
    2339              :         // a detached tenant and config is passed via attach command.
    2340              :         // https://github.com/neondatabase/neon/issues/1555
    2341              :         // OR: we're loading after incomplete deletion that managed to remove config.
    2342          739 :         if !target_config_path.exists() {
    2343            6 :             info!("tenant config not found in {target_config_display}");
    2344            6 :             return Ok(TenantConfOpt::default());
    2345          733 :         }
    2346              : 
    2347              :         // load and parse file
    2348          733 :         let config = fs::read_to_string(&target_config_path).with_context(|| {
    2349            0 :             format!("Failed to load config from path '{target_config_display}'")
    2350          733 :         })?;
    2351              : 
    2352          733 :         let toml = config.parse::<toml_edit::Document>().with_context(|| {
    2353            0 :             format!("Failed to parse config from file '{target_config_display}' as toml file")
    2354          733 :         })?;
    2355              : 
    2356          733 :         let mut tenant_conf = TenantConfOpt::default();
    2357          733 :         for (key, item) in toml.iter() {
    2358          733 :             match key {
    2359          733 :                 "tenant_config" => {
    2360          733 :                     tenant_conf = PageServerConf::parse_toml_tenant_conf(item).with_context(|| {
    2361            0 :                         format!("Failed to parse config from file '{target_config_display}' as pageserver config")
    2362          733 :                     })?;
    2363              :                 }
    2364            0 :                 _ => bail!("config file {target_config_display} has unrecognized pageserver option '{key}'"),
    2365              : 
    2366              :             }
    2367              :         }
    2368              : 
    2369          733 :         Ok(tenant_conf)
    2370          739 :     }
    2371              : 
    2372         1638 :     #[tracing::instrument(skip_all, fields(%tenant_id))]
    2373              :     pub(super) async fn persist_tenant_config(
    2374              :         tenant_id: &TenantId,
    2375              :         target_config_path: &Path,
    2376              :         tenant_conf: TenantConfOpt,
    2377              :     ) -> anyhow::Result<()> {
    2378              :         // imitate a try-block with a closure
    2379          546 :         info!("persisting tenantconf to {}", target_config_path.display());
    2380              : 
    2381              :         let mut conf_content = r#"# This file contains a specific per-tenant's config.
    2382              : #  It is read in case of pageserver restart.
    2383              : 
    2384              : [tenant_config]
    2385              : "#
    2386              :         .to_string();
    2387              : 
    2388              :         // Convert the config to a toml file.
    2389              :         conf_content += &toml_edit::ser::to_string(&tenant_conf)?;
    2390              : 
    2391              :         let conf_content = conf_content.as_bytes();
    2392              : 
    2393              :         let temp_path = path_with_suffix_extension(target_config_path, TEMP_FILE_SUFFIX);
    2394              :         VirtualFile::crashsafe_overwrite(target_config_path, &temp_path, conf_content)
    2395              :             .await
    2396            0 :             .with_context(|| {
    2397            0 :                 format!(
    2398            0 :                     "write tenant {tenant_id} config to {}",
    2399            0 :                     target_config_path.display()
    2400            0 :                 )
    2401            0 :             })?;
    2402              :         Ok(())
    2403              :     }
    2404              : 
    2405              :     //
    2406              :     // How garbage collection works:
    2407              :     //
    2408              :     //                    +--bar------------->
    2409              :     //                   /
    2410              :     //             +----+-----foo---------------->
    2411              :     //            /
    2412              :     // ----main--+-------------------------->
    2413              :     //                \
    2414              :     //                 +-----baz-------->
    2415              :     //
    2416              :     //
    2417              :     // 1. Grab 'gc_cs' mutex to prevent new timelines from being created while Timeline's
    2418              :     //    `gc_infos` are being refreshed
    2419              :     // 2. Scan collected timelines, and on each timeline, make note of the
    2420              :     //    all the points where other timelines have been branched off.
    2421              :     //    We will refrain from removing page versions at those LSNs.
    2422              :     // 3. For each timeline, scan all layer files on the timeline.
    2423              :     //    Remove all files for which a newer file exists and which
    2424              :     //    don't cover any branch point LSNs.
    2425              :     //
    2426              :     // TODO:
    2427              :     // - if a relation has a non-incremental persistent layer on a child branch, then we
    2428              :     //   don't need to keep that in the parent anymore. But currently
    2429              :     //   we do.
    2430          544 :     async fn gc_iteration_internal(
    2431          544 :         &self,
    2432          544 :         target_timeline_id: Option<TimelineId>,
    2433          544 :         horizon: u64,
    2434          544 :         pitr: Duration,
    2435          544 :         ctx: &RequestContext,
    2436          544 :     ) -> anyhow::Result<GcResult> {
    2437          544 :         let mut totals: GcResult = Default::default();
    2438          544 :         let now = Instant::now();
    2439              : 
    2440          544 :         let gc_timelines = self
    2441          544 :             .refresh_gc_info_internal(target_timeline_id, horizon, pitr, ctx)
    2442       212999 :             .await?;
    2443              : 
    2444            3 :         crate::failpoint_support::sleep_millis_async!(
    2445            3 :             "gc_iteration_internal_after_getting_gc_timelines"
    2446            3 :         );
    2447              : 
    2448              :         // If there is nothing to GC, we don't want any messages in the INFO log.
    2449          543 :         if !gc_timelines.is_empty() {
    2450          537 :             info!("{} timelines need GC", gc_timelines.len());
    2451              :         } else {
    2452            0 :             debug!("{} timelines need GC", gc_timelines.len());
    2453              :         }
    2454              : 
    2455              :         // Perform GC for each timeline.
    2456              :         //
    2457              :         // Note that we don't hold the GC lock here because we don't want
    2458              :         // to delay the branch creation task, which requires the GC lock.
    2459              :         // A timeline GC iteration can be slow because it may need to wait for
    2460              :         // compaction (both require `layer_removal_cs` lock),
    2461              :         // but the GC iteration can run concurrently with branch creation.
    2462              :         //
    2463              :         // See comments in [`Tenant::branch_timeline`] for more information
    2464              :         // about why branch creation task can run concurrently with timeline's GC iteration.
    2465         1224 :         for timeline in gc_timelines {
    2466          687 :             if task_mgr::is_shutdown_requested() {
    2467              :                 // We were requested to shut down. Stop and return with the progress we
    2468              :                 // made.
    2469            1 :                 break;
    2470          686 :             }
    2471          686 :             let result = timeline.gc().await?;
    2472          681 :             totals += result;
    2473              :         }
    2474              : 
    2475          538 :         totals.elapsed = now.elapsed();
    2476          538 :         Ok(totals)
    2477          539 :     }
    2478              : 
    2479              :     /// Refreshes the Timeline::gc_info for all timelines, returning the
    2480              :     /// vector of timelines which have [`Timeline::get_last_record_lsn`] past
    2481              :     /// [`Tenant::get_gc_horizon`].
    2482              :     ///
    2483              :     /// This is usually executed as part of periodic gc, but can now be triggered more often.
    2484           73 :     pub async fn refresh_gc_info(
    2485           73 :         &self,
    2486           73 :         ctx: &RequestContext,
    2487           73 :     ) -> anyhow::Result<Vec<Arc<Timeline>>> {
    2488           73 :         // since this method can now be called at different rates than the configured gc loop, it
    2489           73 :         // might be that these configuration values get applied faster than what it was previously,
    2490           73 :         // since these were only read from the gc task.
    2491           73 :         let horizon = self.get_gc_horizon();
    2492           73 :         let pitr = self.get_pitr_interval();
    2493           73 : 
    2494           73 :         // refresh all timelines
    2495           73 :         let target_timeline_id = None;
    2496           73 : 
    2497           73 :         self.refresh_gc_info_internal(target_timeline_id, horizon, pitr, ctx)
    2498           12 :             .await
    2499           73 :     }
    2500              : 
    2501          617 :     async fn refresh_gc_info_internal(
    2502          617 :         &self,
    2503          617 :         target_timeline_id: Option<TimelineId>,
    2504          617 :         horizon: u64,
    2505          617 :         pitr: Duration,
    2506          617 :         ctx: &RequestContext,
    2507          617 :     ) -> anyhow::Result<Vec<Arc<Timeline>>> {
    2508              :         // grab mutex to prevent new timelines from being created here.
    2509          617 :         let gc_cs = self.gc_cs.lock().await;
    2510              : 
    2511              :         // Scan all timelines. For each timeline, remember the timeline ID and
    2512              :         // the branch point where it was created.
    2513          616 :         let (all_branchpoints, timeline_ids): (BTreeSet<(TimelineId, Lsn)>, _) = {
    2514          617 :             let timelines = self.timelines.lock().unwrap();
    2515          617 :             let mut all_branchpoints = BTreeSet::new();
    2516          616 :             let timeline_ids = {
    2517          617 :                 if let Some(target_timeline_id) = target_timeline_id.as_ref() {
    2518          508 :                     if timelines.get(target_timeline_id).is_none() {
    2519            1 :                         bail!("gc target timeline does not exist")
    2520          507 :                     }
    2521          109 :                 };
    2522              : 
    2523          616 :                 timelines
    2524          616 :                     .iter()
    2525          616 :                     .map(|(timeline_id, timeline_entry)| {
    2526          691 :                         if let Some(ancestor_timeline_id) =
    2527         1306 :                             &timeline_entry.get_ancestor_timeline_id()
    2528              :                         {
    2529              :                             // If target_timeline is specified, we only need to know branchpoints of its children
    2530          691 :                             if let Some(timeline_id) = target_timeline_id {
    2531          500 :                                 if ancestor_timeline_id == &timeline_id {
    2532            6 :                                     all_branchpoints.insert((
    2533            6 :                                         *ancestor_timeline_id,
    2534            6 :                                         timeline_entry.get_ancestor_lsn(),
    2535            6 :                                     ));
    2536          494 :                                 }
    2537              :                             }
    2538              :                             // Collect branchpoints for all timelines
    2539          191 :                             else {
    2540          191 :                                 all_branchpoints.insert((
    2541          191 :                                     *ancestor_timeline_id,
    2542          191 :                                     timeline_entry.get_ancestor_lsn(),
    2543          191 :                                 ));
    2544          191 :                             }
    2545          615 :                         }
    2546              : 
    2547         1306 :                         *timeline_id
    2548         1306 :                     })
    2549          616 :                     .collect::<Vec<_>>()
    2550          616 :             };
    2551          616 :             (all_branchpoints, timeline_ids)
    2552          616 :         };
    2553          616 : 
    2554          616 :         // Ok, we now know all the branch points.
    2555          616 :         // Update the GC information for each timeline.
    2556          616 :         let mut gc_timelines = Vec::with_capacity(timeline_ids.len());
    2557         1922 :         for timeline_id in timeline_ids {
    2558              :             // Timeline is known to be local and loaded.
    2559         1306 :             let timeline = self
    2560         1306 :                 .get_timeline(timeline_id, false)
    2561         1306 :                 .with_context(|| format!("Timeline {timeline_id} was not found"))?;
    2562              : 
    2563              :             // If target_timeline is specified, ignore all other timelines
    2564         1306 :             if let Some(target_timeline_id) = target_timeline_id {
    2565         1008 :                 if timeline_id != target_timeline_id {
    2566          501 :                     continue;
    2567          507 :                 }
    2568          298 :             }
    2569              : 
    2570          805 :             if let Some(cutoff) = timeline.get_last_record_lsn().checked_sub(horizon) {
    2571          743 :                 let branchpoints: Vec<Lsn> = all_branchpoints
    2572          743 :                     .range((
    2573          743 :                         Included((timeline_id, Lsn(0))),
    2574          743 :                         Included((timeline_id, Lsn(u64::MAX))),
    2575          743 :                     ))
    2576          743 :                     .map(|&x| x.1)
    2577          743 :                     .collect();
    2578          743 :                 timeline
    2579          743 :                     .update_gc_info(branchpoints, cutoff, pitr, ctx)
    2580       213011 :                     .await?;
    2581              : 
    2582          743 :                 gc_timelines.push(timeline);
    2583           62 :             }
    2584              :         }
    2585          616 :         drop(gc_cs);
    2586          616 :         Ok(gc_timelines)
    2587          617 :     }
    2588              : 
    2589              :     /// A substitute for `branch_timeline` for use in unit tests.
    2590              :     /// The returned timeline will have state value `Active` to make various `anyhow::ensure!()`
    2591              :     /// calls pass, but, we do not actually call `.activate()` under the hood. So, none of the
    2592              :     /// timeline background tasks are launched, except the flush loop.
    2593              :     #[cfg(test)]
    2594          107 :     async fn branch_timeline_test(
    2595          107 :         &self,
    2596          107 :         src_timeline: &Arc<Timeline>,
    2597          107 :         dst_id: TimelineId,
    2598          107 :         start_lsn: Option<Lsn>,
    2599          107 :         ctx: &RequestContext,
    2600          107 :     ) -> Result<Arc<Timeline>, CreateTimelineError> {
    2601          107 :         let tl = self
    2602          107 :             .branch_timeline_impl(src_timeline, dst_id, start_lsn, ctx)
    2603            2 :             .await?;
    2604          105 :         tl.set_state(TimelineState::Active);
    2605          105 :         Ok(tl)
    2606          107 :     }
    2607              : 
    2608              :     /// Branch an existing timeline.
    2609              :     ///
    2610              :     /// The caller is responsible for activating the returned timeline.
    2611          259 :     async fn branch_timeline(
    2612          259 :         &self,
    2613          259 :         src_timeline: &Arc<Timeline>,
    2614          259 :         dst_id: TimelineId,
    2615          259 :         start_lsn: Option<Lsn>,
    2616          259 :         ctx: &RequestContext,
    2617          259 :     ) -> Result<Arc<Timeline>, CreateTimelineError> {
    2618          259 :         self.branch_timeline_impl(src_timeline, dst_id, start_lsn, ctx)
    2619            2 :             .await
    2620          259 :     }
    2621              : 
    2622          366 :     async fn branch_timeline_impl(
    2623          366 :         &self,
    2624          366 :         src_timeline: &Arc<Timeline>,
    2625          366 :         dst_id: TimelineId,
    2626          366 :         start_lsn: Option<Lsn>,
    2627          366 :         _ctx: &RequestContext,
    2628          366 :     ) -> Result<Arc<Timeline>, CreateTimelineError> {
    2629          366 :         let src_id = src_timeline.timeline_id;
    2630          366 : 
    2631          366 :         // If no start LSN is specified, we branch the new timeline from the source timeline's last record LSN
    2632          366 :         let start_lsn = start_lsn.unwrap_or_else(|| {
    2633          226 :             let lsn = src_timeline.get_last_record_lsn();
    2634          226 :             info!("branching timeline {dst_id} from timeline {src_id} at last record LSN: {lsn}");
    2635          226 :             lsn
    2636          366 :         });
    2637              : 
    2638              :         // First acquire the GC lock so that another task cannot advance the GC
    2639              :         // cutoff in 'gc_info', and make 'start_lsn' invalid, while we are
    2640              :         // creating the branch.
    2641          366 :         let _gc_cs = self.gc_cs.lock().await;
    2642              : 
    2643              :         // Create a placeholder for the new branch. This will error
    2644              :         // out if the new timeline ID is already in use.
    2645          366 :         let timeline_uninit_mark = {
    2646          366 :             let timelines = self.timelines.lock().unwrap();
    2647          366 :             self.create_timeline_uninit_mark(dst_id, &timelines)?
    2648              :         };
    2649              : 
    2650              :         // Ensure that `start_lsn` is valid, i.e. the LSN is within the PITR
    2651              :         // horizon on the source timeline
    2652              :         //
    2653              :         // We check it against both the planned GC cutoff stored in 'gc_info',
    2654              :         // and the 'latest_gc_cutoff' of the last GC that was performed.  The
    2655              :         // planned GC cutoff in 'gc_info' is normally larger than
    2656              :         // 'latest_gc_cutoff_lsn', but beware of corner cases like if you just
    2657              :         // changed the GC settings for the tenant to make the PITR window
    2658              :         // larger, but some of the data was already removed by an earlier GC
    2659              :         // iteration.
    2660              : 
    2661              :         // check against last actual 'latest_gc_cutoff' first
    2662          366 :         let latest_gc_cutoff_lsn = src_timeline.get_latest_gc_cutoff_lsn();
    2663          366 :         src_timeline
    2664          366 :             .check_lsn_is_in_scope(start_lsn, &latest_gc_cutoff_lsn)
    2665          366 :             .context(format!(
    2666          366 :                 "invalid branch start lsn: less than latest GC cutoff {}",
    2667          366 :                 *latest_gc_cutoff_lsn,
    2668          366 :             ))
    2669          366 :             .map_err(CreateTimelineError::AncestorLsn)?;
    2670              : 
    2671              :         // and then the planned GC cutoff
    2672              :         {
    2673          358 :             let gc_info = src_timeline.gc_info.read().unwrap();
    2674          358 :             let cutoff = min(gc_info.pitr_cutoff, gc_info.horizon_cutoff);
    2675          358 :             if start_lsn < cutoff {
    2676            0 :                 return Err(CreateTimelineError::AncestorLsn(anyhow::anyhow!(
    2677            0 :                     "invalid branch start lsn: less than planned GC cutoff {cutoff}"
    2678            0 :                 )));
    2679          358 :             }
    2680          358 :         }
    2681          358 : 
    2682          358 :         //
    2683          358 :         // The branch point is valid, and we are still holding the 'gc_cs' lock
    2684          358 :         // so that GC cannot advance the GC cutoff until we are finished.
    2685          358 :         // Proceed with the branch creation.
    2686          358 :         //
    2687          358 : 
    2688          358 :         // Determine prev-LSN for the new timeline. We can only determine it if
    2689          358 :         // the timeline was branched at the current end of the source timeline.
    2690          358 :         let RecordLsn {
    2691          358 :             last: src_last,
    2692          358 :             prev: src_prev,
    2693          358 :         } = src_timeline.get_last_record_rlsn();
    2694          358 :         let dst_prev = if src_last == start_lsn {
    2695          331 :             Some(src_prev)
    2696              :         } else {
    2697           27 :             None
    2698              :         };
    2699              : 
    2700              :         // Create the metadata file, noting the ancestor of the new timeline.
    2701              :         // There is initially no data in it, but all the read-calls know to look
    2702              :         // into the ancestor.
    2703          358 :         let metadata = TimelineMetadata::new(
    2704          358 :             start_lsn,
    2705          358 :             dst_prev,
    2706          358 :             Some(src_id),
    2707          358 :             start_lsn,
    2708          358 :             *src_timeline.latest_gc_cutoff_lsn.read(), // FIXME: should we hold onto this guard longer?
    2709          358 :             src_timeline.initdb_lsn,
    2710          358 :             src_timeline.pg_version,
    2711          358 :         );
    2712              : 
    2713          358 :         let uninitialized_timeline = self
    2714          358 :             .prepare_new_timeline(
    2715          358 :                 dst_id,
    2716          358 :                 &metadata,
    2717          358 :                 timeline_uninit_mark,
    2718          358 :                 start_lsn + 1,
    2719          358 :                 Some(Arc::clone(src_timeline)),
    2720          358 :             )
    2721            0 :             .await?;
    2722              : 
    2723          358 :         let new_timeline = uninitialized_timeline.finish_creation()?;
    2724              : 
    2725              :         // Root timeline gets its layers during creation and uploads them along with the metadata.
    2726              :         // A branch timeline though, when created, can get no writes for some time, hence won't get any layers created.
    2727              :         // We still need to upload its metadata eagerly: if other nodes `attach` the tenant and miss this timeline, their GC
    2728              :         // could get incorrect information and remove more layers, than needed.
    2729              :         // See also https://github.com/neondatabase/neon/issues/3865
    2730          358 :         if let Some(remote_client) = new_timeline.remote_client.as_ref() {
    2731          163 :             remote_client
    2732          163 :                 .schedule_index_upload_for_metadata_update(&metadata)
    2733          163 :                 .context("branch initial metadata upload")?;
    2734          195 :         }
    2735              : 
    2736          358 :         info!("branched timeline {dst_id} from {src_id} at {start_lsn}");
    2737              : 
    2738          358 :         Ok(new_timeline)
    2739          366 :     }
    2740              : 
    2741              :     /// - run initdb to init temporary instance and get bootstrap data
    2742              :     /// - after initialization complete, remove the temp dir.
    2743              :     ///
    2744              :     /// The caller is responsible for activating the returned timeline.
    2745          644 :     async fn bootstrap_timeline(
    2746          644 :         &self,
    2747          644 :         timeline_id: TimelineId,
    2748          644 :         pg_version: u32,
    2749          644 :         ctx: &RequestContext,
    2750          644 :     ) -> anyhow::Result<Arc<Timeline>> {
    2751          644 :         let timeline_uninit_mark = {
    2752          644 :             let timelines = self.timelines.lock().unwrap();
    2753          644 :             self.create_timeline_uninit_mark(timeline_id, &timelines)?
    2754              :         };
    2755              :         // create a `tenant/{tenant_id}/timelines/basebackup-{timeline_id}.{TEMP_FILE_SUFFIX}/`
    2756              :         // temporary directory for basebackup files for the given timeline.
    2757          644 :         let initdb_path = path_with_suffix_extension(
    2758          644 :             self.conf
    2759          644 :                 .timelines_path(&self.tenant_id)
    2760          644 :                 .join(format!("basebackup-{timeline_id}")),
    2761          644 :             TEMP_FILE_SUFFIX,
    2762          644 :         );
    2763          644 : 
    2764          644 :         // an uninit mark was placed before, nothing else can access this timeline files
    2765          644 :         // current initdb was not run yet, so remove whatever was left from the previous runs
    2766          644 :         if initdb_path.exists() {
    2767            0 :             fs::remove_dir_all(&initdb_path).with_context(|| {
    2768            0 :                 format!(
    2769            0 :                     "Failed to remove already existing initdb directory: {}",
    2770            0 :                     initdb_path.display()
    2771            0 :                 )
    2772            0 :             })?;
    2773          644 :         }
    2774              :         // Init temporarily repo to get bootstrap data, this creates a directory in the `initdb_path` path
    2775          644 :         run_initdb(self.conf, &initdb_path, pg_version)?;
    2776              :         // this new directory is very temporary, set to remove it immediately after bootstrap, we don't need it
    2777          644 :         scopeguard::defer! {
    2778          644 :             if let Err(e) = fs::remove_dir_all(&initdb_path) {
    2779              :                 // this is unlikely, but we will remove the directory on pageserver restart or another bootstrap call
    2780            0 :                 error!("Failed to remove temporary initdb directory '{}': {}", initdb_path.display(), e);
    2781          644 :             }
    2782              :         }
    2783          644 :         let pgdata_path = &initdb_path;
    2784          644 :         let pgdata_lsn = import_datadir::get_lsn_from_controlfile(pgdata_path)?.align();
    2785          644 : 
    2786          644 :         // Import the contents of the data directory at the initial checkpoint
    2787          644 :         // LSN, and any WAL after that.
    2788          644 :         // Initdb lsn will be equal to last_record_lsn which will be set after import.
    2789          644 :         // Because we know it upfront avoid having an option or dummy zero value by passing it to the metadata.
    2790          644 :         let new_metadata = TimelineMetadata::new(
    2791          644 :             Lsn(0),
    2792          644 :             None,
    2793          644 :             None,
    2794          644 :             Lsn(0),
    2795          644 :             pgdata_lsn,
    2796          644 :             pgdata_lsn,
    2797          644 :             pg_version,
    2798          644 :         );
    2799          644 :         let raw_timeline = self
    2800          644 :             .prepare_new_timeline(
    2801          644 :                 timeline_id,
    2802          644 :                 &new_metadata,
    2803          644 :                 timeline_uninit_mark,
    2804          644 :                 pgdata_lsn,
    2805          644 :                 None,
    2806          644 :             )
    2807            1 :             .await?;
    2808              : 
    2809          643 :         let tenant_id = raw_timeline.owning_tenant.tenant_id;
    2810          643 :         let unfinished_timeline = raw_timeline.raw_timeline()?;
    2811              : 
    2812          643 :         import_datadir::import_timeline_from_postgres_datadir(
    2813          643 :             unfinished_timeline,
    2814          643 :             pgdata_path,
    2815          643 :             pgdata_lsn,
    2816          643 :             ctx,
    2817          643 :         )
    2818      3210259 :         .await
    2819          643 :         .with_context(|| {
    2820            0 :             format!("Failed to import pgdatadir for timeline {tenant_id}/{timeline_id}")
    2821          643 :         })?;
    2822              : 
    2823              :         // Flush the new layer files to disk, before we make the timeline as available to
    2824              :         // the outside world.
    2825              :         //
    2826              :         // Flush loop needs to be spawned in order to be able to flush.
    2827          643 :         unfinished_timeline.maybe_spawn_flush_loop();
    2828          643 : 
    2829          643 :         fail::fail_point!("before-checkpoint-new-timeline", |_| {
    2830            1 :             anyhow::bail!("failpoint before-checkpoint-new-timeline");
    2831          643 :         });
    2832              : 
    2833          642 :         unfinished_timeline
    2834          642 :             .freeze_and_flush()
    2835          642 :             .await
    2836          642 :             .with_context(|| {
    2837            0 :                 format!(
    2838            0 :                     "Failed to flush after pgdatadir import for timeline {tenant_id}/{timeline_id}"
    2839            0 :                 )
    2840          642 :             })?;
    2841              : 
    2842              :         // All done!
    2843          642 :         let timeline = raw_timeline.finish_creation()?;
    2844              : 
    2845          642 :         info!(
    2846          642 :             "created root timeline {} timeline.lsn {}",
    2847          642 :             timeline_id,
    2848          642 :             timeline.get_last_record_lsn()
    2849          642 :         );
    2850              : 
    2851          642 :         Ok(timeline)
    2852          644 :     }
    2853              : 
    2854              :     /// Call this before constructing a timeline, to build its required structures
    2855         1341 :     fn build_timeline_resources(&self, timeline_id: TimelineId) -> TimelineResources {
    2856         1341 :         let remote_client = if let Some(remote_storage) = self.remote_storage.as_ref() {
    2857          710 :             let remote_client = RemoteTimelineClient::new(
    2858          710 :                 remote_storage.clone(),
    2859          710 :                 self.conf,
    2860          710 :                 self.tenant_id,
    2861          710 :                 timeline_id,
    2862          710 :                 self.generation,
    2863          710 :             );
    2864          710 :             Some(remote_client)
    2865              :         } else {
    2866          631 :             None
    2867              :         };
    2868              : 
    2869         1341 :         TimelineResources { remote_client }
    2870         1341 :     }
    2871              : 
    2872              :     /// Creates intermediate timeline structure and its files.
    2873              :     ///
    2874              :     /// An empty layer map is initialized, and new data and WAL can be imported starting
    2875              :     /// at 'disk_consistent_lsn'. After any initial data has been imported, call
    2876              :     /// `finish_creation` to insert the Timeline into the timelines map and to remove the
    2877              :     /// uninit mark file.
    2878         1042 :     async fn prepare_new_timeline(
    2879         1042 :         &self,
    2880         1042 :         new_timeline_id: TimelineId,
    2881         1042 :         new_metadata: &TimelineMetadata,
    2882         1042 :         uninit_mark: TimelineUninitMark,
    2883         1042 :         start_lsn: Lsn,
    2884         1042 :         ancestor: Option<Arc<Timeline>>,
    2885         1042 :     ) -> anyhow::Result<UninitializedTimeline> {
    2886         1042 :         let tenant_id = self.tenant_id;
    2887         1042 : 
    2888         1042 :         let resources = self.build_timeline_resources(new_timeline_id);
    2889         1042 :         if let Some(remote_client) = &resources.remote_client {
    2890          563 :             remote_client.init_upload_queue_for_empty_remote(new_metadata)?;
    2891          479 :         }
    2892              : 
    2893         1042 :         let timeline_struct = self
    2894         1042 :             .create_timeline_struct(
    2895         1042 :                 new_timeline_id,
    2896         1042 :                 new_metadata,
    2897         1042 :                 ancestor,
    2898         1042 :                 resources,
    2899         1042 :                 None,
    2900         1042 :                 CreateTimelineCause::Load,
    2901         1042 :             )
    2902         1042 :             .context("Failed to create timeline data structure")?;
    2903              : 
    2904         1042 :         timeline_struct.init_empty_layer_map(start_lsn);
    2905              : 
    2906         1042 :         if let Err(e) = self
    2907         1042 :             .create_timeline_files(&uninit_mark.timeline_path, &new_timeline_id, new_metadata)
    2908            0 :             .await
    2909              :         {
    2910            1 :             error!("Failed to create initial files for timeline {tenant_id}/{new_timeline_id}, cleaning up: {e:?}");
    2911            1 :             cleanup_timeline_directory(uninit_mark);
    2912            1 :             return Err(e);
    2913         1041 :         }
    2914              : 
    2915            0 :         debug!("Successfully created initial files for timeline {tenant_id}/{new_timeline_id}");
    2916              : 
    2917         1041 :         Ok(UninitializedTimeline::new(
    2918         1041 :             self,
    2919         1041 :             new_timeline_id,
    2920         1041 :             Some((timeline_struct, uninit_mark)),
    2921         1041 :         ))
    2922         1042 :     }
    2923              : 
    2924         1042 :     async fn create_timeline_files(
    2925         1042 :         &self,
    2926         1042 :         timeline_path: &Path,
    2927         1042 :         new_timeline_id: &TimelineId,
    2928         1042 :         new_metadata: &TimelineMetadata,
    2929         1042 :     ) -> anyhow::Result<()> {
    2930         1042 :         crashsafe::create_dir(timeline_path).context("Failed to create timeline directory")?;
    2931              : 
    2932         1042 :         fail::fail_point!("after-timeline-uninit-mark-creation", |_| {
    2933            1 :             anyhow::bail!("failpoint after-timeline-uninit-mark-creation");
    2934         1042 :         });
    2935              : 
    2936         1041 :         save_metadata(self.conf, &self.tenant_id, new_timeline_id, new_metadata)
    2937            0 :             .await
    2938         1041 :             .context("Failed to create timeline metadata")?;
    2939         1041 :         Ok(())
    2940         1042 :     }
    2941              : 
    2942              :     /// Attempts to create an uninit mark file for the timeline initialization.
    2943              :     /// Bails, if the timeline is already loaded into the memory (i.e. initialized before), or the uninit mark file already exists.
    2944              :     ///
    2945              :     /// This way, we need to hold the timelines lock only for small amount of time during the mark check/creation per timeline init.
    2946         1051 :     fn create_timeline_uninit_mark(
    2947         1051 :         &self,
    2948         1051 :         timeline_id: TimelineId,
    2949         1051 :         timelines: &MutexGuard<HashMap<TimelineId, Arc<Timeline>>>,
    2950         1051 :     ) -> anyhow::Result<TimelineUninitMark> {
    2951         1051 :         let tenant_id = self.tenant_id;
    2952         1051 : 
    2953         1051 :         anyhow::ensure!(
    2954         1051 :             timelines.get(&timeline_id).is_none(),
    2955            1 :             "Timeline {tenant_id}/{timeline_id} already exists in pageserver's memory"
    2956              :         );
    2957         1050 :         let timeline_path = self.conf.timeline_path(&tenant_id, &timeline_id);
    2958         1050 :         anyhow::ensure!(
    2959         1050 :             !timeline_path.exists(),
    2960            0 :             "Timeline {} already exists, cannot create its uninit mark file",
    2961            0 :             timeline_path.display()
    2962              :         );
    2963              : 
    2964         1050 :         let uninit_mark_path = self
    2965         1050 :             .conf
    2966         1050 :             .timeline_uninit_mark_file_path(tenant_id, timeline_id);
    2967         1050 :         fs::File::create(&uninit_mark_path)
    2968         1050 :             .context("Failed to create uninit mark file")
    2969         1050 :             .and_then(|_| {
    2970         1050 :                 crashsafe::fsync_file_and_parent(&uninit_mark_path)
    2971         1050 :                     .context("Failed to fsync uninit mark file")
    2972         1050 :             })
    2973         1050 :             .with_context(|| {
    2974            0 :                 format!("Failed to crate uninit mark for timeline {tenant_id}/{timeline_id}")
    2975         1050 :             })?;
    2976              : 
    2977         1050 :         let uninit_mark = TimelineUninitMark::new(uninit_mark_path, timeline_path);
    2978         1050 : 
    2979         1050 :         Ok(uninit_mark)
    2980         1051 :     }
    2981              : 
    2982              :     /// Gathers inputs from all of the timelines to produce a sizing model input.
    2983              :     ///
    2984              :     /// Future is cancellation safe. Only one calculation can be running at once per tenant.
    2985          268 :     #[instrument(skip_all, fields(tenant_id=%self.tenant_id))]
    2986              :     pub async fn gather_size_inputs(
    2987              :         &self,
    2988              :         // `max_retention_period` overrides the cutoff that is used to calculate the size
    2989              :         // (only if it is shorter than the real cutoff).
    2990              :         max_retention_period: Option<u64>,
    2991              :         cause: LogicalSizeCalculationCause,
    2992              :         ctx: &RequestContext,
    2993              :     ) -> anyhow::Result<size::ModelInputs> {
    2994              :         let logical_sizes_at_once = self
    2995              :             .conf
    2996              :             .concurrent_tenant_size_logical_size_queries
    2997              :             .inner();
    2998              : 
    2999              :         // TODO: Having a single mutex block concurrent reads is not great for performance.
    3000              :         //
    3001              :         // But the only case where we need to run multiple of these at once is when we
    3002              :         // request a size for a tenant manually via API, while another background calculation
    3003              :         // is in progress (which is not a common case).
    3004              :         //
    3005              :         // See more for on the issue #2748 condenced out of the initial PR review.
    3006              :         let mut shared_cache = self.cached_logical_sizes.lock().await;
    3007              : 
    3008              :         size::gather_inputs(
    3009              :             self,
    3010              :             logical_sizes_at_once,
    3011              :             max_retention_period,
    3012              :             &mut shared_cache,
    3013              :             cause,
    3014              :             ctx,
    3015              :         )
    3016              :         .await
    3017              :     }
    3018              : 
    3019              :     /// Calculate synthetic tenant size and cache the result.
    3020              :     /// This is periodically called by background worker.
    3021              :     /// result is cached in tenant struct
    3022           56 :     #[instrument(skip_all, fields(tenant_id=%self.tenant_id))]
    3023              :     pub async fn calculate_synthetic_size(
    3024              :         &self,
    3025              :         cause: LogicalSizeCalculationCause,
    3026              :         ctx: &RequestContext,
    3027              :     ) -> anyhow::Result<u64> {
    3028              :         let inputs = self.gather_size_inputs(None, cause, ctx).await?;
    3029              : 
    3030              :         let size = inputs.calculate()?;
    3031              : 
    3032              :         self.set_cached_synthetic_size(size);
    3033              : 
    3034              :         Ok(size)
    3035              :     }
    3036              : 
    3037              :     /// Cache given synthetic size and update the metric value
    3038           14 :     pub fn set_cached_synthetic_size(&self, size: u64) {
    3039           14 :         self.cached_synthetic_tenant_size
    3040           14 :             .store(size, Ordering::Relaxed);
    3041           14 : 
    3042           14 :         TENANT_SYNTHETIC_SIZE_METRIC
    3043           14 :             .get_metric_with_label_values(&[&self.tenant_id.to_string()])
    3044           14 :             .unwrap()
    3045           14 :             .set(size);
    3046           14 :     }
    3047              : 
    3048           40 :     pub fn cached_synthetic_size(&self) -> u64 {
    3049           40 :         self.cached_synthetic_tenant_size.load(Ordering::Relaxed)
    3050           40 :     }
    3051              : }
    3052              : 
    3053              : fn remove_timeline_and_uninit_mark(timeline_dir: &Path, uninit_mark: &Path) -> anyhow::Result<()> {
    3054            1 :     fs::remove_dir_all(timeline_dir)
    3055            1 :         .or_else(|e| {
    3056            0 :             if e.kind() == std::io::ErrorKind::NotFound {
    3057              :                 // we can leave the uninit mark without a timeline dir,
    3058              :                 // just remove the mark then
    3059            0 :                 Ok(())
    3060              :             } else {
    3061            0 :                 Err(e)
    3062              :             }
    3063            1 :         })
    3064            1 :         .with_context(|| {
    3065            0 :             format!(
    3066            0 :                 "Failed to remove unit marked timeline directory {}",
    3067            0 :                 timeline_dir.display()
    3068            0 :             )
    3069            1 :         })?;
    3070            1 :     fs::remove_file(uninit_mark).with_context(|| {
    3071            0 :         format!(
    3072            0 :             "Failed to remove timeline uninit mark file {}",
    3073            0 :             uninit_mark.display()
    3074            0 :         )
    3075            1 :     })?;
    3076              : 
    3077            1 :     Ok(())
    3078            1 : }
    3079              : 
    3080              : pub(crate) enum CreateTenantFilesMode {
    3081              :     Create,
    3082              :     Attach,
    3083              : }
    3084              : 
    3085          521 : pub(crate) async fn create_tenant_files(
    3086          521 :     conf: &'static PageServerConf,
    3087          521 :     tenant_conf: TenantConfOpt,
    3088          521 :     tenant_id: &TenantId,
    3089          521 :     mode: CreateTenantFilesMode,
    3090          521 : ) -> anyhow::Result<PathBuf> {
    3091          521 :     let target_tenant_directory = conf.tenant_path(tenant_id);
    3092          521 :     anyhow::ensure!(
    3093          521 :         !target_tenant_directory
    3094          521 :             .try_exists()
    3095          521 :             .context("check existence of tenant directory")?,
    3096            2 :         "tenant directory already exists",
    3097              :     );
    3098              : 
    3099          519 :     let temporary_tenant_dir =
    3100          519 :         path_with_suffix_extension(&target_tenant_directory, TEMP_FILE_SUFFIX);
    3101            0 :     debug!(
    3102            0 :         "Creating temporary directory structure in {}",
    3103            0 :         temporary_tenant_dir.display()
    3104            0 :     );
    3105              : 
    3106              :     // top-level dir may exist if we are creating it through CLI
    3107          519 :     crashsafe::create_dir_all(&temporary_tenant_dir).with_context(|| {
    3108            0 :         format!(
    3109            0 :             "could not create temporary tenant directory {}",
    3110            0 :             temporary_tenant_dir.display()
    3111            0 :         )
    3112          519 :     })?;
    3113              : 
    3114          519 :     let creation_result = try_create_target_tenant_dir(
    3115          519 :         conf,
    3116          519 :         tenant_conf,
    3117          519 :         tenant_id,
    3118          519 :         mode,
    3119          519 :         &temporary_tenant_dir,
    3120          519 :         &target_tenant_directory,
    3121          519 :     )
    3122            0 :     .await;
    3123              : 
    3124          519 :     if creation_result.is_err() {
    3125            1 :         error!("Failed to create directory structure for tenant {tenant_id}, cleaning tmp data");
    3126            1 :         if let Err(e) = fs::remove_dir_all(&temporary_tenant_dir) {
    3127            0 :             error!("Failed to remove temporary tenant directory {temporary_tenant_dir:?}: {e}")
    3128            1 :         } else if let Err(e) = crashsafe::fsync(&temporary_tenant_dir) {
    3129            1 :             error!(
    3130            1 :                 "Failed to fsync removed temporary tenant directory {temporary_tenant_dir:?}: {e}"
    3131            1 :             )
    3132            0 :         }
    3133          518 :     }
    3134              : 
    3135          519 :     creation_result?;
    3136              : 
    3137          518 :     Ok(target_tenant_directory)
    3138          521 : }
    3139              : 
    3140          519 : async fn try_create_target_tenant_dir(
    3141          519 :     conf: &'static PageServerConf,
    3142          519 :     tenant_conf: TenantConfOpt,
    3143          519 :     tenant_id: &TenantId,
    3144          519 :     mode: CreateTenantFilesMode,
    3145          519 :     temporary_tenant_dir: &Path,
    3146          519 :     target_tenant_directory: &Path,
    3147          519 : ) -> Result<(), anyhow::Error> {
    3148          519 :     match mode {
    3149          480 :         CreateTenantFilesMode::Create => {} // needs no attach marker, writing tenant conf + atomic rename of dir is good enough
    3150              :         CreateTenantFilesMode::Attach => {
    3151           39 :             let attach_marker_path = temporary_tenant_dir.join(TENANT_ATTACHING_MARKER_FILENAME);
    3152           39 :             let file = std::fs::OpenOptions::new()
    3153           39 :                 .create_new(true)
    3154           39 :                 .write(true)
    3155           39 :                 .open(&attach_marker_path)
    3156           39 :                 .with_context(|| {
    3157            0 :                     format!("could not create attach marker file {attach_marker_path:?}")
    3158           39 :                 })?;
    3159           39 :             file.sync_all().with_context(|| {
    3160            0 :                 format!("could not sync attach marker file: {attach_marker_path:?}")
    3161           39 :             })?;
    3162              :             // fsync of the directory in which the file resides comes later in this function
    3163              :         }
    3164              :     }
    3165              : 
    3166          519 :     let temporary_tenant_timelines_dir = rebase_directory(
    3167          519 :         &conf.timelines_path(tenant_id),
    3168          519 :         target_tenant_directory,
    3169          519 :         temporary_tenant_dir,
    3170          519 :     )
    3171          519 :     .with_context(|| format!("resolve tenant {tenant_id} temporary timelines dir"))?;
    3172          519 :     let temporary_tenant_config_path = rebase_directory(
    3173          519 :         &conf.tenant_config_path(tenant_id),
    3174          519 :         target_tenant_directory,
    3175          519 :         temporary_tenant_dir,
    3176          519 :     )
    3177          519 :     .with_context(|| format!("resolve tenant {tenant_id} temporary config path"))?;
    3178              : 
    3179          519 :     Tenant::persist_tenant_config(tenant_id, &temporary_tenant_config_path, tenant_conf).await?;
    3180              : 
    3181          519 :     crashsafe::create_dir(&temporary_tenant_timelines_dir).with_context(|| {
    3182            0 :         format!(
    3183            0 :             "create tenant {} temporary timelines directory {}",
    3184            0 :             tenant_id,
    3185            0 :             temporary_tenant_timelines_dir.display()
    3186            0 :         )
    3187          519 :     })?;
    3188          519 :     fail::fail_point!("tenant-creation-before-tmp-rename", |_| {
    3189            1 :         anyhow::bail!("failpoint tenant-creation-before-tmp-rename");
    3190          519 :     });
    3191              : 
    3192              :     // Make sure the current tenant directory entries are durable before renaming.
    3193              :     // Without this, a crash may reorder any of the directory entry creations above.
    3194          518 :     crashsafe::fsync(temporary_tenant_dir)
    3195          518 :         .with_context(|| format!("sync temporary tenant directory {temporary_tenant_dir:?}"))?;
    3196              : 
    3197          518 :     fs::rename(temporary_tenant_dir, target_tenant_directory).with_context(|| {
    3198            0 :         format!(
    3199            0 :             "move tenant {} temporary directory {} into the permanent one {}",
    3200            0 :             tenant_id,
    3201            0 :             temporary_tenant_dir.display(),
    3202            0 :             target_tenant_directory.display()
    3203            0 :         )
    3204          518 :     })?;
    3205          518 :     let target_dir_parent = target_tenant_directory.parent().with_context(|| {
    3206            0 :         format!(
    3207            0 :             "get tenant {} dir parent for {}",
    3208            0 :             tenant_id,
    3209            0 :             target_tenant_directory.display()
    3210            0 :         )
    3211          518 :     })?;
    3212          518 :     crashsafe::fsync(target_dir_parent).with_context(|| {
    3213            0 :         format!(
    3214            0 :             "fsync renamed directory's parent {} for tenant {}",
    3215            0 :             target_dir_parent.display(),
    3216            0 :             tenant_id,
    3217            0 :         )
    3218          518 :     })?;
    3219              : 
    3220          518 :     Ok(())
    3221          519 : }
    3222              : 
    3223         1038 : fn rebase_directory(original_path: &Path, base: &Path, new_base: &Path) -> anyhow::Result<PathBuf> {
    3224         1038 :     let relative_path = original_path.strip_prefix(base).with_context(|| {
    3225            0 :         format!(
    3226            0 :             "Failed to strip base prefix '{}' off path '{}'",
    3227            0 :             base.display(),
    3228            0 :             original_path.display()
    3229            0 :         )
    3230         1038 :     })?;
    3231         1038 :     Ok(new_base.join(relative_path))
    3232         1038 : }
    3233              : 
    3234              : /// Create the cluster temporarily in 'initdbpath' directory inside the repository
    3235              : /// to get bootstrap data for timeline initialization.
    3236          644 : fn run_initdb(
    3237          644 :     conf: &'static PageServerConf,
    3238          644 :     initdb_target_dir: &Path,
    3239          644 :     pg_version: u32,
    3240          644 : ) -> anyhow::Result<()> {
    3241          644 :     let initdb_bin_path = conf.pg_bin_dir(pg_version)?.join("initdb");
    3242          644 :     let initdb_lib_dir = conf.pg_lib_dir(pg_version)?;
    3243          644 :     info!(
    3244          644 :         "running {} in {}, libdir: {}",
    3245          644 :         initdb_bin_path.display(),
    3246          644 :         initdb_target_dir.display(),
    3247          644 :         initdb_lib_dir.display(),
    3248          644 :     );
    3249              : 
    3250          644 :     let initdb_output = Command::new(&initdb_bin_path)
    3251          644 :         .args(["-D", &initdb_target_dir.to_string_lossy()])
    3252          644 :         .args(["-U", &conf.superuser])
    3253          644 :         .args(["-E", "utf8"])
    3254          644 :         .arg("--no-instructions")
    3255          644 :         // This is only used for a temporary installation that is deleted shortly after,
    3256          644 :         // so no need to fsync it
    3257          644 :         .arg("--no-sync")
    3258          644 :         .env_clear()
    3259          644 :         .env("LD_LIBRARY_PATH", &initdb_lib_dir)
    3260          644 :         .env("DYLD_LIBRARY_PATH", &initdb_lib_dir)
    3261          644 :         .stdout(Stdio::null())
    3262          644 :         .output()
    3263          644 :         .with_context(|| {
    3264            0 :             format!(
    3265            0 :                 "failed to execute {} at target dir {}",
    3266            0 :                 initdb_bin_path.display(),
    3267            0 :                 initdb_target_dir.display()
    3268            0 :             )
    3269          644 :         })?;
    3270          644 :     if !initdb_output.status.success() {
    3271            0 :         bail!(
    3272            0 :             "initdb failed: '{}'",
    3273            0 :             String::from_utf8_lossy(&initdb_output.stderr)
    3274            0 :         );
    3275          644 :     }
    3276          644 : 
    3277          644 :     Ok(())
    3278          644 : }
    3279              : 
    3280              : impl Drop for Tenant {
    3281          164 :     fn drop(&mut self) {
    3282          164 :         remove_tenant_metrics(&self.tenant_id);
    3283          164 :     }
    3284              : }
    3285              : /// Dump contents of a layer file to stdout.
    3286            0 : pub async fn dump_layerfile_from_path(
    3287            0 :     path: &Path,
    3288            0 :     verbose: bool,
    3289            0 :     ctx: &RequestContext,
    3290            0 : ) -> anyhow::Result<()> {
    3291              :     use std::os::unix::fs::FileExt;
    3292              : 
    3293              :     // All layer files start with a two-byte "magic" value, to identify the kind of
    3294              :     // file.
    3295            0 :     let file = File::open(path)?;
    3296            0 :     let mut header_buf = [0u8; 2];
    3297            0 :     file.read_exact_at(&mut header_buf, 0)?;
    3298              : 
    3299            0 :     match u16::from_be_bytes(header_buf) {
    3300              :         crate::IMAGE_FILE_MAGIC => {
    3301            0 :             ImageLayer::new_for_path(path, file)?
    3302            0 :                 .dump(verbose, ctx)
    3303            0 :                 .await?
    3304              :         }
    3305              :         crate::DELTA_FILE_MAGIC => {
    3306            0 :             DeltaLayer::new_for_path(path, file)?
    3307            0 :                 .dump(verbose, ctx)
    3308            0 :                 .await?
    3309              :         }
    3310            0 :         magic => bail!("unrecognized magic identifier: {:?}", magic),
    3311              :     }
    3312              : 
    3313            0 :     Ok(())
    3314            0 : }
    3315              : 
    3316              : #[cfg(test)]
    3317              : pub mod harness {
    3318              :     use bytes::{Bytes, BytesMut};
    3319              :     use once_cell::sync::OnceCell;
    3320              :     use std::sync::Arc;
    3321              :     use std::{fs, path::PathBuf};
    3322              :     use utils::logging;
    3323              :     use utils::lsn::Lsn;
    3324              : 
    3325              :     use crate::{
    3326              :         config::PageServerConf,
    3327              :         repository::Key,
    3328              :         tenant::Tenant,
    3329              :         walrecord::NeonWalRecord,
    3330              :         walredo::{WalRedoError, WalRedoManager},
    3331              :     };
    3332              : 
    3333              :     use super::*;
    3334              :     use crate::tenant::config::{TenantConf, TenantConfOpt};
    3335              :     use hex_literal::hex;
    3336              :     use utils::id::{TenantId, TimelineId};
    3337              : 
    3338              :     pub const TIMELINE_ID: TimelineId =
    3339              :         TimelineId::from_array(hex!("11223344556677881122334455667788"));
    3340              :     pub const NEW_TIMELINE_ID: TimelineId =
    3341              :         TimelineId::from_array(hex!("AA223344556677881122334455667788"));
    3342              : 
    3343              :     /// Convenience function to create a page image with given string as the only content
    3344              :     #[allow(non_snake_case)]
    3345       854153 :     pub fn TEST_IMG(s: &str) -> Bytes {
    3346       854153 :         let mut buf = BytesMut::new();
    3347       854153 :         buf.extend_from_slice(s.as_bytes());
    3348       854153 :         buf.resize(64, 0);
    3349       854153 : 
    3350       854153 :         buf.freeze()
    3351       854153 :     }
    3352              : 
    3353              :     impl From<TenantConf> for TenantConfOpt {
    3354           40 :         fn from(tenant_conf: TenantConf) -> Self {
    3355           40 :             Self {
    3356           40 :                 checkpoint_distance: Some(tenant_conf.checkpoint_distance),
    3357           40 :                 checkpoint_timeout: Some(tenant_conf.checkpoint_timeout),
    3358           40 :                 compaction_target_size: Some(tenant_conf.compaction_target_size),
    3359           40 :                 compaction_period: Some(tenant_conf.compaction_period),
    3360           40 :                 compaction_threshold: Some(tenant_conf.compaction_threshold),
    3361           40 :                 gc_horizon: Some(tenant_conf.gc_horizon),
    3362           40 :                 gc_period: Some(tenant_conf.gc_period),
    3363           40 :                 image_creation_threshold: Some(tenant_conf.image_creation_threshold),
    3364           40 :                 pitr_interval: Some(tenant_conf.pitr_interval),
    3365           40 :                 walreceiver_connect_timeout: Some(tenant_conf.walreceiver_connect_timeout),
    3366           40 :                 lagging_wal_timeout: Some(tenant_conf.lagging_wal_timeout),
    3367           40 :                 max_lsn_wal_lag: Some(tenant_conf.max_lsn_wal_lag),
    3368           40 :                 trace_read_requests: Some(tenant_conf.trace_read_requests),
    3369           40 :                 eviction_policy: Some(tenant_conf.eviction_policy),
    3370           40 :                 min_resident_size_override: tenant_conf.min_resident_size_override,
    3371           40 :                 evictions_low_residence_duration_metric_threshold: Some(
    3372           40 :                     tenant_conf.evictions_low_residence_duration_metric_threshold,
    3373           40 :                 ),
    3374           40 :                 gc_feedback: Some(tenant_conf.gc_feedback),
    3375           40 :             }
    3376           40 :         }
    3377              :     }
    3378              : 
    3379              :     pub struct TenantHarness {
    3380              :         pub conf: &'static PageServerConf,
    3381              :         pub tenant_conf: TenantConf,
    3382              :         pub tenant_id: TenantId,
    3383              :         pub generation: Generation,
    3384              :         remote_storage: GenericRemoteStorage,
    3385              :         pub remote_fs_dir: PathBuf,
    3386              :     }
    3387              : 
    3388              :     static LOG_HANDLE: OnceCell<()> = OnceCell::new();
    3389              : 
    3390              :     impl TenantHarness {
    3391           36 :         pub fn create(test_name: &'static str) -> anyhow::Result<Self> {
    3392           36 :             LOG_HANDLE.get_or_init(|| {
    3393            1 :                 logging::init(
    3394            1 :                     logging::LogFormat::Test,
    3395            1 :                     // enable it in case in case the tests exercise code paths that use
    3396            1 :                     // debug_assert_current_span_has_tenant_and_timeline_id
    3397            1 :                     logging::TracingErrorLayerEnablement::EnableWithRustLogFilter,
    3398            1 :                 )
    3399            1 :                 .expect("Failed to init test logging")
    3400           36 :             });
    3401           36 : 
    3402           36 :             let repo_dir = PageServerConf::test_repo_dir(test_name);
    3403           36 :             let _ = fs::remove_dir_all(&repo_dir);
    3404           36 :             fs::create_dir_all(&repo_dir)?;
    3405              : 
    3406           36 :             let conf = PageServerConf::dummy_conf(repo_dir);
    3407           36 :             // Make a static copy of the config. This can never be free'd, but that's
    3408           36 :             // OK in a test.
    3409           36 :             let conf: &'static PageServerConf = Box::leak(Box::new(conf));
    3410           36 : 
    3411           36 :             // Disable automatic GC and compaction to make the unit tests more deterministic.
    3412           36 :             // The tests perform them manually if needed.
    3413           36 :             let tenant_conf = TenantConf {
    3414           36 :                 gc_period: Duration::ZERO,
    3415           36 :                 compaction_period: Duration::ZERO,
    3416           36 :                 ..TenantConf::default()
    3417           36 :             };
    3418           36 : 
    3419           36 :             let tenant_id = TenantId::generate();
    3420           36 :             fs::create_dir_all(conf.tenant_path(&tenant_id))?;
    3421           36 :             fs::create_dir_all(conf.timelines_path(&tenant_id))?;
    3422              : 
    3423              :             use remote_storage::{RemoteStorageConfig, RemoteStorageKind};
    3424           36 :             let remote_fs_dir = conf.workdir.join("localfs");
    3425           36 :             std::fs::create_dir_all(&remote_fs_dir).unwrap();
    3426           36 :             let config = RemoteStorageConfig {
    3427           36 :                 // TODO: why not remote_storage::DEFAULT_REMOTE_STORAGE_MAX_CONCURRENT_SYNCS,
    3428           36 :                 max_concurrent_syncs: std::num::NonZeroUsize::new(2_000_000).unwrap(),
    3429           36 :                 // TODO: why not remote_storage::DEFAULT_REMOTE_STORAGE_MAX_SYNC_ERRORS,
    3430           36 :                 max_sync_errors: std::num::NonZeroU32::new(3_000_000).unwrap(),
    3431           36 :                 storage: RemoteStorageKind::LocalFs(remote_fs_dir.clone()),
    3432           36 :             };
    3433           36 :             let remote_storage = GenericRemoteStorage::from_config(&config).unwrap();
    3434           36 : 
    3435           36 :             Ok(Self {
    3436           36 :                 conf,
    3437           36 :                 tenant_conf,
    3438           36 :                 tenant_id,
    3439           36 :                 generation: Generation::new(0xdeadbeef),
    3440           36 :                 remote_storage,
    3441           36 :                 remote_fs_dir,
    3442           36 :             })
    3443           36 :         }
    3444              : 
    3445           37 :         pub async fn load(&self) -> (Arc<Tenant>, RequestContext) {
    3446           37 :             let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error);
    3447           37 :             (
    3448           37 :                 self.try_load(&ctx)
    3449           49 :                     .await
    3450           37 :                     .expect("failed to load test tenant"),
    3451           37 :                 ctx,
    3452           37 :             )
    3453           37 :         }
    3454              : 
    3455           40 :         pub async fn try_load(&self, ctx: &RequestContext) -> anyhow::Result<Arc<Tenant>> {
    3456           40 :             let walredo_mgr = Arc::new(TestRedoManager);
    3457           40 : 
    3458           40 :             let tenant = Arc::new(Tenant::new(
    3459           40 :                 TenantState::Loading,
    3460           40 :                 self.conf,
    3461           40 :                 TenantConfOpt::from(self.tenant_conf),
    3462           40 :                 walredo_mgr,
    3463           40 :                 self.tenant_id,
    3464           40 :                 self.generation,
    3465           40 :                 Some(self.remote_storage.clone()),
    3466           40 :             ));
    3467           40 :             tenant
    3468           40 :                 .load(None, ctx)
    3469           40 :                 .instrument(info_span!("try_load", tenant_id=%self.tenant_id))
    3470           52 :                 .await?;
    3471              : 
    3472              :             // TODO reuse Tenant::activate (needs broker)
    3473           39 :             tenant.state.send_replace(TenantState::Active);
    3474           39 :             for timeline in tenant.timelines.lock().unwrap().values() {
    3475            3 :                 timeline.set_state(TimelineState::Active);
    3476            3 :             }
    3477           39 :             Ok(tenant)
    3478           40 :         }
    3479              : 
    3480            3 :         pub fn timeline_path(&self, timeline_id: &TimelineId) -> PathBuf {
    3481            3 :             self.conf.timeline_path(&self.tenant_id, timeline_id)
    3482            3 :         }
    3483              :     }
    3484              : 
    3485              :     // Mock WAL redo manager that doesn't do much
    3486              :     pub struct TestRedoManager;
    3487              : 
    3488              :     impl WalRedoManager for TestRedoManager {
    3489            0 :         fn request_redo(
    3490            0 :             &self,
    3491            0 :             key: Key,
    3492            0 :             lsn: Lsn,
    3493            0 :             base_img: Option<(Lsn, Bytes)>,
    3494            0 :             records: Vec<(Lsn, NeonWalRecord)>,
    3495            0 :             _pg_version: u32,
    3496            0 :         ) -> Result<Bytes, WalRedoError> {
    3497            0 :             let s = format!(
    3498            0 :                 "redo for {} to get to {}, with {} and {} records",
    3499            0 :                 key,
    3500            0 :                 lsn,
    3501            0 :                 if base_img.is_some() {
    3502            0 :                     "base image"
    3503              :                 } else {
    3504            0 :                     "no base image"
    3505              :                 },
    3506            0 :                 records.len()
    3507            0 :             );
    3508            0 :             println!("{s}");
    3509            0 : 
    3510            0 :             Ok(TEST_IMG(&s))
    3511            0 :         }
    3512              :     }
    3513              : }
    3514              : 
    3515              : #[cfg(test)]
    3516              : mod tests {
    3517              :     use super::*;
    3518              :     use crate::keyspace::KeySpaceAccum;
    3519              :     use crate::repository::{Key, Value};
    3520              :     use crate::tenant::harness::*;
    3521              :     use crate::DEFAULT_PG_VERSION;
    3522              :     use crate::METADATA_FILE_NAME;
    3523              :     use bytes::BytesMut;
    3524              :     use hex_literal::hex;
    3525              :     use once_cell::sync::Lazy;
    3526              :     use rand::{thread_rng, Rng};
    3527              :     use tokio_util::sync::CancellationToken;
    3528              : 
    3529              :     static TEST_KEY: Lazy<Key> =
    3530            1 :         Lazy::new(|| Key::from_slice(&hex!("112222222233333333444444445500000001")));
    3531              : 
    3532            1 :     #[tokio::test]
    3533            1 :     async fn test_basic() -> anyhow::Result<()> {
    3534            1 :         let (tenant, ctx) = TenantHarness::create("test_basic")?.load().await;
    3535            1 :         let tline = tenant
    3536            1 :             .create_test_timeline(TIMELINE_ID, Lsn(0x08), DEFAULT_PG_VERSION, &ctx)
    3537            2 :             .await?;
    3538              : 
    3539            1 :         let writer = tline.writer().await;
    3540            1 :         writer
    3541            1 :             .put(*TEST_KEY, Lsn(0x10), &Value::Image(TEST_IMG("foo at 0x10")))
    3542            0 :             .await?;
    3543            1 :         writer.finish_write(Lsn(0x10));
    3544            1 :         drop(writer);
    3545              : 
    3546            1 :         let writer = tline.writer().await;
    3547            1 :         writer
    3548            1 :             .put(*TEST_KEY, Lsn(0x20), &Value::Image(TEST_IMG("foo at 0x20")))
    3549            0 :             .await?;
    3550            1 :         writer.finish_write(Lsn(0x20));
    3551            1 :         drop(writer);
    3552              : 
    3553            1 :         assert_eq!(
    3554            1 :             tline.get(*TEST_KEY, Lsn(0x10), &ctx).await?,
    3555            1 :             TEST_IMG("foo at 0x10")
    3556              :         );
    3557            1 :         assert_eq!(
    3558            1 :             tline.get(*TEST_KEY, Lsn(0x1f), &ctx).await?,
    3559            1 :             TEST_IMG("foo at 0x10")
    3560              :         );
    3561            1 :         assert_eq!(
    3562            1 :             tline.get(*TEST_KEY, Lsn(0x20), &ctx).await?,
    3563            1 :             TEST_IMG("foo at 0x20")
    3564              :         );
    3565              : 
    3566            1 :         Ok(())
    3567              :     }
    3568              : 
    3569            1 :     #[tokio::test]
    3570            1 :     async fn no_duplicate_timelines() -> anyhow::Result<()> {
    3571            1 :         let (tenant, ctx) = TenantHarness::create("no_duplicate_timelines")?
    3572            1 :             .load()
    3573            1 :             .await;
    3574            1 :         let _ = tenant
    3575            1 :             .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
    3576            2 :             .await?;
    3577              : 
    3578            1 :         match tenant
    3579            1 :             .create_empty_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
    3580            0 :             .await
    3581              :         {
    3582            0 :             Ok(_) => panic!("duplicate timeline creation should fail"),
    3583            1 :             Err(e) => assert_eq!(
    3584            1 :                 e.to_string(),
    3585            1 :                 format!(
    3586            1 :                     "Timeline {}/{} already exists in pageserver's memory",
    3587            1 :                     tenant.tenant_id, TIMELINE_ID
    3588            1 :                 )
    3589            1 :             ),
    3590              :         }
    3591              : 
    3592            1 :         Ok(())
    3593              :     }
    3594              : 
    3595              :     /// Convenience function to create a page image with given string as the only content
    3596            5 :     pub fn test_value(s: &str) -> Value {
    3597            5 :         let mut buf = BytesMut::new();
    3598            5 :         buf.extend_from_slice(s.as_bytes());
    3599            5 :         Value::Image(buf.freeze())
    3600            5 :     }
    3601              : 
    3602              :     ///
    3603              :     /// Test branch creation
    3604              :     ///
    3605            1 :     #[tokio::test]
    3606            1 :     async fn test_branch() -> anyhow::Result<()> {
    3607              :         use std::str::from_utf8;
    3608              : 
    3609            1 :         let (tenant, ctx) = TenantHarness::create("test_branch")?.load().await;
    3610            1 :         let tline = tenant
    3611            1 :             .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
    3612            2 :             .await?;
    3613            1 :         let writer = tline.writer().await;
    3614              : 
    3615              :         #[allow(non_snake_case)]
    3616            1 :         let TEST_KEY_A: Key = Key::from_hex("112222222233333333444444445500000001").unwrap();
    3617            1 :         #[allow(non_snake_case)]
    3618            1 :         let TEST_KEY_B: Key = Key::from_hex("112222222233333333444444445500000002").unwrap();
    3619            1 : 
    3620            1 :         // Insert a value on the timeline
    3621            1 :         writer
    3622            1 :             .put(TEST_KEY_A, Lsn(0x20), &test_value("foo at 0x20"))
    3623            0 :             .await?;
    3624            1 :         writer
    3625            1 :             .put(TEST_KEY_B, Lsn(0x20), &test_value("foobar at 0x20"))
    3626            0 :             .await?;
    3627            1 :         writer.finish_write(Lsn(0x20));
    3628            1 : 
    3629            1 :         writer
    3630            1 :             .put(TEST_KEY_A, Lsn(0x30), &test_value("foo at 0x30"))
    3631            0 :             .await?;
    3632            1 :         writer.finish_write(Lsn(0x30));
    3633            1 :         writer
    3634            1 :             .put(TEST_KEY_A, Lsn(0x40), &test_value("foo at 0x40"))
    3635            0 :             .await?;
    3636            1 :         writer.finish_write(Lsn(0x40));
    3637            1 : 
    3638            1 :         //assert_current_logical_size(&tline, Lsn(0x40));
    3639            1 : 
    3640            1 :         // Branch the history, modify relation differently on the new timeline
    3641            1 :         tenant
    3642            1 :             .branch_timeline_test(&tline, NEW_TIMELINE_ID, Some(Lsn(0x30)), &ctx)
    3643            0 :             .await?;
    3644            1 :         let newtline = tenant
    3645            1 :             .get_timeline(NEW_TIMELINE_ID, true)
    3646            1 :             .expect("Should have a local timeline");
    3647            1 :         let new_writer = newtline.writer().await;
    3648            1 :         new_writer
    3649            1 :             .put(TEST_KEY_A, Lsn(0x40), &test_value("bar at 0x40"))
    3650            0 :             .await?;
    3651            1 :         new_writer.finish_write(Lsn(0x40));
    3652              : 
    3653              :         // Check page contents on both branches
    3654            1 :         assert_eq!(
    3655            1 :             from_utf8(&tline.get(TEST_KEY_A, Lsn(0x40), &ctx).await?)?,
    3656              :             "foo at 0x40"
    3657              :         );
    3658            1 :         assert_eq!(
    3659            1 :             from_utf8(&newtline.get(TEST_KEY_A, Lsn(0x40), &ctx).await?)?,
    3660              :             "bar at 0x40"
    3661              :         );
    3662            1 :         assert_eq!(
    3663            1 :             from_utf8(&newtline.get(TEST_KEY_B, Lsn(0x40), &ctx).await?)?,
    3664              :             "foobar at 0x20"
    3665              :         );
    3666              : 
    3667              :         //assert_current_logical_size(&tline, Lsn(0x40));
    3668              : 
    3669            1 :         Ok(())
    3670              :     }
    3671              : 
    3672           10 :     async fn make_some_layers(tline: &Timeline, start_lsn: Lsn) -> anyhow::Result<()> {
    3673           10 :         let mut lsn = start_lsn;
    3674              :         #[allow(non_snake_case)]
    3675              :         {
    3676           10 :             let writer = tline.writer().await;
    3677              :             // Create a relation on the timeline
    3678           10 :             writer
    3679           10 :                 .put(
    3680           10 :                     *TEST_KEY,
    3681           10 :                     lsn,
    3682           10 :                     &Value::Image(TEST_IMG(&format!("foo at {}", lsn))),
    3683           10 :                 )
    3684            0 :                 .await?;
    3685           10 :             writer.finish_write(lsn);
    3686           10 :             lsn += 0x10;
    3687           10 :             writer
    3688           10 :                 .put(
    3689           10 :                     *TEST_KEY,
    3690           10 :                     lsn,
    3691           10 :                     &Value::Image(TEST_IMG(&format!("foo at {}", lsn))),
    3692           10 :                 )
    3693            0 :                 .await?;
    3694           10 :             writer.finish_write(lsn);
    3695           10 :             lsn += 0x10;
    3696           10 :         }
    3697           10 :         tline.freeze_and_flush().await?;
    3698              :         {
    3699           10 :             let writer = tline.writer().await;
    3700           10 :             writer
    3701           10 :                 .put(
    3702           10 :                     *TEST_KEY,
    3703           10 :                     lsn,
    3704           10 :                     &Value::Image(TEST_IMG(&format!("foo at {}", lsn))),
    3705           10 :                 )
    3706            0 :                 .await?;
    3707           10 :             writer.finish_write(lsn);
    3708           10 :             lsn += 0x10;
    3709           10 :             writer
    3710           10 :                 .put(
    3711           10 :                     *TEST_KEY,
    3712           10 :                     lsn,
    3713           10 :                     &Value::Image(TEST_IMG(&format!("foo at {}", lsn))),
    3714           10 :                 )
    3715            0 :                 .await?;
    3716           10 :             writer.finish_write(lsn);
    3717           10 :         }
    3718           10 :         tline.freeze_and_flush().await
    3719           10 :     }
    3720              : 
    3721            1 :     #[tokio::test]
    3722            1 :     async fn test_prohibit_branch_creation_on_garbage_collected_data() -> anyhow::Result<()> {
    3723            1 :         let (tenant, ctx) =
    3724            1 :             TenantHarness::create("test_prohibit_branch_creation_on_garbage_collected_data")?
    3725            1 :                 .load()
    3726            1 :                 .await;
    3727            1 :         let tline = tenant
    3728            1 :             .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
    3729            2 :             .await?;
    3730            2 :         make_some_layers(tline.as_ref(), Lsn(0x20)).await?;
    3731              : 
    3732              :         // this removes layers before lsn 40 (50 minus 10), so there are two remaining layers, image and delta for 31-50
    3733              :         // FIXME: this doesn't actually remove any layer currently, given how the flushing
    3734              :         // and compaction works. But it does set the 'cutoff' point so that the cross check
    3735              :         // below should fail.
    3736            1 :         tenant
    3737            1 :             .gc_iteration(Some(TIMELINE_ID), 0x10, Duration::ZERO, &ctx)
    3738            1 :             .await?;
    3739              : 
    3740              :         // try to branch at lsn 25, should fail because we already garbage collected the data
    3741            1 :         match tenant
    3742            1 :             .branch_timeline_test(&tline, NEW_TIMELINE_ID, Some(Lsn(0x25)), &ctx)
    3743            0 :             .await
    3744              :         {
    3745            0 :             Ok(_) => panic!("branching should have failed"),
    3746            1 :             Err(err) => {
    3747            1 :                 let CreateTimelineError::AncestorLsn(err) = err else {
    3748            0 :                     panic!("wrong error type")
    3749              :                 };
    3750            1 :                 assert!(err.to_string().contains("invalid branch start lsn"));
    3751            1 :                 assert!(err
    3752            1 :                     .source()
    3753            1 :                     .unwrap()
    3754            1 :                     .to_string()
    3755            1 :                     .contains("we might've already garbage collected needed data"))
    3756              :             }
    3757              :         }
    3758              : 
    3759            1 :         Ok(())
    3760              :     }
    3761              : 
    3762            1 :     #[tokio::test]
    3763            1 :     async fn test_prohibit_branch_creation_on_pre_initdb_lsn() -> anyhow::Result<()> {
    3764            1 :         let (tenant, ctx) =
    3765            1 :             TenantHarness::create("test_prohibit_branch_creation_on_pre_initdb_lsn")?
    3766            1 :                 .load()
    3767            1 :                 .await;
    3768              : 
    3769            1 :         let tline = tenant
    3770            1 :             .create_test_timeline(TIMELINE_ID, Lsn(0x50), DEFAULT_PG_VERSION, &ctx)
    3771            2 :             .await?;
    3772              :         // try to branch at lsn 0x25, should fail because initdb lsn is 0x50
    3773            1 :         match tenant
    3774            1 :             .branch_timeline_test(&tline, NEW_TIMELINE_ID, Some(Lsn(0x25)), &ctx)
    3775            0 :             .await
    3776              :         {
    3777            0 :             Ok(_) => panic!("branching should have failed"),
    3778            1 :             Err(err) => {
    3779            1 :                 let CreateTimelineError::AncestorLsn(err) = err else {
    3780            0 :                     panic!("wrong error type");
    3781              :                 };
    3782            1 :                 assert!(&err.to_string().contains("invalid branch start lsn"));
    3783            1 :                 assert!(&err
    3784            1 :                     .source()
    3785            1 :                     .unwrap()
    3786            1 :                     .to_string()
    3787            1 :                     .contains("is earlier than latest GC horizon"));
    3788              :             }
    3789              :         }
    3790              : 
    3791            1 :         Ok(())
    3792              :     }
    3793              : 
    3794              :     /*
    3795              :     // FIXME: This currently fails to error out. Calling GC doesn't currently
    3796              :     // remove the old value, we'd need to work a little harder
    3797              :     #[tokio::test]
    3798              :     async fn test_prohibit_get_for_garbage_collected_data() -> anyhow::Result<()> {
    3799              :         let repo =
    3800              :             RepoHarness::create("test_prohibit_get_for_garbage_collected_data")?
    3801              :             .load();
    3802              : 
    3803              :         let tline = repo.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION)?;
    3804              :         make_some_layers(tline.as_ref(), Lsn(0x20)).await?;
    3805              : 
    3806              :         repo.gc_iteration(Some(TIMELINE_ID), 0x10, Duration::ZERO)?;
    3807              :         let latest_gc_cutoff_lsn = tline.get_latest_gc_cutoff_lsn();
    3808              :         assert!(*latest_gc_cutoff_lsn > Lsn(0x25));
    3809              :         match tline.get(*TEST_KEY, Lsn(0x25)) {
    3810              :             Ok(_) => panic!("request for page should have failed"),
    3811              :             Err(err) => assert!(err.to_string().contains("not found at")),
    3812              :         }
    3813              :         Ok(())
    3814              :     }
    3815              :      */
    3816              : 
    3817            1 :     #[tokio::test]
    3818            1 :     async fn test_get_branchpoints_from_an_inactive_timeline() -> anyhow::Result<()> {
    3819            1 :         let (tenant, ctx) =
    3820            1 :             TenantHarness::create("test_get_branchpoints_from_an_inactive_timeline")?
    3821            1 :                 .load()
    3822            1 :                 .await;
    3823            1 :         let tline = tenant
    3824            1 :             .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
    3825            2 :             .await?;
    3826            2 :         make_some_layers(tline.as_ref(), Lsn(0x20)).await?;
    3827              : 
    3828            1 :         tenant
    3829            1 :             .branch_timeline_test(&tline, NEW_TIMELINE_ID, Some(Lsn(0x40)), &ctx)
    3830            0 :             .await?;
    3831            1 :         let newtline = tenant
    3832            1 :             .get_timeline(NEW_TIMELINE_ID, true)
    3833            1 :             .expect("Should have a local timeline");
    3834            1 : 
    3835            2 :         make_some_layers(newtline.as_ref(), Lsn(0x60)).await?;
    3836              : 
    3837            1 :         tline.set_broken("test".to_owned());
    3838            1 : 
    3839            1 :         tenant
    3840            1 :             .gc_iteration(Some(TIMELINE_ID), 0x10, Duration::ZERO, &ctx)
    3841            0 :             .await?;
    3842              : 
    3843              :         // The branchpoints should contain all timelines, even ones marked
    3844              :         // as Broken.
    3845              :         {
    3846            1 :             let branchpoints = &tline.gc_info.read().unwrap().retain_lsns;
    3847            1 :             assert_eq!(branchpoints.len(), 1);
    3848            1 :             assert_eq!(branchpoints[0], Lsn(0x40));
    3849              :         }
    3850              : 
    3851              :         // You can read the key from the child branch even though the parent is
    3852              :         // Broken, as long as you don't need to access data from the parent.
    3853            1 :         assert_eq!(
    3854            1 :             newtline.get(*TEST_KEY, Lsn(0x70), &ctx).await?,
    3855            1 :             TEST_IMG(&format!("foo at {}", Lsn(0x70)))
    3856              :         );
    3857              : 
    3858              :         // This needs to traverse to the parent, and fails.
    3859            1 :         let err = newtline.get(*TEST_KEY, Lsn(0x50), &ctx).await.unwrap_err();
    3860            1 :         assert!(err
    3861            1 :             .to_string()
    3862            1 :             .contains("will not become active. Current state: Broken"));
    3863              : 
    3864            1 :         Ok(())
    3865              :     }
    3866              : 
    3867            1 :     #[tokio::test]
    3868            1 :     async fn test_retain_data_in_parent_which_is_needed_for_child() -> anyhow::Result<()> {
    3869            1 :         let (tenant, ctx) =
    3870            1 :             TenantHarness::create("test_retain_data_in_parent_which_is_needed_for_child")?
    3871            1 :                 .load()
    3872            1 :                 .await;
    3873            1 :         let tline = tenant
    3874            1 :             .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
    3875            2 :             .await?;
    3876            2 :         make_some_layers(tline.as_ref(), Lsn(0x20)).await?;
    3877              : 
    3878            1 :         tenant
    3879            1 :             .branch_timeline_test(&tline, NEW_TIMELINE_ID, Some(Lsn(0x40)), &ctx)
    3880            0 :             .await?;
    3881            1 :         let newtline = tenant
    3882            1 :             .get_timeline(NEW_TIMELINE_ID, true)
    3883            1 :             .expect("Should have a local timeline");
    3884            1 :         // this removes layers before lsn 40 (50 minus 10), so there are two remaining layers, image and delta for 31-50
    3885            1 :         tenant
    3886            1 :             .gc_iteration(Some(TIMELINE_ID), 0x10, Duration::ZERO, &ctx)
    3887            1 :             .await?;
    3888            1 :         assert!(newtline.get(*TEST_KEY, Lsn(0x25), &ctx).await.is_ok());
    3889              : 
    3890            1 :         Ok(())
    3891              :     }
    3892            1 :     #[tokio::test]
    3893            1 :     async fn test_parent_keeps_data_forever_after_branching() -> anyhow::Result<()> {
    3894            1 :         let (tenant, ctx) =
    3895            1 :             TenantHarness::create("test_parent_keeps_data_forever_after_branching")?
    3896            1 :                 .load()
    3897            1 :                 .await;
    3898            1 :         let tline = tenant
    3899            1 :             .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
    3900            2 :             .await?;
    3901            2 :         make_some_layers(tline.as_ref(), Lsn(0x20)).await?;
    3902              : 
    3903            1 :         tenant
    3904            1 :             .branch_timeline_test(&tline, NEW_TIMELINE_ID, Some(Lsn(0x40)), &ctx)
    3905            0 :             .await?;
    3906            1 :         let newtline = tenant
    3907            1 :             .get_timeline(NEW_TIMELINE_ID, true)
    3908            1 :             .expect("Should have a local timeline");
    3909            1 : 
    3910            2 :         make_some_layers(newtline.as_ref(), Lsn(0x60)).await?;
    3911              : 
    3912              :         // run gc on parent
    3913            1 :         tenant
    3914            1 :             .gc_iteration(Some(TIMELINE_ID), 0x10, Duration::ZERO, &ctx)
    3915            0 :             .await?;
    3916              : 
    3917              :         // Check that the data is still accessible on the branch.
    3918            1 :         assert_eq!(
    3919            1 :             newtline.get(*TEST_KEY, Lsn(0x50), &ctx).await?,
    3920            1 :             TEST_IMG(&format!("foo at {}", Lsn(0x40)))
    3921              :         );
    3922              : 
    3923            1 :         Ok(())
    3924              :     }
    3925              : 
    3926            1 :     #[tokio::test]
    3927            1 :     async fn timeline_load() -> anyhow::Result<()> {
    3928              :         const TEST_NAME: &str = "timeline_load";
    3929            1 :         let harness = TenantHarness::create(TEST_NAME)?;
    3930              :         {
    3931            1 :             let (tenant, ctx) = harness.load().await;
    3932            1 :             let tline = tenant
    3933            1 :                 .create_test_timeline(TIMELINE_ID, Lsn(0x7000), DEFAULT_PG_VERSION, &ctx)
    3934            2 :                 .await?;
    3935            2 :             make_some_layers(tline.as_ref(), Lsn(0x8000)).await?;
    3936              :             // so that all uploads finish & we can call harness.load() below again
    3937            1 :             tenant
    3938            1 :                 .shutdown(Default::default(), true)
    3939            1 :                 .instrument(info_span!("test_shutdown", tenant_id=%tenant.tenant_id))
    3940            2 :                 .await
    3941            1 :                 .ok()
    3942            1 :                 .unwrap();
    3943              :         }
    3944              : 
    3945            5 :         let (tenant, _ctx) = harness.load().await;
    3946            1 :         tenant
    3947            1 :             .get_timeline(TIMELINE_ID, true)
    3948            1 :             .expect("cannot load timeline");
    3949            1 : 
    3950            1 :         Ok(())
    3951              :     }
    3952              : 
    3953            1 :     #[tokio::test]
    3954            1 :     async fn timeline_load_with_ancestor() -> anyhow::Result<()> {
    3955              :         const TEST_NAME: &str = "timeline_load_with_ancestor";
    3956            1 :         let harness = TenantHarness::create(TEST_NAME)?;
    3957              :         // create two timelines
    3958              :         {
    3959            1 :             let (tenant, ctx) = harness.load().await;
    3960            1 :             let tline = tenant
    3961            1 :                 .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
    3962            2 :                 .await?;
    3963              : 
    3964            2 :             make_some_layers(tline.as_ref(), Lsn(0x20)).await?;
    3965              : 
    3966            1 :             let child_tline = tenant
    3967            1 :                 .branch_timeline_test(&tline, NEW_TIMELINE_ID, Some(Lsn(0x40)), &ctx)
    3968            0 :                 .await?;
    3969            1 :             child_tline.set_state(TimelineState::Active);
    3970            1 : 
    3971            1 :             let newtline = tenant
    3972            1 :                 .get_timeline(NEW_TIMELINE_ID, true)
    3973            1 :                 .expect("Should have a local timeline");
    3974            1 : 
    3975            2 :             make_some_layers(newtline.as_ref(), Lsn(0x60)).await?;
    3976              : 
    3977              :             // so that all uploads finish & we can call harness.load() below again
    3978            1 :             tenant
    3979            1 :                 .shutdown(Default::default(), true)
    3980            1 :                 .instrument(info_span!("test_shutdown", tenant_id=%tenant.tenant_id))
    3981            4 :                 .await
    3982            1 :                 .ok()
    3983            1 :                 .unwrap();
    3984              :         }
    3985              : 
    3986              :         // check that both of them are initially unloaded
    3987            9 :         let (tenant, _ctx) = harness.load().await;
    3988              : 
    3989              :         // check that both, child and ancestor are loaded
    3990            1 :         let _child_tline = tenant
    3991            1 :             .get_timeline(NEW_TIMELINE_ID, true)
    3992            1 :             .expect("cannot get child timeline loaded");
    3993            1 : 
    3994            1 :         let _ancestor_tline = tenant
    3995            1 :             .get_timeline(TIMELINE_ID, true)
    3996            1 :             .expect("cannot get ancestor timeline loaded");
    3997            1 : 
    3998            1 :         Ok(())
    3999              :     }
    4000              : 
    4001            1 :     #[tokio::test]
    4002            1 :     async fn delta_layer_dumping() -> anyhow::Result<()> {
    4003            1 :         let (tenant, ctx) = TenantHarness::create("test_layer_dumping")?.load().await;
    4004            1 :         let tline = tenant
    4005            1 :             .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
    4006            2 :             .await?;
    4007            2 :         make_some_layers(tline.as_ref(), Lsn(0x20)).await?;
    4008              : 
    4009            1 :         let layer_map = tline.layers.read().await;
    4010            1 :         let level0_deltas = layer_map.layer_map().get_level0_deltas()?;
    4011              : 
    4012            1 :         assert!(!level0_deltas.is_empty());
    4013              : 
    4014            3 :         for delta in level0_deltas {
    4015            2 :             let delta = layer_map.get_from_desc(&delta);
    4016            2 :             // Ensure we are dumping a delta layer here
    4017            2 :             let delta = delta.downcast_delta_layer().unwrap();
    4018            2 : 
    4019            2 :             delta.dump(false, &ctx).await.unwrap();
    4020            2 :             delta.dump(true, &ctx).await.unwrap();
    4021              :         }
    4022              : 
    4023            1 :         Ok(())
    4024              :     }
    4025              : 
    4026            1 :     #[tokio::test]
    4027            1 :     async fn corrupt_metadata() -> anyhow::Result<()> {
    4028              :         const TEST_NAME: &str = "corrupt_metadata";
    4029            1 :         let harness = TenantHarness::create(TEST_NAME)?;
    4030            1 :         let (tenant, ctx) = harness.load().await;
    4031              : 
    4032            1 :         let tline = tenant
    4033            1 :             .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
    4034            2 :             .await?;
    4035            1 :         drop(tline);
    4036            1 :         // so that all uploads finish & we can call harness.try_load() below again
    4037            1 :         tenant
    4038            1 :             .shutdown(Default::default(), true)
    4039            1 :             .instrument(info_span!("test_shutdown", tenant_id=%tenant.tenant_id))
    4040            2 :             .await
    4041            1 :             .ok()
    4042            1 :             .unwrap();
    4043            1 :         drop(tenant);
    4044            1 : 
    4045            1 :         let metadata_path = harness.timeline_path(&TIMELINE_ID).join(METADATA_FILE_NAME);
    4046            1 : 
    4047            1 :         assert!(metadata_path.is_file());
    4048              : 
    4049            1 :         let mut metadata_bytes = std::fs::read(&metadata_path)?;
    4050            1 :         assert_eq!(metadata_bytes.len(), 512);
    4051            1 :         metadata_bytes[8] ^= 1;
    4052            1 :         std::fs::write(metadata_path, metadata_bytes)?;
    4053              : 
    4054            1 :         let err = harness.try_load(&ctx).await.err().expect("should fail");
    4055            1 :         // get all the stack with all .context, not only the last one
    4056            1 :         let message = format!("{err:#}");
    4057            1 :         let expected = "failed to load metadata";
    4058            1 :         assert!(
    4059            1 :             message.contains(expected),
    4060            0 :             "message '{message}' expected to contain {expected}"
    4061              :         );
    4062              : 
    4063            1 :         let mut found_error_message = false;
    4064            1 :         let mut err_source = err.source();
    4065            1 :         while let Some(source) = err_source {
    4066            1 :             if source.to_string().contains("metadata checksum mismatch") {
    4067            1 :                 found_error_message = true;
    4068            1 :                 break;
    4069            0 :             }
    4070            0 :             err_source = source.source();
    4071              :         }
    4072            1 :         assert!(
    4073            1 :             found_error_message,
    4074            0 :             "didn't find the corrupted metadata error in {}",
    4075              :             message
    4076              :         );
    4077              : 
    4078            1 :         Ok(())
    4079              :     }
    4080              : 
    4081            1 :     #[tokio::test]
    4082            1 :     async fn test_images() -> anyhow::Result<()> {
    4083            1 :         let (tenant, ctx) = TenantHarness::create("test_images")?.load().await;
    4084            1 :         let tline = tenant
    4085            1 :             .create_test_timeline(TIMELINE_ID, Lsn(0x08), DEFAULT_PG_VERSION, &ctx)
    4086            2 :             .await?;
    4087              : 
    4088            1 :         let writer = tline.writer().await;
    4089            1 :         writer
    4090            1 :             .put(*TEST_KEY, Lsn(0x10), &Value::Image(TEST_IMG("foo at 0x10")))
    4091            0 :             .await?;
    4092            1 :         writer.finish_write(Lsn(0x10));
    4093            1 :         drop(writer);
    4094            1 : 
    4095            1 :         tline.freeze_and_flush().await?;
    4096            1 :         tline.compact(&CancellationToken::new(), &ctx).await?;
    4097              : 
    4098            1 :         let writer = tline.writer().await;
    4099            1 :         writer
    4100            1 :             .put(*TEST_KEY, Lsn(0x20), &Value::Image(TEST_IMG("foo at 0x20")))
    4101            0 :             .await?;
    4102            1 :         writer.finish_write(Lsn(0x20));
    4103            1 :         drop(writer);
    4104            1 : 
    4105            1 :         tline.freeze_and_flush().await?;
    4106            1 :         tline.compact(&CancellationToken::new(), &ctx).await?;
    4107              : 
    4108            1 :         let writer = tline.writer().await;
    4109            1 :         writer
    4110            1 :             .put(*TEST_KEY, Lsn(0x30), &Value::Image(TEST_IMG("foo at 0x30")))
    4111            0 :             .await?;
    4112            1 :         writer.finish_write(Lsn(0x30));
    4113            1 :         drop(writer);
    4114            1 : 
    4115            1 :         tline.freeze_and_flush().await?;
    4116            1 :         tline.compact(&CancellationToken::new(), &ctx).await?;
    4117              : 
    4118            1 :         let writer = tline.writer().await;
    4119            1 :         writer
    4120            1 :             .put(*TEST_KEY, Lsn(0x40), &Value::Image(TEST_IMG("foo at 0x40")))
    4121            0 :             .await?;
    4122            1 :         writer.finish_write(Lsn(0x40));
    4123            1 :         drop(writer);
    4124            1 : 
    4125            1 :         tline.freeze_and_flush().await?;
    4126            1 :         tline.compact(&CancellationToken::new(), &ctx).await?;
    4127              : 
    4128            1 :         assert_eq!(
    4129            1 :             tline.get(*TEST_KEY, Lsn(0x10), &ctx).await?,
    4130            1 :             TEST_IMG("foo at 0x10")
    4131              :         );
    4132            1 :         assert_eq!(
    4133            1 :             tline.get(*TEST_KEY, Lsn(0x1f), &ctx).await?,
    4134            1 :             TEST_IMG("foo at 0x10")
    4135              :         );
    4136            1 :         assert_eq!(
    4137            1 :             tline.get(*TEST_KEY, Lsn(0x20), &ctx).await?,
    4138            1 :             TEST_IMG("foo at 0x20")
    4139              :         );
    4140            1 :         assert_eq!(
    4141            1 :             tline.get(*TEST_KEY, Lsn(0x30), &ctx).await?,
    4142            1 :             TEST_IMG("foo at 0x30")
    4143              :         );
    4144            1 :         assert_eq!(
    4145            1 :             tline.get(*TEST_KEY, Lsn(0x40), &ctx).await?,
    4146            1 :             TEST_IMG("foo at 0x40")
    4147              :         );
    4148              : 
    4149            1 :         Ok(())
    4150              :     }
    4151              : 
    4152              :     //
    4153              :     // Insert 1000 key-value pairs with increasing keys, flush, compact, GC.
    4154              :     // Repeat 50 times.
    4155              :     //
    4156            1 :     #[tokio::test]
    4157            1 :     async fn test_bulk_insert() -> anyhow::Result<()> {
    4158            1 :         let (tenant, ctx) = TenantHarness::create("test_bulk_insert")?.load().await;
    4159            1 :         let tline = tenant
    4160            1 :             .create_test_timeline(TIMELINE_ID, Lsn(0x08), DEFAULT_PG_VERSION, &ctx)
    4161            2 :             .await?;
    4162              : 
    4163            1 :         let mut lsn = Lsn(0x10);
    4164            1 : 
    4165            1 :         let mut keyspace = KeySpaceAccum::new();
    4166            1 : 
    4167            1 :         let mut test_key = Key::from_hex("012222222233333333444444445500000000").unwrap();
    4168            1 :         let mut blknum = 0;
    4169           51 :         for _ in 0..50 {
    4170       500050 :             for _ in 0..10000 {
    4171       500000 :                 test_key.field6 = blknum;
    4172       500000 :                 let writer = tline.writer().await;
    4173       500000 :                 writer
    4174       500000 :                     .put(
    4175       500000 :                         test_key,
    4176       500000 :                         lsn,
    4177       500000 :                         &Value::Image(TEST_IMG(&format!("{} at {}", blknum, lsn))),
    4178       500000 :                     )
    4179         7800 :                     .await?;
    4180       500000 :                 writer.finish_write(lsn);
    4181       500000 :                 drop(writer);
    4182       500000 : 
    4183       500000 :                 keyspace.add_key(test_key);
    4184       500000 : 
    4185       500000 :                 lsn = Lsn(lsn.0 + 0x10);
    4186       500000 :                 blknum += 1;
    4187              :             }
    4188              : 
    4189           50 :             let cutoff = tline.get_last_record_lsn();
    4190           50 : 
    4191           50 :             tline
    4192           50 :                 .update_gc_info(Vec::new(), cutoff, Duration::ZERO, &ctx)
    4193            0 :                 .await?;
    4194           50 :             tline.freeze_and_flush().await?;
    4195         4005 :             tline.compact(&CancellationToken::new(), &ctx).await?;
    4196           50 :             tline.gc().await?;
    4197              :         }
    4198              : 
    4199            1 :         Ok(())
    4200              :     }
    4201              : 
    4202            1 :     #[tokio::test]
    4203            1 :     async fn test_random_updates() -> anyhow::Result<()> {
    4204            1 :         let (tenant, ctx) = TenantHarness::create("test_random_updates")?.load().await;
    4205            1 :         let tline = tenant
    4206            1 :             .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
    4207            2 :             .await?;
    4208              : 
    4209              :         const NUM_KEYS: usize = 1000;
    4210              : 
    4211            1 :         let mut test_key = Key::from_hex("012222222233333333444444445500000000").unwrap();
    4212            1 : 
    4213            1 :         let mut keyspace = KeySpaceAccum::new();
    4214            1 : 
    4215            1 :         // Track when each page was last modified. Used to assert that
    4216            1 :         // a read sees the latest page version.
    4217            1 :         let mut updated = [Lsn(0); NUM_KEYS];
    4218            1 : 
    4219            1 :         let mut lsn = Lsn(0x10);
    4220              :         #[allow(clippy::needless_range_loop)]
    4221         1001 :         for blknum in 0..NUM_KEYS {
    4222         1000 :             lsn = Lsn(lsn.0 + 0x10);
    4223         1000 :             test_key.field6 = blknum as u32;
    4224         1000 :             let writer = tline.writer().await;
    4225         1000 :             writer
    4226         1000 :                 .put(
    4227         1000 :                     test_key,
    4228         1000 :                     lsn,
    4229         1000 :                     &Value::Image(TEST_IMG(&format!("{} at {}", blknum, lsn))),
    4230         1000 :                 )
    4231           16 :                 .await?;
    4232         1000 :             writer.finish_write(lsn);
    4233         1000 :             updated[blknum] = lsn;
    4234         1000 :             drop(writer);
    4235         1000 : 
    4236         1000 :             keyspace.add_key(test_key);
    4237              :         }
    4238              : 
    4239           51 :         for _ in 0..50 {
    4240        50050 :             for _ in 0..NUM_KEYS {
    4241        50000 :                 lsn = Lsn(lsn.0 + 0x10);
    4242        50000 :                 let blknum = thread_rng().gen_range(0..NUM_KEYS);
    4243        50000 :                 test_key.field6 = blknum as u32;
    4244        50000 :                 let writer = tline.writer().await;
    4245        50000 :                 writer
    4246        50000 :                     .put(
    4247        50000 :                         test_key,
    4248        50000 :                         lsn,
    4249        50000 :                         &Value::Image(TEST_IMG(&format!("{} at {}", blknum, lsn))),
    4250        50000 :                     )
    4251          750 :                     .await?;
    4252        50000 :                 writer.finish_write(lsn);
    4253        50000 :                 drop(writer);
    4254        50000 :                 updated[blknum] = lsn;
    4255              :             }
    4256              : 
    4257              :             // Read all the blocks
    4258        50000 :             for (blknum, last_lsn) in updated.iter().enumerate() {
    4259        50000 :                 test_key.field6 = blknum as u32;
    4260        50000 :                 assert_eq!(
    4261        50000 :                     tline.get(test_key, lsn, &ctx).await?,
    4262        50000 :                     TEST_IMG(&format!("{} at {}", blknum, last_lsn))
    4263              :                 );
    4264              :             }
    4265              : 
    4266              :             // Perform a cycle of flush, compact, and GC
    4267           50 :             let cutoff = tline.get_last_record_lsn();
    4268           50 :             tline
    4269           50 :                 .update_gc_info(Vec::new(), cutoff, Duration::ZERO, &ctx)
    4270            0 :                 .await?;
    4271           52 :             tline.freeze_and_flush().await?;
    4272          453 :             tline.compact(&CancellationToken::new(), &ctx).await?;
    4273           50 :             tline.gc().await?;
    4274              :         }
    4275              : 
    4276            1 :         Ok(())
    4277              :     }
    4278              : 
    4279            1 :     #[tokio::test]
    4280            1 :     async fn test_traverse_branches() -> anyhow::Result<()> {
    4281            1 :         let (tenant, ctx) = TenantHarness::create("test_traverse_branches")?
    4282            1 :             .load()
    4283            1 :             .await;
    4284            1 :         let mut tline = tenant
    4285            1 :             .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
    4286            2 :             .await?;
    4287              : 
    4288              :         const NUM_KEYS: usize = 1000;
    4289              : 
    4290            1 :         let mut test_key = Key::from_hex("012222222233333333444444445500000000").unwrap();
    4291            1 : 
    4292            1 :         let mut keyspace = KeySpaceAccum::new();
    4293            1 : 
    4294            1 :         // Track when each page was last modified. Used to assert that
    4295            1 :         // a read sees the latest page version.
    4296            1 :         let mut updated = [Lsn(0); NUM_KEYS];
    4297            1 : 
    4298            1 :         let mut lsn = Lsn(0x10);
    4299              :         #[allow(clippy::needless_range_loop)]
    4300         1001 :         for blknum in 0..NUM_KEYS {
    4301         1000 :             lsn = Lsn(lsn.0 + 0x10);
    4302         1000 :             test_key.field6 = blknum as u32;
    4303         1000 :             let writer = tline.writer().await;
    4304         1000 :             writer
    4305         1000 :                 .put(
    4306         1000 :                     test_key,
    4307         1000 :                     lsn,
    4308         1000 :                     &Value::Image(TEST_IMG(&format!("{} at {}", blknum, lsn))),
    4309         1000 :                 )
    4310           16 :                 .await?;
    4311         1000 :             writer.finish_write(lsn);
    4312         1000 :             updated[blknum] = lsn;
    4313         1000 :             drop(writer);
    4314         1000 : 
    4315         1000 :             keyspace.add_key(test_key);
    4316              :         }
    4317              : 
    4318           51 :         for _ in 0..50 {
    4319           50 :             let new_tline_id = TimelineId::generate();
    4320           50 :             tenant
    4321           50 :                 .branch_timeline_test(&tline, new_tline_id, Some(lsn), &ctx)
    4322            0 :                 .await?;
    4323           50 :             tline = tenant
    4324           50 :                 .get_timeline(new_tline_id, true)
    4325           50 :                 .expect("Should have the branched timeline");
    4326              : 
    4327        50050 :             for _ in 0..NUM_KEYS {
    4328        50000 :                 lsn = Lsn(lsn.0 + 0x10);
    4329        50000 :                 let blknum = thread_rng().gen_range(0..NUM_KEYS);
    4330        50000 :                 test_key.field6 = blknum as u32;
    4331        50000 :                 let writer = tline.writer().await;
    4332        50000 :                 writer
    4333        50000 :                     .put(
    4334        50000 :                         test_key,
    4335        50000 :                         lsn,
    4336        50000 :                         &Value::Image(TEST_IMG(&format!("{} at {}", blknum, lsn))),
    4337        50000 :                     )
    4338          751 :                     .await?;
    4339        50000 :                 println!("updating {} at {}", blknum, lsn);
    4340        50000 :                 writer.finish_write(lsn);
    4341        50000 :                 drop(writer);
    4342        50000 :                 updated[blknum] = lsn;
    4343              :             }
    4344              : 
    4345              :             // Read all the blocks
    4346        50000 :             for (blknum, last_lsn) in updated.iter().enumerate() {
    4347        50000 :                 test_key.field6 = blknum as u32;
    4348        50000 :                 assert_eq!(
    4349        50000 :                     tline.get(test_key, lsn, &ctx).await?,
    4350        50000 :                     TEST_IMG(&format!("{} at {}", blknum, last_lsn))
    4351              :                 );
    4352              :             }
    4353              : 
    4354              :             // Perform a cycle of flush, compact, and GC
    4355           50 :             let cutoff = tline.get_last_record_lsn();
    4356           50 :             tline
    4357           50 :                 .update_gc_info(Vec::new(), cutoff, Duration::ZERO, &ctx)
    4358            0 :                 .await?;
    4359           51 :             tline.freeze_and_flush().await?;
    4360          147 :             tline.compact(&CancellationToken::new(), &ctx).await?;
    4361           50 :             tline.gc().await?;
    4362              :         }
    4363              : 
    4364            1 :         Ok(())
    4365              :     }
    4366              : 
    4367            1 :     #[tokio::test]
    4368            1 :     async fn test_traverse_ancestors() -> anyhow::Result<()> {
    4369            1 :         let (tenant, ctx) = TenantHarness::create("test_traverse_ancestors")?
    4370            1 :             .load()
    4371            1 :             .await;
    4372            1 :         let mut tline = tenant
    4373            1 :             .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
    4374            2 :             .await?;
    4375              : 
    4376              :         const NUM_KEYS: usize = 100;
    4377              :         const NUM_TLINES: usize = 50;
    4378              : 
    4379            1 :         let mut test_key = Key::from_hex("012222222233333333444444445500000000").unwrap();
    4380            1 :         // Track page mutation lsns across different timelines.
    4381            1 :         let mut updated = [[Lsn(0); NUM_KEYS]; NUM_TLINES];
    4382            1 : 
    4383            1 :         let mut lsn = Lsn(0x10);
    4384              : 
    4385              :         #[allow(clippy::needless_range_loop)]
    4386           51 :         for idx in 0..NUM_TLINES {
    4387           50 :             let new_tline_id = TimelineId::generate();
    4388           50 :             tenant
    4389           50 :                 .branch_timeline_test(&tline, new_tline_id, Some(lsn), &ctx)
    4390            0 :                 .await?;
    4391           50 :             tline = tenant
    4392           50 :                 .get_timeline(new_tline_id, true)
    4393           50 :                 .expect("Should have the branched timeline");
    4394              : 
    4395         5050 :             for _ in 0..NUM_KEYS {
    4396         5000 :                 lsn = Lsn(lsn.0 + 0x10);
    4397         5000 :                 let blknum = thread_rng().gen_range(0..NUM_KEYS);
    4398         5000 :                 test_key.field6 = blknum as u32;
    4399         5000 :                 let writer = tline.writer().await;
    4400         5000 :                 writer
    4401         5000 :                     .put(
    4402         5000 :                         test_key,
    4403         5000 :                         lsn,
    4404         5000 :                         &Value::Image(TEST_IMG(&format!("{} {} at {}", idx, blknum, lsn))),
    4405         5000 :                     )
    4406           78 :                     .await?;
    4407         5000 :                 println!("updating [{}][{}] at {}", idx, blknum, lsn);
    4408         5000 :                 writer.finish_write(lsn);
    4409         5000 :                 drop(writer);
    4410         5000 :                 updated[idx][blknum] = lsn;
    4411              :             }
    4412              :         }
    4413              : 
    4414              :         // Read pages from leaf timeline across all ancestors.
    4415           50 :         for (idx, lsns) in updated.iter().enumerate() {
    4416         5000 :             for (blknum, lsn) in lsns.iter().enumerate() {
    4417              :                 // Skip empty mutations.
    4418         5000 :                 if lsn.0 == 0 {
    4419         1797 :                     continue;
    4420         3203 :                 }
    4421         3203 :                 println!("checking [{idx}][{blknum}] at {lsn}");
    4422         3203 :                 test_key.field6 = blknum as u32;
    4423         3203 :                 assert_eq!(
    4424         3203 :                     tline.get(test_key, *lsn, &ctx).await?,
    4425         3203 :                     TEST_IMG(&format!("{idx} {blknum} at {lsn}"))
    4426              :                 );
    4427              :             }
    4428              :         }
    4429            1 :         Ok(())
    4430              :     }
    4431              : 
    4432            1 :     #[tokio::test]
    4433            1 :     async fn test_write_at_initdb_lsn_takes_optimization_code_path() -> anyhow::Result<()> {
    4434            1 :         let (tenant, ctx) = TenantHarness::create("test_empty_test_timeline_is_usable")?
    4435            1 :             .load()
    4436            1 :             .await;
    4437              : 
    4438            1 :         let initdb_lsn = Lsn(0x20);
    4439            1 :         let utline = tenant
    4440            1 :             .create_empty_timeline(TIMELINE_ID, initdb_lsn, DEFAULT_PG_VERSION, &ctx)
    4441            0 :             .await?;
    4442            1 :         let tline = utline.raw_timeline().unwrap();
    4443            1 : 
    4444            1 :         // Spawn flush loop now so that we can set the `expect_initdb_optimization`
    4445            1 :         tline.maybe_spawn_flush_loop();
    4446            1 : 
    4447            1 :         // Make sure the timeline has the minimum set of required keys for operation.
    4448            1 :         // The only operation you can always do on an empty timeline is to `put` new data.
    4449            1 :         // Except if you `put` at `initdb_lsn`.
    4450            1 :         // In that case, there's an optimization to directly create image layers instead of delta layers.
    4451            1 :         // It uses `repartition()`, which assumes some keys to be present.
    4452            1 :         // Let's make sure the test timeline can handle that case.
    4453            1 :         {
    4454            1 :             let mut state = tline.flush_loop_state.lock().unwrap();
    4455            1 :             assert_eq!(
    4456            1 :                 timeline::FlushLoopState::Running {
    4457            1 :                     expect_initdb_optimization: false,
    4458            1 :                     initdb_optimization_count: 0,
    4459            1 :                 },
    4460            1 :                 *state
    4461            1 :             );
    4462            1 :             *state = timeline::FlushLoopState::Running {
    4463            1 :                 expect_initdb_optimization: true,
    4464            1 :                 initdb_optimization_count: 0,
    4465            1 :             };
    4466            1 :         }
    4467            1 : 
    4468            1 :         // Make writes at the initdb_lsn. When we flush it below, it should be handled by the optimization.
    4469            1 :         // As explained above, the optimization requires some keys to be present.
    4470            1 :         // As per `create_empty_timeline` documentation, use init_empty to set them.
    4471            1 :         // This is what `create_test_timeline` does, by the way.
    4472            1 :         let mut modification = tline.begin_modification(initdb_lsn);
    4473            1 :         modification
    4474            1 :             .init_empty_test_timeline()
    4475            1 :             .context("init_empty_test_timeline")?;
    4476            1 :         modification
    4477            1 :             .commit()
    4478            0 :             .await
    4479            1 :             .context("commit init_empty_test_timeline modification")?;
    4480              : 
    4481              :         // Do the flush. The flush code will check the expectations that we set above.
    4482            1 :         tline.freeze_and_flush().await?;
    4483              : 
    4484              :         // assert freeze_and_flush exercised the initdb optimization
    4485              :         {
    4486            1 :             let state = tline.flush_loop_state.lock().unwrap();
    4487              :             let timeline::FlushLoopState::Running {
    4488            1 :                 expect_initdb_optimization,
    4489            1 :                 initdb_optimization_count,
    4490            1 :             } = *state
    4491              :             else {
    4492            0 :                 panic!("unexpected state: {:?}", *state);
    4493              :             };
    4494            1 :             assert!(expect_initdb_optimization);
    4495            1 :             assert!(initdb_optimization_count > 0);
    4496              :         }
    4497            1 :         Ok(())
    4498              :     }
    4499              : 
    4500            1 :     #[tokio::test]
    4501            1 :     async fn test_uninit_mark_crash() -> anyhow::Result<()> {
    4502            1 :         let name = "test_uninit_mark_crash";
    4503            1 :         let harness = TenantHarness::create(name)?;
    4504              :         {
    4505            1 :             let (tenant, ctx) = harness.load().await;
    4506            1 :             let tline = tenant
    4507            1 :                 .create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)
    4508            0 :                 .await?;
    4509              :             // Keeps uninit mark in place
    4510            1 :             let raw_tline = tline.raw_timeline().unwrap();
    4511            1 :             raw_tline
    4512            1 :                 .shutdown(false)
    4513            1 :                 .instrument(info_span!("test_shutdown", tenant_id=%raw_tline.tenant_id))
    4514            0 :                 .await;
    4515            1 :             std::mem::forget(tline);
    4516              :         }
    4517              : 
    4518            1 :         let (tenant, _) = harness.load().await;
    4519            1 :         match tenant.get_timeline(TIMELINE_ID, false) {
    4520            0 :             Ok(_) => panic!("timeline should've been removed during load"),
    4521            1 :             Err(e) => {
    4522            1 :                 assert_eq!(
    4523            1 :                     e,
    4524            1 :                     GetTimelineError::NotFound {
    4525            1 :                         tenant_id: tenant.tenant_id,
    4526            1 :                         timeline_id: TIMELINE_ID,
    4527            1 :                     }
    4528            1 :                 )
    4529              :             }
    4530              :         }
    4531              : 
    4532            1 :         assert!(!harness
    4533            1 :             .conf
    4534            1 :             .timeline_path(&tenant.tenant_id, &TIMELINE_ID)
    4535            1 :             .exists());
    4536              : 
    4537            1 :         assert!(!harness
    4538            1 :             .conf
    4539            1 :             .timeline_uninit_mark_file_path(tenant.tenant_id, TIMELINE_ID)
    4540            1 :             .exists());
    4541              : 
    4542            1 :         Ok(())
    4543              :     }
    4544              : }
        

Generated by: LCOV version 2.1-beta