LCOV - code coverage report
Current view: top level - pageserver/src - tenant.rs (source / functions) Coverage Total Hit
Test: 32f4a56327bc9da697706839ed4836b2a00a408f.info Lines: 86.8 % 3298 2863
Test Date: 2024-02-07 07:37:29 Functions: 72.2 % 378 273

            Line data    Source code
       1              : //!
       2              : //! Timeline repository implementation that keeps old data in files on disk, and
       3              : //! the recent changes in memory. See tenant/*_layer.rs files.
       4              : //! The functions here are responsible for locating the correct layer for the
       5              : //! get/put call, walking back the timeline branching history as needed.
       6              : //!
       7              : //! The files are stored in the .neon/tenants/<tenant_id>/timelines/<timeline_id>
       8              : //! directory. See docs/pageserver-storage.md for how the files are managed.
       9              : //! In addition to the layer files, there is a metadata file in the same
      10              : //! directory that contains information about the timeline, in particular its
      11              : //! parent timeline, and the last LSN that has been written to disk.
      12              : //!
      13              : 
      14              : use anyhow::{bail, Context};
      15              : use camino::Utf8Path;
      16              : use camino::Utf8PathBuf;
      17              : use enumset::EnumSet;
      18              : use futures::stream::FuturesUnordered;
      19              : use futures::FutureExt;
      20              : use futures::StreamExt;
      21              : use pageserver_api::models;
      22              : use pageserver_api::models::TimelineState;
      23              : use pageserver_api::models::WalRedoManagerStatus;
      24              : use pageserver_api::shard::ShardIdentity;
      25              : use pageserver_api::shard::TenantShardId;
      26              : use remote_storage::DownloadError;
      27              : use remote_storage::GenericRemoteStorage;
      28              : use std::fmt;
      29              : use storage_broker::BrokerClientChannel;
      30              : use tokio::io::BufReader;
      31              : use tokio::runtime::Handle;
      32              : use tokio::sync::watch;
      33              : use tokio::task::JoinSet;
      34              : use tokio_util::sync::CancellationToken;
      35              : use tracing::*;
      36              : use utils::backoff;
      37              : use utils::completion;
      38              : use utils::crashsafe::path_with_suffix_extension;
      39              : use utils::failpoint_support;
      40              : use utils::fs_ext;
      41              : use utils::sync::gate::Gate;
      42              : use utils::sync::gate::GateGuard;
      43              : use utils::timeout::timeout_cancellable;
      44              : use utils::timeout::TimeoutCancellableError;
      45              : 
      46              : use self::config::AttachedLocationConfig;
      47              : use self::config::AttachmentMode;
      48              : use self::config::LocationConf;
      49              : use self::config::TenantConf;
      50              : use self::delete::DeleteTenantFlow;
      51              : use self::metadata::LoadMetadataError;
      52              : use self::metadata::TimelineMetadata;
      53              : use self::mgr::GetActiveTenantError;
      54              : use self::mgr::GetTenantError;
      55              : use self::mgr::TenantsMap;
      56              : use self::remote_timeline_client::RemoteTimelineClient;
      57              : use self::timeline::uninit::TimelineExclusionError;
      58              : use self::timeline::uninit::TimelineUninitMark;
      59              : use self::timeline::uninit::UninitializedTimeline;
      60              : use self::timeline::EvictionTaskTenantState;
      61              : use self::timeline::TimelineResources;
      62              : use self::timeline::WaitLsnError;
      63              : use crate::config::PageServerConf;
      64              : use crate::context::{DownloadBehavior, RequestContext};
      65              : use crate::deletion_queue::DeletionQueueClient;
      66              : use crate::deletion_queue::DeletionQueueError;
      67              : use crate::import_datadir;
      68              : use crate::is_uninit_mark;
      69              : use crate::metrics::TENANT;
      70              : use crate::metrics::{
      71              :     remove_tenant_metrics, BROKEN_TENANTS_SET, TENANT_STATE_METRIC, TENANT_SYNTHETIC_SIZE_METRIC,
      72              : };
      73              : use crate::repository::GcResult;
      74              : use crate::task_mgr;
      75              : use crate::task_mgr::TaskKind;
      76              : use crate::tenant::config::LocationMode;
      77              : use crate::tenant::config::TenantConfOpt;
      78              : use crate::tenant::metadata::load_metadata;
      79              : pub use crate::tenant::remote_timeline_client::index::IndexPart;
      80              : use crate::tenant::remote_timeline_client::remote_initdb_archive_path;
      81              : use crate::tenant::remote_timeline_client::MaybeDeletedIndexPart;
      82              : use crate::tenant::remote_timeline_client::INITDB_PATH;
      83              : use crate::tenant::storage_layer::DeltaLayer;
      84              : use crate::tenant::storage_layer::ImageLayer;
      85              : use crate::InitializationOrder;
      86              : use std::cmp::min;
      87              : use std::collections::hash_map::Entry;
      88              : use std::collections::BTreeSet;
      89              : use std::collections::HashMap;
      90              : use std::collections::HashSet;
      91              : use std::fmt::Debug;
      92              : use std::fmt::Display;
      93              : use std::fs;
      94              : use std::fs::File;
      95              : use std::io;
      96              : use std::ops::Bound::Included;
      97              : use std::sync::atomic::AtomicU64;
      98              : use std::sync::atomic::Ordering;
      99              : use std::sync::Arc;
     100              : use std::sync::{Mutex, RwLock};
     101              : use std::time::{Duration, Instant};
     102              : 
     103              : use crate::span;
     104              : use crate::tenant::timeline::delete::DeleteTimelineFlow;
     105              : use crate::tenant::timeline::uninit::cleanup_timeline_directory;
     106              : use crate::virtual_file::VirtualFile;
     107              : use crate::walredo::PostgresRedoManager;
     108              : use crate::TEMP_FILE_SUFFIX;
     109              : use once_cell::sync::Lazy;
     110              : pub use pageserver_api::models::TenantState;
     111              : use tokio::sync::Semaphore;
     112              : 
     113          372 : static INIT_DB_SEMAPHORE: Lazy<Semaphore> = Lazy::new(|| Semaphore::new(8));
     114              : use toml_edit;
     115              : use utils::{
     116              :     crashsafe,
     117              :     generation::Generation,
     118              :     id::TimelineId,
     119              :     lsn::{Lsn, RecordLsn},
     120              : };
     121              : 
     122              : /// Declare a failpoint that can use the `pause` failpoint action.
     123              : /// We don't want to block the executor thread, hence, spawn_blocking + await.
     124              : macro_rules! pausable_failpoint {
     125              :     ($name:literal) => {
     126              :         if cfg!(feature = "testing") {
     127              :             tokio::task::spawn_blocking({
     128              :                 let current = tracing::Span::current();
     129              :                 move || {
     130              :                     let _entered = current.entered();
     131              :                     tracing::info!("at failpoint {}", $name);
     132              :                     fail::fail_point!($name);
     133              :                 }
     134              :             })
     135              :             .await
     136              :             .expect("spawn_blocking");
     137              :         }
     138              :     };
     139              :     ($name:literal, $cond:expr) => {
     140              :         if cfg!(feature = "testing") {
     141              :             if $cond {
     142              :                 pausable_failpoint!($name)
     143              :             }
     144              :         }
     145              :     };
     146              : }
     147              : 
     148              : pub mod blob_io;
     149              : pub mod block_io;
     150              : 
     151              : pub mod disk_btree;
     152              : pub(crate) mod ephemeral_file;
     153              : pub mod layer_map;
     154              : 
     155              : pub mod metadata;
     156              : mod par_fsync;
     157              : pub mod remote_timeline_client;
     158              : pub mod storage_layer;
     159              : 
     160              : pub mod config;
     161              : pub mod delete;
     162              : pub mod mgr;
     163              : pub mod secondary;
     164              : pub mod tasks;
     165              : pub mod upload_queue;
     166              : 
     167              : pub(crate) mod timeline;
     168              : 
     169              : pub mod size;
     170              : 
     171              : pub(crate) use crate::span::debug_assert_current_span_has_tenant_and_timeline_id;
     172              : pub(crate) use timeline::{LogicalSizeCalculationCause, PageReconstructError, Timeline};
     173              : 
     174              : // re-export for use in remote_timeline_client.rs
     175              : pub use crate::tenant::metadata::save_metadata;
     176              : 
     177              : // re-export for use in walreceiver
     178              : pub use crate::tenant::timeline::WalReceiverInfo;
     179              : 
     180              : /// The "tenants" part of `tenants/<tenant>/timelines...`
     181              : pub const TENANTS_SEGMENT_NAME: &str = "tenants";
     182              : 
     183              : /// Parts of the `.neon/tenants/<tenant_id>/timelines/<timeline_id>` directory prefix.
     184              : pub const TIMELINES_SEGMENT_NAME: &str = "timelines";
     185              : 
     186              : pub const TENANT_DELETED_MARKER_FILE_NAME: &str = "deleted";
     187              : 
     188              : /// References to shared objects that are passed into each tenant, such
     189              : /// as the shared remote storage client and process initialization state.
     190          855 : #[derive(Clone)]
     191              : pub struct TenantSharedResources {
     192              :     pub broker_client: storage_broker::BrokerClientChannel,
     193              :     pub remote_storage: Option<GenericRemoteStorage>,
     194              :     pub deletion_queue_client: DeletionQueueClient,
     195              : }
     196              : 
     197              : /// A [`Tenant`] is really an _attached_ tenant.  The configuration
     198              : /// for an attached tenant is a subset of the [`LocationConf`], represented
     199              : /// in this struct.
     200              : pub(super) struct AttachedTenantConf {
     201              :     tenant_conf: TenantConfOpt,
     202              :     location: AttachedLocationConfig,
     203              : }
     204              : 
     205              : impl AttachedTenantConf {
     206          970 :     fn try_from(location_conf: LocationConf) -> anyhow::Result<Self> {
     207          970 :         match &location_conf.mode {
     208          970 :             LocationMode::Attached(attach_conf) => Ok(Self {
     209          970 :                 tenant_conf: location_conf.tenant_conf,
     210          970 :                 location: *attach_conf,
     211          970 :             }),
     212              :             LocationMode::Secondary(_) => {
     213            0 :                 anyhow::bail!("Attempted to construct AttachedTenantConf from a LocationConf in secondary mode")
     214              :             }
     215              :         }
     216          970 :     }
     217              : }
     218              : struct TimelinePreload {
     219              :     timeline_id: TimelineId,
     220              :     client: RemoteTimelineClient,
     221              :     index_part: Result<MaybeDeletedIndexPart, DownloadError>,
     222              : }
     223              : 
     224              : pub(crate) struct TenantPreload {
     225              :     deleting: bool,
     226              :     timelines: HashMap<TimelineId, TimelinePreload>,
     227              : }
     228              : 
     229              : /// When we spawn a tenant, there is a special mode for tenant creation that
     230              : /// avoids trying to read anything from remote storage.
     231              : pub(crate) enum SpawnMode {
     232              :     Normal,
     233              :     Create,
     234              : }
     235              : 
     236              : ///
     237              : /// Tenant consists of multiple timelines. Keep them in a hash table.
     238              : ///
     239              : pub struct Tenant {
     240              :     // Global pageserver config parameters
     241              :     pub conf: &'static PageServerConf,
     242              : 
     243              :     /// The value creation timestamp, used to measure activation delay, see:
     244              :     /// <https://github.com/neondatabase/neon/issues/4025>
     245              :     constructed_at: Instant,
     246              : 
     247              :     state: watch::Sender<TenantState>,
     248              : 
     249              :     // Overridden tenant-specific config parameters.
     250              :     // We keep TenantConfOpt sturct here to preserve the information
     251              :     // about parameters that are not set.
     252              :     // This is necessary to allow global config updates.
     253              :     tenant_conf: Arc<RwLock<AttachedTenantConf>>,
     254              : 
     255              :     tenant_shard_id: TenantShardId,
     256              : 
     257              :     // The detailed sharding information, beyond the number/count in tenant_shard_id
     258              :     shard_identity: ShardIdentity,
     259              : 
     260              :     /// The remote storage generation, used to protect S3 objects from split-brain.
     261              :     /// Does not change over the lifetime of the [`Tenant`] object.
     262              :     ///
     263              :     /// This duplicates the generation stored in LocationConf, but that structure is mutable:
     264              :     /// this copy enforces the invariant that generatio doesn't change during a Tenant's lifetime.
     265              :     generation: Generation,
     266              : 
     267              :     timelines: Mutex<HashMap<TimelineId, Arc<Timeline>>>,
     268              : 
     269              :     /// During timeline creation, we first insert the TimelineId to the
     270              :     /// creating map, then `timelines`, then remove it from the creating map.
     271              :     /// **Lock order**: if acquring both, acquire`timelines` before `timelines_creating`
     272              :     timelines_creating: std::sync::Mutex<HashSet<TimelineId>>,
     273              : 
     274              :     // This mutex prevents creation of new timelines during GC.
     275              :     // Adding yet another mutex (in addition to `timelines`) is needed because holding
     276              :     // `timelines` mutex during all GC iteration
     277              :     // may block for a long time `get_timeline`, `get_timelines_state`,... and other operations
     278              :     // with timelines, which in turn may cause dropping replication connection, expiration of wait_for_lsn
     279              :     // timeout...
     280              :     gc_cs: tokio::sync::Mutex<()>,
     281              :     walredo_mgr: Option<Arc<WalRedoManager>>,
     282              : 
     283              :     // provides access to timeline data sitting in the remote storage
     284              :     pub(crate) remote_storage: Option<GenericRemoteStorage>,
     285              : 
     286              :     // Access to global deletion queue for when this tenant wants to schedule a deletion
     287              :     deletion_queue_client: DeletionQueueClient,
     288              : 
     289              :     /// Cached logical sizes updated updated on each [`Tenant::gather_size_inputs`].
     290              :     cached_logical_sizes: tokio::sync::Mutex<HashMap<(TimelineId, Lsn), u64>>,
     291              :     cached_synthetic_tenant_size: Arc<AtomicU64>,
     292              : 
     293              :     eviction_task_tenant_state: tokio::sync::Mutex<EvictionTaskTenantState>,
     294              : 
     295              :     /// If the tenant is in Activating state, notify this to encourage it
     296              :     /// to proceed to Active as soon as possible, rather than waiting for lazy
     297              :     /// background warmup.
     298              :     pub(crate) activate_now_sem: tokio::sync::Semaphore,
     299              : 
     300              :     pub(crate) delete_progress: Arc<tokio::sync::Mutex<DeleteTenantFlow>>,
     301              : 
     302              :     // Cancellation token fires when we have entered shutdown().  This is a parent of
     303              :     // Timelines' cancellation token.
     304              :     pub(crate) cancel: CancellationToken,
     305              : 
     306              :     // Users of the Tenant such as the page service must take this Gate to avoid
     307              :     // trying to use a Tenant which is shutting down.
     308              :     pub(crate) gate: Gate,
     309              : }
     310              : 
     311              : impl std::fmt::Debug for Tenant {
     312            2 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
     313            2 :         write!(f, "{} ({})", self.tenant_shard_id, self.current_state())
     314            2 :     }
     315              : }
     316              : 
     317              : pub(crate) enum WalRedoManager {
     318              :     Prod(PostgresRedoManager),
     319              :     #[cfg(test)]
     320              :     Test(harness::TestRedoManager),
     321              : }
     322              : 
     323              : impl From<PostgresRedoManager> for WalRedoManager {
     324          858 :     fn from(mgr: PostgresRedoManager) -> Self {
     325          858 :         Self::Prod(mgr)
     326          858 :     }
     327              : }
     328              : 
     329              : #[cfg(test)]
     330              : impl From<harness::TestRedoManager> for WalRedoManager {
     331           84 :     fn from(mgr: harness::TestRedoManager) -> Self {
     332           84 :         Self::Test(mgr)
     333           84 :     }
     334              : }
     335              : 
     336              : impl WalRedoManager {
     337            0 :     pub(crate) fn maybe_quiesce(&self, idle_timeout: Duration) {
     338            0 :         match self {
     339          775 :             Self::Prod(mgr) => mgr.maybe_quiesce(idle_timeout),
     340          775 :             #[cfg(test)]
     341          775 :             Self::Test(_) => {
     342            0 :                 // Not applicable to test redo manager
     343            0 :             }
     344          775 :         }
     345          775 :     }
     346              : 
     347              :     /// # Cancel-Safety
     348              :     ///
     349              :     /// This method is cancellation-safe.
     350      2860917 :     pub async fn request_redo(
     351      2860917 :         &self,
     352      2860917 :         key: crate::repository::Key,
     353      2860917 :         lsn: Lsn,
     354      2860917 :         base_img: Option<(Lsn, bytes::Bytes)>,
     355      2860917 :         records: Vec<(Lsn, crate::walrecord::NeonWalRecord)>,
     356      2860917 :         pg_version: u32,
     357      2860917 :     ) -> anyhow::Result<bytes::Bytes> {
     358            0 :         match self {
     359      2860917 :             Self::Prod(mgr) => {
     360            0 :                 mgr.request_redo(key, lsn, base_img, records, pg_version)
     361            0 :                     .await
     362              :             }
     363              :             #[cfg(test)]
     364            0 :             Self::Test(mgr) => {
     365            0 :                 mgr.request_redo(key, lsn, base_img, records, pg_version)
     366            0 :                     .await
     367              :             }
     368              :         }
     369      2860917 :     }
     370              : 
     371            0 :     pub(crate) fn status(&self) -> Option<WalRedoManagerStatus> {
     372            0 :         match self {
     373          486 :             WalRedoManager::Prod(m) => m.status(),
     374          486 :             #[cfg(test)]
     375          486 :             WalRedoManager::Test(_) => None,
     376          486 :         }
     377          486 :     }
     378              : }
     379              : 
     380          106 : #[derive(Debug, thiserror::Error, PartialEq, Eq)]
     381              : pub enum GetTimelineError {
     382              :     #[error("Timeline {tenant_id}/{timeline_id} is not active, state: {state:?}")]
     383              :     NotActive {
     384              :         tenant_id: TenantShardId,
     385              :         timeline_id: TimelineId,
     386              :         state: TimelineState,
     387              :     },
     388              :     #[error("Timeline {tenant_id}/{timeline_id} was not found")]
     389              :     NotFound {
     390              :         tenant_id: TenantShardId,
     391              :         timeline_id: TimelineId,
     392              :     },
     393              : }
     394              : 
     395            0 : #[derive(Debug, thiserror::Error)]
     396              : pub enum LoadLocalTimelineError {
     397              :     #[error("FailedToLoad")]
     398              :     Load(#[source] anyhow::Error),
     399              :     #[error("FailedToResumeDeletion")]
     400              :     ResumeDeletion(#[source] anyhow::Error),
     401              : }
     402              : 
     403          139 : #[derive(thiserror::Error)]
     404              : pub enum DeleteTimelineError {
     405              :     #[error("NotFound")]
     406              :     NotFound,
     407              : 
     408              :     #[error("HasChildren")]
     409              :     HasChildren(Vec<TimelineId>),
     410              : 
     411              :     #[error("Timeline deletion is already in progress")]
     412              :     AlreadyInProgress(Arc<tokio::sync::Mutex<DeleteTimelineFlow>>),
     413              : 
     414              :     #[error(transparent)]
     415              :     Other(#[from] anyhow::Error),
     416              : }
     417              : 
     418              : impl Debug for DeleteTimelineError {
     419            0 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
     420            0 :         match self {
     421            0 :             Self::NotFound => write!(f, "NotFound"),
     422            0 :             Self::HasChildren(c) => f.debug_tuple("HasChildren").field(c).finish(),
     423            0 :             Self::AlreadyInProgress(_) => f.debug_tuple("AlreadyInProgress").finish(),
     424            0 :             Self::Other(e) => f.debug_tuple("Other").field(e).finish(),
     425              :         }
     426            0 :     }
     427              : }
     428              : 
     429              : pub enum SetStoppingError {
     430              :     AlreadyStopping(completion::Barrier),
     431              :     Broken,
     432              : }
     433              : 
     434              : impl Debug for SetStoppingError {
     435            0 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
     436            0 :         match self {
     437            0 :             Self::AlreadyStopping(_) => f.debug_tuple("AlreadyStopping").finish(),
     438            0 :             Self::Broken => write!(f, "Broken"),
     439              :         }
     440            0 :     }
     441              : }
     442              : 
     443            6 : #[derive(thiserror::Error, Debug)]
     444              : pub enum CreateTimelineError {
     445              :     #[error("creation of timeline with the given ID is in progress")]
     446              :     AlreadyCreating,
     447              :     #[error("timeline already exists with different parameters")]
     448              :     Conflict,
     449              :     #[error(transparent)]
     450              :     AncestorLsn(anyhow::Error),
     451              :     #[error("ancestor timeline is not active")]
     452              :     AncestorNotActive,
     453              :     #[error("tenant shutting down")]
     454              :     ShuttingDown,
     455              :     #[error(transparent)]
     456              :     Other(#[from] anyhow::Error),
     457              : }
     458              : 
     459            0 : #[derive(thiserror::Error, Debug)]
     460              : enum InitdbError {
     461              :     Other(anyhow::Error),
     462              :     Cancelled,
     463              :     Spawn(std::io::Result<()>),
     464              :     Failed(std::process::ExitStatus, Vec<u8>),
     465              : }
     466              : 
     467              : impl fmt::Display for InitdbError {
     468            0 :     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
     469            0 :         match self {
     470            0 :             InitdbError::Cancelled => write!(f, "Operation was cancelled"),
     471            0 :             InitdbError::Spawn(e) => write!(f, "Spawn error: {:?}", e),
     472            0 :             InitdbError::Failed(status, stderr) => write!(
     473            0 :                 f,
     474            0 :                 "Command failed with status {:?}: {}",
     475            0 :                 status,
     476            0 :                 String::from_utf8_lossy(stderr)
     477            0 :             ),
     478            0 :             InitdbError::Other(e) => write!(f, "Error: {:?}", e),
     479              :         }
     480            0 :     }
     481              : }
     482              : 
     483              : impl From<std::io::Error> for InitdbError {
     484            0 :     fn from(error: std::io::Error) -> Self {
     485            0 :         InitdbError::Spawn(Err(error))
     486            0 :     }
     487              : }
     488              : 
     489              : struct TenantDirectoryScan {
     490              :     sorted_timelines_to_load: Vec<(TimelineId, TimelineMetadata)>,
     491              :     timelines_to_resume_deletion: Vec<(TimelineId, Option<TimelineMetadata>)>,
     492              : }
     493              : 
     494              : enum CreateTimelineCause {
     495              :     Load,
     496              :     Delete,
     497              : }
     498              : 
     499              : impl Tenant {
     500              :     /// Yet another helper for timeline initialization.
     501              :     ///
     502              :     /// - Initializes the Timeline struct and inserts it into the tenant's hash map
     503              :     /// - Scans the local timeline directory for layer files and builds the layer map
     504              :     /// - Downloads remote index file and adds remote files to the layer map
     505              :     /// - Schedules remote upload tasks for any files that are present locally but missing from remote storage.
     506              :     ///
     507              :     /// If the operation fails, the timeline is left in the tenant's hash map in Broken state. On success,
     508              :     /// it is marked as Active.
     509              :     #[allow(clippy::too_many_arguments)]
     510          412 :     async fn timeline_init_and_sync(
     511          412 :         &self,
     512          412 :         timeline_id: TimelineId,
     513          412 :         resources: TimelineResources,
     514          412 :         index_part: Option<IndexPart>,
     515          412 :         metadata: TimelineMetadata,
     516          412 :         ancestor: Option<Arc<Timeline>>,
     517          412 :         _ctx: &RequestContext,
     518          412 :     ) -> anyhow::Result<()> {
     519          412 :         let tenant_id = self.tenant_shard_id;
     520              : 
     521          412 :         let timeline = self.create_timeline_struct(
     522          412 :             timeline_id,
     523          412 :             &metadata,
     524          412 :             ancestor.clone(),
     525          412 :             resources,
     526          412 :             CreateTimelineCause::Load,
     527          412 :         )?;
     528          412 :         let disk_consistent_lsn = timeline.get_disk_consistent_lsn();
     529          412 :         anyhow::ensure!(
     530          412 :             disk_consistent_lsn.is_valid(),
     531            0 :             "Timeline {tenant_id}/{timeline_id} has invalid disk_consistent_lsn"
     532              :         );
     533          412 :         assert_eq!(
     534          412 :             disk_consistent_lsn,
     535          412 :             metadata.disk_consistent_lsn(),
     536            0 :             "these are used interchangeably"
     537              :         );
     538              : 
     539          412 :         if let Some(index_part) = index_part.as_ref() {
     540          412 :             timeline
     541          412 :                 .remote_client
     542          412 :                 .as_ref()
     543          412 :                 .unwrap()
     544          412 :                 .init_upload_queue(index_part)?;
     545            0 :         } else if self.remote_storage.is_some() {
     546              :             // No data on the remote storage, but we have local metadata file. We can end up
     547              :             // here with timeline_create being interrupted before finishing index part upload.
     548              :             // By doing what we do here, the index part upload is retried.
     549              :             // If control plane retries timeline creation in the meantime, the mgmt API handler
     550              :             // for timeline creation will coalesce on the upload we queue here.
     551            0 :             let rtc = timeline.remote_client.as_ref().unwrap();
     552            0 :             rtc.init_upload_queue_for_empty_remote(&metadata)?;
     553            0 :             rtc.schedule_index_upload_for_metadata_update(&metadata)?;
     554            0 :         }
     555              : 
     556          412 :         timeline
     557          412 :             .load_layer_map(disk_consistent_lsn, index_part)
     558          408 :             .await
     559          412 :             .with_context(|| {
     560            0 :                 format!("Failed to load layermap for timeline {tenant_id}/{timeline_id}")
     561          412 :             })?;
     562              : 
     563              :         {
     564              :             // avoiding holding it across awaits
     565          412 :             let mut timelines_accessor = self.timelines.lock().unwrap();
     566          412 :             match timelines_accessor.entry(timeline_id) {
     567              :                 Entry::Occupied(_) => {
     568              :                     // The uninit mark file acts as a lock that prevents another task from
     569              :                     // initializing the timeline at the same time.
     570            0 :                     unreachable!(
     571            0 :                         "Timeline {tenant_id}/{timeline_id} already exists in the tenant map"
     572            0 :                     );
     573              :                 }
     574          412 :                 Entry::Vacant(v) => {
     575          412 :                     v.insert(Arc::clone(&timeline));
     576          412 :                     timeline.maybe_spawn_flush_loop();
     577          412 :                 }
     578          412 :             }
     579          412 :         };
     580          412 : 
     581          412 :         // Sanity check: a timeline should have some content.
     582          412 :         anyhow::ensure!(
     583          412 :             ancestor.is_some()
     584          359 :                 || timeline
     585          359 :                     .layers
     586          359 :                     .read()
     587            0 :                     .await
     588          359 :                     .layer_map()
     589          359 :                     .iter_historic_layers()
     590          359 :                     .next()
     591          359 :                     .is_some(),
     592            0 :             "Timeline has no ancestor and no layer files"
     593              :         );
     594              : 
     595          412 :         Ok(())
     596          412 :     }
     597              : 
     598              :     /// Attach a tenant that's available in cloud storage.
     599              :     ///
     600              :     /// This returns quickly, after just creating the in-memory object
     601              :     /// Tenant struct and launching a background task to download
     602              :     /// the remote index files.  On return, the tenant is most likely still in
     603              :     /// Attaching state, and it will become Active once the background task
     604              :     /// finishes. You can use wait_until_active() to wait for the task to
     605              :     /// complete.
     606              :     ///
     607              :     #[allow(clippy::too_many_arguments)]
     608          858 :     pub(crate) fn spawn(
     609          858 :         conf: &'static PageServerConf,
     610          858 :         tenant_shard_id: TenantShardId,
     611          858 :         resources: TenantSharedResources,
     612          858 :         attached_conf: AttachedTenantConf,
     613          858 :         shard_identity: ShardIdentity,
     614          858 :         init_order: Option<InitializationOrder>,
     615          858 :         tenants: &'static std::sync::RwLock<TenantsMap>,
     616          858 :         mode: SpawnMode,
     617          858 :         ctx: &RequestContext,
     618          858 :     ) -> anyhow::Result<Arc<Tenant>> {
     619          858 :         let wal_redo_manager = Arc::new(WalRedoManager::from(PostgresRedoManager::new(
     620          858 :             conf,
     621          858 :             tenant_shard_id,
     622          858 :         )));
     623          858 : 
     624          858 :         let TenantSharedResources {
     625          858 :             broker_client,
     626          858 :             remote_storage,
     627          858 :             deletion_queue_client,
     628          858 :         } = resources;
     629          858 : 
     630          858 :         let attach_mode = attached_conf.location.attach_mode;
     631          858 :         let generation = attached_conf.location.generation;
     632          858 : 
     633          858 :         let tenant = Arc::new(Tenant::new(
     634          858 :             TenantState::Attaching,
     635          858 :             conf,
     636          858 :             attached_conf,
     637          858 :             shard_identity,
     638          858 :             Some(wal_redo_manager),
     639          858 :             tenant_shard_id,
     640          858 :             remote_storage.clone(),
     641          858 :             deletion_queue_client,
     642          858 :         ));
     643              : 
     644              :         // The attach task will carry a GateGuard, so that shutdown() reliably waits for it to drop out if
     645              :         // we shut down while attaching.
     646          858 :         let Ok(attach_gate_guard) = tenant.gate.enter() else {
     647              :             // We just created the Tenant: nothing else can have shut it down yet
     648            0 :             unreachable!();
     649              :         };
     650              : 
     651              :         // Do all the hard work in the background
     652          858 :         let tenant_clone = Arc::clone(&tenant);
     653          858 :         let ctx = ctx.detached_child(TaskKind::Attach, DownloadBehavior::Warn);
     654          858 :         task_mgr::spawn(
     655          858 :             &tokio::runtime::Handle::current(),
     656          858 :             TaskKind::Attach,
     657          858 :             Some(tenant_shard_id),
     658          858 :             None,
     659          858 :             "attach tenant",
     660              :             false,
     661          858 :             async move {
     662          858 : 
     663          858 :                 info!(
     664          858 :                     ?attach_mode,
     665          858 :                     "Attaching tenant"
     666          858 :                 );
     667              : 
     668          858 :                 let _gate_guard = attach_gate_guard;
     669          858 : 
     670          858 :                 // Is this tenant being spawned as part of process startup?
     671          858 :                 let starting_up = init_order.is_some();
     672          850 :                 scopeguard::defer! {
     673          850 :                     if starting_up {
     674          199 :                         TENANT.startup_complete.inc();
     675          651 :                     }
     676              :                 }
     677              : 
     678              :                 // Ideally we should use Tenant::set_broken_no_wait, but it is not supposed to be used when tenant is in loading state.
     679          858 :                 let make_broken =
     680            7 :                     |t: &Tenant, err: anyhow::Error| {
     681            7 :                         error!("attach failed, setting tenant state to Broken: {err:?}");
     682            7 :                         t.state.send_modify(|state| {
     683              :                             // The Stopping case is for when we have passed control on to DeleteTenantFlow:
     684              :                             // if it errors, we will call make_broken when tenant is already in Stopping.
     685            7 :                             assert!(
     686            7 :                             matches!(*state, TenantState::Attaching | TenantState::Stopping { .. }),
     687            0 :                             "the attach task owns the tenant state until activation is complete"
     688              :                         );
     689              : 
     690            7 :                             *state = TenantState::broken_from_reason(err.to_string());
     691            7 :                         });
     692            7 :                     };
     693              : 
     694          858 :                 let mut init_order = init_order;
     695          858 :                 // take the completion because initial tenant loading will complete when all of
     696          858 :                 // these tasks complete.
     697          858 :                 let _completion = init_order
     698          858 :                     .as_mut()
     699          858 :                     .and_then(|x| x.initial_tenant_load.take());
     700          858 :                 let remote_load_completion = init_order
     701          858 :                     .as_mut()
     702          858 :                     .and_then(|x| x.initial_tenant_load_remote.take());
     703              : 
     704              :                 enum AttachType<'a> {
     705              :                     // During pageserver startup, we are attaching this tenant lazily in the background
     706              :                     Warmup(tokio::sync::SemaphorePermit<'a>),
     707              :                     // During pageserver startup, we are attaching this tenant as soon as we can,
     708              :                     // because a client tried to access it.
     709              :                     OnDemand,
     710              :                     // During normal operations after startup, we are attaching a tenant.
     711              :                     Normal,
     712              :                 }
     713              : 
     714              :                 // Before doing any I/O, wait for either or:
     715              :                 // - A client to attempt to access to this tenant (on-demand loading)
     716              :                 // - A permit to become available in the warmup semaphore (background warmup)
     717              :                 //
     718              :                 // Some-ness of init_order is how we know if we're attaching during startup or later
     719              :                 // in process lifetime.
     720          858 :                 let attach_type = if init_order.is_some() {
     721          207 :                     tokio::select!(
     722              :                         _ = tenant_clone.activate_now_sem.acquire() => {
     723            3 :                             tracing::info!("Activating tenant (on-demand)");
     724              :                             AttachType::OnDemand
     725              :                         },
     726          203 :                         permit_result = conf.concurrent_tenant_warmup.inner().acquire() => {
     727              :                             match permit_result {
     728              :                                 Ok(p) => {
     729          203 :                                     tracing::info!("Activating tenant (warmup)");
     730              :                                     AttachType::Warmup(p)
     731              :                                 }
     732              :                                 Err(_) => {
     733              :                                     // This is unexpected: the warmup semaphore should stay alive
     734              :                                     // for the lifetime of init_order.  Log a warning and proceed.
     735            0 :                                     tracing::warn!("warmup_limit semaphore unexpectedly closed");
     736              :                                     AttachType::Normal
     737              :                                 }
     738              :                             }
     739              : 
     740              :                         }
     741              :                         _ = tenant_clone.cancel.cancelled() => {
     742              :                             // This is safe, but should be pretty rare: it is interesting if a tenant
     743              :                             // stayed in Activating for such a long time that shutdown found it in
     744              :                             // that state.
     745            1 :                             tracing::info!(state=%tenant_clone.current_state(), "Tenant shut down before activation");
     746              :                             // Make the tenant broken so that set_stopping will not hang waiting for it to leave
     747              :                             // the Attaching state.  This is an over-reaction (nothing really broke, the tenant is
     748              :                             // just shutting down), but ensures progress.
     749              :                             make_broken(&tenant_clone, anyhow::anyhow!("Shut down while Attaching"));
     750              :                             return Ok(());
     751              :                         },
     752              :                     )
     753              :                 } else {
     754          651 :                     AttachType::Normal
     755              :                 };
     756              : 
     757          857 :                 let preload_timer = TENANT.preload.start_timer();
     758          857 :                 let preload = match mode {
     759              :                     SpawnMode::Create => {
     760              :                         // Don't count the skipped preload into the histogram of preload durations
     761           15 :                         preload_timer.stop_and_discard();
     762           15 :                         None
     763              :                     },
     764              :                     SpawnMode::Normal => {
     765          842 :                         match &remote_storage {
     766          842 :                             Some(remote_storage) => Some(
     767          842 :                                 match tenant_clone
     768          842 :                                     .preload(remote_storage, task_mgr::shutdown_token())
     769          842 :                                     .instrument(
     770          842 :                                         tracing::info_span!(parent: None, "attach_preload", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug()),
     771              :                                     )
     772         2419 :                                     .await {
     773          836 :                                         Ok(p) => {
     774          836 :                                             preload_timer.observe_duration();
     775          836 :                                             p
     776              :                                         }
     777              :                                             ,
     778            6 :                                         Err(e) => {
     779            6 :                                             make_broken(&tenant_clone, anyhow::anyhow!(e));
     780            6 :                                                 return Ok(());
     781              :                                         }
     782              :                                     },
     783              :                             ),
     784            0 :                             None => None,
     785              :                         }
     786              :                     }
     787              :                 };
     788              : 
     789              :                 // Remote preload is complete.
     790          851 :                 drop(remote_load_completion);
     791              : 
     792          851 :                 let pending_deletion = {
     793          851 :                     match DeleteTenantFlow::should_resume_deletion(
     794          851 :                         conf,
     795          851 :                         preload.as_ref().map(|p| p.deleting).unwrap_or(false),
     796          851 :                         &tenant_clone,
     797          851 :                     )
     798            0 :                     .await
     799              :                     {
     800          851 :                         Ok(should_resume_deletion) => should_resume_deletion,
     801            0 :                         Err(err) => {
     802            0 :                             make_broken(&tenant_clone, anyhow::anyhow!(err));
     803            0 :                             return Ok(());
     804              :                         }
     805              :                     }
     806              :                 };
     807              : 
     808          851 :                 info!("pending_deletion {}", pending_deletion.is_some());
     809              : 
     810          851 :                 if let Some(deletion) = pending_deletion {
     811              :                     // as we are no longer loading, signal completion by dropping
     812              :                     // the completion while we resume deletion
     813           21 :                     drop(_completion);
     814           21 :                     let background_jobs_can_start =
     815           21 :                         init_order.as_ref().map(|x| &x.background_jobs_can_start);
     816           21 :                     if let Some(background) = background_jobs_can_start {
     817           20 :                         info!("waiting for backgound jobs barrier");
     818           20 :                         background.clone().wait().await;
     819           20 :                         info!("ready for backgound jobs barrier");
     820            1 :                     }
     821              : 
     822           21 :                     match DeleteTenantFlow::resume_from_attach(
     823           21 :                         deletion,
     824           21 :                         &tenant_clone,
     825           21 :                         preload,
     826           21 :                         tenants,
     827           21 :                         &ctx,
     828           21 :                     )
     829          789 :                     .await
     830              :                     {
     831            0 :                         Err(err) => {
     832            0 :                             make_broken(&tenant_clone, anyhow::anyhow!(err));
     833            0 :                             return Ok(());
     834              :                         }
     835           21 :                         Ok(()) => return Ok(()),
     836              :                     }
     837          830 :                 }
     838              : 
     839              :                 // We will time the duration of the attach phase unless this is a creation (attach will do no work)
     840          830 :                 let attach_timer = match mode {
     841           15 :                     SpawnMode::Create => None,
     842          815 :                     SpawnMode::Normal => {Some(TENANT.attach.start_timer())}
     843              :                 };
     844          830 :                 match tenant_clone.attach(preload, mode, &ctx).await {
     845              :                     Ok(()) => {
     846          830 :                         info!("attach finished, activating");
     847          830 :                         if let Some(t)=  attach_timer {t.observe_duration();}
     848          830 :                         tenant_clone.activate(broker_client, None, &ctx);
     849              :                     }
     850            0 :                     Err(e) => {
     851            0 :                         if let Some(t)=  attach_timer {t.observe_duration();}
     852            0 :                         make_broken(&tenant_clone, anyhow::anyhow!(e));
     853              :                     }
     854              :                 }
     855              : 
     856              :                 // If we are doing an opportunistic warmup attachment at startup, initialize
     857              :                 // logical size at the same time.  This is better than starting a bunch of idle tenants
     858              :                 // with cold caches and then coming back later to initialize their logical sizes.
     859              :                 //
     860              :                 // It also prevents the warmup proccess competing with the concurrency limit on
     861              :                 // logical size calculations: if logical size calculation semaphore is saturated,
     862              :                 // then warmup will wait for that before proceeding to the next tenant.
     863          830 :                 if let AttachType::Warmup(_permit) = attach_type {
     864          183 :                     let mut futs = FuturesUnordered::new();
     865          183 :                     let timelines: Vec<_> = tenant_clone.timelines.lock().unwrap().values().cloned().collect();
     866          417 :                     for t in timelines {
     867          234 :                         futs.push(t.await_initial_logical_size())
     868              :                     }
     869          183 :                     tracing::info!("Waiting for initial logical sizes while warming up...");
     870          409 :                     while futs.next().await.is_some() {
     871          226 : 
     872          226 :                     }
     873          175 :                     tracing::info!("Warm-up complete");
     874          647 :                 }
     875              : 
     876          822 :                 Ok(())
     877          850 :             }
     878          858 :             .instrument({
     879          858 :                 let span = tracing::info_span!(parent: None, "attach", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), gen=?generation);
     880          858 :                 span.follows_from(Span::current());
     881          858 :                 span
     882          858 :             }),
     883          858 :         );
     884          858 :         Ok(tenant)
     885          858 :     }
     886              : 
     887          846 :     pub(crate) async fn preload(
     888          846 :         self: &Arc<Tenant>,
     889          846 :         remote_storage: &GenericRemoteStorage,
     890          846 :         cancel: CancellationToken,
     891          846 :     ) -> anyhow::Result<TenantPreload> {
     892              :         // Get list of remote timelines
     893              :         // download index files for every tenant timeline
     894          846 :         info!("listing remote timelines");
     895          846 :         let (remote_timeline_ids, other_keys) = remote_timeline_client::list_remote_timelines(
     896          846 :             remote_storage,
     897          846 :             self.tenant_shard_id,
     898          846 :             cancel.clone(),
     899          846 :         )
     900         2031 :         .await?;
     901              : 
     902          840 :         let deleting = other_keys.contains(TENANT_DELETED_MARKER_FILE_NAME);
     903          840 :         info!(
     904          840 :             "found {} timelines, deleting={}",
     905          840 :             remote_timeline_ids.len(),
     906          840 :             deleting
     907          840 :         );
     908              : 
     909          855 :         for k in other_keys {
     910           15 :             if k != TENANT_DELETED_MARKER_FILE_NAME {
     911            0 :                 warn!("Unexpected non timeline key {k}");
     912           15 :             }
     913              :         }
     914              : 
     915              :         Ok(TenantPreload {
     916          840 :             deleting,
     917          840 :             timelines: self
     918          840 :                 .load_timeline_metadata(remote_timeline_ids, remote_storage, cancel)
     919          403 :                 .await?,
     920              :         })
     921          846 :     }
     922              : 
     923              :     ///
     924              :     /// Background task that downloads all data for a tenant and brings it to Active state.
     925              :     ///
     926              :     /// No background tasks are started as part of this routine.
     927              :     ///
     928          855 :     async fn attach(
     929          855 :         self: &Arc<Tenant>,
     930          855 :         preload: Option<TenantPreload>,
     931          855 :         mode: SpawnMode,
     932          855 :         ctx: &RequestContext,
     933          855 :     ) -> anyhow::Result<()> {
     934          855 :         span::debug_assert_current_span_has_tenant_id();
     935              : 
     936            3 :         failpoint_support::sleep_millis_async!("before-attaching-tenant");
     937              : 
     938          855 :         let preload = match (preload, mode) {
     939          840 :             (Some(p), _) => p,
     940           15 :             (None, SpawnMode::Create) => TenantPreload {
     941           15 :                 deleting: false,
     942           15 :                 timelines: HashMap::new(),
     943           15 :             },
     944              :             (None, SpawnMode::Normal) => {
     945              :                 // Deprecated dev mode: load from local disk state instead of remote storage
     946              :                 // https://github.com/neondatabase/neon/issues/5624
     947            0 :                 return self.load_local(ctx).await;
     948              :             }
     949              :         };
     950              : 
     951          855 :         let mut timelines_to_resume_deletions = vec![];
     952          855 : 
     953          855 :         let mut remote_index_and_client = HashMap::new();
     954          855 :         let mut timeline_ancestors = HashMap::new();
     955          855 :         let mut existent_timelines = HashSet::new();
     956         1282 :         for (timeline_id, preload) in preload.timelines {
     957          424 :             let index_part = match preload.index_part {
     958          424 :                 Ok(i) => {
     959            0 :                     debug!("remote index part exists for timeline {timeline_id}");
     960              :                     // We found index_part on the remote, this is the standard case.
     961          424 :                     existent_timelines.insert(timeline_id);
     962          424 :                     i
     963              :                 }
     964              :                 Err(DownloadError::NotFound) => {
     965              :                     // There is no index_part on the remote. We only get here
     966              :                     // if there is some prefix for the timeline in the remote storage.
     967              :                     // This can e.g. be the initdb.tar.zst archive, maybe a
     968              :                     // remnant from a prior incomplete creation or deletion attempt.
     969              :                     // Delete the local directory as the deciding criterion for a
     970              :                     // timeline's existence is presence of index_part.
     971            3 :                     info!(%timeline_id, "index_part not found on remote");
     972            3 :                     continue;
     973              :                 }
     974            0 :                 Err(e) => {
     975              :                     // Some (possibly ephemeral) error happened during index_part download.
     976              :                     // Pretend the timeline exists to not delete the timeline directory,
     977              :                     // as it might be a temporary issue and we don't want to re-download
     978              :                     // everything after it resolves.
     979            0 :                     warn!(%timeline_id, "Failed to load index_part from remote storage, failed creation? ({e})");
     980              : 
     981            0 :                     existent_timelines.insert(timeline_id);
     982            0 :                     continue;
     983              :                 }
     984              :             };
     985          424 :             match index_part {
     986          412 :                 MaybeDeletedIndexPart::IndexPart(index_part) => {
     987          412 :                     timeline_ancestors.insert(timeline_id, index_part.metadata.clone());
     988          412 :                     remote_index_and_client.insert(timeline_id, (index_part, preload.client));
     989          412 :                 }
     990           12 :                 MaybeDeletedIndexPart::Deleted(index_part) => {
     991           12 :                     info!(
     992           12 :                         "timeline {} is deleted, picking to resume deletion",
     993           12 :                         timeline_id
     994           12 :                     );
     995           12 :                     timelines_to_resume_deletions.push((timeline_id, index_part, preload.client));
     996              :                 }
     997              :             }
     998              :         }
     999              : 
    1000              :         // For every timeline, download the metadata file, scan the local directory,
    1001              :         // and build a layer map that contains an entry for each remote and local
    1002              :         // layer file.
    1003          855 :         let sorted_timelines = tree_sort_timelines(timeline_ancestors, |m| m.ancestor_timeline())?;
    1004         1267 :         for (timeline_id, remote_metadata) in sorted_timelines {
    1005          412 :             let (index_part, remote_client) = remote_index_and_client
    1006          412 :                 .remove(&timeline_id)
    1007          412 :                 .expect("just put it in above");
    1008          412 : 
    1009          412 :             // TODO again handle early failure
    1010          412 :             self.load_remote_timeline(
    1011          412 :                 timeline_id,
    1012          412 :                 index_part,
    1013          412 :                 remote_metadata,
    1014          412 :                 TimelineResources {
    1015          412 :                     remote_client: Some(remote_client),
    1016          412 :                     deletion_queue_client: self.deletion_queue_client.clone(),
    1017          412 :                 },
    1018          412 :                 ctx,
    1019          412 :             )
    1020          818 :             .await
    1021          412 :             .with_context(|| {
    1022            0 :                 format!(
    1023            0 :                     "failed to load remote timeline {} for tenant {}",
    1024            0 :                     timeline_id, self.tenant_shard_id
    1025            0 :                 )
    1026          412 :             })?;
    1027              :         }
    1028              : 
    1029              :         // Walk through deleted timelines, resume deletion
    1030          867 :         for (timeline_id, index_part, remote_timeline_client) in timelines_to_resume_deletions {
    1031           12 :             remote_timeline_client
    1032           12 :                 .init_upload_queue_stopped_to_continue_deletion(&index_part)
    1033           12 :                 .context("init queue stopped")
    1034           12 :                 .map_err(LoadLocalTimelineError::ResumeDeletion)?;
    1035              : 
    1036           12 :             DeleteTimelineFlow::resume_deletion(
    1037           12 :                 Arc::clone(self),
    1038           12 :                 timeline_id,
    1039           12 :                 &index_part.metadata,
    1040           12 :                 Some(remote_timeline_client),
    1041           12 :                 self.deletion_queue_client.clone(),
    1042           12 :             )
    1043           12 :             .instrument(tracing::info_span!("timeline_delete", %timeline_id))
    1044            0 :             .await
    1045           12 :             .context("resume_deletion")
    1046           12 :             .map_err(LoadLocalTimelineError::ResumeDeletion)?;
    1047              :         }
    1048              : 
    1049              :         // The local filesystem contents are a cache of what's in the remote IndexPart;
    1050              :         // IndexPart is the source of truth.
    1051          855 :         self.clean_up_timelines(&existent_timelines)?;
    1052              : 
    1053          855 :         fail::fail_point!("attach-before-activate", |_| {
    1054            0 :             anyhow::bail!("attach-before-activate");
    1055          855 :         });
    1056            3 :         failpoint_support::sleep_millis_async!("attach-before-activate-sleep", &self.cancel);
    1057              : 
    1058          855 :         info!("Done");
    1059              : 
    1060          855 :         Ok(())
    1061          855 :     }
    1062              : 
    1063              :     /// Check for any local timeline directories that are temporary, or do not correspond to a
    1064              :     /// timeline that still exists: this can happen if we crashed during a deletion/creation, or
    1065              :     /// if a timeline was deleted while the tenant was attached to a different pageserver.
    1066          855 :     fn clean_up_timelines(&self, existent_timelines: &HashSet<TimelineId>) -> anyhow::Result<()> {
    1067          855 :         let timelines_dir = self.conf.timelines_path(&self.tenant_shard_id);
    1068              : 
    1069          855 :         let entries = match timelines_dir.read_dir_utf8() {
    1070          853 :             Ok(d) => d,
    1071            2 :             Err(e) => {
    1072            2 :                 if e.kind() == std::io::ErrorKind::NotFound {
    1073            2 :                     return Ok(());
    1074              :                 } else {
    1075            0 :                     return Err(e).context("list timelines directory for tenant");
    1076              :                 }
    1077              :             }
    1078              :         };
    1079              : 
    1080         1285 :         for entry in entries {
    1081          432 :             let entry = entry.context("read timeline dir entry")?;
    1082          432 :             let entry_path = entry.path();
    1083              : 
    1084          432 :             let purge = if crate::is_temporary(entry_path)
    1085              :                 // TODO: uninit_mark isn't needed any more, since uninitialized timelines are already
    1086              :                 // covered by the check that the timeline must exist in remote storage.
    1087          429 :                 || is_uninit_mark(entry_path)
    1088          427 :                 || crate::is_delete_mark(entry_path)
    1089              :             {
    1090            5 :                 true
    1091              :             } else {
    1092          427 :                 match TimelineId::try_from(entry_path.file_name()) {
    1093          427 :                     Ok(i) => {
    1094          427 :                         // Purge if the timeline ID does not exist in remote storage: remote storage is the authority.
    1095          427 :                         !existent_timelines.contains(&i)
    1096              :                     }
    1097            0 :                     Err(e) => {
    1098            0 :                         tracing::warn!(
    1099            0 :                             "Unparseable directory in timelines directory: {entry_path}, ignoring ({e})"
    1100            0 :                         );
    1101              :                         // Do not purge junk: if we don't recognize it, be cautious and leave it for a human.
    1102            0 :                         false
    1103              :                     }
    1104              :                 }
    1105              :             };
    1106              : 
    1107          432 :             if purge {
    1108           10 :                 tracing::info!("Purging stale timeline dentry {entry_path}");
    1109           10 :                 if let Err(e) = match entry.file_type() {
    1110           10 :                     Ok(t) => if t.is_dir() {
    1111            7 :                         std::fs::remove_dir_all(entry_path)
    1112              :                     } else {
    1113            3 :                         std::fs::remove_file(entry_path)
    1114              :                     }
    1115           10 :                     .or_else(fs_ext::ignore_not_found),
    1116            0 :                     Err(e) => Err(e),
    1117              :                 } {
    1118            0 :                     tracing::warn!("Failed to purge stale timeline dentry {entry_path}: {e}");
    1119           10 :                 }
    1120          422 :             }
    1121              :         }
    1122              : 
    1123          853 :         Ok(())
    1124          855 :     }
    1125              : 
    1126              :     /// Get sum of all remote timelines sizes
    1127              :     ///
    1128              :     /// This function relies on the index_part instead of listing the remote storage
    1129           23 :     pub fn remote_size(&self) -> u64 {
    1130           23 :         let mut size = 0;
    1131              : 
    1132           23 :         for timeline in self.list_timelines() {
    1133           15 :             if let Some(remote_client) = &timeline.remote_client {
    1134           15 :                 size += remote_client.get_remote_physical_size();
    1135           15 :             }
    1136              :         }
    1137              : 
    1138           23 :         size
    1139           23 :     }
    1140              : 
    1141          412 :     #[instrument(skip_all, fields(timeline_id=%timeline_id))]
    1142              :     async fn load_remote_timeline(
    1143              :         &self,
    1144              :         timeline_id: TimelineId,
    1145              :         index_part: IndexPart,
    1146              :         remote_metadata: TimelineMetadata,
    1147              :         resources: TimelineResources,
    1148              :         ctx: &RequestContext,
    1149              :     ) -> anyhow::Result<()> {
    1150              :         span::debug_assert_current_span_has_tenant_id();
    1151              : 
    1152          412 :         info!("downloading index file for timeline {}", timeline_id);
    1153              :         tokio::fs::create_dir_all(self.conf.timeline_path(&self.tenant_shard_id, &timeline_id))
    1154              :             .await
    1155              :             .context("Failed to create new timeline directory")?;
    1156              : 
    1157              :         let ancestor = if let Some(ancestor_id) = remote_metadata.ancestor_timeline() {
    1158              :             let timelines = self.timelines.lock().unwrap();
    1159              :             Some(Arc::clone(timelines.get(&ancestor_id).ok_or_else(
    1160            0 :                 || {
    1161            0 :                     anyhow::anyhow!(
    1162            0 :                         "cannot find ancestor timeline {ancestor_id} for timeline {timeline_id}"
    1163            0 :                     )
    1164            0 :                 },
    1165              :             )?))
    1166              :         } else {
    1167              :             None
    1168              :         };
    1169              : 
    1170              :         // timeline loading after attach expects to find metadata file for each metadata
    1171              :         save_metadata(
    1172              :             self.conf,
    1173              :             &self.tenant_shard_id,
    1174              :             &timeline_id,
    1175              :             &remote_metadata,
    1176              :         )
    1177              :         .await
    1178              :         .context("save_metadata")
    1179              :         .map_err(LoadLocalTimelineError::Load)?;
    1180              : 
    1181              :         self.timeline_init_and_sync(
    1182              :             timeline_id,
    1183              :             resources,
    1184              :             Some(index_part),
    1185              :             remote_metadata,
    1186              :             ancestor,
    1187              :             ctx,
    1188              :         )
    1189              :         .await
    1190              :     }
    1191              : 
    1192              :     /// Create a placeholder Tenant object for a broken tenant
    1193            0 :     pub fn create_broken_tenant(
    1194            0 :         conf: &'static PageServerConf,
    1195            0 :         tenant_shard_id: TenantShardId,
    1196            0 :         reason: String,
    1197            0 :     ) -> Arc<Tenant> {
    1198            0 :         Arc::new(Tenant::new(
    1199            0 :             TenantState::Broken {
    1200            0 :                 reason,
    1201            0 :                 backtrace: String::new(),
    1202            0 :             },
    1203            0 :             conf,
    1204            0 :             AttachedTenantConf::try_from(LocationConf::default()).unwrap(),
    1205            0 :             // Shard identity isn't meaningful for a broken tenant: it's just a placeholder
    1206            0 :             // to occupy the slot for this TenantShardId.
    1207            0 :             ShardIdentity::broken(tenant_shard_id.shard_number, tenant_shard_id.shard_count),
    1208            0 :             None,
    1209            0 :             tenant_shard_id,
    1210            0 :             None,
    1211            0 :             DeletionQueueClient::broken(),
    1212            0 :         ))
    1213            0 :     }
    1214              : 
    1215           80 :     fn scan_and_sort_timelines_dir(self: Arc<Tenant>) -> anyhow::Result<TenantDirectoryScan> {
    1216           80 :         let mut timelines_to_load: HashMap<TimelineId, TimelineMetadata> = HashMap::new();
    1217           80 :         // Note timelines_to_resume_deletion needs to be separate because it can be not sortable
    1218           80 :         // from the point of `tree_sort_timelines`. I e some parents can be missing because deletion
    1219           80 :         // completed in non topological order (for example because parent has smaller number of layer files in it)
    1220           80 :         let mut timelines_to_resume_deletion: Vec<(TimelineId, Option<TimelineMetadata>)> = vec![];
    1221           80 : 
    1222           80 :         let timelines_dir = self.conf.timelines_path(&self.tenant_shard_id);
    1223              : 
    1224           80 :         for entry in timelines_dir
    1225           80 :             .read_dir_utf8()
    1226           80 :             .context("list timelines directory for tenant")?
    1227              :         {
    1228            6 :             let entry = entry.context("read timeline dir entry")?;
    1229            6 :             let timeline_dir = entry.path();
    1230            6 : 
    1231            6 :             if crate::is_temporary(timeline_dir) {
    1232            0 :                 info!("Found temporary timeline directory, removing: {timeline_dir}");
    1233            0 :                 if let Err(e) = std::fs::remove_dir_all(timeline_dir) {
    1234            0 :                     error!("Failed to remove temporary directory '{timeline_dir}': {e:?}");
    1235            0 :                 }
    1236            6 :             } else if is_uninit_mark(timeline_dir) {
    1237            2 :                 if !timeline_dir.exists() {
    1238            0 :                     warn!("Timeline dir entry become invalid: {timeline_dir}");
    1239            0 :                     continue;
    1240            2 :                 }
    1241            2 : 
    1242            2 :                 let timeline_uninit_mark_file = &timeline_dir;
    1243            2 :                 info!(
    1244            2 :                     "Found an uninit mark file {timeline_uninit_mark_file}, removing the timeline and its uninit mark",
    1245            2 :                 );
    1246            2 :                 let timeline_id =
    1247            2 :                     TimelineId::try_from(timeline_uninit_mark_file.file_stem())
    1248            2 :                         .with_context(|| {
    1249            0 :                             format!(
    1250            0 :                                 "Could not parse timeline id out of the timeline uninit mark name {timeline_uninit_mark_file}",
    1251            0 :                             )
    1252            2 :                         })?;
    1253            2 :                 let timeline_dir = self.conf.timeline_path(&self.tenant_shard_id, &timeline_id);
    1254            0 :                 if let Err(e) =
    1255            2 :                     remove_timeline_and_uninit_mark(&timeline_dir, timeline_uninit_mark_file)
    1256              :                 {
    1257            0 :                     error!("Failed to clean up uninit marked timeline: {e:?}");
    1258            2 :                 }
    1259            4 :             } else if crate::is_delete_mark(timeline_dir) {
    1260              :                 // If metadata exists, load as usual, continue deletion
    1261            0 :                 let timeline_id = TimelineId::try_from(timeline_dir.file_stem())
    1262            0 :                     .with_context(|| {
    1263            0 :                         format!(
    1264            0 :                             "Could not parse timeline id out of the timeline uninit mark name {timeline_dir}",
    1265            0 :                         )
    1266            0 :                     })?;
    1267              : 
    1268            0 :                 info!("Found deletion mark for timeline {}", timeline_id);
    1269              : 
    1270            0 :                 match load_metadata(self.conf, &self.tenant_shard_id, &timeline_id) {
    1271            0 :                     Ok(metadata) => {
    1272            0 :                         timelines_to_resume_deletion.push((timeline_id, Some(metadata)))
    1273              :                     }
    1274            0 :                     Err(e) => match &e {
    1275            0 :                         LoadMetadataError::Read(r) => {
    1276            0 :                             if r.kind() != io::ErrorKind::NotFound {
    1277            0 :                                 return Err(anyhow::anyhow!(e)).with_context(|| {
    1278            0 :                                     format!("Failed to load metadata for timeline_id {timeline_id}")
    1279            0 :                                 });
    1280            0 :                             }
    1281            0 : 
    1282            0 :                             // If metadata doesnt exist it means that we've crashed without
    1283            0 :                             // completing cleanup_remaining_timeline_fs_traces in DeleteTimelineFlow.
    1284            0 :                             // So save timeline_id for later call to `DeleteTimelineFlow::cleanup_remaining_timeline_fs_traces`.
    1285            0 :                             // We cant do it here because the method is async so we'd need block_on
    1286            0 :                             // and here we're in spawn_blocking. cleanup_remaining_timeline_fs_traces uses fs operations
    1287            0 :                             // so that basically results in a cycle:
    1288            0 :                             // spawn_blocking
    1289            0 :                             // - block_on
    1290            0 :                             //   - spawn_blocking
    1291            0 :                             // which can lead to running out of threads in blocing pool.
    1292            0 :                             timelines_to_resume_deletion.push((timeline_id, None));
    1293              :                         }
    1294              :                         _ => {
    1295            0 :                             return Err(anyhow::anyhow!(e)).with_context(|| {
    1296            0 :                                 format!("Failed to load metadata for timeline_id {timeline_id}")
    1297            0 :                             })
    1298              :                         }
    1299              :                     },
    1300              :                 }
    1301              :             } else {
    1302            4 :                 if !timeline_dir.exists() {
    1303            2 :                     warn!("Timeline dir entry become invalid: {timeline_dir}");
    1304            2 :                     continue;
    1305            2 :                 }
    1306            2 :                 let timeline_id = TimelineId::try_from(timeline_dir.file_name())
    1307            2 :                     .with_context(|| {
    1308            0 :                         format!(
    1309            0 :                             "Could not parse timeline id out of the timeline dir name {timeline_dir}",
    1310            0 :                         )
    1311            2 :                     })?;
    1312            2 :                 let timeline_uninit_mark_file = self
    1313            2 :                     .conf
    1314            2 :                     .timeline_uninit_mark_file_path(self.tenant_shard_id, timeline_id);
    1315            2 :                 if timeline_uninit_mark_file.exists() {
    1316            0 :                     info!(
    1317            0 :                         %timeline_id,
    1318            0 :                         "Found an uninit mark file, removing the timeline and its uninit mark",
    1319            0 :                     );
    1320            0 :                     if let Err(e) =
    1321            0 :                         remove_timeline_and_uninit_mark(timeline_dir, &timeline_uninit_mark_file)
    1322              :                     {
    1323            0 :                         error!("Failed to clean up uninit marked timeline: {e:?}");
    1324            0 :                     }
    1325            0 :                     continue;
    1326            2 :                 }
    1327            2 : 
    1328            2 :                 let timeline_delete_mark_file = self
    1329            2 :                     .conf
    1330            2 :                     .timeline_delete_mark_file_path(self.tenant_shard_id, timeline_id);
    1331            2 :                 if timeline_delete_mark_file.exists() {
    1332              :                     // Cleanup should be done in `is_delete_mark` branch above
    1333            0 :                     continue;
    1334            2 :                 }
    1335            2 : 
    1336            2 :                 let file_name = entry.file_name();
    1337            2 :                 if let Ok(timeline_id) = file_name.parse::<TimelineId>() {
    1338            2 :                     let metadata = load_metadata(self.conf, &self.tenant_shard_id, &timeline_id)
    1339            2 :                         .context("failed to load metadata")?;
    1340            0 :                     timelines_to_load.insert(timeline_id, metadata);
    1341              :                 } else {
    1342              :                     // A file or directory that doesn't look like a timeline ID
    1343            0 :                     warn!("unexpected file or directory in timelines directory: {file_name}");
    1344              :                 }
    1345              :             }
    1346              :         }
    1347              : 
    1348              :         // Sort the array of timeline IDs into tree-order, so that parent comes before
    1349              :         // all its children.
    1350           78 :         tree_sort_timelines(timelines_to_load, |m| m.ancestor_timeline()).map(|sorted_timelines| {
    1351           78 :             TenantDirectoryScan {
    1352           78 :                 sorted_timelines_to_load: sorted_timelines,
    1353           78 :                 timelines_to_resume_deletion,
    1354           78 :             }
    1355           78 :         })
    1356           80 :     }
    1357              : 
    1358          840 :     async fn load_timeline_metadata(
    1359          840 :         self: &Arc<Tenant>,
    1360          840 :         timeline_ids: HashSet<TimelineId>,
    1361          840 :         remote_storage: &GenericRemoteStorage,
    1362          840 :         cancel: CancellationToken,
    1363          840 :     ) -> anyhow::Result<HashMap<TimelineId, TimelinePreload>> {
    1364          840 :         let mut part_downloads = JoinSet::new();
    1365         1267 :         for timeline_id in timeline_ids {
    1366          427 :             let client = RemoteTimelineClient::new(
    1367          427 :                 remote_storage.clone(),
    1368          427 :                 self.deletion_queue_client.clone(),
    1369          427 :                 self.conf,
    1370          427 :                 self.tenant_shard_id,
    1371          427 :                 timeline_id,
    1372          427 :                 self.generation,
    1373          427 :             );
    1374          427 :             let cancel_clone = cancel.clone();
    1375          427 :             part_downloads.spawn(
    1376          427 :                 async move {
    1377          427 :                     debug!("starting index part download");
    1378              : 
    1379         2931 :                     let index_part = client.download_index_file(cancel_clone).await;
    1380              : 
    1381          427 :                     debug!("finished index part download");
    1382              : 
    1383          427 :                     Result::<_, anyhow::Error>::Ok(TimelinePreload {
    1384          427 :                         client,
    1385          427 :                         timeline_id,
    1386          427 :                         index_part,
    1387          427 :                     })
    1388          427 :                 }
    1389          427 :                 .map(move |res| {
    1390          427 :                     res.with_context(|| format!("download index part for timeline {timeline_id}"))
    1391          427 :                 })
    1392          427 :                 .instrument(info_span!("download_index_part", %timeline_id)),
    1393              :             );
    1394              :         }
    1395              : 
    1396          840 :         let mut timeline_preloads: HashMap<TimelineId, TimelinePreload> = HashMap::new();
    1397              : 
    1398         1267 :         loop {
    1399         1670 :             tokio::select!(
    1400         1267 :                 next = part_downloads.join_next() => {
    1401              :                     match next {
    1402              :                         Some(result) => {
    1403              :                             let preload_result = result.context("join preload task")?;
    1404              :                             let preload = preload_result?;
    1405              :                             timeline_preloads.insert(preload.timeline_id, preload);
    1406              :                         },
    1407              :                         None => {
    1408              :                             break;
    1409              :                         }
    1410              :                     }
    1411              :                 },
    1412              :                 _ = cancel.cancelled() => {
    1413              :                     anyhow::bail!("Cancelled while waiting for remote index download")
    1414              :                 }
    1415         1267 :             )
    1416         1267 :         }
    1417              : 
    1418          840 :         Ok(timeline_preloads)
    1419          840 :     }
    1420              : 
    1421              :     ///
    1422              :     /// Background task to load in-memory data structures for this tenant, from
    1423              :     /// files on disk. Used at pageserver startup.
    1424              :     ///
    1425              :     /// No background tasks are started as part of this routine.
    1426           80 :     async fn load_local(self: &Arc<Tenant>, ctx: &RequestContext) -> anyhow::Result<()> {
    1427           80 :         span::debug_assert_current_span_has_tenant_id();
    1428              : 
    1429            0 :         debug!("loading tenant task");
    1430              : 
    1431              :         // Load in-memory state to reflect the local files on disk
    1432              :         //
    1433              :         // Scan the directory, peek into the metadata file of each timeline, and
    1434              :         // collect a list of timelines and their ancestors.
    1435           80 :         let span = info_span!("blocking");
    1436           80 :         let cloned = Arc::clone(self);
    1437              : 
    1438           80 :         let scan = tokio::task::spawn_blocking(move || {
    1439           80 :             let _g = span.entered();
    1440           80 :             cloned.scan_and_sort_timelines_dir()
    1441           80 :         })
    1442           80 :         .await
    1443           80 :         .context("load spawn_blocking")
    1444           80 :         .and_then(|res| res)?;
    1445              : 
    1446              :         // FIXME original collect_timeline_files contained one more check:
    1447              :         //    1. "Timeline has no ancestor and no layer files"
    1448              : 
    1449              :         // Process loadable timelines first
    1450           78 :         for (timeline_id, local_metadata) in scan.sorted_timelines_to_load {
    1451            0 :             if let Err(e) = self
    1452            0 :                 .load_local_timeline(timeline_id, local_metadata, ctx, false)
    1453            0 :                 .await
    1454              :             {
    1455            0 :                 match e {
    1456            0 :                     LoadLocalTimelineError::Load(source) => {
    1457            0 :                         return Err(anyhow::anyhow!(source)).with_context(|| {
    1458            0 :                             format!("Failed to load local timeline: {timeline_id}")
    1459            0 :                         })
    1460              :                     }
    1461            0 :                     LoadLocalTimelineError::ResumeDeletion(source) => {
    1462              :                         // Make sure resumed deletion wont fail loading for entire tenant.
    1463            0 :                         error!("Failed to resume timeline deletion: {source:#}")
    1464              :                     }
    1465              :                 }
    1466            0 :             }
    1467              :         }
    1468              : 
    1469              :         // Resume deletion ones with deleted_mark
    1470           78 :         for (timeline_id, maybe_local_metadata) in scan.timelines_to_resume_deletion {
    1471            0 :             match maybe_local_metadata {
    1472              :                 None => {
    1473              :                     // See comment in `scan_and_sort_timelines_dir`.
    1474            0 :                     if let Err(e) =
    1475            0 :                         DeleteTimelineFlow::cleanup_remaining_timeline_fs_traces(self, timeline_id)
    1476            0 :                             .await
    1477              :                     {
    1478            0 :                         warn!(
    1479            0 :                             "cannot clean up deleted timeline dir timeline_id: {} error: {:#}",
    1480            0 :                             timeline_id, e
    1481            0 :                         );
    1482            0 :                     }
    1483              :                 }
    1484            0 :                 Some(local_metadata) => {
    1485            0 :                     if let Err(e) = self
    1486            0 :                         .load_local_timeline(timeline_id, local_metadata, ctx, true)
    1487            0 :                         .await
    1488              :                     {
    1489            0 :                         match e {
    1490            0 :                             LoadLocalTimelineError::Load(source) => {
    1491            0 :                                 // We tried to load deleted timeline, this is a bug.
    1492            0 :                                 return Err(anyhow::anyhow!(source).context(
    1493            0 :                                     format!("This is a bug. We tried to load deleted timeline which is wrong and loading failed. Timeline: {timeline_id}")
    1494            0 :                                 ));
    1495              :                             }
    1496            0 :                             LoadLocalTimelineError::ResumeDeletion(source) => {
    1497              :                                 // Make sure resumed deletion wont fail loading for entire tenant.
    1498            0 :                                 error!("Failed to resume timeline deletion: {source:#}")
    1499              :                             }
    1500              :                         }
    1501            0 :                     }
    1502              :                 }
    1503              :             }
    1504              :         }
    1505              : 
    1506            0 :         trace!("Done");
    1507              : 
    1508           78 :         Ok(())
    1509           80 :     }
    1510              : 
    1511              :     /// Subroutine of `load_tenant`, to load an individual timeline
    1512              :     ///
    1513              :     /// NB: The parent is assumed to be already loaded!
    1514            0 :     #[instrument(skip(self, local_metadata, ctx))]
    1515              :     async fn load_local_timeline(
    1516              :         self: &Arc<Self>,
    1517              :         timeline_id: TimelineId,
    1518              :         local_metadata: TimelineMetadata,
    1519              :         ctx: &RequestContext,
    1520              :         found_delete_mark: bool,
    1521              :     ) -> Result<(), LoadLocalTimelineError> {
    1522              :         span::debug_assert_current_span_has_tenant_id();
    1523              : 
    1524              :         let resources = self.build_timeline_resources(timeline_id);
    1525              : 
    1526              :         if found_delete_mark {
    1527              :             // There is no remote client, we found local metadata.
    1528              :             // Continue cleaning up local disk.
    1529              :             DeleteTimelineFlow::resume_deletion(
    1530              :                 Arc::clone(self),
    1531              :                 timeline_id,
    1532              :                 &local_metadata,
    1533              :                 None,
    1534              :                 self.deletion_queue_client.clone(),
    1535              :             )
    1536              :             .await
    1537              :             .context("resume deletion")
    1538              :             .map_err(LoadLocalTimelineError::ResumeDeletion)?;
    1539              :             return Ok(());
    1540              :         }
    1541              : 
    1542              :         let ancestor = if let Some(ancestor_timeline_id) = local_metadata.ancestor_timeline() {
    1543              :             let ancestor_timeline = self.get_timeline(ancestor_timeline_id, false)
    1544            0 :                 .with_context(|| anyhow::anyhow!("cannot find ancestor timeline {ancestor_timeline_id} for timeline {timeline_id}"))
    1545              :                 .map_err(LoadLocalTimelineError::Load)?;
    1546              :             Some(ancestor_timeline)
    1547              :         } else {
    1548              :             None
    1549              :         };
    1550              : 
    1551              :         self.timeline_init_and_sync(timeline_id, resources, None, local_metadata, ancestor, ctx)
    1552              :             .await
    1553              :             .map_err(LoadLocalTimelineError::Load)
    1554              :     }
    1555              : 
    1556          672 :     pub(crate) fn tenant_shard_id(&self) -> TenantShardId {
    1557          672 :         self.tenant_shard_id
    1558          672 :     }
    1559              : 
    1560              :     /// Get Timeline handle for given Neon timeline ID.
    1561              :     /// This function is idempotent. It doesn't change internal state in any way.
    1562        17379 :     pub fn get_timeline(
    1563        17379 :         &self,
    1564        17379 :         timeline_id: TimelineId,
    1565        17379 :         active_only: bool,
    1566        17379 :     ) -> Result<Arc<Timeline>, GetTimelineError> {
    1567        17379 :         let timelines_accessor = self.timelines.lock().unwrap();
    1568        17379 :         let timeline = timelines_accessor
    1569        17379 :             .get(&timeline_id)
    1570        17379 :             .ok_or(GetTimelineError::NotFound {
    1571        17379 :                 tenant_id: self.tenant_shard_id,
    1572        17379 :                 timeline_id,
    1573        17379 :             })?;
    1574              : 
    1575        17326 :         if active_only && !timeline.is_active() {
    1576            6 :             Err(GetTimelineError::NotActive {
    1577            6 :                 tenant_id: self.tenant_shard_id,
    1578            6 :                 timeline_id,
    1579            6 :                 state: timeline.current_state(),
    1580            6 :             })
    1581              :         } else {
    1582        17320 :             Ok(Arc::clone(timeline))
    1583              :         }
    1584        17379 :     }
    1585              : 
    1586              :     /// Lists timelines the tenant contains.
    1587              :     /// Up to tenant's implementation to omit certain timelines that ar not considered ready for use.
    1588          759 :     pub fn list_timelines(&self) -> Vec<Arc<Timeline>> {
    1589          759 :         self.timelines
    1590          759 :             .lock()
    1591          759 :             .unwrap()
    1592          759 :             .values()
    1593          759 :             .map(Arc::clone)
    1594          759 :             .collect()
    1595          759 :     }
    1596              : 
    1597          486 :     pub fn list_timeline_ids(&self) -> Vec<TimelineId> {
    1598          486 :         self.timelines.lock().unwrap().keys().cloned().collect()
    1599          486 :     }
    1600              : 
    1601              :     /// This is used to create the initial 'main' timeline during bootstrapping,
    1602              :     /// or when importing a new base backup. The caller is expected to load an
    1603              :     /// initial image of the datadir to the new timeline after this.
    1604              :     ///
    1605              :     /// Until that happens, the on-disk state is invalid (disk_consistent_lsn=Lsn(0))
    1606              :     /// and the timeline will fail to load at a restart.
    1607              :     ///
    1608              :     /// That's why we add an uninit mark file, and wrap it together witht the Timeline
    1609              :     /// in-memory object into UninitializedTimeline.
    1610              :     /// Once the caller is done setting up the timeline, they should call
    1611              :     /// `UninitializedTimeline::initialize_with_lock` to remove the uninit mark.
    1612              :     ///
    1613              :     /// For tests, use `DatadirModification::init_empty_test_timeline` + `commit` to setup the
    1614              :     /// minimum amount of keys required to get a writable timeline.
    1615              :     /// (Without it, `put` might fail due to `repartition` failing.)
    1616           86 :     pub(crate) async fn create_empty_timeline(
    1617           86 :         &self,
    1618           86 :         new_timeline_id: TimelineId,
    1619           86 :         initdb_lsn: Lsn,
    1620           86 :         pg_version: u32,
    1621           86 :         _ctx: &RequestContext,
    1622           86 :     ) -> anyhow::Result<UninitializedTimeline> {
    1623           86 :         anyhow::ensure!(
    1624           86 :             self.is_active(),
    1625            0 :             "Cannot create empty timelines on inactive tenant"
    1626              :         );
    1627              : 
    1628           86 :         let timeline_uninit_mark = self.create_timeline_uninit_mark(new_timeline_id)?;
    1629           84 :         let new_metadata = TimelineMetadata::new(
    1630           84 :             // Initialize disk_consistent LSN to 0, The caller must import some data to
    1631           84 :             // make it valid, before calling finish_creation()
    1632           84 :             Lsn(0),
    1633           84 :             None,
    1634           84 :             None,
    1635           84 :             Lsn(0),
    1636           84 :             initdb_lsn,
    1637           84 :             initdb_lsn,
    1638           84 :             pg_version,
    1639           84 :         );
    1640           84 :         self.prepare_new_timeline(
    1641           84 :             new_timeline_id,
    1642           84 :             &new_metadata,
    1643           84 :             timeline_uninit_mark,
    1644           84 :             initdb_lsn,
    1645           84 :             None,
    1646           84 :         )
    1647          108 :         .await
    1648           86 :     }
    1649              : 
    1650              :     /// Helper for unit tests to create an empty timeline.
    1651              :     ///
    1652              :     /// The timeline is has state value `Active` but its background loops are not running.
    1653              :     // This makes the various functions which anyhow::ensure! for Active state work in tests.
    1654              :     // Our current tests don't need the background loops.
    1655              :     #[cfg(test)]
    1656           68 :     pub async fn create_test_timeline(
    1657           68 :         &self,
    1658           68 :         new_timeline_id: TimelineId,
    1659           68 :         initdb_lsn: Lsn,
    1660           68 :         pg_version: u32,
    1661           68 :         ctx: &RequestContext,
    1662           68 :     ) -> anyhow::Result<Arc<Timeline>> {
    1663           68 :         let uninit_tl = self
    1664           68 :             .create_empty_timeline(new_timeline_id, initdb_lsn, pg_version, ctx)
    1665          102 :             .await?;
    1666           68 :         let tline = uninit_tl.raw_timeline().expect("we just created it");
    1667           68 :         assert_eq!(tline.get_last_record_lsn(), Lsn(0));
    1668              : 
    1669              :         // Setup minimum keys required for the timeline to be usable.
    1670           68 :         let mut modification = tline.begin_modification(initdb_lsn);
    1671           68 :         modification
    1672           68 :             .init_empty_test_timeline()
    1673           68 :             .context("init_empty_test_timeline")?;
    1674           68 :         modification
    1675           68 :             .commit(ctx)
    1676           34 :             .await
    1677           68 :             .context("commit init_empty_test_timeline modification")?;
    1678              : 
    1679              :         // Flush to disk so that uninit_tl's check for valid disk_consistent_lsn passes.
    1680           68 :         tline.maybe_spawn_flush_loop();
    1681           68 :         tline.freeze_and_flush().await.context("freeze_and_flush")?;
    1682              : 
    1683              :         // Make sure the freeze_and_flush reaches remote storage.
    1684           68 :         tline
    1685           68 :             .remote_client
    1686           68 :             .as_ref()
    1687           68 :             .unwrap()
    1688           68 :             .wait_completion()
    1689           31 :             .await
    1690           68 :             .unwrap();
    1691              : 
    1692           68 :         let tl = uninit_tl.finish_creation()?;
    1693              :         // The non-test code would call tl.activate() here.
    1694           68 :         tl.set_state(TimelineState::Active);
    1695           68 :         Ok(tl)
    1696           68 :     }
    1697              : 
    1698              :     /// Create a new timeline.
    1699              :     ///
    1700              :     /// Returns the new timeline ID and reference to its Timeline object.
    1701              :     ///
    1702              :     /// If the caller specified the timeline ID to use (`new_timeline_id`), and timeline with
    1703              :     /// the same timeline ID already exists, returns CreateTimelineError::AlreadyExists.
    1704              :     #[allow(clippy::too_many_arguments)]
    1705          893 :     pub(crate) async fn create_timeline(
    1706          893 :         &self,
    1707          893 :         new_timeline_id: TimelineId,
    1708          893 :         ancestor_timeline_id: Option<TimelineId>,
    1709          893 :         mut ancestor_start_lsn: Option<Lsn>,
    1710          893 :         pg_version: u32,
    1711          893 :         load_existing_initdb: Option<TimelineId>,
    1712          893 :         broker_client: storage_broker::BrokerClientChannel,
    1713          893 :         ctx: &RequestContext,
    1714          893 :     ) -> Result<Arc<Timeline>, CreateTimelineError> {
    1715          893 :         if !self.is_active() {
    1716            0 :             if matches!(self.current_state(), TenantState::Stopping { .. }) {
    1717            0 :                 return Err(CreateTimelineError::ShuttingDown);
    1718              :             } else {
    1719            0 :                 return Err(CreateTimelineError::Other(anyhow::anyhow!(
    1720            0 :                     "Cannot create timelines on inactive tenant"
    1721            0 :                 )));
    1722              :             }
    1723          893 :         }
    1724              : 
    1725          893 :         let _gate = self
    1726          893 :             .gate
    1727          893 :             .enter()
    1728          893 :             .map_err(|_| CreateTimelineError::ShuttingDown)?;
    1729              : 
    1730              :         // Get exclusive access to the timeline ID: this ensures that it does not already exist,
    1731              :         // and that no other creation attempts will be allowed in while we are working.  The
    1732              :         // uninit_mark is a guard.
    1733          893 :         let uninit_mark = match self.create_timeline_uninit_mark(new_timeline_id) {
    1734          865 :             Ok(m) => m,
    1735              :             Err(TimelineExclusionError::AlreadyCreating) => {
    1736              :                 // Creation is in progress, we cannot create it again, and we cannot
    1737              :                 // check if this request matches the existing one, so caller must try
    1738              :                 // again later.
    1739            1 :                 return Err(CreateTimelineError::AlreadyCreating);
    1740              :             }
    1741            0 :             Err(TimelineExclusionError::Other(e)) => {
    1742            0 :                 return Err(CreateTimelineError::Other(e));
    1743              :             }
    1744           27 :             Err(TimelineExclusionError::AlreadyExists(existing)) => {
    1745            0 :                 debug!("timeline {new_timeline_id} already exists");
    1746              : 
    1747              :                 // Idempotency: creating the same timeline twice is not an error, unless
    1748              :                 // the second creation has different parameters.
    1749           27 :                 if existing.get_ancestor_timeline_id() != ancestor_timeline_id
    1750           27 :                     || existing.pg_version != pg_version
    1751           27 :                     || (ancestor_start_lsn.is_some()
    1752            0 :                         && ancestor_start_lsn != Some(existing.get_ancestor_lsn()))
    1753              :                 {
    1754            0 :                     return Err(CreateTimelineError::Conflict);
    1755           27 :                 }
    1756              : 
    1757           27 :                 if let Some(remote_client) = existing.remote_client.as_ref() {
    1758              :                     // Wait for uploads to complete, so that when we return Ok, the timeline
    1759              :                     // is known to be durable on remote storage. Just like we do at the end of
    1760              :                     // this function, after we have created the timeline ourselves.
    1761              :                     //
    1762              :                     // We only really care that the initial version of `index_part.json` has
    1763              :                     // been uploaded. That's enough to remember that the timeline
    1764              :                     // exists. However, there is no function to wait specifically for that so
    1765              :                     // we just wait for all in-progress uploads to finish.
    1766           27 :                     remote_client
    1767           27 :                         .wait_completion()
    1768            0 :                         .await
    1769           27 :                         .context("wait for timeline uploads to complete")?;
    1770            0 :                 }
    1771              : 
    1772           27 :                 return Ok(existing);
    1773              :             }
    1774              :         };
    1775              : 
    1776          865 :         let loaded_timeline = match ancestor_timeline_id {
    1777          270 :             Some(ancestor_timeline_id) => {
    1778          270 :                 let ancestor_timeline = self
    1779          270 :                     .get_timeline(ancestor_timeline_id, false)
    1780          270 :                     .context("Cannot branch off the timeline that's not present in pageserver")?;
    1781              : 
    1782              :                 // instead of waiting around, just deny the request because ancestor is not yet
    1783              :                 // ready for other purposes either.
    1784          270 :                 if !ancestor_timeline.is_active() {
    1785            6 :                     return Err(CreateTimelineError::AncestorNotActive);
    1786          264 :                 }
    1787              : 
    1788          264 :                 if let Some(lsn) = ancestor_start_lsn.as_mut() {
    1789           33 :                     *lsn = lsn.align();
    1790           33 : 
    1791           33 :                     let ancestor_ancestor_lsn = ancestor_timeline.get_ancestor_lsn();
    1792           33 :                     if ancestor_ancestor_lsn > *lsn {
    1793              :                         // can we safely just branch from the ancestor instead?
    1794            2 :                         return Err(CreateTimelineError::AncestorLsn(anyhow::anyhow!(
    1795            2 :                             "invalid start lsn {} for ancestor timeline {}: less than timeline ancestor lsn {}",
    1796            2 :                             lsn,
    1797            2 :                             ancestor_timeline_id,
    1798            2 :                             ancestor_ancestor_lsn,
    1799            2 :                         )));
    1800           31 :                     }
    1801           31 : 
    1802           31 :                     // Wait for the WAL to arrive and be processed on the parent branch up
    1803           31 :                     // to the requested branch point. The repository code itself doesn't
    1804           31 :                     // require it, but if we start to receive WAL on the new timeline,
    1805           31 :                     // decoding the new WAL might need to look up previous pages, relation
    1806           31 :                     // sizes etc. and that would get confused if the previous page versions
    1807           31 :                     // are not in the repository yet.
    1808           31 :                     ancestor_timeline
    1809           31 :                         .wait_lsn(*lsn, ctx)
    1810            5 :                         .await
    1811           31 :                         .map_err(|e| match e {
    1812            0 :                             e @ (WaitLsnError::Timeout(_) | WaitLsnError::BadState) => {
    1813            0 :                                 CreateTimelineError::AncestorLsn(anyhow::anyhow!(e))
    1814              :                             }
    1815            0 :                             WaitLsnError::Shutdown => CreateTimelineError::ShuttingDown,
    1816           31 :                         })?;
    1817          231 :                 }
    1818              : 
    1819          262 :                 self.branch_timeline(
    1820          262 :                     &ancestor_timeline,
    1821          262 :                     new_timeline_id,
    1822          262 :                     ancestor_start_lsn,
    1823          262 :                     uninit_mark,
    1824          262 :                     ctx,
    1825          262 :                 )
    1826            6 :                 .await?
    1827              :             }
    1828              :             None => {
    1829          595 :                 self.bootstrap_timeline(
    1830          595 :                     new_timeline_id,
    1831          595 :                     pg_version,
    1832          595 :                     load_existing_initdb,
    1833          595 :                     uninit_mark,
    1834          595 :                     ctx,
    1835          595 :                 )
    1836      7640121 :                 .await?
    1837              :             }
    1838              :         };
    1839              : 
    1840              :         // At this point we have dropped our guard on [`Self::timelines_creating`], and
    1841              :         // the timeline is visible in [`Self::timelines`], but it is _not_ durable yet.  We must
    1842              :         // not send a success to the caller until it is.  The same applies to handling retries,
    1843              :         // see the handling of [`TimelineExclusionError::AlreadyExists`] above.
    1844          843 :         if let Some(remote_client) = loaded_timeline.remote_client.as_ref() {
    1845          843 :             let kind = ancestor_timeline_id
    1846          843 :                 .map(|_| "branched")
    1847          843 :                 .unwrap_or("bootstrapped");
    1848          843 :             remote_client.wait_completion().await.with_context(|| {
    1849            0 :                 format!("wait for {} timeline initial uploads to complete", kind)
    1850          839 :             })?;
    1851            0 :         }
    1852              : 
    1853          839 :         loaded_timeline.activate(broker_client, None, ctx);
    1854          839 : 
    1855          839 :         Ok(loaded_timeline)
    1856          887 :     }
    1857              : 
    1858           64 :     pub(crate) async fn delete_timeline(
    1859           64 :         self: Arc<Self>,
    1860           64 :         timeline_id: TimelineId,
    1861           64 :     ) -> Result<(), DeleteTimelineError> {
    1862          378 :         DeleteTimelineFlow::run(&self, timeline_id, false).await?;
    1863              : 
    1864           53 :         Ok(())
    1865           64 :     }
    1866              : 
    1867              :     /// perform one garbage collection iteration, removing old data files from disk.
    1868              :     /// this function is periodically called by gc task.
    1869              :     /// also it can be explicitly requested through page server api 'do_gc' command.
    1870              :     ///
    1871              :     /// `target_timeline_id` specifies the timeline to GC, or None for all.
    1872              :     ///
    1873              :     /// The `horizon` an `pitr` parameters determine how much WAL history needs to be retained.
    1874              :     /// Also known as the retention period, or the GC cutoff point. `horizon` specifies
    1875              :     /// the amount of history, as LSN difference from current latest LSN on each timeline.
    1876              :     /// `pitr` specifies the same as a time difference from the current time. The effective
    1877              :     /// GC cutoff point is determined conservatively by either `horizon` and `pitr`, whichever
    1878              :     /// requires more history to be retained.
    1879              :     //
    1880          304 :     pub async fn gc_iteration(
    1881          304 :         &self,
    1882          304 :         target_timeline_id: Option<TimelineId>,
    1883          304 :         horizon: u64,
    1884          304 :         pitr: Duration,
    1885          304 :         cancel: &CancellationToken,
    1886          304 :         ctx: &RequestContext,
    1887          304 :     ) -> anyhow::Result<GcResult> {
    1888          304 :         // Don't start doing work during shutdown
    1889          304 :         if let TenantState::Stopping { .. } = self.current_state() {
    1890            0 :             return Ok(GcResult::default());
    1891          304 :         }
    1892          304 : 
    1893          304 :         // there is a global allowed_error for this
    1894          304 :         anyhow::ensure!(
    1895          304 :             self.is_active(),
    1896            0 :             "Cannot run GC iteration on inactive tenant"
    1897              :         );
    1898              : 
    1899              :         {
    1900          304 :             let conf = self.tenant_conf.read().unwrap();
    1901          304 : 
    1902          304 :             if !conf.location.may_delete_layers_hint() {
    1903            2 :                 info!("Skipping GC in location state {:?}", conf.location);
    1904            2 :                 return Ok(GcResult::default());
    1905          302 :             }
    1906          302 :         }
    1907          302 : 
    1908          302 :         self.gc_iteration_internal(target_timeline_id, horizon, pitr, cancel, ctx)
    1909       264491 :             .await
    1910          303 :     }
    1911              : 
    1912              :     /// Perform one compaction iteration.
    1913              :     /// This function is periodically called by compactor task.
    1914              :     /// Also it can be explicitly requested per timeline through page server
    1915              :     /// api's 'compact' command.
    1916          390 :     async fn compaction_iteration(
    1917          390 :         &self,
    1918          390 :         cancel: &CancellationToken,
    1919          390 :         ctx: &RequestContext,
    1920          390 :     ) -> anyhow::Result<(), timeline::CompactionError> {
    1921          390 :         // Don't start doing work during shutdown, or when broken, we do not need those in the logs
    1922          390 :         if !self.is_active() {
    1923            1 :             return Ok(());
    1924          389 :         }
    1925          389 : 
    1926          389 :         {
    1927          389 :             let conf = self.tenant_conf.read().unwrap();
    1928          389 :             if !conf.location.may_delete_layers_hint() || !conf.location.may_upload_layers_hint() {
    1929            6 :                 info!("Skipping compaction in location state {:?}", conf.location);
    1930            6 :                 return Ok(());
    1931          383 :             }
    1932          383 :         }
    1933          383 : 
    1934          383 :         // Scan through the hashmap and collect a list of all the timelines,
    1935          383 :         // while holding the lock. Then drop the lock and actually perform the
    1936          383 :         // compactions.  We don't want to block everything else while the
    1937          383 :         // compaction runs.
    1938          383 :         let timelines_to_compact = {
    1939          383 :             let timelines = self.timelines.lock().unwrap();
    1940          383 :             let timelines_to_compact = timelines
    1941          383 :                 .iter()
    1942          566 :                 .filter_map(|(timeline_id, timeline)| {
    1943          566 :                     if timeline.is_active() {
    1944          563 :                         Some((*timeline_id, timeline.clone()))
    1945              :                     } else {
    1946            3 :                         None
    1947              :                     }
    1948          566 :                 })
    1949          383 :                 .collect::<Vec<_>>();
    1950          383 :             drop(timelines);
    1951          383 :             timelines_to_compact
    1952              :         };
    1953              : 
    1954          942 :         for (timeline_id, timeline) in &timelines_to_compact {
    1955          561 :             timeline
    1956          561 :                 .compact(cancel, EnumSet::empty(), ctx)
    1957          561 :                 .instrument(info_span!("compact_timeline", %timeline_id))
    1958       128297 :                 .await?;
    1959              :         }
    1960              : 
    1961          381 :         Ok(())
    1962          388 :     }
    1963              : 
    1964        34368 :     pub fn current_state(&self) -> TenantState {
    1965        34368 :         self.state.borrow().clone()
    1966        34368 :     }
    1967              : 
    1968         2633 :     pub fn is_active(&self) -> bool {
    1969         2633 :         self.current_state() == TenantState::Active
    1970         2633 :     }
    1971              : 
    1972          726 :     pub fn generation(&self) -> Generation {
    1973          726 :         self.generation
    1974          726 :     }
    1975              : 
    1976          486 :     pub(crate) fn wal_redo_manager_status(&self) -> Option<WalRedoManagerStatus> {
    1977          486 :         self.walredo_mgr.as_ref().and_then(|mgr| mgr.status())
    1978          486 :     }
    1979              : 
    1980              :     /// Changes tenant status to active, unless shutdown was already requested.
    1981              :     ///
    1982              :     /// `background_jobs_can_start` is an optional barrier set to a value during pageserver startup
    1983              :     /// to delay background jobs. Background jobs can be started right away when None is given.
    1984          830 :     fn activate(
    1985          830 :         self: &Arc<Self>,
    1986          830 :         broker_client: BrokerClientChannel,
    1987          830 :         background_jobs_can_start: Option<&completion::Barrier>,
    1988          830 :         ctx: &RequestContext,
    1989          830 :     ) {
    1990          830 :         span::debug_assert_current_span_has_tenant_id();
    1991          830 : 
    1992          830 :         let mut activating = false;
    1993          830 :         self.state.send_modify(|current_state| {
    1994          830 :             use pageserver_api::models::ActivatingFrom;
    1995          830 :             match &*current_state {
    1996              :                 TenantState::Activating(_) | TenantState::Active | TenantState::Broken { .. } | TenantState::Stopping { .. } => {
    1997            0 :                     panic!("caller is responsible for calling activate() only on Loading / Attaching tenants, got {state:?}", state = current_state);
    1998              :                 }
    1999            0 :                 TenantState::Loading => {
    2000            0 :                     *current_state = TenantState::Activating(ActivatingFrom::Loading);
    2001            0 :                 }
    2002          830 :                 TenantState::Attaching => {
    2003          830 :                     *current_state = TenantState::Activating(ActivatingFrom::Attaching);
    2004          830 :                 }
    2005              :             }
    2006          830 :             debug!(tenant_id = %self.tenant_shard_id.tenant_id, shard_id = %self.tenant_shard_id.shard_slug(), "Activating tenant");
    2007          830 :             activating = true;
    2008          830 :             // Continue outside the closure. We need to grab timelines.lock()
    2009          830 :             // and we plan to turn it into a tokio::sync::Mutex in a future patch.
    2010          830 :         });
    2011          830 : 
    2012          830 :         if activating {
    2013          830 :             let timelines_accessor = self.timelines.lock().unwrap();
    2014          830 :             let timelines_to_activate = timelines_accessor
    2015          830 :                 .values()
    2016          830 :                 .filter(|timeline| !(timeline.is_broken() || timeline.is_stopping()));
    2017          830 : 
    2018          830 :             // Spawn gc and compaction loops. The loops will shut themselves
    2019          830 :             // down when they notice that the tenant is inactive.
    2020          830 :             tasks::start_background_loops(self, background_jobs_can_start);
    2021          830 : 
    2022          830 :             let mut activated_timelines = 0;
    2023              : 
    2024         1216 :             for timeline in timelines_to_activate {
    2025          386 :                 timeline.activate(broker_client.clone(), background_jobs_can_start, ctx);
    2026          386 :                 activated_timelines += 1;
    2027          386 :             }
    2028              : 
    2029          830 :             self.state.send_modify(move |current_state| {
    2030          830 :                 assert!(
    2031          830 :                     matches!(current_state, TenantState::Activating(_)),
    2032            0 :                     "set_stopping and set_broken wait for us to leave Activating state",
    2033              :                 );
    2034          830 :                 *current_state = TenantState::Active;
    2035          830 : 
    2036          830 :                 let elapsed = self.constructed_at.elapsed();
    2037          830 :                 let total_timelines = timelines_accessor.len();
    2038          830 : 
    2039          830 :                 // log a lot of stuff, because some tenants sometimes suffer from user-visible
    2040          830 :                 // times to activate. see https://github.com/neondatabase/neon/issues/4025
    2041          830 :                 info!(
    2042          830 :                     since_creation_millis = elapsed.as_millis(),
    2043          830 :                     tenant_id = %self.tenant_shard_id.tenant_id,
    2044          830 :                     shard_id = %self.tenant_shard_id.shard_slug(),
    2045          830 :                     activated_timelines,
    2046          830 :                     total_timelines,
    2047          830 :                     post_state = <&'static str>::from(&*current_state),
    2048          830 :                     "activation attempt finished"
    2049          830 :                 );
    2050              : 
    2051          830 :                 TENANT.activation.observe(elapsed.as_secs_f64());
    2052          830 :             });
    2053            0 :         }
    2054          830 :     }
    2055              : 
    2056              :     /// Shutdown the tenant and join all of the spawned tasks.
    2057              :     ///
    2058              :     /// The method caters for all use-cases:
    2059              :     /// - pageserver shutdown (freeze_and_flush == true)
    2060              :     /// - detach + ignore (freeze_and_flush == false)
    2061              :     ///
    2062              :     /// This will attempt to shutdown even if tenant is broken.
    2063              :     ///
    2064              :     /// `shutdown_progress` is a [`completion::Barrier`] for the shutdown initiated by this call.
    2065              :     /// If the tenant is already shutting down, we return a clone of the first shutdown call's
    2066              :     /// `Barrier` as an `Err`. This not-first caller can use the returned barrier to join with
    2067              :     /// the ongoing shutdown.
    2068          449 :     async fn shutdown(
    2069          449 :         &self,
    2070          449 :         shutdown_progress: completion::Barrier,
    2071          449 :         freeze_and_flush: bool,
    2072          449 :     ) -> Result<(), completion::Barrier> {
    2073          449 :         span::debug_assert_current_span_has_tenant_id();
    2074          449 : 
    2075          449 :         // Set tenant (and its timlines) to Stoppping state.
    2076          449 :         //
    2077          449 :         // Since we can only transition into Stopping state after activation is complete,
    2078          449 :         // run it in a JoinSet so all tenants have a chance to stop before we get SIGKILLed.
    2079          449 :         //
    2080          449 :         // Transitioning tenants to Stopping state has a couple of non-obvious side effects:
    2081          449 :         // 1. Lock out any new requests to the tenants.
    2082          449 :         // 2. Signal cancellation to WAL receivers (we wait on it below).
    2083          449 :         // 3. Signal cancellation for other tenant background loops.
    2084          449 :         // 4. ???
    2085          449 :         //
    2086          449 :         // The waiting for the cancellation is not done uniformly.
    2087          449 :         // We certainly wait for WAL receivers to shut down.
    2088          449 :         // That is necessary so that no new data comes in before the freeze_and_flush.
    2089          449 :         // But the tenant background loops are joined-on in our caller.
    2090          449 :         // It's mesed up.
    2091          449 :         // we just ignore the failure to stop
    2092          449 : 
    2093          449 :         // If we're still attaching, fire the cancellation token early to drop out: this
    2094          449 :         // will prevent us flushing, but ensures timely shutdown if some I/O during attach
    2095          449 :         // is very slow.
    2096          449 :         if matches!(self.current_state(), TenantState::Attaching) {
    2097           14 :             self.cancel.cancel();
    2098          435 :         }
    2099              : 
    2100          449 :         match self.set_stopping(shutdown_progress, false, false).await {
    2101          392 :             Ok(()) => {}
    2102           57 :             Err(SetStoppingError::Broken) => {
    2103           57 :                 // assume that this is acceptable
    2104           57 :             }
    2105            0 :             Err(SetStoppingError::AlreadyStopping(other)) => {
    2106              :                 // give caller the option to wait for this this shutdown
    2107            0 :                 info!("Tenant::shutdown: AlreadyStopping");
    2108            0 :                 return Err(other);
    2109              :             }
    2110              :         };
    2111              : 
    2112          449 :         let mut js = tokio::task::JoinSet::new();
    2113          449 :         {
    2114          449 :             let timelines = self.timelines.lock().unwrap();
    2115          575 :             timelines.values().for_each(|timeline| {
    2116          575 :                 let timeline = Arc::clone(timeline);
    2117          575 :                 let timeline_id = timeline.timeline_id;
    2118              : 
    2119          575 :                 let span =
    2120          575 :                     tracing::info_span!("timeline_shutdown", %timeline_id, ?freeze_and_flush);
    2121          575 :                 js.spawn(async move {
    2122          575 :                     if freeze_and_flush {
    2123          829 :                         timeline.flush_and_shutdown().instrument(span).await
    2124              :                     } else {
    2125          786 :                         timeline.shutdown().instrument(span).await
    2126              :                     }
    2127          575 :                 });
    2128          575 :             })
    2129              :         };
    2130              :         // test_long_timeline_create_then_tenant_delete is leaning on this message
    2131          449 :         tracing::info!("Waiting for timelines...");
    2132         1024 :         while let Some(res) = js.join_next().await {
    2133            0 :             match res {
    2134          575 :                 Ok(()) => {}
    2135            0 :                 Err(je) if je.is_cancelled() => unreachable!("no cancelling used"),
    2136            0 :                 Err(je) if je.is_panic() => { /* logged already */ }
    2137            0 :                 Err(je) => warn!("unexpected JoinError: {je:?}"),
    2138              :             }
    2139              :         }
    2140              : 
    2141              :         // We cancel the Tenant's cancellation token _after_ the timelines have all shut down.  This permits
    2142              :         // them to continue to do work during their shutdown methods, e.g. flushing data.
    2143            0 :         tracing::debug!("Cancelling CancellationToken");
    2144          449 :         self.cancel.cancel();
    2145              : 
    2146              :         // shutdown all tenant and timeline tasks: gc, compaction, page service
    2147              :         // No new tasks will be started for this tenant because it's in `Stopping` state.
    2148              :         //
    2149              :         // this will additionally shutdown and await all timeline tasks.
    2150            0 :         tracing::debug!("Waiting for tasks...");
    2151          449 :         task_mgr::shutdown_tasks(None, Some(self.tenant_shard_id), None).await;
    2152              : 
    2153              :         // Wait for any in-flight operations to complete
    2154          449 :         self.gate.close().await;
    2155              : 
    2156          449 :         Ok(())
    2157          449 :     }
    2158              : 
    2159              :     /// Change tenant status to Stopping, to mark that it is being shut down.
    2160              :     ///
    2161              :     /// This function waits for the tenant to become active if it isn't already, before transitioning it into Stopping state.
    2162              :     ///
    2163              :     /// This function is not cancel-safe!
    2164              :     ///
    2165              :     /// `allow_transition_from_loading` is needed for the special case of loading task deleting the tenant.
    2166              :     /// `allow_transition_from_attaching` is needed for the special case of attaching deleted tenant.
    2167          470 :     async fn set_stopping(
    2168          470 :         &self,
    2169          470 :         progress: completion::Barrier,
    2170          470 :         allow_transition_from_loading: bool,
    2171          470 :         allow_transition_from_attaching: bool,
    2172          470 :     ) -> Result<(), SetStoppingError> {
    2173          470 :         let mut rx = self.state.subscribe();
    2174          470 : 
    2175          470 :         // cannot stop before we're done activating, so wait out until we're done activating
    2176          493 :         rx.wait_for(|state| match state {
    2177           21 :             TenantState::Attaching if allow_transition_from_attaching => true,
    2178              :             TenantState::Activating(_) | TenantState::Attaching => {
    2179           23 :                 info!(
    2180           23 :                     "waiting for {} to turn Active|Broken|Stopping",
    2181           23 :                     <&'static str>::from(state)
    2182           23 :                 );
    2183           23 :                 false
    2184              :             }
    2185            0 :             TenantState::Loading => allow_transition_from_loading,
    2186          449 :             TenantState::Active | TenantState::Broken { .. } | TenantState::Stopping { .. } => true,
    2187          493 :         })
    2188           23 :         .await
    2189          470 :         .expect("cannot drop self.state while on a &self method");
    2190          470 : 
    2191          470 :         // we now know we're done activating, let's see whether this task is the winner to transition into Stopping
    2192          470 :         let mut err = None;
    2193          470 :         let stopping = self.state.send_if_modified(|current_state| match current_state {
    2194              :             TenantState::Activating(_) => {
    2195            0 :                 unreachable!("1we ensured above that we're done with activation, and, there is no re-activation")
    2196              :             }
    2197              :             TenantState::Attaching => {
    2198           21 :                 if !allow_transition_from_attaching {
    2199            0 :                     unreachable!("2we ensured above that we're done with activation, and, there is no re-activation")
    2200           21 :                 };
    2201           21 :                 *current_state = TenantState::Stopping { progress };
    2202           21 :                 true
    2203              :             }
    2204              :             TenantState::Loading => {
    2205            0 :                 if !allow_transition_from_loading {
    2206            0 :                     unreachable!("3we ensured above that we're done with activation, and, there is no re-activation")
    2207            0 :                 };
    2208            0 :                 *current_state = TenantState::Stopping { progress };
    2209            0 :                 true
    2210              :             }
    2211              :             TenantState::Active => {
    2212              :                 // FIXME: due to time-of-check vs time-of-use issues, it can happen that new timelines
    2213              :                 // are created after the transition to Stopping. That's harmless, as the Timelines
    2214              :                 // won't be accessible to anyone afterwards, because the Tenant is in Stopping state.
    2215          392 :                 *current_state = TenantState::Stopping { progress };
    2216          392 :                 // Continue stopping outside the closure. We need to grab timelines.lock()
    2217          392 :                 // and we plan to turn it into a tokio::sync::Mutex in a future patch.
    2218          392 :                 true
    2219              :             }
    2220           57 :             TenantState::Broken { reason, .. } => {
    2221           57 :                 info!(
    2222           57 :                     "Cannot set tenant to Stopping state, it is in Broken state due to: {reason}"
    2223           57 :                 );
    2224           57 :                 err = Some(SetStoppingError::Broken);
    2225           57 :                 false
    2226              :             }
    2227            0 :             TenantState::Stopping { progress } => {
    2228            0 :                 info!("Tenant is already in Stopping state");
    2229            0 :                 err = Some(SetStoppingError::AlreadyStopping(progress.clone()));
    2230            0 :                 false
    2231              :             }
    2232          470 :         });
    2233          470 :         match (stopping, err) {
    2234          413 :             (true, None) => {} // continue
    2235           57 :             (false, Some(err)) => return Err(err),
    2236            0 :             (true, Some(_)) => unreachable!(
    2237            0 :                 "send_if_modified closure must error out if not transitioning to Stopping"
    2238            0 :             ),
    2239            0 :             (false, None) => unreachable!(
    2240            0 :                 "send_if_modified closure must return true if transitioning to Stopping"
    2241            0 :             ),
    2242              :         }
    2243              : 
    2244          413 :         let timelines_accessor = self.timelines.lock().unwrap();
    2245          413 :         let not_broken_timelines = timelines_accessor
    2246          413 :             .values()
    2247          515 :             .filter(|timeline| !timeline.is_broken());
    2248          919 :         for timeline in not_broken_timelines {
    2249          506 :             timeline.set_state(TimelineState::Stopping);
    2250          506 :         }
    2251          413 :         Ok(())
    2252          470 :     }
    2253              : 
    2254              :     /// Method for tenant::mgr to transition us into Broken state in case of a late failure in
    2255              :     /// `remove_tenant_from_memory`
    2256              :     ///
    2257              :     /// This function waits for the tenant to become active if it isn't already, before transitioning it into Stopping state.
    2258              :     ///
    2259              :     /// In tests, we also use this to set tenants to Broken state on purpose.
    2260           51 :     pub(crate) async fn set_broken(&self, reason: String) {
    2261           51 :         let mut rx = self.state.subscribe();
    2262           51 : 
    2263           51 :         // The load & attach routines own the tenant state until it has reached `Active`.
    2264           51 :         // So, wait until it's done.
    2265           51 :         rx.wait_for(|state| match state {
    2266              :             TenantState::Activating(_) | TenantState::Loading | TenantState::Attaching => {
    2267            0 :                 info!(
    2268            0 :                     "waiting for {} to turn Active|Broken|Stopping",
    2269            0 :                     <&'static str>::from(state)
    2270            0 :                 );
    2271            0 :                 false
    2272              :             }
    2273           51 :             TenantState::Active | TenantState::Broken { .. } | TenantState::Stopping { .. } => true,
    2274           51 :         })
    2275            0 :         .await
    2276           51 :         .expect("cannot drop self.state while on a &self method");
    2277           51 : 
    2278           51 :         // we now know we're done activating, let's see whether this task is the winner to transition into Broken
    2279           51 :         self.set_broken_no_wait(reason)
    2280           51 :     }
    2281              : 
    2282           51 :     pub(crate) fn set_broken_no_wait(&self, reason: impl Display) {
    2283           51 :         let reason = reason.to_string();
    2284           51 :         self.state.send_modify(|current_state| {
    2285           51 :             match *current_state {
    2286              :                 TenantState::Activating(_) | TenantState::Loading | TenantState::Attaching => {
    2287            0 :                     unreachable!("we ensured above that we're done with activation, and, there is no re-activation")
    2288              :                 }
    2289              :                 TenantState::Active => {
    2290            2 :                     if cfg!(feature = "testing") {
    2291            2 :                         warn!("Changing Active tenant to Broken state, reason: {}", reason);
    2292            2 :                         *current_state = TenantState::broken_from_reason(reason);
    2293              :                     } else {
    2294            0 :                         unreachable!("not allowed to call set_broken on Active tenants in non-testing builds")
    2295              :                     }
    2296              :                 }
    2297              :                 TenantState::Broken { .. } => {
    2298            0 :                     warn!("Tenant is already in Broken state");
    2299              :                 }
    2300              :                 // This is the only "expected" path, any other path is a bug.
    2301              :                 TenantState::Stopping { .. } => {
    2302           49 :                     warn!(
    2303           49 :                         "Marking Stopping tenant as Broken state, reason: {}",
    2304           49 :                         reason
    2305           49 :                     );
    2306           49 :                     *current_state = TenantState::broken_from_reason(reason);
    2307              :                 }
    2308              :            }
    2309           51 :         });
    2310           51 :     }
    2311              : 
    2312          387 :     pub fn subscribe_for_state_updates(&self) -> watch::Receiver<TenantState> {
    2313          387 :         self.state.subscribe()
    2314          387 :     }
    2315              : 
    2316              :     /// The activate_now semaphore is initialized with zero units.  As soon as
    2317              :     /// we add a unit, waiters will be able to acquire a unit and proceed.
    2318          593 :     pub(crate) fn activate_now(&self) {
    2319          593 :         self.activate_now_sem.add_permits(1);
    2320          593 :     }
    2321              : 
    2322         1216 :     pub(crate) async fn wait_to_become_active(
    2323         1216 :         &self,
    2324         1216 :         timeout: Duration,
    2325         1216 :     ) -> Result<(), GetActiveTenantError> {
    2326         1216 :         let mut receiver = self.state.subscribe();
    2327         1672 :         loop {
    2328         1672 :             let current_state = receiver.borrow_and_update().clone();
    2329         1672 :             match current_state {
    2330              :                 TenantState::Loading | TenantState::Attaching | TenantState::Activating(_) => {
    2331              :                     // in these states, there's a chance that we can reach ::Active
    2332          460 :                     self.activate_now();
    2333          597 :                     match timeout_cancellable(timeout, &self.cancel, receiver.changed()).await {
    2334          456 :                         Ok(r) => {
    2335          456 :                             r.map_err(
    2336          456 :                             |_e: tokio::sync::watch::error::RecvError|
    2337              :                                 // Tenant existed but was dropped: report it as non-existent
    2338          456 :                                 GetActiveTenantError::NotFound(GetTenantError::NotFound(self.tenant_shard_id.tenant_id))
    2339          456 :                         )?
    2340              :                         }
    2341              :                         Err(TimeoutCancellableError::Cancelled) => {
    2342            4 :                             return Err(GetActiveTenantError::Cancelled);
    2343              :                         }
    2344              :                         Err(TimeoutCancellableError::Timeout) => {
    2345            0 :                             return Err(GetActiveTenantError::WaitForActiveTimeout {
    2346            0 :                                 latest_state: Some(self.current_state()),
    2347            0 :                                 wait_time: timeout,
    2348            0 :                             });
    2349              :                         }
    2350              :                     }
    2351              :                 }
    2352              :                 TenantState::Active { .. } => {
    2353         1211 :                     return Ok(());
    2354              :                 }
    2355              :                 TenantState::Broken { .. } | TenantState::Stopping { .. } => {
    2356              :                     // There's no chance the tenant can transition back into ::Active
    2357            1 :                     return Err(GetActiveTenantError::WillNotBecomeActive(current_state));
    2358              :                 }
    2359              :             }
    2360              :         }
    2361         1216 :     }
    2362              : 
    2363           74 :     pub(crate) fn get_attach_mode(&self) -> AttachmentMode {
    2364           74 :         self.tenant_conf.read().unwrap().location.attach_mode
    2365           74 :     }
    2366              : 
    2367              :     /// For API access: generate a LocationConfig equivalent to the one that would be used to
    2368              :     /// create a Tenant in the same state.  Do not use this in hot paths: it's for relatively
    2369              :     /// rare external API calls, like a reconciliation at startup.
    2370            4 :     pub(crate) fn get_location_conf(&self) -> models::LocationConfig {
    2371            4 :         let conf = self.tenant_conf.read().unwrap();
    2372              : 
    2373            4 :         let location_config_mode = match conf.location.attach_mode {
    2374            4 :             AttachmentMode::Single => models::LocationConfigMode::AttachedSingle,
    2375            0 :             AttachmentMode::Multi => models::LocationConfigMode::AttachedMulti,
    2376            0 :             AttachmentMode::Stale => models::LocationConfigMode::AttachedStale,
    2377              :         };
    2378              : 
    2379              :         // We have a pageserver TenantConf, we need the API-facing TenantConfig.
    2380            4 :         let tenant_config: models::TenantConfig = conf.tenant_conf.into();
    2381            4 : 
    2382            4 :         models::LocationConfig {
    2383            4 :             mode: location_config_mode,
    2384            4 :             generation: self.generation.into(),
    2385            4 :             secondary_conf: None,
    2386            4 :             shard_number: self.shard_identity.number.0,
    2387            4 :             shard_count: self.shard_identity.count.0,
    2388            4 :             shard_stripe_size: self.shard_identity.stripe_size.0,
    2389            4 :             tenant_conf: tenant_config,
    2390            4 :         }
    2391            4 :     }
    2392              : 
    2393           48 :     pub(crate) fn get_tenant_shard_id(&self) -> &TenantShardId {
    2394           48 :         &self.tenant_shard_id
    2395           48 :     }
    2396              : 
    2397            8 :     pub(crate) fn get_generation(&self) -> Generation {
    2398            8 :         self.generation
    2399            8 :     }
    2400              : }
    2401              : 
    2402              : /// Given a Vec of timelines and their ancestors (timeline_id, ancestor_id),
    2403              : /// perform a topological sort, so that the parent of each timeline comes
    2404              : /// before the children.
    2405              : /// E extracts the ancestor from T
    2406              : /// This allows for T to be different. It can be TimelineMetadata, can be Timeline itself, etc.
    2407         1040 : fn tree_sort_timelines<T, E>(
    2408         1040 :     timelines: HashMap<TimelineId, T>,
    2409         1040 :     extractor: E,
    2410         1040 : ) -> anyhow::Result<Vec<(TimelineId, T)>>
    2411         1040 : where
    2412         1040 :     E: Fn(&T) -> Option<TimelineId>,
    2413         1040 : {
    2414         1040 :     let mut result = Vec::with_capacity(timelines.len());
    2415         1040 : 
    2416         1040 :     let mut now = Vec::with_capacity(timelines.len());
    2417         1040 :     // (ancestor, children)
    2418         1040 :     let mut later: HashMap<TimelineId, Vec<(TimelineId, T)>> =
    2419         1040 :         HashMap::with_capacity(timelines.len());
    2420              : 
    2421         1615 :     for (timeline_id, value) in timelines {
    2422          575 :         if let Some(ancestor_id) = extractor(&value) {
    2423           57 :             let children = later.entry(ancestor_id).or_default();
    2424           57 :             children.push((timeline_id, value));
    2425          518 :         } else {
    2426          518 :             now.push((timeline_id, value));
    2427          518 :         }
    2428              :     }
    2429              : 
    2430         1615 :     while let Some((timeline_id, metadata)) = now.pop() {
    2431          575 :         result.push((timeline_id, metadata));
    2432              :         // All children of this can be loaded now
    2433          575 :         if let Some(mut children) = later.remove(&timeline_id) {
    2434           46 :             now.append(&mut children);
    2435          529 :         }
    2436              :     }
    2437              : 
    2438              :     // All timelines should be visited now. Unless there were timelines with missing ancestors.
    2439         1040 :     if !later.is_empty() {
    2440            0 :         for (missing_id, orphan_ids) in later {
    2441            0 :             for (orphan_id, _) in orphan_ids {
    2442            0 :                 error!("could not load timeline {orphan_id} because its ancestor timeline {missing_id} could not be loaded");
    2443              :             }
    2444              :         }
    2445            0 :         bail!("could not load tenant because some timelines are missing ancestors");
    2446         1040 :     }
    2447         1040 : 
    2448         1040 :     Ok(result)
    2449         1040 : }
    2450              : 
    2451              : impl Tenant {
    2452           90 :     pub fn tenant_specific_overrides(&self) -> TenantConfOpt {
    2453           90 :         self.tenant_conf.read().unwrap().tenant_conf
    2454           90 :     }
    2455              : 
    2456           45 :     pub fn effective_config(&self) -> TenantConf {
    2457           45 :         self.tenant_specific_overrides()
    2458           45 :             .merge(self.conf.default_tenant_conf)
    2459           45 :     }
    2460              : 
    2461            5 :     pub fn get_checkpoint_distance(&self) -> u64 {
    2462            5 :         let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
    2463            5 :         tenant_conf
    2464            5 :             .checkpoint_distance
    2465            5 :             .unwrap_or(self.conf.default_tenant_conf.checkpoint_distance)
    2466            5 :     }
    2467              : 
    2468            5 :     pub fn get_checkpoint_timeout(&self) -> Duration {
    2469            5 :         let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
    2470            5 :         tenant_conf
    2471            5 :             .checkpoint_timeout
    2472            5 :             .unwrap_or(self.conf.default_tenant_conf.checkpoint_timeout)
    2473            5 :     }
    2474              : 
    2475            5 :     pub fn get_compaction_target_size(&self) -> u64 {
    2476            5 :         let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
    2477            5 :         tenant_conf
    2478            5 :             .compaction_target_size
    2479            5 :             .unwrap_or(self.conf.default_tenant_conf.compaction_target_size)
    2480            5 :     }
    2481              : 
    2482         1210 :     pub fn get_compaction_period(&self) -> Duration {
    2483         1210 :         let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
    2484         1210 :         tenant_conf
    2485         1210 :             .compaction_period
    2486         1210 :             .unwrap_or(self.conf.default_tenant_conf.compaction_period)
    2487         1210 :     }
    2488              : 
    2489            5 :     pub fn get_compaction_threshold(&self) -> usize {
    2490            5 :         let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
    2491            5 :         tenant_conf
    2492            5 :             .compaction_threshold
    2493            5 :             .unwrap_or(self.conf.default_tenant_conf.compaction_threshold)
    2494            5 :     }
    2495              : 
    2496          522 :     pub fn get_gc_horizon(&self) -> u64 {
    2497          522 :         let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
    2498          522 :         tenant_conf
    2499          522 :             .gc_horizon
    2500          522 :             .unwrap_or(self.conf.default_tenant_conf.gc_horizon)
    2501          522 :     }
    2502              : 
    2503         1076 :     pub fn get_gc_period(&self) -> Duration {
    2504         1076 :         let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
    2505         1076 :         tenant_conf
    2506         1076 :             .gc_period
    2507         1076 :             .unwrap_or(self.conf.default_tenant_conf.gc_period)
    2508         1076 :     }
    2509              : 
    2510            5 :     pub fn get_image_creation_threshold(&self) -> usize {
    2511            5 :         let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
    2512            5 :         tenant_conf
    2513            5 :             .image_creation_threshold
    2514            5 :             .unwrap_or(self.conf.default_tenant_conf.image_creation_threshold)
    2515            5 :     }
    2516              : 
    2517          383 :     pub fn get_pitr_interval(&self) -> Duration {
    2518          383 :         let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
    2519          383 :         tenant_conf
    2520          383 :             .pitr_interval
    2521          383 :             .unwrap_or(self.conf.default_tenant_conf.pitr_interval)
    2522          383 :     }
    2523              : 
    2524        10025 :     pub fn get_trace_read_requests(&self) -> bool {
    2525        10025 :         let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
    2526        10025 :         tenant_conf
    2527        10025 :             .trace_read_requests
    2528        10025 :             .unwrap_or(self.conf.default_tenant_conf.trace_read_requests)
    2529        10025 :     }
    2530              : 
    2531           35 :     pub fn get_min_resident_size_override(&self) -> Option<u64> {
    2532           35 :         let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
    2533           35 :         tenant_conf
    2534           35 :             .min_resident_size_override
    2535           35 :             .or(self.conf.default_tenant_conf.min_resident_size_override)
    2536           35 :     }
    2537              : 
    2538          940 :     pub fn get_heatmap_period(&self) -> Option<Duration> {
    2539          940 :         let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
    2540          940 :         let heatmap_period = tenant_conf
    2541          940 :             .heatmap_period
    2542          940 :             .unwrap_or(self.conf.default_tenant_conf.heatmap_period);
    2543          940 :         if heatmap_period.is_zero() {
    2544          940 :             None
    2545              :         } else {
    2546            0 :             Some(heatmap_period)
    2547              :         }
    2548          940 :     }
    2549              : 
    2550           30 :     pub fn set_new_tenant_config(&self, new_tenant_conf: TenantConfOpt) {
    2551           30 :         self.tenant_conf.write().unwrap().tenant_conf = new_tenant_conf;
    2552           30 :         // Don't hold self.timelines.lock() during the notifies.
    2553           30 :         // There's no risk of deadlock right now, but there could be if we consolidate
    2554           30 :         // mutexes in struct Timeline in the future.
    2555           30 :         let timelines = self.list_timelines();
    2556           62 :         for timeline in timelines {
    2557           32 :             timeline.tenant_conf_updated();
    2558           32 :         }
    2559           30 :     }
    2560              : 
    2561           28 :     pub(crate) fn set_new_location_config(&self, new_conf: AttachedTenantConf) {
    2562           28 :         *self.tenant_conf.write().unwrap() = new_conf;
    2563           28 :         // Don't hold self.timelines.lock() during the notifies.
    2564           28 :         // There's no risk of deadlock right now, but there could be if we consolidate
    2565           28 :         // mutexes in struct Timeline in the future.
    2566           28 :         let timelines = self.list_timelines();
    2567           49 :         for timeline in timelines {
    2568           21 :             timeline.tenant_conf_updated();
    2569           21 :         }
    2570           28 :     }
    2571              : 
    2572              :     /// Helper function to create a new Timeline struct.
    2573              :     ///
    2574              :     /// The returned Timeline is in Loading state. The caller is responsible for
    2575              :     /// initializing any on-disk state, and for inserting the Timeline to the 'timelines'
    2576              :     /// map.
    2577              :     ///
    2578              :     /// `validate_ancestor == false` is used when a timeline is created for deletion
    2579              :     /// and we might not have the ancestor present anymore which is fine for to be
    2580              :     /// deleted timelines.
    2581         1568 :     fn create_timeline_struct(
    2582         1568 :         &self,
    2583         1568 :         new_timeline_id: TimelineId,
    2584         1568 :         new_metadata: &TimelineMetadata,
    2585         1568 :         ancestor: Option<Arc<Timeline>>,
    2586         1568 :         resources: TimelineResources,
    2587         1568 :         cause: CreateTimelineCause,
    2588         1568 :     ) -> anyhow::Result<Arc<Timeline>> {
    2589         1568 :         let state = match cause {
    2590              :             CreateTimelineCause::Load => {
    2591         1556 :                 let ancestor_id = new_metadata.ancestor_timeline();
    2592         1556 :                 anyhow::ensure!(
    2593         1556 :                     ancestor_id == ancestor.as_ref().map(|t| t.timeline_id),
    2594            0 :                     "Timeline's {new_timeline_id} ancestor {ancestor_id:?} was not found"
    2595              :                 );
    2596         1556 :                 TimelineState::Loading
    2597              :             }
    2598           12 :             CreateTimelineCause::Delete => TimelineState::Stopping,
    2599              :         };
    2600              : 
    2601         1568 :         let pg_version = new_metadata.pg_version();
    2602         1568 : 
    2603         1568 :         let timeline = Timeline::new(
    2604         1568 :             self.conf,
    2605         1568 :             Arc::clone(&self.tenant_conf),
    2606         1568 :             new_metadata,
    2607         1568 :             ancestor,
    2608         1568 :             new_timeline_id,
    2609         1568 :             self.tenant_shard_id,
    2610         1568 :             self.generation,
    2611         1568 :             self.shard_identity,
    2612         1568 :             self.walredo_mgr.as_ref().map(Arc::clone),
    2613         1568 :             resources,
    2614         1568 :             pg_version,
    2615         1568 :             state,
    2616         1568 :             self.cancel.child_token(),
    2617         1568 :         );
    2618         1568 : 
    2619         1568 :         Ok(timeline)
    2620         1568 :     }
    2621              : 
    2622              :     // Allow too_many_arguments because a constructor's argument list naturally grows with the
    2623              :     // number of attributes in the struct: breaking these out into a builder wouldn't be helpful.
    2624              :     #[allow(clippy::too_many_arguments)]
    2625          942 :     fn new(
    2626          942 :         state: TenantState,
    2627          942 :         conf: &'static PageServerConf,
    2628          942 :         attached_conf: AttachedTenantConf,
    2629          942 :         shard_identity: ShardIdentity,
    2630          942 :         walredo_mgr: Option<Arc<WalRedoManager>>,
    2631          942 :         tenant_shard_id: TenantShardId,
    2632          942 :         remote_storage: Option<GenericRemoteStorage>,
    2633          942 :         deletion_queue_client: DeletionQueueClient,
    2634          942 :     ) -> Tenant {
    2635          942 :         let (state, mut rx) = watch::channel(state);
    2636          942 : 
    2637          942 :         tokio::spawn(async move {
    2638          942 :             // reflect tenant state in metrics:
    2639          942 :             // - global per tenant state: TENANT_STATE_METRIC
    2640          942 :             // - "set" of broken tenants: BROKEN_TENANTS_SET
    2641          942 :             //
    2642          942 :             // set of broken tenants should not have zero counts so that it remains accessible for
    2643          942 :             // alerting.
    2644          942 : 
    2645          942 :             let tid = tenant_shard_id.to_string();
    2646          942 :             let shard_id = tenant_shard_id.shard_slug().to_string();
    2647          942 :             let set_key = &[tid.as_str(), shard_id.as_str()][..];
    2648          942 : 
    2649         2583 :             fn inspect_state(state: &TenantState) -> ([&'static str; 1], bool) {
    2650         2583 :                 ([state.into()], matches!(state, TenantState::Broken { .. }))
    2651         2583 :             }
    2652          942 : 
    2653          942 :             let mut tuple = inspect_state(&rx.borrow_and_update());
    2654          942 : 
    2655          942 :             let is_broken = tuple.1;
    2656          942 :             let mut counted_broken = if is_broken {
    2657              :                 // add the id to the set right away, there should not be any updates on the channel
    2658              :                 // after before tenant is removed, if ever
    2659            0 :                 BROKEN_TENANTS_SET.with_label_values(set_key).set(1);
    2660            0 :                 true
    2661              :             } else {
    2662          942 :                 false
    2663              :             };
    2664              : 
    2665         2583 :             loop {
    2666         2583 :                 let labels = &tuple.0;
    2667         2583 :                 let current = TENANT_STATE_METRIC.with_label_values(labels);
    2668         2583 :                 current.inc();
    2669         2583 : 
    2670         2583 :                 if rx.changed().await.is_err() {
    2671              :                     // tenant has been dropped
    2672          238 :                     current.dec();
    2673          238 :                     drop(BROKEN_TENANTS_SET.remove_label_values(set_key));
    2674          238 :                     break;
    2675         1641 :                 }
    2676         1641 : 
    2677         1641 :                 current.dec();
    2678         1641 :                 tuple = inspect_state(&rx.borrow_and_update());
    2679         1641 : 
    2680         1641 :                 let is_broken = tuple.1;
    2681         1641 :                 if is_broken && !counted_broken {
    2682           58 :                     counted_broken = true;
    2683           58 :                     // insert the tenant_id (back) into the set while avoiding needless counter
    2684           58 :                     // access
    2685           58 :                     BROKEN_TENANTS_SET.with_label_values(set_key).set(1);
    2686         1583 :                 }
    2687              :             }
    2688          942 :         });
    2689          942 : 
    2690          942 :         Tenant {
    2691          942 :             tenant_shard_id,
    2692          942 :             shard_identity,
    2693          942 :             generation: attached_conf.location.generation,
    2694          942 :             conf,
    2695          942 :             // using now here is good enough approximation to catch tenants with really long
    2696          942 :             // activation times.
    2697          942 :             constructed_at: Instant::now(),
    2698          942 :             tenant_conf: Arc::new(RwLock::new(attached_conf)),
    2699          942 :             timelines: Mutex::new(HashMap::new()),
    2700          942 :             timelines_creating: Mutex::new(HashSet::new()),
    2701          942 :             gc_cs: tokio::sync::Mutex::new(()),
    2702          942 :             walredo_mgr,
    2703          942 :             remote_storage,
    2704          942 :             deletion_queue_client,
    2705          942 :             state,
    2706          942 :             cached_logical_sizes: tokio::sync::Mutex::new(HashMap::new()),
    2707          942 :             cached_synthetic_tenant_size: Arc::new(AtomicU64::new(0)),
    2708          942 :             eviction_task_tenant_state: tokio::sync::Mutex::new(EvictionTaskTenantState::default()),
    2709          942 :             activate_now_sem: tokio::sync::Semaphore::new(0),
    2710          942 :             delete_progress: Arc::new(tokio::sync::Mutex::new(DeleteTenantFlow::default())),
    2711          942 :             cancel: CancellationToken::default(),
    2712          942 :             gate: Gate::default(),
    2713          942 :         }
    2714          942 :     }
    2715              : 
    2716              :     /// Locate and load config
    2717          231 :     pub(super) fn load_tenant_config(
    2718          231 :         conf: &'static PageServerConf,
    2719          231 :         tenant_shard_id: &TenantShardId,
    2720          231 :     ) -> anyhow::Result<LocationConf> {
    2721          231 :         let legacy_config_path = conf.tenant_config_path(tenant_shard_id);
    2722          231 :         let config_path = conf.tenant_location_config_path(tenant_shard_id);
    2723          231 : 
    2724          231 :         if config_path.exists() {
    2725              :             // New-style config takes precedence
    2726          227 :             let deserialized = Self::read_config(&config_path)?;
    2727          227 :             Ok(toml_edit::de::from_document::<LocationConf>(deserialized)?)
    2728            4 :         } else if legacy_config_path.exists() {
    2729              :             // Upgrade path: found an old-style configuration only
    2730            0 :             let deserialized = Self::read_config(&legacy_config_path)?;
    2731              : 
    2732            0 :             let mut tenant_conf = TenantConfOpt::default();
    2733            0 :             for (key, item) in deserialized.iter() {
    2734            0 :                 match key {
    2735            0 :                     "tenant_config" => {
    2736            0 :                         tenant_conf = TenantConfOpt::try_from(item.to_owned()).context(format!("Failed to parse config from file '{legacy_config_path}' as pageserver config"))?;
    2737              :                     }
    2738            0 :                     _ => bail!(
    2739            0 :                         "config file {legacy_config_path} has unrecognized pageserver option '{key}'"
    2740            0 :                     ),
    2741              :                 }
    2742              :             }
    2743              : 
    2744              :             // Legacy configs are implicitly in attached state, and do not support sharding
    2745            0 :             Ok(LocationConf::attached_single(
    2746            0 :                 tenant_conf,
    2747            0 :                 Generation::none(),
    2748            0 :                 &models::ShardParameters::default(),
    2749            0 :             ))
    2750              :         } else {
    2751              :             // FIXME If the config file is not found, assume that we're attaching
    2752              :             // a detached tenant and config is passed via attach command.
    2753              :             // https://github.com/neondatabase/neon/issues/1555
    2754              :             // OR: we're loading after incomplete deletion that managed to remove config.
    2755            4 :             info!(
    2756            4 :                 "tenant config not found in {} or {}",
    2757            4 :                 config_path, legacy_config_path
    2758            4 :             );
    2759            4 :             Ok(LocationConf::default())
    2760              :         }
    2761          231 :     }
    2762              : 
    2763          227 :     fn read_config(path: &Utf8Path) -> anyhow::Result<toml_edit::Document> {
    2764          227 :         info!("loading tenant configuration from {path}");
    2765              : 
    2766              :         // load and parse file
    2767          227 :         let config = fs::read_to_string(path)
    2768          227 :             .with_context(|| format!("Failed to load config from path '{path}'"))?;
    2769              : 
    2770          227 :         config
    2771          227 :             .parse::<toml_edit::Document>()
    2772          227 :             .with_context(|| format!("Failed to parse config from file '{path}' as toml file"))
    2773          227 :     }
    2774              : 
    2775          946 :     #[tracing::instrument(skip_all, fields(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug()))]
    2776              :     pub(super) async fn persist_tenant_config(
    2777              :         conf: &'static PageServerConf,
    2778              :         tenant_shard_id: &TenantShardId,
    2779              :         location_conf: &LocationConf,
    2780              :     ) -> anyhow::Result<()> {
    2781              :         let legacy_config_path = conf.tenant_config_path(tenant_shard_id);
    2782              :         let config_path = conf.tenant_location_config_path(tenant_shard_id);
    2783              : 
    2784              :         Self::persist_tenant_config_at(
    2785              :             tenant_shard_id,
    2786              :             &config_path,
    2787              :             &legacy_config_path,
    2788              :             location_conf,
    2789              :         )
    2790              :         .await
    2791              :     }
    2792              : 
    2793            0 :     #[tracing::instrument(skip_all, fields(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug()))]
    2794              :     pub(super) async fn persist_tenant_config_at(
    2795              :         tenant_shard_id: &TenantShardId,
    2796              :         config_path: &Utf8Path,
    2797              :         legacy_config_path: &Utf8Path,
    2798              :         location_conf: &LocationConf,
    2799              :     ) -> anyhow::Result<()> {
    2800              :         // Forward compat: write out an old-style configuration that old versions can read, in case we roll back
    2801              :         Self::persist_tenant_config_legacy(
    2802              :             tenant_shard_id,
    2803              :             legacy_config_path,
    2804              :             &location_conf.tenant_conf,
    2805              :         )
    2806              :         .await?;
    2807              : 
    2808              :         if let LocationMode::Attached(attach_conf) = &location_conf.mode {
    2809              :             // Once we use LocationMode, generations are mandatory.  If we aren't using generations,
    2810              :             // then drop out after writing legacy-style config.
    2811              :             if attach_conf.generation.is_none() {
    2812            0 :                 tracing::debug!("Running without generations, not writing new-style LocationConf");
    2813              :                 return Ok(());
    2814              :             }
    2815              :         }
    2816              : 
    2817            0 :         debug!("persisting tenantconf to {config_path}");
    2818              : 
    2819              :         let mut conf_content = r#"# This file contains a specific per-tenant's config.
    2820              : #  It is read in case of pageserver restart.
    2821              : "#
    2822              :         .to_string();
    2823              : 
    2824            1 :         fail::fail_point!("tenant-config-before-write", |_| {
    2825            1 :             anyhow::bail!("tenant-config-before-write");
    2826            1 :         });
    2827              : 
    2828              :         // Convert the config to a toml file.
    2829              :         conf_content += &toml_edit::ser::to_string_pretty(&location_conf)?;
    2830              : 
    2831              :         let temp_path = path_with_suffix_extension(config_path, TEMP_FILE_SUFFIX);
    2832              : 
    2833              :         let tenant_shard_id = *tenant_shard_id;
    2834              :         let config_path = config_path.to_owned();
    2835          945 :         tokio::task::spawn_blocking(move || {
    2836          945 :             Handle::current().block_on(async move {
    2837          945 :                 let conf_content = conf_content.as_bytes();
    2838          945 :                 VirtualFile::crashsafe_overwrite(&config_path, &temp_path, conf_content)
    2839            0 :                     .await
    2840          945 :                     .with_context(|| {
    2841            0 :                         format!("write tenant {tenant_shard_id} config to {config_path}")
    2842          945 :                     })
    2843          945 :             })
    2844          945 :         })
    2845              :         .await??;
    2846              : 
    2847              :         Ok(())
    2848              :     }
    2849              : 
    2850          946 :     #[tracing::instrument(skip_all, fields(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug()))]
    2851              :     async fn persist_tenant_config_legacy(
    2852              :         tenant_shard_id: &TenantShardId,
    2853              :         target_config_path: &Utf8Path,
    2854              :         tenant_conf: &TenantConfOpt,
    2855              :     ) -> anyhow::Result<()> {
    2856            0 :         debug!("persisting tenantconf to {target_config_path}");
    2857              : 
    2858              :         let mut conf_content = r#"# This file contains a specific per-tenant's config.
    2859              : #  It is read in case of pageserver restart.
    2860              : 
    2861              : [tenant_config]
    2862              : "#
    2863              :         .to_string();
    2864              : 
    2865              :         // Convert the config to a toml file.
    2866              :         conf_content += &toml_edit::ser::to_string(&tenant_conf)?;
    2867              : 
    2868              :         let temp_path = path_with_suffix_extension(target_config_path, TEMP_FILE_SUFFIX);
    2869              : 
    2870              :         let tenant_shard_id = *tenant_shard_id;
    2871              :         let target_config_path = target_config_path.to_owned();
    2872          946 :         tokio::task::spawn_blocking(move || {
    2873          946 :             Handle::current().block_on(async move {
    2874          946 :                 let conf_content = conf_content.as_bytes();
    2875          946 :                 VirtualFile::crashsafe_overwrite(&target_config_path, &temp_path, conf_content)
    2876            0 :                     .await
    2877          946 :                     .with_context(|| {
    2878            0 :                         format!("write tenant {tenant_shard_id} config to {target_config_path}")
    2879          946 :                     })
    2880          946 :             })
    2881          946 :         })
    2882              :         .await??;
    2883              :         Ok(())
    2884              :     }
    2885              : 
    2886              :     //
    2887              :     // How garbage collection works:
    2888              :     //
    2889              :     //                    +--bar------------->
    2890              :     //                   /
    2891              :     //             +----+-----foo---------------->
    2892              :     //            /
    2893              :     // ----main--+-------------------------->
    2894              :     //                \
    2895              :     //                 +-----baz-------->
    2896              :     //
    2897              :     //
    2898              :     // 1. Grab 'gc_cs' mutex to prevent new timelines from being created while Timeline's
    2899              :     //    `gc_infos` are being refreshed
    2900              :     // 2. Scan collected timelines, and on each timeline, make note of the
    2901              :     //    all the points where other timelines have been branched off.
    2902              :     //    We will refrain from removing page versions at those LSNs.
    2903              :     // 3. For each timeline, scan all layer files on the timeline.
    2904              :     //    Remove all files for which a newer file exists and which
    2905              :     //    don't cover any branch point LSNs.
    2906              :     //
    2907              :     // TODO:
    2908              :     // - if a relation has a non-incremental persistent layer on a child branch, then we
    2909              :     //   don't need to keep that in the parent anymore. But currently
    2910              :     //   we do.
    2911          302 :     async fn gc_iteration_internal(
    2912          302 :         &self,
    2913          302 :         target_timeline_id: Option<TimelineId>,
    2914          302 :         horizon: u64,
    2915          302 :         pitr: Duration,
    2916          302 :         cancel: &CancellationToken,
    2917          302 :         ctx: &RequestContext,
    2918          302 :     ) -> anyhow::Result<GcResult> {
    2919          302 :         let mut totals: GcResult = Default::default();
    2920          302 :         let now = Instant::now();
    2921              : 
    2922          302 :         let gc_timelines = match self
    2923          302 :             .refresh_gc_info_internal(target_timeline_id, horizon, pitr, cancel, ctx)
    2924       264392 :             .await
    2925              :         {
    2926          300 :             Ok(result) => result,
    2927            1 :             Err(e) => {
    2928            0 :                 if let Some(PageReconstructError::Cancelled) =
    2929            1 :                     e.downcast_ref::<PageReconstructError>()
    2930              :                 {
    2931              :                     // Handle cancellation
    2932            0 :                     totals.elapsed = now.elapsed();
    2933            0 :                     return Ok(totals);
    2934              :                 } else {
    2935              :                     // Propagate other errors
    2936            1 :                     return Err(e);
    2937              :                 }
    2938              :             }
    2939              :         };
    2940              : 
    2941            3 :         failpoint_support::sleep_millis_async!("gc_iteration_internal_after_getting_gc_timelines");
    2942              : 
    2943              :         // If there is nothing to GC, we don't want any messages in the INFO log.
    2944          300 :         if !gc_timelines.is_empty() {
    2945          296 :             info!("{} timelines need GC", gc_timelines.len());
    2946              :         } else {
    2947            0 :             debug!("{} timelines need GC", gc_timelines.len());
    2948              :         }
    2949              : 
    2950              :         // Perform GC for each timeline.
    2951              :         //
    2952              :         // Note that we don't hold the `Tenant::gc_cs` lock here because we don't want to delay the
    2953              :         // branch creation task, which requires the GC lock. A GC iteration can run concurrently
    2954              :         // with branch creation.
    2955              :         //
    2956              :         // See comments in [`Tenant::branch_timeline`] for more information about why branch
    2957              :         // creation task can run concurrently with timeline's GC iteration.
    2958          720 :         for timeline in gc_timelines {
    2959          420 :             if task_mgr::is_shutdown_requested() || cancel.is_cancelled() {
    2960              :                 // We were requested to shut down. Stop and return with the progress we
    2961              :                 // made.
    2962            0 :                 break;
    2963          420 :             }
    2964          420 :             let result = timeline.gc().await?;
    2965          420 :             totals += result;
    2966              :         }
    2967              : 
    2968          300 :         totals.elapsed = now.elapsed();
    2969          300 :         Ok(totals)
    2970          301 :     }
    2971              : 
    2972              :     /// Refreshes the Timeline::gc_info for all timelines, returning the
    2973              :     /// vector of timelines which have [`Timeline::get_last_record_lsn`] past
    2974              :     /// [`Tenant::get_gc_horizon`].
    2975              :     ///
    2976              :     /// This is usually executed as part of periodic gc, but can now be triggered more often.
    2977           82 :     pub async fn refresh_gc_info(
    2978           82 :         &self,
    2979           82 :         cancel: &CancellationToken,
    2980           82 :         ctx: &RequestContext,
    2981           82 :     ) -> anyhow::Result<Vec<Arc<Timeline>>> {
    2982           82 :         // since this method can now be called at different rates than the configured gc loop, it
    2983           82 :         // might be that these configuration values get applied faster than what it was previously,
    2984           82 :         // since these were only read from the gc task.
    2985           82 :         let horizon = self.get_gc_horizon();
    2986           82 :         let pitr = self.get_pitr_interval();
    2987           82 : 
    2988           82 :         // refresh all timelines
    2989           82 :         let target_timeline_id = None;
    2990           82 : 
    2991           82 :         self.refresh_gc_info_internal(target_timeline_id, horizon, pitr, cancel, ctx)
    2992           24 :             .await
    2993           82 :     }
    2994              : 
    2995          384 :     async fn refresh_gc_info_internal(
    2996          384 :         &self,
    2997          384 :         target_timeline_id: Option<TimelineId>,
    2998          384 :         horizon: u64,
    2999          384 :         pitr: Duration,
    3000          384 :         cancel: &CancellationToken,
    3001          384 :         ctx: &RequestContext,
    3002          384 :     ) -> anyhow::Result<Vec<Arc<Timeline>>> {
    3003              :         // grab mutex to prevent new timelines from being created here.
    3004          384 :         let gc_cs = self.gc_cs.lock().await;
    3005              : 
    3006              :         // Scan all timelines. For each timeline, remember the timeline ID and
    3007              :         // the branch point where it was created.
    3008          383 :         let (all_branchpoints, timeline_ids): (BTreeSet<(TimelineId, Lsn)>, _) = {
    3009          384 :             let timelines = self.timelines.lock().unwrap();
    3010          384 :             let mut all_branchpoints = BTreeSet::new();
    3011          383 :             let timeline_ids = {
    3012          384 :                 if let Some(target_timeline_id) = target_timeline_id.as_ref() {
    3013          258 :                     if timelines.get(target_timeline_id).is_none() {
    3014            1 :                         bail!("gc target timeline does not exist")
    3015          257 :                     }
    3016          126 :                 };
    3017              : 
    3018          383 :                 timelines
    3019          383 :                     .iter()
    3020          791 :                     .map(|(timeline_id, timeline_entry)| {
    3021          413 :                         if let Some(ancestor_timeline_id) =
    3022          791 :                             &timeline_entry.get_ancestor_timeline_id()
    3023              :                         {
    3024              :                             // If target_timeline is specified, we only need to know branchpoints of its children
    3025          413 :                             if let Some(timeline_id) = target_timeline_id {
    3026          236 :                                 if ancestor_timeline_id == &timeline_id {
    3027            9 :                                     all_branchpoints.insert((
    3028            9 :                                         *ancestor_timeline_id,
    3029            9 :                                         timeline_entry.get_ancestor_lsn(),
    3030            9 :                                     ));
    3031          227 :                                 }
    3032              :                             }
    3033              :                             // Collect branchpoints for all timelines
    3034          177 :                             else {
    3035          177 :                                 all_branchpoints.insert((
    3036          177 :                                     *ancestor_timeline_id,
    3037          177 :                                     timeline_entry.get_ancestor_lsn(),
    3038          177 :                                 ));
    3039          177 :                             }
    3040          378 :                         }
    3041              : 
    3042          791 :                         *timeline_id
    3043          791 :                     })
    3044          383 :                     .collect::<Vec<_>>()
    3045          383 :             };
    3046          383 :             (all_branchpoints, timeline_ids)
    3047          383 :         };
    3048          383 : 
    3049          383 :         // Ok, we now know all the branch points.
    3050          383 :         // Update the GC information for each timeline.
    3051          383 :         let mut gc_timelines = Vec::with_capacity(timeline_ids.len());
    3052         1166 :         for timeline_id in timeline_ids {
    3053              :             // Timeline is known to be local and loaded.
    3054          784 :             let timeline = self
    3055          784 :                 .get_timeline(timeline_id, false)
    3056          784 :                 .with_context(|| format!("Timeline {timeline_id} was not found"))?;
    3057              : 
    3058              :             // If target_timeline is specified, ignore all other timelines
    3059          784 :             if let Some(target_timeline_id) = target_timeline_id {
    3060          493 :                 if timeline_id != target_timeline_id {
    3061          236 :                     continue;
    3062          257 :                 }
    3063          291 :             }
    3064              : 
    3065          548 :             if let Some(cutoff) = timeline.get_last_record_lsn().checked_sub(horizon) {
    3066          481 :                 let branchpoints: Vec<Lsn> = all_branchpoints
    3067          481 :                     .range((
    3068          481 :                         Included((timeline_id, Lsn(0))),
    3069          481 :                         Included((timeline_id, Lsn(u64::MAX))),
    3070          481 :                     ))
    3071          481 :                     .map(|&x| x.1)
    3072          481 :                     .collect();
    3073          481 :                 timeline
    3074          481 :                     .update_gc_info(branchpoints, cutoff, pitr, cancel, ctx)
    3075       264416 :                     .await?;
    3076              : 
    3077          480 :                 gc_timelines.push(timeline);
    3078           67 :             }
    3079              :         }
    3080          382 :         drop(gc_cs);
    3081          382 :         Ok(gc_timelines)
    3082          383 :     }
    3083              : 
    3084              :     /// A substitute for `branch_timeline` for use in unit tests.
    3085              :     /// The returned timeline will have state value `Active` to make various `anyhow::ensure!()`
    3086              :     /// calls pass, but, we do not actually call `.activate()` under the hood. So, none of the
    3087              :     /// timeline background tasks are launched, except the flush loop.
    3088              :     #[cfg(test)]
    3089          214 :     async fn branch_timeline_test(
    3090          214 :         &self,
    3091          214 :         src_timeline: &Arc<Timeline>,
    3092          214 :         dst_id: TimelineId,
    3093          214 :         start_lsn: Option<Lsn>,
    3094          214 :         ctx: &RequestContext,
    3095          214 :     ) -> Result<Arc<Timeline>, CreateTimelineError> {
    3096          214 :         let uninit_mark = self.create_timeline_uninit_mark(dst_id).unwrap();
    3097          214 :         let tl = self
    3098          214 :             .branch_timeline_impl(src_timeline, dst_id, start_lsn, uninit_mark, ctx)
    3099          217 :             .await?;
    3100          210 :         tl.set_state(TimelineState::Active);
    3101          210 :         Ok(tl)
    3102          214 :     }
    3103              : 
    3104              :     /// Branch an existing timeline.
    3105              :     ///
    3106              :     /// The caller is responsible for activating the returned timeline.
    3107          262 :     async fn branch_timeline(
    3108          262 :         &self,
    3109          262 :         src_timeline: &Arc<Timeline>,
    3110          262 :         dst_id: TimelineId,
    3111          262 :         start_lsn: Option<Lsn>,
    3112          262 :         timeline_uninit_mark: TimelineUninitMark<'_>,
    3113          262 :         ctx: &RequestContext,
    3114          262 :     ) -> Result<Arc<Timeline>, CreateTimelineError> {
    3115          262 :         self.branch_timeline_impl(src_timeline, dst_id, start_lsn, timeline_uninit_mark, ctx)
    3116            6 :             .await
    3117          262 :     }
    3118              : 
    3119          476 :     async fn branch_timeline_impl(
    3120          476 :         &self,
    3121          476 :         src_timeline: &Arc<Timeline>,
    3122          476 :         dst_id: TimelineId,
    3123          476 :         start_lsn: Option<Lsn>,
    3124          476 :         timeline_uninit_mark: TimelineUninitMark<'_>,
    3125          476 :         _ctx: &RequestContext,
    3126          476 :     ) -> Result<Arc<Timeline>, CreateTimelineError> {
    3127          476 :         let src_id = src_timeline.timeline_id;
    3128              : 
    3129              :         // We will validate our ancestor LSN in this function.  Acquire the GC lock so that
    3130              :         // this check cannot race with GC, and the ancestor LSN is guaranteed to remain
    3131              :         // valid while we are creating the branch.
    3132          476 :         let _gc_cs = self.gc_cs.lock().await;
    3133              : 
    3134              :         // If no start LSN is specified, we branch the new timeline from the source timeline's last record LSN
    3135          476 :         let start_lsn = start_lsn.unwrap_or_else(|| {
    3136          231 :             let lsn = src_timeline.get_last_record_lsn();
    3137          231 :             info!("branching timeline {dst_id} from timeline {src_id} at last record LSN: {lsn}");
    3138          231 :             lsn
    3139          476 :         });
    3140          476 : 
    3141          476 :         // Ensure that `start_lsn` is valid, i.e. the LSN is within the PITR
    3142          476 :         // horizon on the source timeline
    3143          476 :         //
    3144          476 :         // We check it against both the planned GC cutoff stored in 'gc_info',
    3145          476 :         // and the 'latest_gc_cutoff' of the last GC that was performed.  The
    3146          476 :         // planned GC cutoff in 'gc_info' is normally larger than
    3147          476 :         // 'latest_gc_cutoff_lsn', but beware of corner cases like if you just
    3148          476 :         // changed the GC settings for the tenant to make the PITR window
    3149          476 :         // larger, but some of the data was already removed by an earlier GC
    3150          476 :         // iteration.
    3151          476 : 
    3152          476 :         // check against last actual 'latest_gc_cutoff' first
    3153          476 :         let latest_gc_cutoff_lsn = src_timeline.get_latest_gc_cutoff_lsn();
    3154          476 :         src_timeline
    3155          476 :             .check_lsn_is_in_scope(start_lsn, &latest_gc_cutoff_lsn)
    3156          476 :             .context(format!(
    3157          476 :                 "invalid branch start lsn: less than latest GC cutoff {}",
    3158          476 :                 *latest_gc_cutoff_lsn,
    3159          476 :             ))
    3160          476 :             .map_err(CreateTimelineError::AncestorLsn)?;
    3161              : 
    3162              :         // and then the planned GC cutoff
    3163              :         {
    3164          466 :             let gc_info = src_timeline.gc_info.read().unwrap();
    3165          466 :             let cutoff = min(gc_info.pitr_cutoff, gc_info.horizon_cutoff);
    3166          466 :             if start_lsn < cutoff {
    3167            0 :                 return Err(CreateTimelineError::AncestorLsn(anyhow::anyhow!(
    3168            0 :                     "invalid branch start lsn: less than planned GC cutoff {cutoff}"
    3169            0 :                 )));
    3170          466 :             }
    3171          466 :         }
    3172          466 : 
    3173          466 :         //
    3174          466 :         // The branch point is valid, and we are still holding the 'gc_cs' lock
    3175          466 :         // so that GC cannot advance the GC cutoff until we are finished.
    3176          466 :         // Proceed with the branch creation.
    3177          466 :         //
    3178          466 : 
    3179          466 :         // Determine prev-LSN for the new timeline. We can only determine it if
    3180          466 :         // the timeline was branched at the current end of the source timeline.
    3181          466 :         let RecordLsn {
    3182          466 :             last: src_last,
    3183          466 :             prev: src_prev,
    3184          466 :         } = src_timeline.get_last_record_rlsn();
    3185          466 :         let dst_prev = if src_last == start_lsn {
    3186          446 :             Some(src_prev)
    3187              :         } else {
    3188           20 :             None
    3189              :         };
    3190              : 
    3191              :         // Create the metadata file, noting the ancestor of the new timeline.
    3192              :         // There is initially no data in it, but all the read-calls know to look
    3193              :         // into the ancestor.
    3194          466 :         let metadata = TimelineMetadata::new(
    3195          466 :             start_lsn,
    3196          466 :             dst_prev,
    3197          466 :             Some(src_id),
    3198          466 :             start_lsn,
    3199          466 :             *src_timeline.latest_gc_cutoff_lsn.read(), // FIXME: should we hold onto this guard longer?
    3200          466 :             src_timeline.initdb_lsn,
    3201          466 :             src_timeline.pg_version,
    3202          466 :         );
    3203              : 
    3204          466 :         let uninitialized_timeline = self
    3205          466 :             .prepare_new_timeline(
    3206          466 :                 dst_id,
    3207          466 :                 &metadata,
    3208          466 :                 timeline_uninit_mark,
    3209          466 :                 start_lsn + 1,
    3210          466 :                 Some(Arc::clone(src_timeline)),
    3211          466 :             )
    3212          217 :             .await?;
    3213              : 
    3214          466 :         let new_timeline = uninitialized_timeline.finish_creation()?;
    3215              : 
    3216              :         // Root timeline gets its layers during creation and uploads them along with the metadata.
    3217              :         // A branch timeline though, when created, can get no writes for some time, hence won't get any layers created.
    3218              :         // We still need to upload its metadata eagerly: if other nodes `attach` the tenant and miss this timeline, their GC
    3219              :         // could get incorrect information and remove more layers, than needed.
    3220              :         // See also https://github.com/neondatabase/neon/issues/3865
    3221          466 :         if let Some(remote_client) = new_timeline.remote_client.as_ref() {
    3222          466 :             remote_client
    3223          466 :                 .schedule_index_upload_for_metadata_update(&metadata)
    3224          466 :                 .context("branch initial metadata upload")?;
    3225            0 :         }
    3226              : 
    3227          466 :         Ok(new_timeline)
    3228          476 :     }
    3229              : 
    3230              :     /// For unit tests, make this visible so that other modules can directly create timelines
    3231              :     #[cfg(test)]
    3232            2 :     #[tracing::instrument(fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug(), %timeline_id))]
    3233              :     pub(crate) async fn bootstrap_timeline_test(
    3234              :         &self,
    3235              :         timeline_id: TimelineId,
    3236              :         pg_version: u32,
    3237              :         load_existing_initdb: Option<TimelineId>,
    3238              :         ctx: &RequestContext,
    3239              :     ) -> anyhow::Result<Arc<Timeline>> {
    3240              :         let uninit_mark = self.create_timeline_uninit_mark(timeline_id).unwrap();
    3241              :         self.bootstrap_timeline(
    3242              :             timeline_id,
    3243              :             pg_version,
    3244              :             load_existing_initdb,
    3245              :             uninit_mark,
    3246              :             ctx,
    3247              :         )
    3248              :         .await
    3249              :     }
    3250              : 
    3251          566 :     async fn upload_initdb(
    3252          566 :         &self,
    3253          566 :         timelines_path: &Utf8PathBuf,
    3254          566 :         pgdata_path: &Utf8PathBuf,
    3255          566 :         timeline_id: &TimelineId,
    3256          566 :     ) -> anyhow::Result<()> {
    3257          566 :         let Some(storage) = &self.remote_storage else {
    3258              :             // No remote storage?  No upload.
    3259            0 :             return Ok(());
    3260              :         };
    3261              : 
    3262          566 :         let temp_path = timelines_path.join(format!(
    3263          566 :             "{INITDB_PATH}.upload-{timeline_id}.{TEMP_FILE_SUFFIX}"
    3264          566 :         ));
    3265              : 
    3266          565 :         scopeguard::defer! {
    3267          565 :             if let Err(e) = fs::remove_file(&temp_path) {
    3268            0 :                 error!("Failed to remove temporary initdb archive '{temp_path}': {e}");
    3269          565 :             }
    3270              :         }
    3271              : 
    3272          565 :         let (pgdata_zstd, tar_zst_size) =
    3273      4640603 :             import_datadir::create_tar_zst(pgdata_path, &temp_path).await?;
    3274              : 
    3275          565 :         pausable_failpoint!("before-initdb-upload");
    3276              : 
    3277          565 :         backoff::retry(
    3278          632 :             || async {
    3279          632 :                 self::remote_timeline_client::upload_initdb_dir(
    3280          632 :                     storage,
    3281          632 :                     &self.tenant_shard_id.tenant_id,
    3282          632 :                     timeline_id,
    3283          632 :                     pgdata_zstd.try_clone().await?,
    3284          632 :                     tar_zst_size,
    3285          632 :                     &self.cancel,
    3286              :                 )
    3287        27428 :                 .await
    3288          632 :             },
    3289          565 :             |_| false,
    3290          565 :             3,
    3291          565 :             u32::MAX,
    3292          565 :             "persist_initdb_tar_zst",
    3293          565 :             &self.cancel,
    3294          565 :         )
    3295        28056 :         .await
    3296          565 :         .ok_or_else(|| anyhow::anyhow!("Cancelled"))
    3297          565 :         .and_then(|x| x)
    3298          565 :     }
    3299              : 
    3300              :     /// - run initdb to init temporary instance and get bootstrap data
    3301              :     /// - after initialization completes, tar up the temp dir and upload it to S3.
    3302              :     ///
    3303              :     /// The caller is responsible for activating the returned timeline.
    3304          597 :     async fn bootstrap_timeline(
    3305          597 :         &self,
    3306          597 :         timeline_id: TimelineId,
    3307          597 :         pg_version: u32,
    3308          597 :         load_existing_initdb: Option<TimelineId>,
    3309          597 :         timeline_uninit_mark: TimelineUninitMark<'_>,
    3310          597 :         ctx: &RequestContext,
    3311          597 :     ) -> anyhow::Result<Arc<Timeline>> {
    3312          597 :         // create a `tenant/{tenant_id}/timelines/basebackup-{timeline_id}.{TEMP_FILE_SUFFIX}/`
    3313          597 :         // temporary directory for basebackup files for the given timeline.
    3314          597 : 
    3315          597 :         let timelines_path = self.conf.timelines_path(&self.tenant_shard_id);
    3316          597 :         let pgdata_path = path_with_suffix_extension(
    3317          597 :             timelines_path.join(format!("basebackup-{timeline_id}")),
    3318          597 :             TEMP_FILE_SUFFIX,
    3319          597 :         );
    3320          597 : 
    3321          597 :         // an uninit mark was placed before, nothing else can access this timeline files
    3322          597 :         // current initdb was not run yet, so remove whatever was left from the previous runs
    3323          597 :         if pgdata_path.exists() {
    3324            0 :             fs::remove_dir_all(&pgdata_path).with_context(|| {
    3325            0 :                 format!("Failed to remove already existing initdb directory: {pgdata_path}")
    3326            0 :             })?;
    3327          597 :         }
    3328              :         // this new directory is very temporary, set to remove it immediately after bootstrap, we don't need it
    3329          595 :         scopeguard::defer! {
    3330          595 :             if let Err(e) = fs::remove_dir_all(&pgdata_path) {
    3331              :                 // this is unlikely, but we will remove the directory on pageserver restart or another bootstrap call
    3332            0 :                 error!("Failed to remove temporary initdb directory '{pgdata_path}': {e}");
    3333          595 :             }
    3334              :         }
    3335          597 :         if let Some(existing_initdb_timeline_id) = load_existing_initdb {
    3336            4 :             let Some(storage) = &self.remote_storage else {
    3337            0 :                 bail!("no storage configured but load_existing_initdb set to {existing_initdb_timeline_id}");
    3338              :             };
    3339            4 :             if existing_initdb_timeline_id != timeline_id {
    3340            0 :                 let source_path = &remote_initdb_archive_path(
    3341            0 :                     &self.tenant_shard_id.tenant_id,
    3342            0 :                     &existing_initdb_timeline_id,
    3343            0 :                 );
    3344            0 :                 let dest_path =
    3345            0 :                     &remote_initdb_archive_path(&self.tenant_shard_id.tenant_id, &timeline_id);
    3346            0 :                 storage
    3347            0 :                     .copy_object(source_path, dest_path)
    3348            0 :                     .await
    3349            0 :                     .context("copy initdb tar")?;
    3350            4 :             }
    3351            4 :             let (initdb_tar_zst_path, initdb_tar_zst) =
    3352            4 :                 self::remote_timeline_client::download_initdb_tar_zst(
    3353            4 :                     self.conf,
    3354            4 :                     storage,
    3355            4 :                     &self.tenant_shard_id,
    3356            4 :                     &existing_initdb_timeline_id,
    3357            4 :                     &self.cancel,
    3358            4 :                 )
    3359         1471 :                 .await
    3360            4 :                 .context("download initdb tar")?;
    3361              : 
    3362            4 :             scopeguard::defer! {
    3363            4 :                 if let Err(e) = fs::remove_file(&initdb_tar_zst_path) {
    3364            0 :                     error!("Failed to remove temporary initdb archive '{initdb_tar_zst_path}': {e}");
    3365            4 :                 }
    3366              :             }
    3367              : 
    3368            4 :             let buf_read =
    3369            4 :                 BufReader::with_capacity(remote_timeline_client::BUFFER_SIZE, initdb_tar_zst);
    3370            4 :             import_datadir::extract_tar_zst(&pgdata_path, buf_read)
    3371        21517 :                 .await
    3372            4 :                 .context("extract initdb tar")?;
    3373              :         } else {
    3374              :             // Init temporarily repo to get bootstrap data, this creates a directory in the `pgdata_path` path
    3375         1161 :             run_initdb(self.conf, &pgdata_path, pg_version, &self.cancel).await?;
    3376              : 
    3377              :             // Upload the created data dir to S3
    3378          591 :             if self.tenant_shard_id().is_zero() {
    3379          566 :                 self.upload_initdb(&timelines_path, &pgdata_path, &timeline_id)
    3380      4669218 :                     .await?;
    3381           25 :             }
    3382              :         }
    3383          594 :         let pgdata_lsn = import_datadir::get_lsn_from_controlfile(&pgdata_path)?.align();
    3384          594 : 
    3385          594 :         // Import the contents of the data directory at the initial checkpoint
    3386          594 :         // LSN, and any WAL after that.
    3387          594 :         // Initdb lsn will be equal to last_record_lsn which will be set after import.
    3388          594 :         // Because we know it upfront avoid having an option or dummy zero value by passing it to the metadata.
    3389          594 :         let new_metadata = TimelineMetadata::new(
    3390          594 :             Lsn(0),
    3391          594 :             None,
    3392          594 :             None,
    3393          594 :             Lsn(0),
    3394          594 :             pgdata_lsn,
    3395          594 :             pgdata_lsn,
    3396          594 :             pg_version,
    3397          594 :         );
    3398          594 :         let raw_timeline = self
    3399          594 :             .prepare_new_timeline(
    3400          594 :                 timeline_id,
    3401          594 :                 &new_metadata,
    3402          594 :                 timeline_uninit_mark,
    3403          594 :                 pgdata_lsn,
    3404          594 :                 None,
    3405          594 :             )
    3406            3 :             .await?;
    3407              : 
    3408          593 :         let tenant_shard_id = raw_timeline.owning_tenant.tenant_shard_id;
    3409          593 :         let unfinished_timeline = raw_timeline.raw_timeline()?;
    3410              : 
    3411          593 :         import_datadir::import_timeline_from_postgres_datadir(
    3412          593 :             unfinished_timeline,
    3413          593 :             &pgdata_path,
    3414          593 :             pgdata_lsn,
    3415          593 :             ctx,
    3416          593 :         )
    3417      2966584 :         .await
    3418          593 :         .with_context(|| {
    3419            0 :             format!("Failed to import pgdatadir for timeline {tenant_shard_id}/{timeline_id}")
    3420          593 :         })?;
    3421              : 
    3422              :         // Flush the new layer files to disk, before we make the timeline as available to
    3423              :         // the outside world.
    3424              :         //
    3425              :         // Flush loop needs to be spawned in order to be able to flush.
    3426          593 :         unfinished_timeline.maybe_spawn_flush_loop();
    3427          593 : 
    3428          593 :         fail::fail_point!("before-checkpoint-new-timeline", |_| {
    3429            2 :             anyhow::bail!("failpoint before-checkpoint-new-timeline");
    3430          593 :         });
    3431              : 
    3432          591 :         unfinished_timeline
    3433          591 :             .freeze_and_flush()
    3434          590 :             .await
    3435          590 :             .with_context(|| {
    3436            0 :                 format!(
    3437            0 :                     "Failed to flush after pgdatadir import for timeline {tenant_shard_id}/{timeline_id}"
    3438            0 :                 )
    3439          590 :             })?;
    3440              : 
    3441              :         // All done!
    3442          590 :         let timeline = raw_timeline.finish_creation()?;
    3443              : 
    3444          589 :         Ok(timeline)
    3445          595 :     }
    3446              : 
    3447              :     /// Call this before constructing a timeline, to build its required structures
    3448         1144 :     fn build_timeline_resources(&self, timeline_id: TimelineId) -> TimelineResources {
    3449         1144 :         let remote_client = if let Some(remote_storage) = self.remote_storage.as_ref() {
    3450         1144 :             let remote_client = RemoteTimelineClient::new(
    3451         1144 :                 remote_storage.clone(),
    3452         1144 :                 self.deletion_queue_client.clone(),
    3453         1144 :                 self.conf,
    3454         1144 :                 self.tenant_shard_id,
    3455         1144 :                 timeline_id,
    3456         1144 :                 self.generation,
    3457         1144 :             );
    3458         1144 :             Some(remote_client)
    3459              :         } else {
    3460            0 :             None
    3461              :         };
    3462              : 
    3463         1144 :         TimelineResources {
    3464         1144 :             remote_client,
    3465         1144 :             deletion_queue_client: self.deletion_queue_client.clone(),
    3466         1144 :         }
    3467         1144 :     }
    3468              : 
    3469              :     /// Creates intermediate timeline structure and its files.
    3470              :     ///
    3471              :     /// An empty layer map is initialized, and new data and WAL can be imported starting
    3472              :     /// at 'disk_consistent_lsn'. After any initial data has been imported, call
    3473              :     /// `finish_creation` to insert the Timeline into the timelines map and to remove the
    3474              :     /// uninit mark file.
    3475         1144 :     async fn prepare_new_timeline<'a>(
    3476         1144 :         &'a self,
    3477         1144 :         new_timeline_id: TimelineId,
    3478         1144 :         new_metadata: &TimelineMetadata,
    3479         1144 :         uninit_mark: TimelineUninitMark<'a>,
    3480         1144 :         start_lsn: Lsn,
    3481         1144 :         ancestor: Option<Arc<Timeline>>,
    3482         1144 :     ) -> anyhow::Result<UninitializedTimeline> {
    3483         1144 :         let tenant_shard_id = self.tenant_shard_id;
    3484         1144 : 
    3485         1144 :         let resources = self.build_timeline_resources(new_timeline_id);
    3486         1144 :         if let Some(remote_client) = &resources.remote_client {
    3487         1144 :             remote_client.init_upload_queue_for_empty_remote(new_metadata)?;
    3488            0 :         }
    3489              : 
    3490         1144 :         let timeline_struct = self
    3491         1144 :             .create_timeline_struct(
    3492         1144 :                 new_timeline_id,
    3493         1144 :                 new_metadata,
    3494         1144 :                 ancestor,
    3495         1144 :                 resources,
    3496         1144 :                 CreateTimelineCause::Load,
    3497         1144 :             )
    3498         1144 :             .context("Failed to create timeline data structure")?;
    3499              : 
    3500         1144 :         timeline_struct.init_empty_layer_map(start_lsn);
    3501              : 
    3502         1144 :         if let Err(e) = self
    3503         1144 :             .create_timeline_files(&uninit_mark.timeline_path, &new_timeline_id, new_metadata)
    3504          328 :             .await
    3505              :         {
    3506            1 :             error!("Failed to create initial files for timeline {tenant_shard_id}/{new_timeline_id}, cleaning up: {e:?}");
    3507            1 :             cleanup_timeline_directory(uninit_mark);
    3508            1 :             return Err(e);
    3509         1143 :         }
    3510              : 
    3511            0 :         debug!(
    3512            0 :             "Successfully created initial files for timeline {tenant_shard_id}/{new_timeline_id}"
    3513            0 :         );
    3514              : 
    3515         1143 :         Ok(UninitializedTimeline::new(
    3516         1143 :             self,
    3517         1143 :             new_timeline_id,
    3518         1143 :             Some((timeline_struct, uninit_mark)),
    3519         1143 :         ))
    3520         1144 :     }
    3521              : 
    3522         1144 :     async fn create_timeline_files(
    3523         1144 :         &self,
    3524         1144 :         timeline_path: &Utf8Path,
    3525         1144 :         new_timeline_id: &TimelineId,
    3526         1144 :         new_metadata: &TimelineMetadata,
    3527         1144 :     ) -> anyhow::Result<()> {
    3528         1144 :         crashsafe::create_dir(timeline_path).context("Failed to create timeline directory")?;
    3529              : 
    3530         1144 :         fail::fail_point!("after-timeline-uninit-mark-creation", |_| {
    3531            1 :             anyhow::bail!("failpoint after-timeline-uninit-mark-creation");
    3532         1144 :         });
    3533              : 
    3534         1143 :         save_metadata(
    3535         1143 :             self.conf,
    3536         1143 :             &self.tenant_shard_id,
    3537         1143 :             new_timeline_id,
    3538         1143 :             new_metadata,
    3539         1143 :         )
    3540          328 :         .await
    3541         1143 :         .context("Failed to create timeline metadata")?;
    3542         1143 :         Ok(())
    3543         1144 :     }
    3544              : 
    3545              :     /// Attempts to create an uninit mark file for the timeline initialization.
    3546              :     /// Bails, if the timeline is already loaded into the memory (i.e. initialized before), or the uninit mark file already exists.
    3547              :     ///
    3548              :     /// This way, we need to hold the timelines lock only for small amount of time during the mark check/creation per timeline init.
    3549         1195 :     fn create_timeline_uninit_mark(
    3550         1195 :         &self,
    3551         1195 :         timeline_id: TimelineId,
    3552         1195 :     ) -> Result<TimelineUninitMark, TimelineExclusionError> {
    3553         1195 :         let tenant_shard_id = self.tenant_shard_id;
    3554         1195 : 
    3555         1195 :         let uninit_mark_path = self
    3556         1195 :             .conf
    3557         1195 :             .timeline_uninit_mark_file_path(tenant_shard_id, timeline_id);
    3558         1195 :         let timeline_path = self.conf.timeline_path(&tenant_shard_id, &timeline_id);
    3559              : 
    3560         1195 :         let uninit_mark = TimelineUninitMark::new(
    3561         1195 :             self,
    3562         1195 :             timeline_id,
    3563         1195 :             uninit_mark_path.clone(),
    3564         1195 :             timeline_path.clone(),
    3565         1195 :         )?;
    3566              : 
    3567              :         // At this stage, we have got exclusive access to in-memory state for this timeline ID
    3568              :         // for creation.
    3569              :         // A timeline directory should never exist on disk already:
    3570              :         // - a previous failed creation would have cleaned up after itself
    3571              :         // - a pageserver restart would clean up timeline directories that don't have valid remote state
    3572              :         //
    3573              :         // Therefore it is an unexpected internal error to encounter a timeline directory already existing here,
    3574              :         // this error may indicate a bug in cleanup on failed creations.
    3575         1165 :         if timeline_path.exists() {
    3576            0 :             return Err(TimelineExclusionError::Other(anyhow::anyhow!(
    3577            0 :                 "Timeline directory already exists! This is a bug."
    3578            0 :             )));
    3579         1165 :         }
    3580         1165 : 
    3581         1165 :         // Create the on-disk uninit mark _after_ the in-memory acquisition of the tenant ID: guarantees
    3582         1165 :         // that during process runtime, colliding creations will be caught in-memory without getting
    3583         1165 :         // as far as failing to write a file.
    3584         1165 :         fs::OpenOptions::new()
    3585         1165 :             .write(true)
    3586         1165 :             .create_new(true)
    3587         1165 :             .open(&uninit_mark_path)
    3588         1165 :             .context("Failed to create uninit mark file")
    3589         1165 :             .and_then(|_| {
    3590         1165 :                 crashsafe::fsync_file_and_parent(&uninit_mark_path)
    3591         1165 :                     .context("Failed to fsync uninit mark file")
    3592         1165 :             })
    3593         1165 :             .with_context(|| {
    3594            0 :                 format!("Failed to crate uninit mark for timeline {tenant_shard_id}/{timeline_id}")
    3595         1165 :             })?;
    3596              : 
    3597         1165 :         Ok(uninit_mark)
    3598         1195 :     }
    3599              : 
    3600              :     /// Gathers inputs from all of the timelines to produce a sizing model input.
    3601              :     ///
    3602              :     /// Future is cancellation safe. Only one calculation can be running at once per tenant.
    3603           76 :     #[instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug()))]
    3604              :     pub async fn gather_size_inputs(
    3605              :         &self,
    3606              :         // `max_retention_period` overrides the cutoff that is used to calculate the size
    3607              :         // (only if it is shorter than the real cutoff).
    3608              :         max_retention_period: Option<u64>,
    3609              :         cause: LogicalSizeCalculationCause,
    3610              :         cancel: &CancellationToken,
    3611              :         ctx: &RequestContext,
    3612              :     ) -> anyhow::Result<size::ModelInputs> {
    3613              :         let logical_sizes_at_once = self
    3614              :             .conf
    3615              :             .concurrent_tenant_size_logical_size_queries
    3616              :             .inner();
    3617              : 
    3618              :         // TODO: Having a single mutex block concurrent reads is not great for performance.
    3619              :         //
    3620              :         // But the only case where we need to run multiple of these at once is when we
    3621              :         // request a size for a tenant manually via API, while another background calculation
    3622              :         // is in progress (which is not a common case).
    3623              :         //
    3624              :         // See more for on the issue #2748 condenced out of the initial PR review.
    3625              :         let mut shared_cache = self.cached_logical_sizes.lock().await;
    3626              : 
    3627              :         size::gather_inputs(
    3628              :             self,
    3629              :             logical_sizes_at_once,
    3630              :             max_retention_period,
    3631              :             &mut shared_cache,
    3632              :             cause,
    3633              :             cancel,
    3634              :             ctx,
    3635              :         )
    3636              :         .await
    3637              :     }
    3638              : 
    3639              :     /// Calculate synthetic tenant size and cache the result.
    3640              :     /// This is periodically called by background worker.
    3641              :     /// result is cached in tenant struct
    3642           23 :     #[instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug()))]
    3643              :     pub async fn calculate_synthetic_size(
    3644              :         &self,
    3645              :         cause: LogicalSizeCalculationCause,
    3646              :         cancel: &CancellationToken,
    3647              :         ctx: &RequestContext,
    3648              :     ) -> anyhow::Result<u64> {
    3649              :         let inputs = self.gather_size_inputs(None, cause, cancel, ctx).await?;
    3650              : 
    3651              :         let size = inputs.calculate()?;
    3652              : 
    3653              :         self.set_cached_synthetic_size(size);
    3654              : 
    3655              :         Ok(size)
    3656              :     }
    3657              : 
    3658              :     /// Cache given synthetic size and update the metric value
    3659           23 :     pub fn set_cached_synthetic_size(&self, size: u64) {
    3660           23 :         self.cached_synthetic_tenant_size
    3661           23 :             .store(size, Ordering::Relaxed);
    3662              : 
    3663              :         // Only shard zero should be calculating synthetic sizes
    3664           23 :         debug_assert!(self.shard_identity.is_zero());
    3665              : 
    3666           23 :         TENANT_SYNTHETIC_SIZE_METRIC
    3667           23 :             .get_metric_with_label_values(&[&self.tenant_shard_id.tenant_id.to_string()])
    3668           23 :             .unwrap()
    3669           23 :             .set(size);
    3670           23 :     }
    3671              : 
    3672           23 :     pub fn cached_synthetic_size(&self) -> u64 {
    3673           23 :         self.cached_synthetic_tenant_size.load(Ordering::Relaxed)
    3674           23 :     }
    3675              : 
    3676              :     /// Flush any in-progress layers, schedule uploads, and wait for uploads to complete.
    3677              :     ///
    3678              :     /// This function can take a long time: callers should wrap it in a timeout if calling
    3679              :     /// from an external API handler.
    3680              :     ///
    3681              :     /// Cancel-safety: cancelling this function may leave I/O running, but such I/O is
    3682              :     /// still bounded by tenant/timeline shutdown.
    3683            0 :     #[tracing::instrument(skip_all)]
    3684              :     pub(crate) async fn flush_remote(&self) -> anyhow::Result<()> {
    3685              :         let timelines = self.timelines.lock().unwrap().clone();
    3686              : 
    3687            1 :         async fn flush_timeline(_gate: GateGuard, timeline: Arc<Timeline>) -> anyhow::Result<()> {
    3688            1 :             tracing::info!(timeline_id=%timeline.timeline_id, "Flushing...");
    3689            1 :             timeline.freeze_and_flush().await?;
    3690            1 :             tracing::info!(timeline_id=%timeline.timeline_id, "Waiting for uploads...");
    3691            1 :             if let Some(client) = &timeline.remote_client {
    3692            1 :                 client.wait_completion().await?;
    3693            0 :             }
    3694              : 
    3695            1 :             Ok(())
    3696            1 :         }
    3697              : 
    3698              :         // We do not use a JoinSet for these tasks, because we don't want them to be
    3699              :         // aborted when this function's future is cancelled: they should stay alive
    3700              :         // holding their GateGuard until they complete, to ensure their I/Os complete
    3701              :         // before Timeline shutdown completes.
    3702              :         let mut results = FuturesUnordered::new();
    3703              : 
    3704              :         for (_timeline_id, timeline) in timelines {
    3705              :             // Run each timeline's flush in a task holding the timeline's gate: this
    3706              :             // means that if this function's future is cancelled, the Timeline shutdown
    3707              :             // will still wait for any I/O in here to complete.
    3708              :             let gate = match timeline.gate.enter() {
    3709              :                 Ok(g) => g,
    3710              :                 Err(_) => continue,
    3711              :             };
    3712            2 :             let jh = tokio::task::spawn(async move { flush_timeline(gate, timeline).await });
    3713              :             results.push(jh);
    3714              :         }
    3715              : 
    3716              :         while let Some(r) = results.next().await {
    3717              :             if let Err(e) = r {
    3718              :                 if !e.is_cancelled() && !e.is_panic() {
    3719            0 :                     tracing::error!("unexpected join error: {e:?}");
    3720              :                 }
    3721              :             }
    3722              :         }
    3723              : 
    3724              :         // The flushes we did above were just writes, but the Tenant might have had
    3725              :         // pending deletions as well from recent compaction/gc: we want to flush those
    3726              :         // as well.  This requires flushing the global delete queue.  This is cheap
    3727              :         // because it's typically a no-op.
    3728              :         match self.deletion_queue_client.flush_execute().await {
    3729              :             Ok(_) => {}
    3730              :             Err(DeletionQueueError::ShuttingDown) => {}
    3731              :         }
    3732              : 
    3733              :         Ok(())
    3734              :     }
    3735              : }
    3736              : 
    3737            2 : fn remove_timeline_and_uninit_mark(
    3738            2 :     timeline_dir: &Utf8Path,
    3739            2 :     uninit_mark: &Utf8Path,
    3740            2 : ) -> anyhow::Result<()> {
    3741            2 :     fs::remove_dir_all(timeline_dir)
    3742            2 :         .or_else(|e| {
    3743            0 :             if e.kind() == std::io::ErrorKind::NotFound {
    3744              :                 // we can leave the uninit mark without a timeline dir,
    3745              :                 // just remove the mark then
    3746            0 :                 Ok(())
    3747              :             } else {
    3748            0 :                 Err(e)
    3749              :             }
    3750            2 :         })
    3751            2 :         .with_context(|| {
    3752            0 :             format!("Failed to remove unit marked timeline directory {timeline_dir}")
    3753            2 :         })?;
    3754            2 :     fs::remove_file(uninit_mark)
    3755            2 :         .with_context(|| format!("Failed to remove timeline uninit mark file {uninit_mark}"))?;
    3756              : 
    3757            2 :     Ok(())
    3758            2 : }
    3759              : 
    3760              : /// Create the cluster temporarily in 'initdbpath' directory inside the repository
    3761              : /// to get bootstrap data for timeline initialization.
    3762          593 : async fn run_initdb(
    3763          593 :     conf: &'static PageServerConf,
    3764          593 :     initdb_target_dir: &Utf8Path,
    3765          593 :     pg_version: u32,
    3766          593 :     cancel: &CancellationToken,
    3767          593 : ) -> Result<(), InitdbError> {
    3768          593 :     let initdb_bin_path = conf
    3769          593 :         .pg_bin_dir(pg_version)
    3770          593 :         .map_err(InitdbError::Other)?
    3771          593 :         .join("initdb");
    3772          593 :     let initdb_lib_dir = conf.pg_lib_dir(pg_version).map_err(InitdbError::Other)?;
    3773          593 :     info!(
    3774          593 :         "running {} in {}, libdir: {}",
    3775          593 :         initdb_bin_path, initdb_target_dir, initdb_lib_dir,
    3776          593 :     );
    3777              : 
    3778          593 :     let _permit = INIT_DB_SEMAPHORE.acquire().await;
    3779              : 
    3780          593 :     let initdb_command = tokio::process::Command::new(&initdb_bin_path)
    3781          593 :         .args(["-D", initdb_target_dir.as_ref()])
    3782          593 :         .args(["-U", &conf.superuser])
    3783          593 :         .args(["-E", "utf8"])
    3784          593 :         .arg("--no-instructions")
    3785          593 :         .arg("--no-sync")
    3786          593 :         .env_clear()
    3787          593 :         .env("LD_LIBRARY_PATH", &initdb_lib_dir)
    3788          593 :         .env("DYLD_LIBRARY_PATH", &initdb_lib_dir)
    3789          593 :         .stdin(std::process::Stdio::null())
    3790          593 :         // stdout invocation produces the same output every time, we don't need it
    3791          593 :         .stdout(std::process::Stdio::null())
    3792          593 :         // we would be interested in the stderr output, if there was any
    3793          593 :         .stderr(std::process::Stdio::piped())
    3794          593 :         .spawn()?;
    3795              : 
    3796              :     // Ideally we'd select here with the cancellation token, but the problem is that
    3797              :     // we can't safely terminate initdb: it launches processes of its own, and killing
    3798              :     // initdb doesn't kill them. After we return from this function, we want the target
    3799              :     // directory to be able to be cleaned up.
    3800              :     // See https://github.com/neondatabase/neon/issues/6385
    3801         1161 :     let initdb_output = initdb_command.wait_with_output().await?;
    3802          593 :     if !initdb_output.status.success() {
    3803            0 :         return Err(InitdbError::Failed(
    3804            0 :             initdb_output.status,
    3805            0 :             initdb_output.stderr,
    3806            0 :         ));
    3807          593 :     }
    3808          593 : 
    3809          593 :     // This isn't true cancellation support, see above. Still return an error to
    3810          593 :     // excercise the cancellation code path.
    3811          593 :     if cancel.is_cancelled() {
    3812            2 :         return Err(InitdbError::Cancelled);
    3813          591 :     }
    3814          591 : 
    3815          591 :     Ok(())
    3816          593 : }
    3817              : 
    3818              : impl Drop for Tenant {
    3819          314 :     fn drop(&mut self) {
    3820          314 :         remove_tenant_metrics(&self.tenant_shard_id);
    3821          314 :     }
    3822              : }
    3823              : /// Dump contents of a layer file to stdout.
    3824            0 : pub async fn dump_layerfile_from_path(
    3825            0 :     path: &Utf8Path,
    3826            0 :     verbose: bool,
    3827            0 :     ctx: &RequestContext,
    3828            0 : ) -> anyhow::Result<()> {
    3829              :     use std::os::unix::fs::FileExt;
    3830              : 
    3831              :     // All layer files start with a two-byte "magic" value, to identify the kind of
    3832              :     // file.
    3833            0 :     let file = File::open(path)?;
    3834            0 :     let mut header_buf = [0u8; 2];
    3835            0 :     file.read_exact_at(&mut header_buf, 0)?;
    3836              : 
    3837            0 :     match u16::from_be_bytes(header_buf) {
    3838              :         crate::IMAGE_FILE_MAGIC => {
    3839            0 :             ImageLayer::new_for_path(path, file)?
    3840            0 :                 .dump(verbose, ctx)
    3841            0 :                 .await?
    3842              :         }
    3843              :         crate::DELTA_FILE_MAGIC => {
    3844            0 :             DeltaLayer::new_for_path(path, file)?
    3845            0 :                 .dump(verbose, ctx)
    3846            0 :                 .await?
    3847              :         }
    3848            0 :         magic => bail!("unrecognized magic identifier: {:?}", magic),
    3849              :     }
    3850              : 
    3851            0 :     Ok(())
    3852            0 : }
    3853              : 
    3854              : #[cfg(test)]
    3855              : pub(crate) mod harness {
    3856              :     use bytes::{Bytes, BytesMut};
    3857              :     use camino::Utf8PathBuf;
    3858              :     use once_cell::sync::OnceCell;
    3859              :     use pageserver_api::models::ShardParameters;
    3860              :     use pageserver_api::shard::ShardIndex;
    3861              :     use std::fs;
    3862              :     use std::sync::Arc;
    3863              :     use utils::logging;
    3864              :     use utils::lsn::Lsn;
    3865              : 
    3866              :     use crate::deletion_queue::mock::MockDeletionQueue;
    3867              :     use crate::{
    3868              :         config::PageServerConf, repository::Key, tenant::Tenant, walrecord::NeonWalRecord,
    3869              :     };
    3870              : 
    3871              :     use super::*;
    3872              :     use crate::tenant::config::{TenantConf, TenantConfOpt};
    3873              :     use hex_literal::hex;
    3874              :     use utils::id::{TenantId, TimelineId};
    3875              : 
    3876              :     pub const TIMELINE_ID: TimelineId =
    3877              :         TimelineId::from_array(hex!("11223344556677881122334455667788"));
    3878              :     pub const NEW_TIMELINE_ID: TimelineId =
    3879              :         TimelineId::from_array(hex!("AA223344556677881122334455667788"));
    3880              : 
    3881              :     /// Convenience function to create a page image with given string as the only content
    3882              :     #[allow(non_snake_case)]
    3883      1708258 :     pub fn TEST_IMG(s: &str) -> Bytes {
    3884      1708258 :         let mut buf = BytesMut::new();
    3885      1708258 :         buf.extend_from_slice(s.as_bytes());
    3886      1708258 :         buf.resize(64, 0);
    3887      1708258 : 
    3888      1708258 :         buf.freeze()
    3889      1708258 :     }
    3890              : 
    3891              :     impl From<TenantConf> for TenantConfOpt {
    3892           84 :         fn from(tenant_conf: TenantConf) -> Self {
    3893           84 :             Self {
    3894           84 :                 checkpoint_distance: Some(tenant_conf.checkpoint_distance),
    3895           84 :                 checkpoint_timeout: Some(tenant_conf.checkpoint_timeout),
    3896           84 :                 compaction_target_size: Some(tenant_conf.compaction_target_size),
    3897           84 :                 compaction_period: Some(tenant_conf.compaction_period),
    3898           84 :                 compaction_threshold: Some(tenant_conf.compaction_threshold),
    3899           84 :                 gc_horizon: Some(tenant_conf.gc_horizon),
    3900           84 :                 gc_period: Some(tenant_conf.gc_period),
    3901           84 :                 image_creation_threshold: Some(tenant_conf.image_creation_threshold),
    3902           84 :                 pitr_interval: Some(tenant_conf.pitr_interval),
    3903           84 :                 walreceiver_connect_timeout: Some(tenant_conf.walreceiver_connect_timeout),
    3904           84 :                 lagging_wal_timeout: Some(tenant_conf.lagging_wal_timeout),
    3905           84 :                 max_lsn_wal_lag: Some(tenant_conf.max_lsn_wal_lag),
    3906           84 :                 trace_read_requests: Some(tenant_conf.trace_read_requests),
    3907           84 :                 eviction_policy: Some(tenant_conf.eviction_policy),
    3908           84 :                 min_resident_size_override: tenant_conf.min_resident_size_override,
    3909           84 :                 evictions_low_residence_duration_metric_threshold: Some(
    3910           84 :                     tenant_conf.evictions_low_residence_duration_metric_threshold,
    3911           84 :                 ),
    3912           84 :                 gc_feedback: Some(tenant_conf.gc_feedback),
    3913           84 :                 heatmap_period: Some(tenant_conf.heatmap_period),
    3914           84 :                 lazy_slru_download: Some(tenant_conf.lazy_slru_download),
    3915           84 :             }
    3916           84 :         }
    3917              :     }
    3918              : 
    3919              :     enum LoadMode {
    3920              :         Local,
    3921              :         Remote,
    3922              :     }
    3923              : 
    3924              :     pub struct TenantHarness {
    3925              :         pub conf: &'static PageServerConf,
    3926              :         pub tenant_conf: TenantConf,
    3927              :         pub tenant_shard_id: TenantShardId,
    3928              :         pub generation: Generation,
    3929              :         pub shard: ShardIndex,
    3930              :         pub remote_storage: GenericRemoteStorage,
    3931              :         pub remote_fs_dir: Utf8PathBuf,
    3932              :         pub deletion_queue: MockDeletionQueue,
    3933              :     }
    3934              : 
    3935              :     static LOG_HANDLE: OnceCell<()> = OnceCell::new();
    3936              : 
    3937           88 :     pub(crate) fn setup_logging() {
    3938           88 :         LOG_HANDLE.get_or_init(|| {
    3939           88 :             logging::init(
    3940           88 :                 logging::LogFormat::Test,
    3941           88 :                 // enable it in case the tests exercise code paths that use
    3942           88 :                 // debug_assert_current_span_has_tenant_and_timeline_id
    3943           88 :                 logging::TracingErrorLayerEnablement::EnableWithRustLogFilter,
    3944           88 :                 logging::Output::Stdout,
    3945           88 :             )
    3946           88 :             .expect("Failed to init test logging")
    3947           88 :         });
    3948           88 :     }
    3949              : 
    3950              :     impl TenantHarness {
    3951           82 :         pub fn create(test_name: &'static str) -> anyhow::Result<Self> {
    3952           82 :             setup_logging();
    3953           82 : 
    3954           82 :             let repo_dir = PageServerConf::test_repo_dir(test_name);
    3955           82 :             let _ = fs::remove_dir_all(&repo_dir);
    3956           82 :             fs::create_dir_all(&repo_dir)?;
    3957              : 
    3958           82 :             let conf = PageServerConf::dummy_conf(repo_dir);
    3959           82 :             // Make a static copy of the config. This can never be free'd, but that's
    3960           82 :             // OK in a test.
    3961           82 :             let conf: &'static PageServerConf = Box::leak(Box::new(conf));
    3962           82 : 
    3963           82 :             // Disable automatic GC and compaction to make the unit tests more deterministic.
    3964           82 :             // The tests perform them manually if needed.
    3965           82 :             let tenant_conf = TenantConf {
    3966           82 :                 gc_period: Duration::ZERO,
    3967           82 :                 compaction_period: Duration::ZERO,
    3968           82 :                 ..TenantConf::default()
    3969           82 :             };
    3970           82 : 
    3971           82 :             let tenant_id = TenantId::generate();
    3972           82 :             let tenant_shard_id = TenantShardId::unsharded(tenant_id);
    3973           82 :             fs::create_dir_all(conf.tenant_path(&tenant_shard_id))?;
    3974           82 :             fs::create_dir_all(conf.timelines_path(&tenant_shard_id))?;
    3975              : 
    3976              :             use remote_storage::{RemoteStorageConfig, RemoteStorageKind};
    3977           82 :             let remote_fs_dir = conf.workdir.join("localfs");
    3978           82 :             std::fs::create_dir_all(&remote_fs_dir).unwrap();
    3979           82 :             let config = RemoteStorageConfig {
    3980           82 :                 storage: RemoteStorageKind::LocalFs(remote_fs_dir.clone()),
    3981           82 :             };
    3982           82 :             let remote_storage = GenericRemoteStorage::from_config(&config).unwrap();
    3983           82 :             let deletion_queue = MockDeletionQueue::new(Some(remote_storage.clone()));
    3984           82 : 
    3985           82 :             Ok(Self {
    3986           82 :                 conf,
    3987           82 :                 tenant_conf,
    3988           82 :                 tenant_shard_id,
    3989           82 :                 generation: Generation::new(0xdeadbeef),
    3990           82 :                 shard: ShardIndex::unsharded(),
    3991           82 :                 remote_storage,
    3992           82 :                 remote_fs_dir,
    3993           82 :                 deletion_queue,
    3994           82 :             })
    3995           82 :         }
    3996              : 
    3997           10 :         pub fn span(&self) -> tracing::Span {
    3998           10 :             info_span!("TenantHarness", tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug())
    3999           10 :         }
    4000              : 
    4001           80 :         pub async fn load(&self) -> (Arc<Tenant>, RequestContext) {
    4002           80 :             let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error);
    4003           80 :             (
    4004           80 :                 self.try_load(&ctx)
    4005          107 :                     .await
    4006           80 :                     .expect("failed to load test tenant"),
    4007           80 :                 ctx,
    4008           80 :             )
    4009           80 :         }
    4010              : 
    4011           82 :         fn remote_empty(&self) -> bool {
    4012           82 :             let tenant_path = self.conf.tenant_path(&self.tenant_shard_id);
    4013           82 :             let remote_tenant_dir = self
    4014           82 :                 .remote_fs_dir
    4015           82 :                 .join(tenant_path.strip_prefix(&self.conf.workdir).unwrap());
    4016           82 :             if std::fs::metadata(&remote_tenant_dir).is_err() {
    4017           78 :                 return true;
    4018            4 :             }
    4019            4 : 
    4020            4 :             match std::fs::read_dir(remote_tenant_dir)
    4021            4 :                 .unwrap()
    4022            4 :                 .flatten()
    4023            4 :                 .next()
    4024              :             {
    4025            4 :                 Some(entry) => {
    4026            4 :                     tracing::debug!(
    4027            0 :                         "remote_empty: not empty, found file {}",
    4028            0 :                         entry.file_name().to_string_lossy(),
    4029            0 :                     );
    4030            4 :                     false
    4031              :                 }
    4032            0 :                 None => true,
    4033              :             }
    4034           82 :         }
    4035              : 
    4036           84 :         async fn do_try_load(
    4037           84 :             &self,
    4038           84 :             ctx: &RequestContext,
    4039           84 :             mode: LoadMode,
    4040           84 :         ) -> anyhow::Result<Arc<Tenant>> {
    4041           84 :             let walredo_mgr = Arc::new(WalRedoManager::from(TestRedoManager));
    4042           84 : 
    4043           84 :             let tenant = Arc::new(Tenant::new(
    4044           84 :                 TenantState::Loading,
    4045           84 :                 self.conf,
    4046           84 :                 AttachedTenantConf::try_from(LocationConf::attached_single(
    4047           84 :                     TenantConfOpt::from(self.tenant_conf),
    4048           84 :                     self.generation,
    4049           84 :                     &ShardParameters::default(),
    4050           84 :                 ))
    4051           84 :                 .unwrap(),
    4052           84 :                 // This is a legacy/test code path: sharding isn't supported here.
    4053           84 :                 ShardIdentity::unsharded(),
    4054           84 :                 Some(walredo_mgr),
    4055           84 :                 self.tenant_shard_id,
    4056           84 :                 Some(self.remote_storage.clone()),
    4057           84 :                 self.deletion_queue.new_client(),
    4058           84 :             ));
    4059           84 : 
    4060           84 :             match mode {
    4061              :                 LoadMode::Local => {
    4062           80 :                     tenant
    4063           80 :                         .load_local(ctx)
    4064           80 :                         .instrument(info_span!("try_load", tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug()))
    4065           80 :                         .await?;
    4066              :                 }
    4067              :                 LoadMode::Remote => {
    4068            4 :                     let preload = tenant
    4069            4 :                         .preload(&self.remote_storage, CancellationToken::new())
    4070            4 :                         .instrument(info_span!("try_load_preload", tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug()))
    4071           15 :                         .await?;
    4072            4 :                     tenant
    4073            4 :                         .attach(Some(preload), SpawnMode::Normal, ctx)
    4074            4 :                         .instrument(info_span!("try_load", tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug()))
    4075           16 :                         .await?;
    4076              :                 }
    4077              :             }
    4078              : 
    4079           82 :             tenant.state.send_replace(TenantState::Active);
    4080           82 :             for timeline in tenant.timelines.lock().unwrap().values() {
    4081            6 :                 timeline.set_state(TimelineState::Active);
    4082            6 :             }
    4083           82 :             Ok(tenant)
    4084           84 :         }
    4085              : 
    4086              :         /// For tests that specifically want to exercise the local load path, which does
    4087              :         /// not use remote storage.
    4088            2 :         pub async fn try_load_local(&self, ctx: &RequestContext) -> anyhow::Result<Arc<Tenant>> {
    4089            2 :             self.do_try_load(ctx, LoadMode::Local).await
    4090            2 :         }
    4091              : 
    4092              :         /// The 'load' in this function is either a local load or a normal attachment,
    4093           82 :         pub async fn try_load(&self, ctx: &RequestContext) -> anyhow::Result<Arc<Tenant>> {
    4094              :             // If we have nothing in remote storage, must use load_local instead of attach: attach
    4095              :             // will error out if there are no timelines.
    4096              :             //
    4097              :             // See https://github.com/neondatabase/neon/issues/5456 for how we will eliminate
    4098              :             // this weird state of a Tenant which exists but doesn't have any timelines.
    4099           82 :             let mode = match self.remote_empty() {
    4100           78 :                 true => LoadMode::Local,
    4101            4 :                 false => LoadMode::Remote,
    4102              :             };
    4103              : 
    4104          109 :             self.do_try_load(ctx, mode).await
    4105           82 :         }
    4106              : 
    4107            6 :         pub fn timeline_path(&self, timeline_id: &TimelineId) -> Utf8PathBuf {
    4108            6 :             self.conf.timeline_path(&self.tenant_shard_id, timeline_id)
    4109            6 :         }
    4110              :     }
    4111              : 
    4112              :     // Mock WAL redo manager that doesn't do much
    4113              :     pub(crate) struct TestRedoManager;
    4114              : 
    4115              :     impl TestRedoManager {
    4116              :         /// # Cancel-Safety
    4117              :         ///
    4118              :         /// This method is cancellation-safe.
    4119            0 :         pub async fn request_redo(
    4120            0 :             &self,
    4121            0 :             key: Key,
    4122            0 :             lsn: Lsn,
    4123            0 :             base_img: Option<(Lsn, Bytes)>,
    4124            0 :             records: Vec<(Lsn, NeonWalRecord)>,
    4125            0 :             _pg_version: u32,
    4126            0 :         ) -> anyhow::Result<Bytes> {
    4127            0 :             let s = format!(
    4128            0 :                 "redo for {} to get to {}, with {} and {} records",
    4129            0 :                 key,
    4130            0 :                 lsn,
    4131            0 :                 if base_img.is_some() {
    4132            0 :                     "base image"
    4133              :                 } else {
    4134            0 :                     "no base image"
    4135              :                 },
    4136            0 :                 records.len()
    4137            0 :             );
    4138            0 :             println!("{s}");
    4139            0 : 
    4140            0 :             Ok(TEST_IMG(&s))
    4141            0 :         }
    4142              :     }
    4143              : }
    4144              : 
    4145              : #[cfg(test)]
    4146              : mod tests {
    4147              :     use super::*;
    4148              :     use crate::keyspace::KeySpaceAccum;
    4149              :     use crate::repository::{Key, Value};
    4150              :     use crate::tenant::harness::*;
    4151              :     use crate::DEFAULT_PG_VERSION;
    4152              :     use crate::METADATA_FILE_NAME;
    4153              :     use bytes::BytesMut;
    4154              :     use hex_literal::hex;
    4155              :     use once_cell::sync::Lazy;
    4156              :     use rand::{thread_rng, Rng};
    4157              :     use tokio_util::sync::CancellationToken;
    4158              : 
    4159              :     static TEST_KEY: Lazy<Key> =
    4160           18 :         Lazy::new(|| Key::from_slice(&hex!("010000000033333333444444445500000001")));
    4161              : 
    4162            2 :     #[tokio::test]
    4163            2 :     async fn test_basic() -> anyhow::Result<()> {
    4164            2 :         let (tenant, ctx) = TenantHarness::create("test_basic")?.load().await;
    4165            2 :         let tline = tenant
    4166            2 :             .create_test_timeline(TIMELINE_ID, Lsn(0x08), DEFAULT_PG_VERSION, &ctx)
    4167            6 :             .await?;
    4168              : 
    4169            2 :         let writer = tline.writer().await;
    4170            2 :         writer
    4171            2 :             .put(
    4172            2 :                 *TEST_KEY,
    4173            2 :                 Lsn(0x10),
    4174            2 :                 &Value::Image(TEST_IMG("foo at 0x10")),
    4175            2 :                 &ctx,
    4176            2 :             )
    4177            1 :             .await?;
    4178            2 :         writer.finish_write(Lsn(0x10));
    4179            2 :         drop(writer);
    4180              : 
    4181            2 :         let writer = tline.writer().await;
    4182            2 :         writer
    4183            2 :             .put(
    4184            2 :                 *TEST_KEY,
    4185            2 :                 Lsn(0x20),
    4186            2 :                 &Value::Image(TEST_IMG("foo at 0x20")),
    4187            2 :                 &ctx,
    4188            2 :             )
    4189            0 :             .await?;
    4190            2 :         writer.finish_write(Lsn(0x20));
    4191            2 :         drop(writer);
    4192              : 
    4193            2 :         assert_eq!(
    4194            2 :             tline.get(*TEST_KEY, Lsn(0x10), &ctx).await?,
    4195            2 :             TEST_IMG("foo at 0x10")
    4196              :         );
    4197            2 :         assert_eq!(
    4198            2 :             tline.get(*TEST_KEY, Lsn(0x1f), &ctx).await?,
    4199            2 :             TEST_IMG("foo at 0x10")
    4200              :         );
    4201            2 :         assert_eq!(
    4202            2 :             tline.get(*TEST_KEY, Lsn(0x20), &ctx).await?,
    4203            2 :             TEST_IMG("foo at 0x20")
    4204              :         );
    4205              : 
    4206            2 :         Ok(())
    4207              :     }
    4208              : 
    4209            2 :     #[tokio::test]
    4210            2 :     async fn no_duplicate_timelines() -> anyhow::Result<()> {
    4211            2 :         let (tenant, ctx) = TenantHarness::create("no_duplicate_timelines")?
    4212            2 :             .load()
    4213            2 :             .await;
    4214            2 :         let _ = tenant
    4215            2 :             .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
    4216            8 :             .await?;
    4217              : 
    4218            2 :         match tenant
    4219            2 :             .create_empty_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
    4220            0 :             .await
    4221              :         {
    4222            0 :             Ok(_) => panic!("duplicate timeline creation should fail"),
    4223            2 :             Err(e) => assert_eq!(e.to_string(), "Already exists".to_string()),
    4224              :         }
    4225              : 
    4226            2 :         Ok(())
    4227              :     }
    4228              : 
    4229              :     /// Convenience function to create a page image with given string as the only content
    4230           10 :     pub fn test_value(s: &str) -> Value {
    4231           10 :         let mut buf = BytesMut::new();
    4232           10 :         buf.extend_from_slice(s.as_bytes());
    4233           10 :         Value::Image(buf.freeze())
    4234           10 :     }
    4235              : 
    4236              :     ///
    4237              :     /// Test branch creation
    4238              :     ///
    4239            2 :     #[tokio::test]
    4240            2 :     async fn test_branch() -> anyhow::Result<()> {
    4241              :         use std::str::from_utf8;
    4242              : 
    4243            2 :         let (tenant, ctx) = TenantHarness::create("test_branch")?.load().await;
    4244            2 :         let tline = tenant
    4245            2 :             .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
    4246            7 :             .await?;
    4247            2 :         let writer = tline.writer().await;
    4248              : 
    4249              :         #[allow(non_snake_case)]
    4250            2 :         let TEST_KEY_A: Key = Key::from_hex("110000000033333333444444445500000001").unwrap();
    4251            2 :         #[allow(non_snake_case)]
    4252            2 :         let TEST_KEY_B: Key = Key::from_hex("110000000033333333444444445500000002").unwrap();
    4253            2 : 
    4254            2 :         // Insert a value on the timeline
    4255            2 :         writer
    4256            2 :             .put(TEST_KEY_A, Lsn(0x20), &test_value("foo at 0x20"), &ctx)
    4257            1 :             .await?;
    4258            2 :         writer
    4259            2 :             .put(TEST_KEY_B, Lsn(0x20), &test_value("foobar at 0x20"), &ctx)
    4260            0 :             .await?;
    4261            2 :         writer.finish_write(Lsn(0x20));
    4262            2 : 
    4263            2 :         writer
    4264            2 :             .put(TEST_KEY_A, Lsn(0x30), &test_value("foo at 0x30"), &ctx)
    4265            0 :             .await?;
    4266            2 :         writer.finish_write(Lsn(0x30));
    4267            2 :         writer
    4268            2 :             .put(TEST_KEY_A, Lsn(0x40), &test_value("foo at 0x40"), &ctx)
    4269            0 :             .await?;
    4270            2 :         writer.finish_write(Lsn(0x40));
    4271            2 : 
    4272            2 :         //assert_current_logical_size(&tline, Lsn(0x40));
    4273            2 : 
    4274            2 :         // Branch the history, modify relation differently on the new timeline
    4275            2 :         tenant
    4276            2 :             .branch_timeline_test(&tline, NEW_TIMELINE_ID, Some(Lsn(0x30)), &ctx)
    4277            2 :             .await?;
    4278            2 :         let newtline = tenant
    4279            2 :             .get_timeline(NEW_TIMELINE_ID, true)
    4280            2 :             .expect("Should have a local timeline");
    4281            2 :         let new_writer = newtline.writer().await;
    4282            2 :         new_writer
    4283            2 :             .put(TEST_KEY_A, Lsn(0x40), &test_value("bar at 0x40"), &ctx)
    4284            1 :             .await?;
    4285            2 :         new_writer.finish_write(Lsn(0x40));
    4286              : 
    4287              :         // Check page contents on both branches
    4288            2 :         assert_eq!(
    4289            2 :             from_utf8(&tline.get(TEST_KEY_A, Lsn(0x40), &ctx).await?)?,
    4290              :             "foo at 0x40"
    4291              :         );
    4292            2 :         assert_eq!(
    4293            2 :             from_utf8(&newtline.get(TEST_KEY_A, Lsn(0x40), &ctx).await?)?,
    4294              :             "bar at 0x40"
    4295              :         );
    4296            2 :         assert_eq!(
    4297            2 :             from_utf8(&newtline.get(TEST_KEY_B, Lsn(0x40), &ctx).await?)?,
    4298              :             "foobar at 0x20"
    4299              :         );
    4300              : 
    4301              :         //assert_current_logical_size(&tline, Lsn(0x40));
    4302              : 
    4303            2 :         Ok(())
    4304              :     }
    4305              : 
    4306           20 :     async fn make_some_layers(
    4307           20 :         tline: &Timeline,
    4308           20 :         start_lsn: Lsn,
    4309           20 :         ctx: &RequestContext,
    4310           20 :     ) -> anyhow::Result<()> {
    4311           20 :         let mut lsn = start_lsn;
    4312              :         #[allow(non_snake_case)]
    4313              :         {
    4314           20 :             let writer = tline.writer().await;
    4315              :             // Create a relation on the timeline
    4316           20 :             writer
    4317           20 :                 .put(
    4318           20 :                     *TEST_KEY,
    4319           20 :                     lsn,
    4320           20 :                     &Value::Image(TEST_IMG(&format!("foo at {}", lsn))),
    4321           20 :                     ctx,
    4322           20 :                 )
    4323           10 :                 .await?;
    4324           20 :             writer.finish_write(lsn);
    4325           20 :             lsn += 0x10;
    4326           20 :             writer
    4327           20 :                 .put(
    4328           20 :                     *TEST_KEY,
    4329           20 :                     lsn,
    4330           20 :                     &Value::Image(TEST_IMG(&format!("foo at {}", lsn))),
    4331           20 :                     ctx,
    4332           20 :                 )
    4333            0 :                 .await?;
    4334           20 :             writer.finish_write(lsn);
    4335           20 :             lsn += 0x10;
    4336           20 :         }
    4337           20 :         tline.freeze_and_flush().await?;
    4338              :         {
    4339           20 :             let writer = tline.writer().await;
    4340           20 :             writer
    4341           20 :                 .put(
    4342           20 :                     *TEST_KEY,
    4343           20 :                     lsn,
    4344           20 :                     &Value::Image(TEST_IMG(&format!("foo at {}", lsn))),
    4345           20 :                     ctx,
    4346           20 :                 )
    4347           10 :                 .await?;
    4348           20 :             writer.finish_write(lsn);
    4349           20 :             lsn += 0x10;
    4350           20 :             writer
    4351           20 :                 .put(
    4352           20 :                     *TEST_KEY,
    4353           20 :                     lsn,
    4354           20 :                     &Value::Image(TEST_IMG(&format!("foo at {}", lsn))),
    4355           20 :                     ctx,
    4356           20 :                 )
    4357            0 :                 .await?;
    4358           20 :             writer.finish_write(lsn);
    4359           20 :         }
    4360           20 :         tline.freeze_and_flush().await
    4361           20 :     }
    4362              : 
    4363            2 :     #[tokio::test]
    4364            2 :     async fn test_prohibit_branch_creation_on_garbage_collected_data() -> anyhow::Result<()> {
    4365            2 :         let (tenant, ctx) =
    4366            2 :             TenantHarness::create("test_prohibit_branch_creation_on_garbage_collected_data")?
    4367            2 :                 .load()
    4368            2 :                 .await;
    4369            2 :         let tline = tenant
    4370            2 :             .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
    4371            7 :             .await?;
    4372            6 :         make_some_layers(tline.as_ref(), Lsn(0x20), &ctx).await?;
    4373              : 
    4374              :         // this removes layers before lsn 40 (50 minus 10), so there are two remaining layers, image and delta for 31-50
    4375              :         // FIXME: this doesn't actually remove any layer currently, given how the flushing
    4376              :         // and compaction works. But it does set the 'cutoff' point so that the cross check
    4377              :         // below should fail.
    4378            2 :         tenant
    4379            2 :             .gc_iteration(
    4380            2 :                 Some(TIMELINE_ID),
    4381            2 :                 0x10,
    4382            2 :                 Duration::ZERO,
    4383            2 :                 &CancellationToken::new(),
    4384            2 :                 &ctx,
    4385            2 :             )
    4386            0 :             .await?;
    4387              : 
    4388              :         // try to branch at lsn 25, should fail because we already garbage collected the data
    4389            2 :         match tenant
    4390            2 :             .branch_timeline_test(&tline, NEW_TIMELINE_ID, Some(Lsn(0x25)), &ctx)
    4391            0 :             .await
    4392              :         {
    4393            0 :             Ok(_) => panic!("branching should have failed"),
    4394            2 :             Err(err) => {
    4395            2 :                 let CreateTimelineError::AncestorLsn(err) = err else {
    4396            0 :                     panic!("wrong error type")
    4397              :                 };
    4398            2 :                 assert!(err.to_string().contains("invalid branch start lsn"));
    4399            2 :                 assert!(err
    4400            2 :                     .source()
    4401            2 :                     .unwrap()
    4402            2 :                     .to_string()
    4403            2 :                     .contains("we might've already garbage collected needed data"))
    4404              :             }
    4405              :         }
    4406              : 
    4407            2 :         Ok(())
    4408              :     }
    4409              : 
    4410            2 :     #[tokio::test]
    4411            2 :     async fn test_prohibit_branch_creation_on_pre_initdb_lsn() -> anyhow::Result<()> {
    4412            2 :         let (tenant, ctx) =
    4413            2 :             TenantHarness::create("test_prohibit_branch_creation_on_pre_initdb_lsn")?
    4414            2 :                 .load()
    4415            2 :                 .await;
    4416              : 
    4417            2 :         let tline = tenant
    4418            2 :             .create_test_timeline(TIMELINE_ID, Lsn(0x50), DEFAULT_PG_VERSION, &ctx)
    4419            7 :             .await?;
    4420              :         // try to branch at lsn 0x25, should fail because initdb lsn is 0x50
    4421            2 :         match tenant
    4422            2 :             .branch_timeline_test(&tline, NEW_TIMELINE_ID, Some(Lsn(0x25)), &ctx)
    4423            0 :             .await
    4424              :         {
    4425            0 :             Ok(_) => panic!("branching should have failed"),
    4426            2 :             Err(err) => {
    4427            2 :                 let CreateTimelineError::AncestorLsn(err) = err else {
    4428            0 :                     panic!("wrong error type");
    4429              :                 };
    4430            2 :                 assert!(&err.to_string().contains("invalid branch start lsn"));
    4431            2 :                 assert!(&err
    4432            2 :                     .source()
    4433            2 :                     .unwrap()
    4434            2 :                     .to_string()
    4435            2 :                     .contains("is earlier than latest GC horizon"));
    4436              :             }
    4437              :         }
    4438              : 
    4439            2 :         Ok(())
    4440              :     }
    4441              : 
    4442              :     /*
    4443              :     // FIXME: This currently fails to error out. Calling GC doesn't currently
    4444              :     // remove the old value, we'd need to work a little harder
    4445              :     #[tokio::test]
    4446              :     async fn test_prohibit_get_for_garbage_collected_data() -> anyhow::Result<()> {
    4447              :         let repo =
    4448              :             RepoHarness::create("test_prohibit_get_for_garbage_collected_data")?
    4449              :             .load();
    4450              : 
    4451              :         let tline = repo.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION)?;
    4452              :         make_some_layers(tline.as_ref(), Lsn(0x20), &ctx).await?;
    4453              : 
    4454              :         repo.gc_iteration(Some(TIMELINE_ID), 0x10, Duration::ZERO)?;
    4455              :         let latest_gc_cutoff_lsn = tline.get_latest_gc_cutoff_lsn();
    4456              :         assert!(*latest_gc_cutoff_lsn > Lsn(0x25));
    4457              :         match tline.get(*TEST_KEY, Lsn(0x25)) {
    4458              :             Ok(_) => panic!("request for page should have failed"),
    4459              :             Err(err) => assert!(err.to_string().contains("not found at")),
    4460              :         }
    4461              :         Ok(())
    4462              :     }
    4463              :      */
    4464              : 
    4465            2 :     #[tokio::test]
    4466            2 :     async fn test_get_branchpoints_from_an_inactive_timeline() -> anyhow::Result<()> {
    4467            2 :         let (tenant, ctx) =
    4468            2 :             TenantHarness::create("test_get_branchpoints_from_an_inactive_timeline")?
    4469            2 :                 .load()
    4470            2 :                 .await;
    4471            2 :         let tline = tenant
    4472            2 :             .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
    4473            8 :             .await?;
    4474            6 :         make_some_layers(tline.as_ref(), Lsn(0x20), &ctx).await?;
    4475              : 
    4476            2 :         tenant
    4477            2 :             .branch_timeline_test(&tline, NEW_TIMELINE_ID, Some(Lsn(0x40)), &ctx)
    4478            2 :             .await?;
    4479            2 :         let newtline = tenant
    4480            2 :             .get_timeline(NEW_TIMELINE_ID, true)
    4481            2 :             .expect("Should have a local timeline");
    4482            2 : 
    4483            6 :         make_some_layers(newtline.as_ref(), Lsn(0x60), &ctx).await?;
    4484              : 
    4485            2 :         tline.set_broken("test".to_owned());
    4486            2 : 
    4487            2 :         tenant
    4488            2 :             .gc_iteration(
    4489            2 :                 Some(TIMELINE_ID),
    4490            2 :                 0x10,
    4491            2 :                 Duration::ZERO,
    4492            2 :                 &CancellationToken::new(),
    4493            2 :                 &ctx,
    4494            2 :             )
    4495            0 :             .await?;
    4496              : 
    4497              :         // The branchpoints should contain all timelines, even ones marked
    4498              :         // as Broken.
    4499              :         {
    4500            2 :             let branchpoints = &tline.gc_info.read().unwrap().retain_lsns;
    4501            2 :             assert_eq!(branchpoints.len(), 1);
    4502            2 :             assert_eq!(branchpoints[0], Lsn(0x40));
    4503              :         }
    4504              : 
    4505              :         // You can read the key from the child branch even though the parent is
    4506              :         // Broken, as long as you don't need to access data from the parent.
    4507            2 :         assert_eq!(
    4508            4 :             newtline.get(*TEST_KEY, Lsn(0x70), &ctx).await?,
    4509            2 :             TEST_IMG(&format!("foo at {}", Lsn(0x70)))
    4510              :         );
    4511              : 
    4512              :         // This needs to traverse to the parent, and fails.
    4513            2 :         let err = newtline.get(*TEST_KEY, Lsn(0x50), &ctx).await.unwrap_err();
    4514            2 :         assert!(err
    4515            2 :             .to_string()
    4516            2 :             .contains("will not become active. Current state: Broken"));
    4517              : 
    4518            2 :         Ok(())
    4519              :     }
    4520              : 
    4521            2 :     #[tokio::test]
    4522            2 :     async fn test_retain_data_in_parent_which_is_needed_for_child() -> anyhow::Result<()> {
    4523            2 :         let (tenant, ctx) =
    4524            2 :             TenantHarness::create("test_retain_data_in_parent_which_is_needed_for_child")?
    4525            2 :                 .load()
    4526            2 :                 .await;
    4527            2 :         let tline = tenant
    4528            2 :             .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
    4529            7 :             .await?;
    4530            6 :         make_some_layers(tline.as_ref(), Lsn(0x20), &ctx).await?;
    4531              : 
    4532            2 :         tenant
    4533            2 :             .branch_timeline_test(&tline, NEW_TIMELINE_ID, Some(Lsn(0x40)), &ctx)
    4534            2 :             .await?;
    4535            2 :         let newtline = tenant
    4536            2 :             .get_timeline(NEW_TIMELINE_ID, true)
    4537            2 :             .expect("Should have a local timeline");
    4538            2 :         // this removes layers before lsn 40 (50 minus 10), so there are two remaining layers, image and delta for 31-50
    4539            2 :         tenant
    4540            2 :             .gc_iteration(
    4541            2 :                 Some(TIMELINE_ID),
    4542            2 :                 0x10,
    4543            2 :                 Duration::ZERO,
    4544            2 :                 &CancellationToken::new(),
    4545            2 :                 &ctx,
    4546            2 :             )
    4547            0 :             .await?;
    4548            4 :         assert!(newtline.get(*TEST_KEY, Lsn(0x25), &ctx).await.is_ok());
    4549              : 
    4550            2 :         Ok(())
    4551              :     }
    4552            2 :     #[tokio::test]
    4553            2 :     async fn test_parent_keeps_data_forever_after_branching() -> anyhow::Result<()> {
    4554            2 :         let (tenant, ctx) =
    4555            2 :             TenantHarness::create("test_parent_keeps_data_forever_after_branching")?
    4556            2 :                 .load()
    4557            2 :                 .await;
    4558            2 :         let tline = tenant
    4559            2 :             .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
    4560            6 :             .await?;
    4561            6 :         make_some_layers(tline.as_ref(), Lsn(0x20), &ctx).await?;
    4562              : 
    4563            2 :         tenant
    4564            2 :             .branch_timeline_test(&tline, NEW_TIMELINE_ID, Some(Lsn(0x40)), &ctx)
    4565            2 :             .await?;
    4566            2 :         let newtline = tenant
    4567            2 :             .get_timeline(NEW_TIMELINE_ID, true)
    4568            2 :             .expect("Should have a local timeline");
    4569            2 : 
    4570            6 :         make_some_layers(newtline.as_ref(), Lsn(0x60), &ctx).await?;
    4571              : 
    4572              :         // run gc on parent
    4573            2 :         tenant
    4574            2 :             .gc_iteration(
    4575            2 :                 Some(TIMELINE_ID),
    4576            2 :                 0x10,
    4577            2 :                 Duration::ZERO,
    4578            2 :                 &CancellationToken::new(),
    4579            2 :                 &ctx,
    4580            2 :             )
    4581            0 :             .await?;
    4582              : 
    4583              :         // Check that the data is still accessible on the branch.
    4584            2 :         assert_eq!(
    4585            7 :             newtline.get(*TEST_KEY, Lsn(0x50), &ctx).await?,
    4586            2 :             TEST_IMG(&format!("foo at {}", Lsn(0x40)))
    4587              :         );
    4588              : 
    4589            2 :         Ok(())
    4590              :     }
    4591              : 
    4592            2 :     #[tokio::test]
    4593            2 :     async fn timeline_load() -> anyhow::Result<()> {
    4594              :         const TEST_NAME: &str = "timeline_load";
    4595            2 :         let harness = TenantHarness::create(TEST_NAME)?;
    4596              :         {
    4597            2 :             let (tenant, ctx) = harness.load().await;
    4598            2 :             let tline = tenant
    4599            2 :                 .create_test_timeline(TIMELINE_ID, Lsn(0x7000), DEFAULT_PG_VERSION, &ctx)
    4600            7 :                 .await?;
    4601            6 :             make_some_layers(tline.as_ref(), Lsn(0x8000), &ctx).await?;
    4602              :             // so that all uploads finish & we can call harness.load() below again
    4603            2 :             tenant
    4604            2 :                 .shutdown(Default::default(), true)
    4605            2 :                 .instrument(harness.span())
    4606            2 :                 .await
    4607            2 :                 .ok()
    4608            2 :                 .unwrap();
    4609              :         }
    4610              : 
    4611           12 :         let (tenant, _ctx) = harness.load().await;
    4612            2 :         tenant
    4613            2 :             .get_timeline(TIMELINE_ID, true)
    4614            2 :             .expect("cannot load timeline");
    4615            2 : 
    4616            2 :         Ok(())
    4617              :     }
    4618              : 
    4619            2 :     #[tokio::test]
    4620            2 :     async fn timeline_load_with_ancestor() -> anyhow::Result<()> {
    4621              :         const TEST_NAME: &str = "timeline_load_with_ancestor";
    4622            2 :         let harness = TenantHarness::create(TEST_NAME)?;
    4623              :         // create two timelines
    4624              :         {
    4625            2 :             let (tenant, ctx) = harness.load().await;
    4626            2 :             let tline = tenant
    4627            2 :                 .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
    4628            6 :                 .await?;
    4629              : 
    4630            6 :             make_some_layers(tline.as_ref(), Lsn(0x20), &ctx).await?;
    4631              : 
    4632            2 :             let child_tline = tenant
    4633            2 :                 .branch_timeline_test(&tline, NEW_TIMELINE_ID, Some(Lsn(0x40)), &ctx)
    4634            2 :                 .await?;
    4635            2 :             child_tline.set_state(TimelineState::Active);
    4636            2 : 
    4637            2 :             let newtline = tenant
    4638            2 :                 .get_timeline(NEW_TIMELINE_ID, true)
    4639            2 :                 .expect("Should have a local timeline");
    4640            2 : 
    4641            6 :             make_some_layers(newtline.as_ref(), Lsn(0x60), &ctx).await?;
    4642              : 
    4643              :             // so that all uploads finish & we can call harness.load() below again
    4644            2 :             tenant
    4645            2 :                 .shutdown(Default::default(), true)
    4646            2 :                 .instrument(harness.span())
    4647            4 :                 .await
    4648            2 :                 .ok()
    4649            2 :                 .unwrap();
    4650              :         }
    4651              : 
    4652              :         // check that both of them are initially unloaded
    4653           19 :         let (tenant, _ctx) = harness.load().await;
    4654              : 
    4655              :         // check that both, child and ancestor are loaded
    4656            2 :         let _child_tline = tenant
    4657            2 :             .get_timeline(NEW_TIMELINE_ID, true)
    4658            2 :             .expect("cannot get child timeline loaded");
    4659            2 : 
    4660            2 :         let _ancestor_tline = tenant
    4661            2 :             .get_timeline(TIMELINE_ID, true)
    4662            2 :             .expect("cannot get ancestor timeline loaded");
    4663            2 : 
    4664            2 :         Ok(())
    4665              :     }
    4666              : 
    4667            2 :     #[tokio::test]
    4668            2 :     async fn delta_layer_dumping() -> anyhow::Result<()> {
    4669              :         use storage_layer::AsLayerDesc;
    4670            2 :         let (tenant, ctx) = TenantHarness::create("test_layer_dumping")?.load().await;
    4671            2 :         let tline = tenant
    4672            2 :             .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
    4673            7 :             .await?;
    4674            6 :         make_some_layers(tline.as_ref(), Lsn(0x20), &ctx).await?;
    4675              : 
    4676            2 :         let layer_map = tline.layers.read().await;
    4677            2 :         let level0_deltas = layer_map
    4678            2 :             .layer_map()
    4679            2 :             .get_level0_deltas()?
    4680            2 :             .into_iter()
    4681            4 :             .map(|desc| layer_map.get_from_desc(&desc))
    4682            2 :             .collect::<Vec<_>>();
    4683              : 
    4684            2 :         assert!(!level0_deltas.is_empty());
    4685              : 
    4686            6 :         for delta in level0_deltas {
    4687              :             // Ensure we are dumping a delta layer here
    4688            4 :             assert!(delta.layer_desc().is_delta);
    4689            8 :             delta.dump(true, &ctx).await.unwrap();
    4690              :         }
    4691              : 
    4692            2 :         Ok(())
    4693              :     }
    4694              : 
    4695            2 :     #[tokio::test]
    4696            2 :     async fn corrupt_local_metadata() -> anyhow::Result<()> {
    4697              :         const TEST_NAME: &str = "corrupt_metadata";
    4698            2 :         let harness = TenantHarness::create(TEST_NAME)?;
    4699            2 :         let (tenant, ctx) = harness.load().await;
    4700              : 
    4701            2 :         let tline = tenant
    4702            2 :             .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
    4703            8 :             .await?;
    4704            2 :         drop(tline);
    4705            2 :         // so that all uploads finish & we can call harness.try_load() below again
    4706            2 :         tenant
    4707            2 :             .shutdown(Default::default(), true)
    4708            2 :             .instrument(harness.span())
    4709            2 :             .await
    4710            2 :             .ok()
    4711            2 :             .unwrap();
    4712            2 :         drop(tenant);
    4713            2 : 
    4714            2 :         // Corrupt local metadata
    4715            2 :         let metadata_path = harness.timeline_path(&TIMELINE_ID).join(METADATA_FILE_NAME);
    4716            2 :         assert!(metadata_path.is_file());
    4717            2 :         let mut metadata_bytes = std::fs::read(&metadata_path)?;
    4718            2 :         assert_eq!(metadata_bytes.len(), 512);
    4719            2 :         metadata_bytes[8] ^= 1;
    4720            2 :         std::fs::write(metadata_path, metadata_bytes)?;
    4721              : 
    4722            2 :         let err = harness.try_load_local(&ctx).await.expect_err("should fail");
    4723            2 :         // get all the stack with all .context, not only the last one
    4724            2 :         let message = format!("{err:#}");
    4725            2 :         let expected = "failed to load metadata";
    4726            2 :         assert!(
    4727            2 :             message.contains(expected),
    4728            0 :             "message '{message}' expected to contain {expected}"
    4729              :         );
    4730              : 
    4731            2 :         let mut found_error_message = false;
    4732            2 :         let mut err_source = err.source();
    4733            2 :         while let Some(source) = err_source {
    4734            2 :             if source.to_string().contains("metadata checksum mismatch") {
    4735            2 :                 found_error_message = true;
    4736            2 :                 break;
    4737            0 :             }
    4738            0 :             err_source = source.source();
    4739              :         }
    4740            2 :         assert!(
    4741            2 :             found_error_message,
    4742            0 :             "didn't find the corrupted metadata error in {}",
    4743              :             message
    4744              :         );
    4745              : 
    4746            2 :         Ok(())
    4747              :     }
    4748              : 
    4749            2 :     #[tokio::test]
    4750            2 :     async fn test_images() -> anyhow::Result<()> {
    4751            2 :         let (tenant, ctx) = TenantHarness::create("test_images")?.load().await;
    4752            2 :         let tline = tenant
    4753            2 :             .create_test_timeline(TIMELINE_ID, Lsn(0x08), DEFAULT_PG_VERSION, &ctx)
    4754            7 :             .await?;
    4755              : 
    4756            2 :         let writer = tline.writer().await;
    4757            2 :         writer
    4758            2 :             .put(
    4759            2 :                 *TEST_KEY,
    4760            2 :                 Lsn(0x10),
    4761            2 :                 &Value::Image(TEST_IMG("foo at 0x10")),
    4762            2 :                 &ctx,
    4763            2 :             )
    4764            1 :             .await?;
    4765            2 :         writer.finish_write(Lsn(0x10));
    4766            2 :         drop(writer);
    4767            2 : 
    4768            2 :         tline.freeze_and_flush().await?;
    4769            2 :         tline
    4770            2 :             .compact(&CancellationToken::new(), EnumSet::empty(), &ctx)
    4771            0 :             .await?;
    4772              : 
    4773            2 :         let writer = tline.writer().await;
    4774            2 :         writer
    4775            2 :             .put(
    4776            2 :                 *TEST_KEY,
    4777            2 :                 Lsn(0x20),
    4778            2 :                 &Value::Image(TEST_IMG("foo at 0x20")),
    4779            2 :                 &ctx,
    4780            2 :             )
    4781            1 :             .await?;
    4782            2 :         writer.finish_write(Lsn(0x20));
    4783            2 :         drop(writer);
    4784            2 : 
    4785            2 :         tline.freeze_and_flush().await?;
    4786            2 :         tline
    4787            2 :             .compact(&CancellationToken::new(), EnumSet::empty(), &ctx)
    4788            0 :             .await?;
    4789              : 
    4790            2 :         let writer = tline.writer().await;
    4791            2 :         writer
    4792            2 :             .put(
    4793            2 :                 *TEST_KEY,
    4794            2 :                 Lsn(0x30),
    4795            2 :                 &Value::Image(TEST_IMG("foo at 0x30")),
    4796            2 :                 &ctx,
    4797            2 :             )
    4798            1 :             .await?;
    4799            2 :         writer.finish_write(Lsn(0x30));
    4800            2 :         drop(writer);
    4801            2 : 
    4802            2 :         tline.freeze_and_flush().await?;
    4803            2 :         tline
    4804            2 :             .compact(&CancellationToken::new(), EnumSet::empty(), &ctx)
    4805            0 :             .await?;
    4806              : 
    4807            2 :         let writer = tline.writer().await;
    4808            2 :         writer
    4809            2 :             .put(
    4810            2 :                 *TEST_KEY,
    4811            2 :                 Lsn(0x40),
    4812            2 :                 &Value::Image(TEST_IMG("foo at 0x40")),
    4813            2 :                 &ctx,
    4814            2 :             )
    4815            1 :             .await?;
    4816            2 :         writer.finish_write(Lsn(0x40));
    4817            2 :         drop(writer);
    4818            2 : 
    4819            2 :         tline.freeze_and_flush().await?;
    4820            2 :         tline
    4821            2 :             .compact(&CancellationToken::new(), EnumSet::empty(), &ctx)
    4822            0 :             .await?;
    4823              : 
    4824            2 :         assert_eq!(
    4825            4 :             tline.get(*TEST_KEY, Lsn(0x10), &ctx).await?,
    4826            2 :             TEST_IMG("foo at 0x10")
    4827              :         );
    4828            2 :         assert_eq!(
    4829            3 :             tline.get(*TEST_KEY, Lsn(0x1f), &ctx).await?,
    4830            2 :             TEST_IMG("foo at 0x10")
    4831              :         );
    4832            2 :         assert_eq!(
    4833            2 :             tline.get(*TEST_KEY, Lsn(0x20), &ctx).await?,
    4834            2 :             TEST_IMG("foo at 0x20")
    4835              :         );
    4836            2 :         assert_eq!(
    4837            4 :             tline.get(*TEST_KEY, Lsn(0x30), &ctx).await?,
    4838            2 :             TEST_IMG("foo at 0x30")
    4839              :         );
    4840            2 :         assert_eq!(
    4841            4 :             tline.get(*TEST_KEY, Lsn(0x40), &ctx).await?,
    4842            2 :             TEST_IMG("foo at 0x40")
    4843              :         );
    4844              : 
    4845            2 :         Ok(())
    4846              :     }
    4847              : 
    4848              :     //
    4849              :     // Insert 1000 key-value pairs with increasing keys, flush, compact, GC.
    4850              :     // Repeat 50 times.
    4851              :     //
    4852            2 :     #[tokio::test]
    4853            2 :     async fn test_bulk_insert() -> anyhow::Result<()> {
    4854            2 :         let harness = TenantHarness::create("test_bulk_insert")?;
    4855            2 :         let (tenant, ctx) = harness.load().await;
    4856            2 :         let tline = tenant
    4857            2 :             .create_test_timeline(TIMELINE_ID, Lsn(0x08), DEFAULT_PG_VERSION, &ctx)
    4858            8 :             .await?;
    4859              : 
    4860            2 :         let mut lsn = Lsn(0x10);
    4861            2 : 
    4862            2 :         let mut keyspace = KeySpaceAccum::new();
    4863            2 : 
    4864            2 :         let mut test_key = Key::from_hex("010000000033333333444444445500000000").unwrap();
    4865            2 :         let mut blknum = 0;
    4866          102 :         for _ in 0..50 {
    4867      1000100 :             for _ in 0..10000 {
    4868      1000000 :                 test_key.field6 = blknum;
    4869      1000000 :                 let writer = tline.writer().await;
    4870      1000000 :                 writer
    4871      1000000 :                     .put(
    4872      1000000 :                         test_key,
    4873      1000000 :                         lsn,
    4874      1000000 :                         &Value::Image(TEST_IMG(&format!("{} at {}", blknum, lsn))),
    4875      1000000 :                         &ctx,
    4876      1000000 :                     )
    4877        15854 :                     .await?;
    4878      1000000 :                 writer.finish_write(lsn);
    4879      1000000 :                 drop(writer);
    4880      1000000 : 
    4881      1000000 :                 keyspace.add_key(test_key);
    4882      1000000 : 
    4883      1000000 :                 lsn = Lsn(lsn.0 + 0x10);
    4884      1000000 :                 blknum += 1;
    4885              :             }
    4886              : 
    4887          100 :             let cutoff = tline.get_last_record_lsn();
    4888          100 : 
    4889          100 :             tline
    4890          100 :                 .update_gc_info(
    4891          100 :                     Vec::new(),
    4892          100 :                     cutoff,
    4893          100 :                     Duration::ZERO,
    4894          100 :                     &CancellationToken::new(),
    4895          100 :                     &ctx,
    4896          100 :                 )
    4897            0 :                 .await?;
    4898          100 :             tline.freeze_and_flush().await?;
    4899          100 :             tline
    4900          100 :                 .compact(&CancellationToken::new(), EnumSet::empty(), &ctx)
    4901        18642 :                 .await?;
    4902          100 :             tline.gc().await?;
    4903              :         }
    4904              : 
    4905            2 :         Ok(())
    4906              :     }
    4907              : 
    4908            2 :     #[tokio::test]
    4909            2 :     async fn test_random_updates() -> anyhow::Result<()> {
    4910            2 :         let harness = TenantHarness::create("test_random_updates")?;
    4911            2 :         let (tenant, ctx) = harness.load().await;
    4912            2 :         let tline = tenant
    4913            2 :             .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
    4914            6 :             .await?;
    4915              : 
    4916              :         const NUM_KEYS: usize = 1000;
    4917              : 
    4918            2 :         let mut test_key = Key::from_hex("010000000033333333444444445500000000").unwrap();
    4919            2 : 
    4920            2 :         let mut keyspace = KeySpaceAccum::new();
    4921            2 : 
    4922            2 :         // Track when each page was last modified. Used to assert that
    4923            2 :         // a read sees the latest page version.
    4924            2 :         let mut updated = [Lsn(0); NUM_KEYS];
    4925            2 : 
    4926            2 :         let mut lsn = Lsn(0x10);
    4927              :         #[allow(clippy::needless_range_loop)]
    4928         2002 :         for blknum in 0..NUM_KEYS {
    4929         2000 :             lsn = Lsn(lsn.0 + 0x10);
    4930         2000 :             test_key.field6 = blknum as u32;
    4931         2000 :             let writer = tline.writer().await;
    4932         2000 :             writer
    4933         2000 :                 .put(
    4934         2000 :                     test_key,
    4935         2000 :                     lsn,
    4936         2000 :                     &Value::Image(TEST_IMG(&format!("{} at {}", blknum, lsn))),
    4937         2000 :                     &ctx,
    4938         2000 :                 )
    4939           33 :                 .await?;
    4940         2000 :             writer.finish_write(lsn);
    4941         2000 :             updated[blknum] = lsn;
    4942         2000 :             drop(writer);
    4943         2000 : 
    4944         2000 :             keyspace.add_key(test_key);
    4945              :         }
    4946              : 
    4947          102 :         for _ in 0..50 {
    4948       100100 :             for _ in 0..NUM_KEYS {
    4949       100000 :                 lsn = Lsn(lsn.0 + 0x10);
    4950       100000 :                 let blknum = thread_rng().gen_range(0..NUM_KEYS);
    4951       100000 :                 test_key.field6 = blknum as u32;
    4952       100000 :                 let writer = tline.writer().await;
    4953       100000 :                 writer
    4954       100000 :                     .put(
    4955       100000 :                         test_key,
    4956       100000 :                         lsn,
    4957       100000 :                         &Value::Image(TEST_IMG(&format!("{} at {}", blknum, lsn))),
    4958       100000 :                         &ctx,
    4959       100000 :                     )
    4960         1604 :                     .await?;
    4961       100000 :                 writer.finish_write(lsn);
    4962       100000 :                 drop(writer);
    4963       100000 :                 updated[blknum] = lsn;
    4964              :             }
    4965              : 
    4966              :             // Read all the blocks
    4967       100000 :             for (blknum, last_lsn) in updated.iter().enumerate() {
    4968       100000 :                 test_key.field6 = blknum as u32;
    4969       100000 :                 assert_eq!(
    4970       100000 :                     tline.get(test_key, lsn, &ctx).await?,
    4971       100000 :                     TEST_IMG(&format!("{} at {}", blknum, last_lsn))
    4972              :                 );
    4973              :             }
    4974              : 
    4975              :             // Perform a cycle of flush, compact, and GC
    4976          100 :             let cutoff = tline.get_last_record_lsn();
    4977          100 :             tline
    4978          100 :                 .update_gc_info(
    4979          100 :                     Vec::new(),
    4980          100 :                     cutoff,
    4981          100 :                     Duration::ZERO,
    4982          100 :                     &CancellationToken::new(),
    4983          100 :                     &ctx,
    4984          100 :                 )
    4985            0 :                 .await?;
    4986          100 :             tline.freeze_and_flush().await?;
    4987          100 :             tline
    4988          100 :                 .compact(&CancellationToken::new(), EnumSet::empty(), &ctx)
    4989         2279 :                 .await?;
    4990          100 :             tline.gc().await?;
    4991              :         }
    4992              : 
    4993            2 :         Ok(())
    4994              :     }
    4995              : 
    4996            2 :     #[tokio::test]
    4997            2 :     async fn test_traverse_branches() -> anyhow::Result<()> {
    4998            2 :         let (tenant, ctx) = TenantHarness::create("test_traverse_branches")?
    4999            2 :             .load()
    5000            2 :             .await;
    5001            2 :         let mut tline = tenant
    5002            2 :             .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
    5003            7 :             .await?;
    5004              : 
    5005              :         const NUM_KEYS: usize = 1000;
    5006              : 
    5007            2 :         let mut test_key = Key::from_hex("010000000033333333444444445500000000").unwrap();
    5008            2 : 
    5009            2 :         let mut keyspace = KeySpaceAccum::new();
    5010            2 : 
    5011            2 :         // Track when each page was last modified. Used to assert that
    5012            2 :         // a read sees the latest page version.
    5013            2 :         let mut updated = [Lsn(0); NUM_KEYS];
    5014            2 : 
    5015            2 :         let mut lsn = Lsn(0x10);
    5016              :         #[allow(clippy::needless_range_loop)]
    5017         2002 :         for blknum in 0..NUM_KEYS {
    5018         2000 :             lsn = Lsn(lsn.0 + 0x10);
    5019         2000 :             test_key.field6 = blknum as u32;
    5020         2000 :             let writer = tline.writer().await;
    5021         2000 :             writer
    5022         2000 :                 .put(
    5023         2000 :                     test_key,
    5024         2000 :                     lsn,
    5025         2000 :                     &Value::Image(TEST_IMG(&format!("{} at {}", blknum, lsn))),
    5026         2000 :                     &ctx,
    5027         2000 :                 )
    5028           33 :                 .await?;
    5029         2000 :             writer.finish_write(lsn);
    5030         2000 :             updated[blknum] = lsn;
    5031         2000 :             drop(writer);
    5032         2000 : 
    5033         2000 :             keyspace.add_key(test_key);
    5034              :         }
    5035              : 
    5036          102 :         for _ in 0..50 {
    5037          100 :             let new_tline_id = TimelineId::generate();
    5038          100 :             tenant
    5039          100 :                 .branch_timeline_test(&tline, new_tline_id, Some(lsn), &ctx)
    5040          101 :                 .await?;
    5041          100 :             tline = tenant
    5042          100 :                 .get_timeline(new_tline_id, true)
    5043          100 :                 .expect("Should have the branched timeline");
    5044              : 
    5045       100100 :             for _ in 0..NUM_KEYS {
    5046       100000 :                 lsn = Lsn(lsn.0 + 0x10);
    5047       100000 :                 let blknum = thread_rng().gen_range(0..NUM_KEYS);
    5048       100000 :                 test_key.field6 = blknum as u32;
    5049       100000 :                 let writer = tline.writer().await;
    5050       100000 :                 writer
    5051       100000 :                     .put(
    5052       100000 :                         test_key,
    5053       100000 :                         lsn,
    5054       100000 :                         &Value::Image(TEST_IMG(&format!("{} at {}", blknum, lsn))),
    5055       100000 :                         &ctx,
    5056       100000 :                     )
    5057         1640 :                     .await?;
    5058       100000 :                 println!("updating {} at {}", blknum, lsn);
    5059       100000 :                 writer.finish_write(lsn);
    5060       100000 :                 drop(writer);
    5061       100000 :                 updated[blknum] = lsn;
    5062              :             }
    5063              : 
    5064              :             // Read all the blocks
    5065       100000 :             for (blknum, last_lsn) in updated.iter().enumerate() {
    5066       100000 :                 test_key.field6 = blknum as u32;
    5067       100000 :                 assert_eq!(
    5068       100000 :                     tline.get(test_key, lsn, &ctx).await?,
    5069       100000 :                     TEST_IMG(&format!("{} at {}", blknum, last_lsn))
    5070              :                 );
    5071              :             }
    5072              : 
    5073              :             // Perform a cycle of flush, compact, and GC
    5074          100 :             let cutoff = tline.get_last_record_lsn();
    5075          100 :             tline
    5076          100 :                 .update_gc_info(
    5077          100 :                     Vec::new(),
    5078          100 :                     cutoff,
    5079          100 :                     Duration::ZERO,
    5080          100 :                     &CancellationToken::new(),
    5081          100 :                     &ctx,
    5082          100 :                 )
    5083            0 :                 .await?;
    5084          102 :             tline.freeze_and_flush().await?;
    5085          100 :             tline
    5086          100 :                 .compact(&CancellationToken::new(), EnumSet::empty(), &ctx)
    5087        13317 :                 .await?;
    5088          100 :             tline.gc().await?;
    5089              :         }
    5090              : 
    5091            2 :         Ok(())
    5092              :     }
    5093              : 
    5094            2 :     #[tokio::test]
    5095            2 :     async fn test_traverse_ancestors() -> anyhow::Result<()> {
    5096            2 :         let (tenant, ctx) = TenantHarness::create("test_traverse_ancestors")?
    5097            2 :             .load()
    5098            2 :             .await;
    5099            2 :         let mut tline = tenant
    5100            2 :             .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
    5101            7 :             .await?;
    5102              : 
    5103              :         const NUM_KEYS: usize = 100;
    5104              :         const NUM_TLINES: usize = 50;
    5105              : 
    5106            2 :         let mut test_key = Key::from_hex("010000000033333333444444445500000000").unwrap();
    5107            2 :         // Track page mutation lsns across different timelines.
    5108            2 :         let mut updated = [[Lsn(0); NUM_KEYS]; NUM_TLINES];
    5109            2 : 
    5110            2 :         let mut lsn = Lsn(0x10);
    5111              : 
    5112              :         #[allow(clippy::needless_range_loop)]
    5113          102 :         for idx in 0..NUM_TLINES {
    5114          100 :             let new_tline_id = TimelineId::generate();
    5115          100 :             tenant
    5116          100 :                 .branch_timeline_test(&tline, new_tline_id, Some(lsn), &ctx)
    5117          106 :                 .await?;
    5118          100 :             tline = tenant
    5119          100 :                 .get_timeline(new_tline_id, true)
    5120          100 :                 .expect("Should have the branched timeline");
    5121              : 
    5122        10100 :             for _ in 0..NUM_KEYS {
    5123        10000 :                 lsn = Lsn(lsn.0 + 0x10);
    5124        10000 :                 let blknum = thread_rng().gen_range(0..NUM_KEYS);
    5125        10000 :                 test_key.field6 = blknum as u32;
    5126        10000 :                 let writer = tline.writer().await;
    5127        10000 :                 writer
    5128        10000 :                     .put(
    5129        10000 :                         test_key,
    5130        10000 :                         lsn,
    5131        10000 :                         &Value::Image(TEST_IMG(&format!("{} {} at {}", idx, blknum, lsn))),
    5132        10000 :                         &ctx,
    5133        10000 :                     )
    5134          174 :                     .await?;
    5135        10000 :                 println!("updating [{}][{}] at {}", idx, blknum, lsn);
    5136        10000 :                 writer.finish_write(lsn);
    5137        10000 :                 drop(writer);
    5138        10000 :                 updated[idx][blknum] = lsn;
    5139              :             }
    5140              :         }
    5141              : 
    5142              :         // Read pages from leaf timeline across all ancestors.
    5143          100 :         for (idx, lsns) in updated.iter().enumerate() {
    5144        10000 :             for (blknum, lsn) in lsns.iter().enumerate() {
    5145              :                 // Skip empty mutations.
    5146        10000 :                 if lsn.0 == 0 {
    5147         3642 :                     continue;
    5148         6358 :                 }
    5149         6358 :                 println!("checking [{idx}][{blknum}] at {lsn}");
    5150         6358 :                 test_key.field6 = blknum as u32;
    5151         6358 :                 assert_eq!(
    5152         6358 :                     tline.get(test_key, *lsn, &ctx).await?,
    5153         6358 :                     TEST_IMG(&format!("{idx} {blknum} at {lsn}"))
    5154              :                 );
    5155              :             }
    5156              :         }
    5157            2 :         Ok(())
    5158              :     }
    5159              : 
    5160            2 :     #[tokio::test]
    5161            2 :     async fn test_write_at_initdb_lsn_takes_optimization_code_path() -> anyhow::Result<()> {
    5162            2 :         let (tenant, ctx) = TenantHarness::create("test_empty_test_timeline_is_usable")?
    5163            2 :             .load()
    5164            2 :             .await;
    5165              : 
    5166            2 :         let initdb_lsn = Lsn(0x20);
    5167            2 :         let utline = tenant
    5168            2 :             .create_empty_timeline(TIMELINE_ID, initdb_lsn, DEFAULT_PG_VERSION, &ctx)
    5169            3 :             .await?;
    5170            2 :         let tline = utline.raw_timeline().unwrap();
    5171            2 : 
    5172            2 :         // Spawn flush loop now so that we can set the `expect_initdb_optimization`
    5173            2 :         tline.maybe_spawn_flush_loop();
    5174            2 : 
    5175            2 :         // Make sure the timeline has the minimum set of required keys for operation.
    5176            2 :         // The only operation you can always do on an empty timeline is to `put` new data.
    5177            2 :         // Except if you `put` at `initdb_lsn`.
    5178            2 :         // In that case, there's an optimization to directly create image layers instead of delta layers.
    5179            2 :         // It uses `repartition()`, which assumes some keys to be present.
    5180            2 :         // Let's make sure the test timeline can handle that case.
    5181            2 :         {
    5182            2 :             let mut state = tline.flush_loop_state.lock().unwrap();
    5183            2 :             assert_eq!(
    5184            2 :                 timeline::FlushLoopState::Running {
    5185            2 :                     expect_initdb_optimization: false,
    5186            2 :                     initdb_optimization_count: 0,
    5187            2 :                 },
    5188            2 :                 *state
    5189            2 :             );
    5190            2 :             *state = timeline::FlushLoopState::Running {
    5191            2 :                 expect_initdb_optimization: true,
    5192            2 :                 initdb_optimization_count: 0,
    5193            2 :             };
    5194            2 :         }
    5195            2 : 
    5196            2 :         // Make writes at the initdb_lsn. When we flush it below, it should be handled by the optimization.
    5197            2 :         // As explained above, the optimization requires some keys to be present.
    5198            2 :         // As per `create_empty_timeline` documentation, use init_empty to set them.
    5199            2 :         // This is what `create_test_timeline` does, by the way.
    5200            2 :         let mut modification = tline.begin_modification(initdb_lsn);
    5201            2 :         modification
    5202            2 :             .init_empty_test_timeline()
    5203            2 :             .context("init_empty_test_timeline")?;
    5204            2 :         modification
    5205            2 :             .commit(&ctx)
    5206            1 :             .await
    5207            2 :             .context("commit init_empty_test_timeline modification")?;
    5208              : 
    5209              :         // Do the flush. The flush code will check the expectations that we set above.
    5210            2 :         tline.freeze_and_flush().await?;
    5211              : 
    5212              :         // assert freeze_and_flush exercised the initdb optimization
    5213              :         {
    5214            2 :             let state = tline.flush_loop_state.lock().unwrap();
    5215              :             let timeline::FlushLoopState::Running {
    5216            2 :                 expect_initdb_optimization,
    5217            2 :                 initdb_optimization_count,
    5218            2 :             } = *state
    5219              :             else {
    5220            0 :                 panic!("unexpected state: {:?}", *state);
    5221              :             };
    5222            2 :             assert!(expect_initdb_optimization);
    5223            2 :             assert!(initdb_optimization_count > 0);
    5224              :         }
    5225            2 :         Ok(())
    5226              :     }
    5227              : 
    5228            2 :     #[tokio::test]
    5229            2 :     async fn test_uninit_mark_crash() -> anyhow::Result<()> {
    5230            2 :         let name = "test_uninit_mark_crash";
    5231            2 :         let harness = TenantHarness::create(name)?;
    5232              :         {
    5233            2 :             let (tenant, ctx) = harness.load().await;
    5234            2 :             let tline = tenant
    5235            2 :                 .create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)
    5236            3 :                 .await?;
    5237              :             // Keeps uninit mark in place
    5238            2 :             let raw_tline = tline.raw_timeline().unwrap();
    5239            2 :             raw_tline
    5240            2 :                 .shutdown()
    5241            2 :                 .instrument(info_span!("test_shutdown", tenant_id=%raw_tline.tenant_shard_id, shard_id=%raw_tline.tenant_shard_id.shard_slug(), timeline_id=%TIMELINE_ID))
    5242            0 :                 .await;
    5243            2 :             std::mem::forget(tline);
    5244              :         }
    5245              : 
    5246            2 :         let (tenant, _) = harness.load().await;
    5247            2 :         match tenant.get_timeline(TIMELINE_ID, false) {
    5248            0 :             Ok(_) => panic!("timeline should've been removed during load"),
    5249            2 :             Err(e) => {
    5250            2 :                 assert_eq!(
    5251            2 :                     e,
    5252            2 :                     GetTimelineError::NotFound {
    5253            2 :                         tenant_id: tenant.tenant_shard_id,
    5254            2 :                         timeline_id: TIMELINE_ID,
    5255            2 :                     }
    5256            2 :                 )
    5257              :             }
    5258              :         }
    5259              : 
    5260            2 :         assert!(!harness
    5261            2 :             .conf
    5262            2 :             .timeline_path(&tenant.tenant_shard_id, &TIMELINE_ID)
    5263            2 :             .exists());
    5264              : 
    5265            2 :         assert!(!harness
    5266            2 :             .conf
    5267            2 :             .timeline_uninit_mark_file_path(tenant.tenant_shard_id, TIMELINE_ID)
    5268            2 :             .exists());
    5269              : 
    5270            2 :         Ok(())
    5271              :     }
    5272              : }
        

Generated by: LCOV version 2.1-beta