LCOV - code coverage report
Current view: top level - pageserver/src - tenant.rs (source / functions) Coverage Total Hit
Test: aca8877be6ceba750c1be359ed71bc1799d52b30.info Lines: 88.4 % 3466 3065
Test Date: 2024-02-14 18:05:35 Functions: 74.2 % 392 291

            Line data    Source code
       1              : //!
       2              : //! Timeline repository implementation that keeps old data in files on disk, and
       3              : //! the recent changes in memory. See tenant/*_layer.rs files.
       4              : //! The functions here are responsible for locating the correct layer for the
       5              : //! get/put call, walking back the timeline branching history as needed.
       6              : //!
       7              : //! The files are stored in the .neon/tenants/<tenant_id>/timelines/<timeline_id>
       8              : //! directory. See docs/pageserver-storage.md for how the files are managed.
       9              : //! In addition to the layer files, there is a metadata file in the same
      10              : //! directory that contains information about the timeline, in particular its
      11              : //! parent timeline, and the last LSN that has been written to disk.
      12              : //!
      13              : 
      14              : use anyhow::{bail, Context};
      15              : use camino::Utf8Path;
      16              : use camino::Utf8PathBuf;
      17              : use enumset::EnumSet;
      18              : use futures::stream::FuturesUnordered;
      19              : use futures::FutureExt;
      20              : use futures::StreamExt;
      21              : use pageserver_api::models;
      22              : use pageserver_api::models::TimelineState;
      23              : use pageserver_api::models::WalRedoManagerStatus;
      24              : use pageserver_api::shard::ShardIdentity;
      25              : use pageserver_api::shard::TenantShardId;
      26              : use remote_storage::DownloadError;
      27              : use remote_storage::GenericRemoteStorage;
      28              : use std::fmt;
      29              : use storage_broker::BrokerClientChannel;
      30              : use tokio::io::BufReader;
      31              : use tokio::sync::watch;
      32              : use tokio::task::JoinSet;
      33              : use tokio_util::sync::CancellationToken;
      34              : use tracing::*;
      35              : use utils::backoff;
      36              : use utils::completion;
      37              : use utils::crashsafe::path_with_suffix_extension;
      38              : use utils::failpoint_support;
      39              : use utils::fs_ext;
      40              : use utils::sync::gate::Gate;
      41              : use utils::sync::gate::GateGuard;
      42              : use utils::timeout::timeout_cancellable;
      43              : use utils::timeout::TimeoutCancellableError;
      44              : 
      45              : use self::config::AttachedLocationConfig;
      46              : use self::config::AttachmentMode;
      47              : use self::config::LocationConf;
      48              : use self::config::TenantConf;
      49              : use self::delete::DeleteTenantFlow;
      50              : use self::metadata::LoadMetadataError;
      51              : use self::metadata::TimelineMetadata;
      52              : use self::mgr::GetActiveTenantError;
      53              : use self::mgr::GetTenantError;
      54              : use self::mgr::TenantsMap;
      55              : use self::remote_timeline_client::upload::upload_index_part;
      56              : use self::remote_timeline_client::RemoteTimelineClient;
      57              : use self::timeline::uninit::TimelineExclusionError;
      58              : use self::timeline::uninit::TimelineUninitMark;
      59              : use self::timeline::uninit::UninitializedTimeline;
      60              : use self::timeline::EvictionTaskTenantState;
      61              : use self::timeline::TimelineResources;
      62              : use self::timeline::WaitLsnError;
      63              : use crate::config::PageServerConf;
      64              : use crate::context::{DownloadBehavior, RequestContext};
      65              : use crate::deletion_queue::DeletionQueueClient;
      66              : use crate::deletion_queue::DeletionQueueError;
      67              : use crate::import_datadir;
      68              : use crate::is_uninit_mark;
      69              : use crate::metrics::TENANT;
      70              : use crate::metrics::{
      71              :     remove_tenant_metrics, BROKEN_TENANTS_SET, TENANT_STATE_METRIC, TENANT_SYNTHETIC_SIZE_METRIC,
      72              : };
      73              : use crate::repository::GcResult;
      74              : use crate::task_mgr;
      75              : use crate::task_mgr::TaskKind;
      76              : use crate::tenant::config::LocationMode;
      77              : use crate::tenant::config::TenantConfOpt;
      78              : use crate::tenant::metadata::load_metadata;
      79              : pub use crate::tenant::remote_timeline_client::index::IndexPart;
      80              : use crate::tenant::remote_timeline_client::remote_initdb_archive_path;
      81              : use crate::tenant::remote_timeline_client::MaybeDeletedIndexPart;
      82              : use crate::tenant::remote_timeline_client::INITDB_PATH;
      83              : use crate::tenant::storage_layer::DeltaLayer;
      84              : use crate::tenant::storage_layer::ImageLayer;
      85              : use crate::InitializationOrder;
      86              : use std::cmp::min;
      87              : use std::collections::hash_map::Entry;
      88              : use std::collections::BTreeSet;
      89              : use std::collections::HashMap;
      90              : use std::collections::HashSet;
      91              : use std::fmt::Debug;
      92              : use std::fmt::Display;
      93              : use std::fs;
      94              : use std::fs::File;
      95              : use std::io;
      96              : use std::ops::Bound::Included;
      97              : use std::sync::atomic::AtomicU64;
      98              : use std::sync::atomic::Ordering;
      99              : use std::sync::Arc;
     100              : use std::sync::{Mutex, RwLock};
     101              : use std::time::{Duration, Instant};
     102              : 
     103              : use crate::span;
     104              : use crate::tenant::timeline::delete::DeleteTimelineFlow;
     105              : use crate::tenant::timeline::uninit::cleanup_timeline_directory;
     106              : use crate::virtual_file::VirtualFile;
     107              : use crate::walredo::PostgresRedoManager;
     108              : use crate::TEMP_FILE_SUFFIX;
     109              : use once_cell::sync::Lazy;
     110              : pub use pageserver_api::models::TenantState;
     111              : use tokio::sync::Semaphore;
     112              : 
     113          378 : static INIT_DB_SEMAPHORE: Lazy<Semaphore> = Lazy::new(|| Semaphore::new(8));
     114              : use toml_edit;
     115              : use utils::{
     116              :     crashsafe,
     117              :     generation::Generation,
     118              :     id::TimelineId,
     119              :     lsn::{Lsn, RecordLsn},
     120              : };
     121              : 
     122              : /// Declare a failpoint that can use the `pause` failpoint action.
     123              : /// We don't want to block the executor thread, hence, spawn_blocking + await.
     124              : macro_rules! pausable_failpoint {
     125              :     ($name:literal) => {
     126              :         if cfg!(feature = "testing") {
     127              :             tokio::task::spawn_blocking({
     128              :                 let current = tracing::Span::current();
     129              :                 move || {
     130              :                     let _entered = current.entered();
     131              :                     tracing::info!("at failpoint {}", $name);
     132              :                     fail::fail_point!($name);
     133              :                 }
     134              :             })
     135              :             .await
     136              :             .expect("spawn_blocking");
     137              :         }
     138              :     };
     139              :     ($name:literal, $cond:expr) => {
     140              :         if cfg!(feature = "testing") {
     141              :             if $cond {
     142              :                 pausable_failpoint!($name)
     143              :             }
     144              :         }
     145              :     };
     146              : }
     147              : 
     148              : pub mod blob_io;
     149              : pub mod block_io;
     150              : 
     151              : pub mod disk_btree;
     152              : pub(crate) mod ephemeral_file;
     153              : pub mod layer_map;
     154              : 
     155              : pub mod metadata;
     156              : mod par_fsync;
     157              : pub mod remote_timeline_client;
     158              : pub mod storage_layer;
     159              : 
     160              : pub mod config;
     161              : pub mod delete;
     162              : pub mod mgr;
     163              : pub mod secondary;
     164              : pub mod tasks;
     165              : pub mod upload_queue;
     166              : 
     167              : pub(crate) mod timeline;
     168              : 
     169              : pub mod size;
     170              : 
     171              : pub(crate) use crate::span::debug_assert_current_span_has_tenant_and_timeline_id;
     172              : pub(crate) use timeline::{LogicalSizeCalculationCause, PageReconstructError, Timeline};
     173              : 
     174              : // re-export for use in remote_timeline_client.rs
     175              : pub use crate::tenant::metadata::save_metadata;
     176              : 
     177              : // re-export for use in walreceiver
     178              : pub use crate::tenant::timeline::WalReceiverInfo;
     179              : 
     180              : /// The "tenants" part of `tenants/<tenant>/timelines...`
     181              : pub const TENANTS_SEGMENT_NAME: &str = "tenants";
     182              : 
     183              : /// Parts of the `.neon/tenants/<tenant_id>/timelines/<timeline_id>` directory prefix.
     184              : pub const TIMELINES_SEGMENT_NAME: &str = "timelines";
     185              : 
     186              : pub const TENANT_DELETED_MARKER_FILE_NAME: &str = "deleted";
     187              : 
     188              : /// References to shared objects that are passed into each tenant, such
     189              : /// as the shared remote storage client and process initialization state.
     190          881 : #[derive(Clone)]
     191              : pub struct TenantSharedResources {
     192              :     pub broker_client: storage_broker::BrokerClientChannel,
     193              :     pub remote_storage: Option<GenericRemoteStorage>,
     194              :     pub deletion_queue_client: DeletionQueueClient,
     195              : }
     196              : 
     197              : /// A [`Tenant`] is really an _attached_ tenant.  The configuration
     198              : /// for an attached tenant is a subset of the [`LocationConf`], represented
     199              : /// in this struct.
     200              : pub(super) struct AttachedTenantConf {
     201              :     tenant_conf: TenantConfOpt,
     202              :     location: AttachedLocationConfig,
     203              : }
     204              : 
     205              : impl AttachedTenantConf {
     206         1007 :     fn try_from(location_conf: LocationConf) -> anyhow::Result<Self> {
     207         1007 :         match &location_conf.mode {
     208         1007 :             LocationMode::Attached(attach_conf) => Ok(Self {
     209         1007 :                 tenant_conf: location_conf.tenant_conf,
     210         1007 :                 location: *attach_conf,
     211         1007 :             }),
     212              :             LocationMode::Secondary(_) => {
     213            0 :                 anyhow::bail!("Attempted to construct AttachedTenantConf from a LocationConf in secondary mode")
     214              :             }
     215              :         }
     216         1007 :     }
     217              : }
     218              : struct TimelinePreload {
     219              :     timeline_id: TimelineId,
     220              :     client: RemoteTimelineClient,
     221              :     index_part: Result<MaybeDeletedIndexPart, DownloadError>,
     222              : }
     223              : 
     224              : pub(crate) struct TenantPreload {
     225              :     deleting: bool,
     226              :     timelines: HashMap<TimelineId, TimelinePreload>,
     227              : }
     228              : 
     229              : /// When we spawn a tenant, there is a special mode for tenant creation that
     230              : /// avoids trying to read anything from remote storage.
     231              : pub(crate) enum SpawnMode {
     232              :     Normal,
     233              :     Create,
     234              : }
     235              : 
     236              : ///
     237              : /// Tenant consists of multiple timelines. Keep them in a hash table.
     238              : ///
     239              : pub struct Tenant {
     240              :     // Global pageserver config parameters
     241              :     pub conf: &'static PageServerConf,
     242              : 
     243              :     /// The value creation timestamp, used to measure activation delay, see:
     244              :     /// <https://github.com/neondatabase/neon/issues/4025>
     245              :     constructed_at: Instant,
     246              : 
     247              :     state: watch::Sender<TenantState>,
     248              : 
     249              :     // Overridden tenant-specific config parameters.
     250              :     // We keep TenantConfOpt sturct here to preserve the information
     251              :     // about parameters that are not set.
     252              :     // This is necessary to allow global config updates.
     253              :     tenant_conf: Arc<RwLock<AttachedTenantConf>>,
     254              : 
     255              :     tenant_shard_id: TenantShardId,
     256              : 
     257              :     // The detailed sharding information, beyond the number/count in tenant_shard_id
     258              :     shard_identity: ShardIdentity,
     259              : 
     260              :     /// The remote storage generation, used to protect S3 objects from split-brain.
     261              :     /// Does not change over the lifetime of the [`Tenant`] object.
     262              :     ///
     263              :     /// This duplicates the generation stored in LocationConf, but that structure is mutable:
     264              :     /// this copy enforces the invariant that generatio doesn't change during a Tenant's lifetime.
     265              :     generation: Generation,
     266              : 
     267              :     timelines: Mutex<HashMap<TimelineId, Arc<Timeline>>>,
     268              : 
     269              :     /// During timeline creation, we first insert the TimelineId to the
     270              :     /// creating map, then `timelines`, then remove it from the creating map.
     271              :     /// **Lock order**: if acquring both, acquire`timelines` before `timelines_creating`
     272              :     timelines_creating: std::sync::Mutex<HashSet<TimelineId>>,
     273              : 
     274              :     // This mutex prevents creation of new timelines during GC.
     275              :     // Adding yet another mutex (in addition to `timelines`) is needed because holding
     276              :     // `timelines` mutex during all GC iteration
     277              :     // may block for a long time `get_timeline`, `get_timelines_state`,... and other operations
     278              :     // with timelines, which in turn may cause dropping replication connection, expiration of wait_for_lsn
     279              :     // timeout...
     280              :     gc_cs: tokio::sync::Mutex<()>,
     281              :     walredo_mgr: Option<Arc<WalRedoManager>>,
     282              : 
     283              :     // provides access to timeline data sitting in the remote storage
     284              :     pub(crate) remote_storage: Option<GenericRemoteStorage>,
     285              : 
     286              :     // Access to global deletion queue for when this tenant wants to schedule a deletion
     287              :     deletion_queue_client: DeletionQueueClient,
     288              : 
     289              :     /// Cached logical sizes updated updated on each [`Tenant::gather_size_inputs`].
     290              :     cached_logical_sizes: tokio::sync::Mutex<HashMap<(TimelineId, Lsn), u64>>,
     291              :     cached_synthetic_tenant_size: Arc<AtomicU64>,
     292              : 
     293              :     eviction_task_tenant_state: tokio::sync::Mutex<EvictionTaskTenantState>,
     294              : 
     295              :     /// If the tenant is in Activating state, notify this to encourage it
     296              :     /// to proceed to Active as soon as possible, rather than waiting for lazy
     297              :     /// background warmup.
     298              :     pub(crate) activate_now_sem: tokio::sync::Semaphore,
     299              : 
     300              :     pub(crate) delete_progress: Arc<tokio::sync::Mutex<DeleteTenantFlow>>,
     301              : 
     302              :     // Cancellation token fires when we have entered shutdown().  This is a parent of
     303              :     // Timelines' cancellation token.
     304              :     pub(crate) cancel: CancellationToken,
     305              : 
     306              :     // Users of the Tenant such as the page service must take this Gate to avoid
     307              :     // trying to use a Tenant which is shutting down.
     308              :     pub(crate) gate: Gate,
     309              : }
     310              : 
     311              : impl std::fmt::Debug for Tenant {
     312            2 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
     313            2 :         write!(f, "{} ({})", self.tenant_shard_id, self.current_state())
     314            2 :     }
     315              : }
     316              : 
     317              : pub(crate) enum WalRedoManager {
     318              :     Prod(PostgresRedoManager),
     319              :     #[cfg(test)]
     320              :     Test(harness::TestRedoManager),
     321              : }
     322              : 
     323              : impl From<PostgresRedoManager> for WalRedoManager {
     324          884 :     fn from(mgr: PostgresRedoManager) -> Self {
     325          884 :         Self::Prod(mgr)
     326          884 :     }
     327              : }
     328              : 
     329              : #[cfg(test)]
     330              : impl From<harness::TestRedoManager> for WalRedoManager {
     331           86 :     fn from(mgr: harness::TestRedoManager) -> Self {
     332           86 :         Self::Test(mgr)
     333           86 :     }
     334              : }
     335              : 
     336              : impl WalRedoManager {
     337            0 :     pub(crate) fn maybe_quiesce(&self, idle_timeout: Duration) {
     338            0 :         match self {
     339          806 :             Self::Prod(mgr) => mgr.maybe_quiesce(idle_timeout),
     340          806 :             #[cfg(test)]
     341          806 :             Self::Test(_) => {
     342            0 :                 // Not applicable to test redo manager
     343            0 :             }
     344          806 :         }
     345          806 :     }
     346              : 
     347              :     /// # Cancel-Safety
     348              :     ///
     349              :     /// This method is cancellation-safe.
     350      2319802 :     pub async fn request_redo(
     351      2319802 :         &self,
     352      2319802 :         key: crate::repository::Key,
     353      2319802 :         lsn: Lsn,
     354      2319802 :         base_img: Option<(Lsn, bytes::Bytes)>,
     355      2319802 :         records: Vec<(Lsn, crate::walrecord::NeonWalRecord)>,
     356      2319802 :         pg_version: u32,
     357      2319802 :     ) -> anyhow::Result<bytes::Bytes> {
     358            8 :         match self {
     359      2319794 :             Self::Prod(mgr) => {
     360            0 :                 mgr.request_redo(key, lsn, base_img, records, pg_version)
     361            0 :                     .await
     362              :             }
     363              :             #[cfg(test)]
     364            8 :             Self::Test(mgr) => {
     365            8 :                 mgr.request_redo(key, lsn, base_img, records, pg_version)
     366            0 :                     .await
     367              :             }
     368              :         }
     369      2319801 :     }
     370              : 
     371            0 :     pub(crate) fn status(&self) -> Option<WalRedoManagerStatus> {
     372            0 :         match self {
     373          482 :             WalRedoManager::Prod(m) => m.status(),
     374          482 :             #[cfg(test)]
     375          482 :             WalRedoManager::Test(_) => None,
     376          482 :         }
     377          482 :     }
     378              : }
     379              : 
     380          106 : #[derive(Debug, thiserror::Error, PartialEq, Eq)]
     381              : pub enum GetTimelineError {
     382              :     #[error("Timeline {tenant_id}/{timeline_id} is not active, state: {state:?}")]
     383              :     NotActive {
     384              :         tenant_id: TenantShardId,
     385              :         timeline_id: TimelineId,
     386              :         state: TimelineState,
     387              :     },
     388              :     #[error("Timeline {tenant_id}/{timeline_id} was not found")]
     389              :     NotFound {
     390              :         tenant_id: TenantShardId,
     391              :         timeline_id: TimelineId,
     392              :     },
     393              : }
     394              : 
     395            0 : #[derive(Debug, thiserror::Error)]
     396              : pub enum LoadLocalTimelineError {
     397              :     #[error("FailedToLoad")]
     398              :     Load(#[source] anyhow::Error),
     399              :     #[error("FailedToResumeDeletion")]
     400              :     ResumeDeletion(#[source] anyhow::Error),
     401              : }
     402              : 
     403          139 : #[derive(thiserror::Error)]
     404              : pub enum DeleteTimelineError {
     405              :     #[error("NotFound")]
     406              :     NotFound,
     407              : 
     408              :     #[error("HasChildren")]
     409              :     HasChildren(Vec<TimelineId>),
     410              : 
     411              :     #[error("Timeline deletion is already in progress")]
     412              :     AlreadyInProgress(Arc<tokio::sync::Mutex<DeleteTimelineFlow>>),
     413              : 
     414              :     #[error(transparent)]
     415              :     Other(#[from] anyhow::Error),
     416              : }
     417              : 
     418              : impl Debug for DeleteTimelineError {
     419            0 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
     420            0 :         match self {
     421            0 :             Self::NotFound => write!(f, "NotFound"),
     422            0 :             Self::HasChildren(c) => f.debug_tuple("HasChildren").field(c).finish(),
     423            0 :             Self::AlreadyInProgress(_) => f.debug_tuple("AlreadyInProgress").finish(),
     424            0 :             Self::Other(e) => f.debug_tuple("Other").field(e).finish(),
     425              :         }
     426            0 :     }
     427              : }
     428              : 
     429              : pub enum SetStoppingError {
     430              :     AlreadyStopping(completion::Barrier),
     431              :     Broken,
     432              : }
     433              : 
     434              : impl Debug for SetStoppingError {
     435            0 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
     436            0 :         match self {
     437            0 :             Self::AlreadyStopping(_) => f.debug_tuple("AlreadyStopping").finish(),
     438            0 :             Self::Broken => write!(f, "Broken"),
     439              :         }
     440            0 :     }
     441              : }
     442              : 
     443            6 : #[derive(thiserror::Error, Debug)]
     444              : pub enum CreateTimelineError {
     445              :     #[error("creation of timeline with the given ID is in progress")]
     446              :     AlreadyCreating,
     447              :     #[error("timeline already exists with different parameters")]
     448              :     Conflict,
     449              :     #[error(transparent)]
     450              :     AncestorLsn(anyhow::Error),
     451              :     #[error("ancestor timeline is not active")]
     452              :     AncestorNotActive,
     453              :     #[error("tenant shutting down")]
     454              :     ShuttingDown,
     455              :     #[error(transparent)]
     456              :     Other(#[from] anyhow::Error),
     457              : }
     458              : 
     459            0 : #[derive(thiserror::Error, Debug)]
     460              : enum InitdbError {
     461              :     Other(anyhow::Error),
     462              :     Cancelled,
     463              :     Spawn(std::io::Result<()>),
     464              :     Failed(std::process::ExitStatus, Vec<u8>),
     465              : }
     466              : 
     467              : impl fmt::Display for InitdbError {
     468            0 :     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
     469            0 :         match self {
     470            0 :             InitdbError::Cancelled => write!(f, "Operation was cancelled"),
     471            0 :             InitdbError::Spawn(e) => write!(f, "Spawn error: {:?}", e),
     472            0 :             InitdbError::Failed(status, stderr) => write!(
     473            0 :                 f,
     474            0 :                 "Command failed with status {:?}: {}",
     475            0 :                 status,
     476            0 :                 String::from_utf8_lossy(stderr)
     477            0 :             ),
     478            0 :             InitdbError::Other(e) => write!(f, "Error: {:?}", e),
     479              :         }
     480            0 :     }
     481              : }
     482              : 
     483              : impl From<std::io::Error> for InitdbError {
     484            0 :     fn from(error: std::io::Error) -> Self {
     485            0 :         InitdbError::Spawn(Err(error))
     486            0 :     }
     487              : }
     488              : 
     489              : struct TenantDirectoryScan {
     490              :     sorted_timelines_to_load: Vec<(TimelineId, TimelineMetadata)>,
     491              :     timelines_to_resume_deletion: Vec<(TimelineId, Option<TimelineMetadata>)>,
     492              : }
     493              : 
     494              : enum CreateTimelineCause {
     495              :     Load,
     496              :     Delete,
     497              : }
     498              : 
     499              : impl Tenant {
     500              :     /// Yet another helper for timeline initialization.
     501              :     ///
     502              :     /// - Initializes the Timeline struct and inserts it into the tenant's hash map
     503              :     /// - Scans the local timeline directory for layer files and builds the layer map
     504              :     /// - Downloads remote index file and adds remote files to the layer map
     505              :     /// - Schedules remote upload tasks for any files that are present locally but missing from remote storage.
     506              :     ///
     507              :     /// If the operation fails, the timeline is left in the tenant's hash map in Broken state. On success,
     508              :     /// it is marked as Active.
     509              :     #[allow(clippy::too_many_arguments)]
     510          431 :     async fn timeline_init_and_sync(
     511          431 :         &self,
     512          431 :         timeline_id: TimelineId,
     513          431 :         resources: TimelineResources,
     514          431 :         index_part: Option<IndexPart>,
     515          431 :         metadata: TimelineMetadata,
     516          431 :         ancestor: Option<Arc<Timeline>>,
     517          431 :         _ctx: &RequestContext,
     518          431 :     ) -> anyhow::Result<()> {
     519          431 :         let tenant_id = self.tenant_shard_id;
     520              : 
     521          431 :         let timeline = self.create_timeline_struct(
     522          431 :             timeline_id,
     523          431 :             &metadata,
     524          431 :             ancestor.clone(),
     525          431 :             resources,
     526          431 :             CreateTimelineCause::Load,
     527          431 :         )?;
     528          431 :         let disk_consistent_lsn = timeline.get_disk_consistent_lsn();
     529          431 :         anyhow::ensure!(
     530          431 :             disk_consistent_lsn.is_valid(),
     531            0 :             "Timeline {tenant_id}/{timeline_id} has invalid disk_consistent_lsn"
     532              :         );
     533          431 :         assert_eq!(
     534          431 :             disk_consistent_lsn,
     535          431 :             metadata.disk_consistent_lsn(),
     536            0 :             "these are used interchangeably"
     537              :         );
     538              : 
     539          431 :         if let Some(index_part) = index_part.as_ref() {
     540          431 :             timeline
     541          431 :                 .remote_client
     542          431 :                 .as_ref()
     543          431 :                 .unwrap()
     544          431 :                 .init_upload_queue(index_part)?;
     545            0 :         } else if self.remote_storage.is_some() {
     546              :             // No data on the remote storage, but we have local metadata file. We can end up
     547              :             // here with timeline_create being interrupted before finishing index part upload.
     548              :             // By doing what we do here, the index part upload is retried.
     549              :             // If control plane retries timeline creation in the meantime, the mgmt API handler
     550              :             // for timeline creation will coalesce on the upload we queue here.
     551            0 :             let rtc = timeline.remote_client.as_ref().unwrap();
     552            0 :             rtc.init_upload_queue_for_empty_remote(&metadata)?;
     553            0 :             rtc.schedule_index_upload_for_metadata_update(&metadata)?;
     554            0 :         }
     555              : 
     556          431 :         timeline
     557          431 :             .load_layer_map(disk_consistent_lsn, index_part)
     558          423 :             .await
     559          431 :             .with_context(|| {
     560            0 :                 format!("Failed to load layermap for timeline {tenant_id}/{timeline_id}")
     561          431 :             })?;
     562              : 
     563              :         {
     564              :             // avoiding holding it across awaits
     565          431 :             let mut timelines_accessor = self.timelines.lock().unwrap();
     566          431 :             match timelines_accessor.entry(timeline_id) {
     567              :                 Entry::Occupied(_) => {
     568              :                     // The uninit mark file acts as a lock that prevents another task from
     569              :                     // initializing the timeline at the same time.
     570            0 :                     unreachable!(
     571            0 :                         "Timeline {tenant_id}/{timeline_id} already exists in the tenant map"
     572            0 :                     );
     573              :                 }
     574          431 :                 Entry::Vacant(v) => {
     575          431 :                     v.insert(Arc::clone(&timeline));
     576          431 :                     timeline.maybe_spawn_flush_loop();
     577          431 :                 }
     578          431 :             }
     579          431 :         };
     580          431 : 
     581          431 :         // Sanity check: a timeline should have some content.
     582          431 :         anyhow::ensure!(
     583          431 :             ancestor.is_some()
     584          378 :                 || timeline
     585          378 :                     .layers
     586          378 :                     .read()
     587            0 :                     .await
     588          378 :                     .layer_map()
     589          378 :                     .iter_historic_layers()
     590          378 :                     .next()
     591          378 :                     .is_some(),
     592            0 :             "Timeline has no ancestor and no layer files"
     593              :         );
     594              : 
     595          431 :         Ok(())
     596          431 :     }
     597              : 
     598              :     /// Attach a tenant that's available in cloud storage.
     599              :     ///
     600              :     /// This returns quickly, after just creating the in-memory object
     601              :     /// Tenant struct and launching a background task to download
     602              :     /// the remote index files.  On return, the tenant is most likely still in
     603              :     /// Attaching state, and it will become Active once the background task
     604              :     /// finishes. You can use wait_until_active() to wait for the task to
     605              :     /// complete.
     606              :     ///
     607              :     #[allow(clippy::too_many_arguments)]
     608          884 :     pub(crate) fn spawn(
     609          884 :         conf: &'static PageServerConf,
     610          884 :         tenant_shard_id: TenantShardId,
     611          884 :         resources: TenantSharedResources,
     612          884 :         attached_conf: AttachedTenantConf,
     613          884 :         shard_identity: ShardIdentity,
     614          884 :         init_order: Option<InitializationOrder>,
     615          884 :         tenants: &'static std::sync::RwLock<TenantsMap>,
     616          884 :         mode: SpawnMode,
     617          884 :         ctx: &RequestContext,
     618          884 :     ) -> anyhow::Result<Arc<Tenant>> {
     619          884 :         let wal_redo_manager = Arc::new(WalRedoManager::from(PostgresRedoManager::new(
     620          884 :             conf,
     621          884 :             tenant_shard_id,
     622          884 :         )));
     623          884 : 
     624          884 :         let TenantSharedResources {
     625          884 :             broker_client,
     626          884 :             remote_storage,
     627          884 :             deletion_queue_client,
     628          884 :         } = resources;
     629          884 : 
     630          884 :         let attach_mode = attached_conf.location.attach_mode;
     631          884 :         let generation = attached_conf.location.generation;
     632          884 : 
     633          884 :         let tenant = Arc::new(Tenant::new(
     634          884 :             TenantState::Attaching,
     635          884 :             conf,
     636          884 :             attached_conf,
     637          884 :             shard_identity,
     638          884 :             Some(wal_redo_manager),
     639          884 :             tenant_shard_id,
     640          884 :             remote_storage.clone(),
     641          884 :             deletion_queue_client,
     642          884 :         ));
     643          884 : 
     644          884 :         // The attach task will carry a GateGuard, so that shutdown() reliably waits for it to drop out if
     645          884 :         // we shut down while attaching.
     646          884 :         let attach_gate_guard = tenant
     647          884 :             .gate
     648          884 :             .enter()
     649          884 :             .expect("We just created the Tenant: nothing else can have shut it down yet");
     650          884 : 
     651          884 :         // Do all the hard work in the background
     652          884 :         let tenant_clone = Arc::clone(&tenant);
     653          884 :         let ctx = ctx.detached_child(TaskKind::Attach, DownloadBehavior::Warn);
     654          884 :         task_mgr::spawn(
     655          884 :             &tokio::runtime::Handle::current(),
     656          884 :             TaskKind::Attach,
     657          884 :             Some(tenant_shard_id),
     658          884 :             None,
     659          884 :             "attach tenant",
     660              :             false,
     661          884 :             async move {
     662          884 : 
     663          884 :                 info!(
     664          884 :                     ?attach_mode,
     665          884 :                     "Attaching tenant"
     666          884 :                 );
     667              : 
     668          884 :                 let _gate_guard = attach_gate_guard;
     669          884 : 
     670          884 :                 // Is this tenant being spawned as part of process startup?
     671          884 :                 let starting_up = init_order.is_some();
     672          870 :                 scopeguard::defer! {
     673          870 :                     if starting_up {
     674          204 :                         TENANT.startup_complete.inc();
     675          666 :                     }
     676              :                 }
     677              : 
     678              :                 // Ideally we should use Tenant::set_broken_no_wait, but it is not supposed to be used when tenant is in loading state.
     679          884 :                 let make_broken =
     680            7 :                     |t: &Tenant, err: anyhow::Error| {
     681            7 :                         error!("attach failed, setting tenant state to Broken: {err:?}");
     682            7 :                         t.state.send_modify(|state| {
     683            7 :                             // The Stopping case is for when we have passed control on to DeleteTenantFlow:
     684            7 :                             // if it errors, we will call make_broken when tenant is already in Stopping.
     685            7 :                             assert!(
     686            7 :                             matches!(*state, TenantState::Attaching | TenantState::Stopping { .. }),
     687            0 :                             "the attach task owns the tenant state until activation is complete"
     688              :                         );
     689              : 
     690            7 :                             *state = TenantState::broken_from_reason(err.to_string());
     691            7 :                         });
     692            7 :                     };
     693              : 
     694          884 :                 let mut init_order = init_order;
     695          884 :                 // take the completion because initial tenant loading will complete when all of
     696          884 :                 // these tasks complete.
     697          884 :                 let _completion = init_order
     698          884 :                     .as_mut()
     699          884 :                     .and_then(|x| x.initial_tenant_load.take());
     700          884 :                 let remote_load_completion = init_order
     701          884 :                     .as_mut()
     702          884 :                     .and_then(|x| x.initial_tenant_load_remote.take());
     703              : 
     704              :                 enum AttachType<'a> {
     705              :                     // During pageserver startup, we are attaching this tenant lazily in the background
     706              :                     Warmup(tokio::sync::SemaphorePermit<'a>),
     707              :                     // During pageserver startup, we are attaching this tenant as soon as we can,
     708              :                     // because a client tried to access it.
     709              :                     OnDemand,
     710              :                     // During normal operations after startup, we are attaching a tenant.
     711              :                     Normal,
     712              :                 }
     713              : 
     714              :                 // Before doing any I/O, wait for either or:
     715              :                 // - A client to attempt to access to this tenant (on-demand loading)
     716              :                 // - A permit to become available in the warmup semaphore (background warmup)
     717              :                 //
     718              :                 // Some-ness of init_order is how we know if we're attaching during startup or later
     719              :                 // in process lifetime.
     720          884 :                 let attach_type = if init_order.is_some() {
     721          218 :                     tokio::select!(
     722              :                         _ = tenant_clone.activate_now_sem.acquire() => {
     723            3 :                             tracing::info!("Activating tenant (on-demand)");
     724              :                             AttachType::OnDemand
     725              :                         },
     726          214 :                         permit_result = conf.concurrent_tenant_warmup.inner().acquire() => {
     727              :                             match permit_result {
     728              :                                 Ok(p) => {
     729          214 :                                     tracing::info!("Activating tenant (warmup)");
     730              :                                     AttachType::Warmup(p)
     731              :                                 }
     732              :                                 Err(_) => {
     733              :                                     // This is unexpected: the warmup semaphore should stay alive
     734              :                                     // for the lifetime of init_order.  Log a warning and proceed.
     735            0 :                                     tracing::warn!("warmup_limit semaphore unexpectedly closed");
     736              :                                     AttachType::Normal
     737              :                                 }
     738              :                             }
     739              : 
     740              :                         }
     741              :                         _ = tenant_clone.cancel.cancelled() => {
     742              :                             // This is safe, but should be pretty rare: it is interesting if a tenant
     743              :                             // stayed in Activating for such a long time that shutdown found it in
     744              :                             // that state.
     745            1 :                             tracing::info!(state=%tenant_clone.current_state(), "Tenant shut down before activation");
     746              :                             // Make the tenant broken so that set_stopping will not hang waiting for it to leave
     747              :                             // the Attaching state.  This is an over-reaction (nothing really broke, the tenant is
     748              :                             // just shutting down), but ensures progress.
     749              :                             make_broken(&tenant_clone, anyhow::anyhow!("Shut down while Attaching"));
     750              :                             return Ok(());
     751              :                         },
     752              :                     )
     753              :                 } else {
     754          666 :                     AttachType::Normal
     755              :                 };
     756              : 
     757          883 :                 let preload = match (&mode, &remote_storage) {
     758              :                     (SpawnMode::Create, _) => {
     759           15 :                         None
     760              :                     },
     761          868 :                     (SpawnMode::Normal, Some(remote_storage)) => {
     762          868 :                         let _preload_timer = TENANT.preload.start_timer();
     763          868 :                         let res = tenant_clone
     764          868 :                             .preload(remote_storage, task_mgr::shutdown_token())
     765         2653 :                             .await;
     766          868 :                         match res {
     767          862 :                             Ok(p) => Some(p),
     768            6 :                             Err(e) => {
     769            6 :                                 make_broken(&tenant_clone, anyhow::anyhow!(e));
     770            6 :                                 return Ok(());
     771              :                             }
     772              :                         }
     773              :                     }
     774              :                     (SpawnMode::Normal, None) => {
     775            0 :                         let _preload_timer = TENANT.preload.start_timer();
     776            0 :                         None
     777              :                     }
     778              :                 };
     779              : 
     780              :                 // Remote preload is complete.
     781          877 :                 drop(remote_load_completion);
     782              : 
     783          877 :                 let pending_deletion = {
     784          877 :                     match DeleteTenantFlow::should_resume_deletion(
     785          877 :                         conf,
     786          877 :                         preload.as_ref().map(|p| p.deleting).unwrap_or(false),
     787          877 :                         &tenant_clone,
     788          877 :                     )
     789            0 :                     .await
     790              :                     {
     791          877 :                         Ok(should_resume_deletion) => should_resume_deletion,
     792            0 :                         Err(err) => {
     793            0 :                             make_broken(&tenant_clone, anyhow::anyhow!(err));
     794            0 :                             return Ok(());
     795              :                         }
     796              :                     }
     797              :                 };
     798              : 
     799          877 :                 info!("pending_deletion {}", pending_deletion.is_some());
     800              : 
     801          877 :                 if let Some(deletion) = pending_deletion {
     802              :                     // as we are no longer loading, signal completion by dropping
     803              :                     // the completion while we resume deletion
     804           21 :                     drop(_completion);
     805           21 :                     let background_jobs_can_start =
     806           21 :                         init_order.as_ref().map(|x| &x.background_jobs_can_start);
     807           21 :                     if let Some(background) = background_jobs_can_start {
     808           20 :                         info!("waiting for backgound jobs barrier");
     809           20 :                         background.clone().wait().await;
     810           20 :                         info!("ready for backgound jobs barrier");
     811            1 :                     }
     812              : 
     813           21 :                     let deleted = DeleteTenantFlow::resume_from_attach(
     814           21 :                         deletion,
     815           21 :                         &tenant_clone,
     816           21 :                         preload,
     817           21 :                         tenants,
     818           21 :                         &ctx,
     819           21 :                     )
     820          898 :                     .await;
     821              : 
     822           21 :                     if let Err(e) = deleted {
     823            0 :                         make_broken(&tenant_clone, anyhow::anyhow!(e));
     824           21 :                     }
     825              : 
     826           21 :                     return Ok(());
     827          856 :                 }
     828              : 
     829              :                 // We will time the duration of the attach phase unless this is a creation (attach will do no work)
     830          856 :                 let attached = {
     831          856 :                     let _attach_timer = match mode {
     832           15 :                         SpawnMode::Create => None,
     833          841 :                         SpawnMode::Normal => {Some(TENANT.attach.start_timer())}
     834              :                     };
     835         1203 :                     tenant_clone.attach(preload, mode, &ctx).await
     836              :                 };
     837              : 
     838          856 :                 match attached {
     839              :                     Ok(()) => {
     840          856 :                         info!("attach finished, activating");
     841          856 :                         tenant_clone.activate(broker_client, None, &ctx);
     842              :                     }
     843            0 :                     Err(e) => {
     844            0 :                         make_broken(&tenant_clone, anyhow::anyhow!(e));
     845            0 :                     }
     846              :                 }
     847              : 
     848              :                 // If we are doing an opportunistic warmup attachment at startup, initialize
     849              :                 // logical size at the same time.  This is better than starting a bunch of idle tenants
     850              :                 // with cold caches and then coming back later to initialize their logical sizes.
     851              :                 //
     852              :                 // It also prevents the warmup proccess competing with the concurrency limit on
     853              :                 // logical size calculations: if logical size calculation semaphore is saturated,
     854              :                 // then warmup will wait for that before proceeding to the next tenant.
     855          856 :                 if let AttachType::Warmup(_permit) = attach_type {
     856          246 :                     let mut futs: FuturesUnordered<_> = tenant_clone.timelines.lock().unwrap().values().cloned().map(|t| t.await_initial_logical_size()).collect();
     857          194 :                     tracing::info!("Waiting for initial logical sizes while warming up...");
     858          426 :                     while futs.next().await.is_some() {}
     859          180 :                     tracing::info!("Warm-up complete");
     860          662 :                 }
     861              : 
     862          842 :                 Ok(())
     863          870 :             }
     864          884 :             .instrument(tracing::info_span!(parent: None, "attach", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), gen=?generation)),
     865              :         );
     866          884 :         Ok(tenant)
     867          884 :     }
     868              : 
     869         1744 :     #[instrument(skip_all)]
     870              :     pub(crate) async fn preload(
     871              :         self: &Arc<Tenant>,
     872              :         remote_storage: &GenericRemoteStorage,
     873              :         cancel: CancellationToken,
     874              :     ) -> anyhow::Result<TenantPreload> {
     875              :         span::debug_assert_current_span_has_tenant_id();
     876              :         // Get list of remote timelines
     877              :         // download index files for every tenant timeline
     878          872 :         info!("listing remote timelines");
     879              :         let (remote_timeline_ids, other_keys) = remote_timeline_client::list_remote_timelines(
     880              :             remote_storage,
     881              :             self.tenant_shard_id,
     882              :             cancel.clone(),
     883              :         )
     884              :         .await?;
     885              : 
     886              :         let deleting = other_keys.contains(TENANT_DELETED_MARKER_FILE_NAME);
     887          866 :         info!(
     888          866 :             "found {} timelines, deleting={}",
     889          866 :             remote_timeline_ids.len(),
     890          866 :             deleting
     891          866 :         );
     892              : 
     893              :         for k in other_keys {
     894              :             if k != TENANT_DELETED_MARKER_FILE_NAME {
     895            0 :                 warn!("Unexpected non timeline key {k}");
     896              :             }
     897              :         }
     898              : 
     899              :         Ok(TenantPreload {
     900              :             deleting,
     901              :             timelines: self
     902              :                 .load_timeline_metadata(remote_timeline_ids, remote_storage, cancel)
     903              :                 .await?,
     904              :         })
     905              :     }
     906              : 
     907              :     ///
     908              :     /// Background task that downloads all data for a tenant and brings it to Active state.
     909              :     ///
     910              :     /// No background tasks are started as part of this routine.
     911              :     ///
     912          881 :     async fn attach(
     913          881 :         self: &Arc<Tenant>,
     914          881 :         preload: Option<TenantPreload>,
     915          881 :         mode: SpawnMode,
     916          881 :         ctx: &RequestContext,
     917          881 :     ) -> anyhow::Result<()> {
     918          881 :         span::debug_assert_current_span_has_tenant_id();
     919              : 
     920            3 :         failpoint_support::sleep_millis_async!("before-attaching-tenant");
     921              : 
     922          881 :         let preload = match (preload, mode) {
     923          866 :             (Some(p), _) => p,
     924           15 :             (None, SpawnMode::Create) => TenantPreload {
     925           15 :                 deleting: false,
     926           15 :                 timelines: HashMap::new(),
     927           15 :             },
     928              :             (None, SpawnMode::Normal) => {
     929              :                 // Deprecated dev mode: load from local disk state instead of remote storage
     930              :                 // https://github.com/neondatabase/neon/issues/5624
     931            0 :                 return self.load_local(ctx).await;
     932              :             }
     933              :         };
     934              : 
     935          881 :         let mut timelines_to_resume_deletions = vec![];
     936          881 : 
     937          881 :         let mut remote_index_and_client = HashMap::new();
     938          881 :         let mut timeline_ancestors = HashMap::new();
     939          881 :         let mut existent_timelines = HashSet::new();
     940         1327 :         for (timeline_id, preload) in preload.timelines {
     941          443 :             let index_part = match preload.index_part {
     942          443 :                 Ok(i) => {
     943            0 :                     debug!("remote index part exists for timeline {timeline_id}");
     944              :                     // We found index_part on the remote, this is the standard case.
     945          443 :                     existent_timelines.insert(timeline_id);
     946          443 :                     i
     947              :                 }
     948              :                 Err(DownloadError::NotFound) => {
     949              :                     // There is no index_part on the remote. We only get here
     950              :                     // if there is some prefix for the timeline in the remote storage.
     951              :                     // This can e.g. be the initdb.tar.zst archive, maybe a
     952              :                     // remnant from a prior incomplete creation or deletion attempt.
     953              :                     // Delete the local directory as the deciding criterion for a
     954              :                     // timeline's existence is presence of index_part.
     955            3 :                     info!(%timeline_id, "index_part not found on remote");
     956            3 :                     continue;
     957              :                 }
     958            0 :                 Err(e) => {
     959              :                     // Some (possibly ephemeral) error happened during index_part download.
     960              :                     // Pretend the timeline exists to not delete the timeline directory,
     961              :                     // as it might be a temporary issue and we don't want to re-download
     962              :                     // everything after it resolves.
     963            0 :                     warn!(%timeline_id, "Failed to load index_part from remote storage, failed creation? ({e})");
     964              : 
     965            0 :                     existent_timelines.insert(timeline_id);
     966            0 :                     continue;
     967              :                 }
     968              :             };
     969          443 :             match index_part {
     970          431 :                 MaybeDeletedIndexPart::IndexPart(index_part) => {
     971          431 :                     timeline_ancestors.insert(timeline_id, index_part.metadata.clone());
     972          431 :                     remote_index_and_client.insert(timeline_id, (index_part, preload.client));
     973          431 :                 }
     974           12 :                 MaybeDeletedIndexPart::Deleted(index_part) => {
     975           12 :                     info!(
     976           12 :                         "timeline {} is deleted, picking to resume deletion",
     977           12 :                         timeline_id
     978           12 :                     );
     979           12 :                     timelines_to_resume_deletions.push((timeline_id, index_part, preload.client));
     980              :                 }
     981              :             }
     982              :         }
     983              : 
     984              :         // For every timeline, download the metadata file, scan the local directory,
     985              :         // and build a layer map that contains an entry for each remote and local
     986              :         // layer file.
     987          881 :         let sorted_timelines = tree_sort_timelines(timeline_ancestors, |m| m.ancestor_timeline())?;
     988         1312 :         for (timeline_id, remote_metadata) in sorted_timelines {
     989          431 :             let (index_part, remote_client) = remote_index_and_client
     990          431 :                 .remove(&timeline_id)
     991          431 :                 .expect("just put it in above");
     992          431 : 
     993          431 :             // TODO again handle early failure
     994          431 :             self.load_remote_timeline(
     995          431 :                 timeline_id,
     996          431 :                 index_part,
     997          431 :                 remote_metadata,
     998          431 :                 TimelineResources {
     999          431 :                     remote_client: Some(remote_client),
    1000          431 :                     deletion_queue_client: self.deletion_queue_client.clone(),
    1001          431 :                 },
    1002          431 :                 ctx,
    1003          431 :             )
    1004         1279 :             .await
    1005          431 :             .with_context(|| {
    1006            0 :                 format!(
    1007            0 :                     "failed to load remote timeline {} for tenant {}",
    1008            0 :                     timeline_id, self.tenant_shard_id
    1009            0 :                 )
    1010          431 :             })?;
    1011              :         }
    1012              : 
    1013              :         // Walk through deleted timelines, resume deletion
    1014          893 :         for (timeline_id, index_part, remote_timeline_client) in timelines_to_resume_deletions {
    1015           12 :             remote_timeline_client
    1016           12 :                 .init_upload_queue_stopped_to_continue_deletion(&index_part)
    1017           12 :                 .context("init queue stopped")
    1018           12 :                 .map_err(LoadLocalTimelineError::ResumeDeletion)?;
    1019              : 
    1020           12 :             DeleteTimelineFlow::resume_deletion(
    1021           12 :                 Arc::clone(self),
    1022           12 :                 timeline_id,
    1023           12 :                 &index_part.metadata,
    1024           12 :                 Some(remote_timeline_client),
    1025           12 :                 self.deletion_queue_client.clone(),
    1026           12 :             )
    1027           12 :             .instrument(tracing::info_span!("timeline_delete", %timeline_id))
    1028            0 :             .await
    1029           12 :             .context("resume_deletion")
    1030           12 :             .map_err(LoadLocalTimelineError::ResumeDeletion)?;
    1031              :         }
    1032              : 
    1033              :         // The local filesystem contents are a cache of what's in the remote IndexPart;
    1034              :         // IndexPart is the source of truth.
    1035          881 :         self.clean_up_timelines(&existent_timelines)?;
    1036              : 
    1037          881 :         fail::fail_point!("attach-before-activate", |_| {
    1038            0 :             anyhow::bail!("attach-before-activate");
    1039          881 :         });
    1040            3 :         failpoint_support::sleep_millis_async!("attach-before-activate-sleep", &self.cancel);
    1041              : 
    1042          881 :         info!("Done");
    1043              : 
    1044          881 :         Ok(())
    1045          881 :     }
    1046              : 
    1047              :     /// Check for any local timeline directories that are temporary, or do not correspond to a
    1048              :     /// timeline that still exists: this can happen if we crashed during a deletion/creation, or
    1049              :     /// if a timeline was deleted while the tenant was attached to a different pageserver.
    1050          881 :     fn clean_up_timelines(&self, existent_timelines: &HashSet<TimelineId>) -> anyhow::Result<()> {
    1051          881 :         let timelines_dir = self.conf.timelines_path(&self.tenant_shard_id);
    1052              : 
    1053          881 :         let entries = match timelines_dir.read_dir_utf8() {
    1054          879 :             Ok(d) => d,
    1055            2 :             Err(e) => {
    1056            2 :                 if e.kind() == std::io::ErrorKind::NotFound {
    1057            2 :                     return Ok(());
    1058              :                 } else {
    1059            0 :                     return Err(e).context("list timelines directory for tenant");
    1060              :                 }
    1061              :             }
    1062              :         };
    1063              : 
    1064         1329 :         for entry in entries {
    1065          450 :             let entry = entry.context("read timeline dir entry")?;
    1066          450 :             let entry_path = entry.path();
    1067              : 
    1068          450 :             let purge = if crate::is_temporary(entry_path)
    1069              :                 // TODO: uninit_mark isn't needed any more, since uninitialized timelines are already
    1070              :                 // covered by the check that the timeline must exist in remote storage.
    1071          448 :                 || is_uninit_mark(entry_path)
    1072          446 :                 || crate::is_delete_mark(entry_path)
    1073              :             {
    1074            4 :                 true
    1075              :             } else {
    1076          446 :                 match TimelineId::try_from(entry_path.file_name()) {
    1077          446 :                     Ok(i) => {
    1078          446 :                         // Purge if the timeline ID does not exist in remote storage: remote storage is the authority.
    1079          446 :                         !existent_timelines.contains(&i)
    1080              :                     }
    1081            0 :                     Err(e) => {
    1082            0 :                         tracing::warn!(
    1083            0 :                             "Unparseable directory in timelines directory: {entry_path}, ignoring ({e})"
    1084            0 :                         );
    1085              :                         // Do not purge junk: if we don't recognize it, be cautious and leave it for a human.
    1086            0 :                         false
    1087              :                     }
    1088              :                 }
    1089              :             };
    1090              : 
    1091          450 :             if purge {
    1092            9 :                 tracing::info!("Purging stale timeline dentry {entry_path}");
    1093            9 :                 if let Err(e) = match entry.file_type() {
    1094            9 :                     Ok(t) => if t.is_dir() {
    1095            7 :                         std::fs::remove_dir_all(entry_path)
    1096              :                     } else {
    1097            2 :                         std::fs::remove_file(entry_path)
    1098              :                     }
    1099            9 :                     .or_else(fs_ext::ignore_not_found),
    1100            0 :                     Err(e) => Err(e),
    1101              :                 } {
    1102            0 :                     tracing::warn!("Failed to purge stale timeline dentry {entry_path}: {e}");
    1103            9 :                 }
    1104          441 :             }
    1105              :         }
    1106              : 
    1107          879 :         Ok(())
    1108          881 :     }
    1109              : 
    1110              :     /// Get sum of all remote timelines sizes
    1111              :     ///
    1112              :     /// This function relies on the index_part instead of listing the remote storage
    1113           24 :     pub fn remote_size(&self) -> u64 {
    1114           24 :         let mut size = 0;
    1115              : 
    1116           24 :         for timeline in self.list_timelines() {
    1117           17 :             if let Some(remote_client) = &timeline.remote_client {
    1118           17 :                 size += remote_client.get_remote_physical_size();
    1119           17 :             }
    1120              :         }
    1121              : 
    1122           24 :         size
    1123           24 :     }
    1124              : 
    1125          862 :     #[instrument(skip_all, fields(timeline_id=%timeline_id))]
    1126              :     async fn load_remote_timeline(
    1127              :         &self,
    1128              :         timeline_id: TimelineId,
    1129              :         index_part: IndexPart,
    1130              :         remote_metadata: TimelineMetadata,
    1131              :         resources: TimelineResources,
    1132              :         ctx: &RequestContext,
    1133              :     ) -> anyhow::Result<()> {
    1134              :         span::debug_assert_current_span_has_tenant_id();
    1135              : 
    1136          431 :         info!("downloading index file for timeline {}", timeline_id);
    1137              :         tokio::fs::create_dir_all(self.conf.timeline_path(&self.tenant_shard_id, &timeline_id))
    1138              :             .await
    1139              :             .context("Failed to create new timeline directory")?;
    1140              : 
    1141              :         let ancestor = if let Some(ancestor_id) = remote_metadata.ancestor_timeline() {
    1142              :             let timelines = self.timelines.lock().unwrap();
    1143              :             Some(Arc::clone(timelines.get(&ancestor_id).ok_or_else(
    1144            0 :                 || {
    1145            0 :                     anyhow::anyhow!(
    1146            0 :                         "cannot find ancestor timeline {ancestor_id} for timeline {timeline_id}"
    1147            0 :                     )
    1148            0 :                 },
    1149              :             )?))
    1150              :         } else {
    1151              :             None
    1152              :         };
    1153              : 
    1154              :         // timeline loading after attach expects to find metadata file for each metadata
    1155              :         save_metadata(
    1156              :             self.conf,
    1157              :             &self.tenant_shard_id,
    1158              :             &timeline_id,
    1159              :             &remote_metadata,
    1160              :         )
    1161              :         .await
    1162              :         .context("save_metadata")
    1163              :         .map_err(LoadLocalTimelineError::Load)?;
    1164              : 
    1165              :         self.timeline_init_and_sync(
    1166              :             timeline_id,
    1167              :             resources,
    1168              :             Some(index_part),
    1169              :             remote_metadata,
    1170              :             ancestor,
    1171              :             ctx,
    1172              :         )
    1173              :         .await
    1174              :     }
    1175              : 
    1176              :     /// Create a placeholder Tenant object for a broken tenant
    1177            0 :     pub fn create_broken_tenant(
    1178            0 :         conf: &'static PageServerConf,
    1179            0 :         tenant_shard_id: TenantShardId,
    1180            0 :         reason: String,
    1181            0 :     ) -> Arc<Tenant> {
    1182            0 :         Arc::new(Tenant::new(
    1183            0 :             TenantState::Broken {
    1184            0 :                 reason,
    1185            0 :                 backtrace: String::new(),
    1186            0 :             },
    1187            0 :             conf,
    1188            0 :             AttachedTenantConf::try_from(LocationConf::default()).unwrap(),
    1189            0 :             // Shard identity isn't meaningful for a broken tenant: it's just a placeholder
    1190            0 :             // to occupy the slot for this TenantShardId.
    1191            0 :             ShardIdentity::broken(tenant_shard_id.shard_number, tenant_shard_id.shard_count),
    1192            0 :             None,
    1193            0 :             tenant_shard_id,
    1194            0 :             None,
    1195            0 :             DeletionQueueClient::broken(),
    1196            0 :         ))
    1197            0 :     }
    1198              : 
    1199           82 :     fn scan_and_sort_timelines_dir(self: Arc<Tenant>) -> anyhow::Result<TenantDirectoryScan> {
    1200           82 :         let mut timelines_to_load: HashMap<TimelineId, TimelineMetadata> = HashMap::new();
    1201           82 :         // Note timelines_to_resume_deletion needs to be separate because it can be not sortable
    1202           82 :         // from the point of `tree_sort_timelines`. I e some parents can be missing because deletion
    1203           82 :         // completed in non topological order (for example because parent has smaller number of layer files in it)
    1204           82 :         let mut timelines_to_resume_deletion: Vec<(TimelineId, Option<TimelineMetadata>)> = vec![];
    1205           82 : 
    1206           82 :         let timelines_dir = self.conf.timelines_path(&self.tenant_shard_id);
    1207              : 
    1208           82 :         for entry in timelines_dir
    1209           82 :             .read_dir_utf8()
    1210           82 :             .context("list timelines directory for tenant")?
    1211              :         {
    1212            6 :             let entry = entry.context("read timeline dir entry")?;
    1213            6 :             let timeline_dir = entry.path();
    1214            6 : 
    1215            6 :             if crate::is_temporary(timeline_dir) {
    1216            0 :                 info!("Found temporary timeline directory, removing: {timeline_dir}");
    1217            0 :                 if let Err(e) = std::fs::remove_dir_all(timeline_dir) {
    1218            0 :                     error!("Failed to remove temporary directory '{timeline_dir}': {e:?}");
    1219            0 :                 }
    1220            6 :             } else if is_uninit_mark(timeline_dir) {
    1221            2 :                 if !timeline_dir.exists() {
    1222            2 :                     warn!("Timeline dir entry become invalid: {timeline_dir}");
    1223            2 :                     continue;
    1224            0 :                 }
    1225            0 : 
    1226            0 :                 let timeline_uninit_mark_file = &timeline_dir;
    1227            0 :                 info!(
    1228            0 :                     "Found an uninit mark file {timeline_uninit_mark_file}, removing the timeline and its uninit mark",
    1229            0 :                 );
    1230            0 :                 let timeline_id =
    1231            0 :                     TimelineId::try_from(timeline_uninit_mark_file.file_stem())
    1232            0 :                         .with_context(|| {
    1233            0 :                             format!(
    1234            0 :                                 "Could not parse timeline id out of the timeline uninit mark name {timeline_uninit_mark_file}",
    1235            0 :                             )
    1236            0 :                         })?;
    1237            0 :                 let timeline_dir = self.conf.timeline_path(&self.tenant_shard_id, &timeline_id);
    1238            0 :                 if let Err(e) =
    1239            0 :                     remove_timeline_and_uninit_mark(&timeline_dir, timeline_uninit_mark_file)
    1240              :                 {
    1241            0 :                     error!("Failed to clean up uninit marked timeline: {e:?}");
    1242            0 :                 }
    1243            4 :             } else if crate::is_delete_mark(timeline_dir) {
    1244              :                 // If metadata exists, load as usual, continue deletion
    1245            0 :                 let timeline_id = TimelineId::try_from(timeline_dir.file_stem())
    1246            0 :                     .with_context(|| {
    1247            0 :                         format!(
    1248            0 :                             "Could not parse timeline id out of the timeline uninit mark name {timeline_dir}",
    1249            0 :                         )
    1250            0 :                     })?;
    1251              : 
    1252            0 :                 info!("Found deletion mark for timeline {}", timeline_id);
    1253              : 
    1254            0 :                 match load_metadata(self.conf, &self.tenant_shard_id, &timeline_id) {
    1255            0 :                     Ok(metadata) => {
    1256            0 :                         timelines_to_resume_deletion.push((timeline_id, Some(metadata)))
    1257              :                     }
    1258            0 :                     Err(e) => match &e {
    1259            0 :                         LoadMetadataError::Read(r) => {
    1260            0 :                             if r.kind() != io::ErrorKind::NotFound {
    1261            0 :                                 return Err(anyhow::anyhow!(e)).with_context(|| {
    1262            0 :                                     format!("Failed to load metadata for timeline_id {timeline_id}")
    1263            0 :                                 });
    1264            0 :                             }
    1265            0 : 
    1266            0 :                             // If metadata doesnt exist it means that we've crashed without
    1267            0 :                             // completing cleanup_remaining_timeline_fs_traces in DeleteTimelineFlow.
    1268            0 :                             // So save timeline_id for later call to `DeleteTimelineFlow::cleanup_remaining_timeline_fs_traces`.
    1269            0 :                             // We cant do it here because the method is async so we'd need block_on
    1270            0 :                             // and here we're in spawn_blocking. cleanup_remaining_timeline_fs_traces uses fs operations
    1271            0 :                             // so that basically results in a cycle:
    1272            0 :                             // spawn_blocking
    1273            0 :                             // - block_on
    1274            0 :                             //   - spawn_blocking
    1275            0 :                             // which can lead to running out of threads in blocing pool.
    1276            0 :                             timelines_to_resume_deletion.push((timeline_id, None));
    1277              :                         }
    1278              :                         _ => {
    1279            0 :                             return Err(anyhow::anyhow!(e)).with_context(|| {
    1280            0 :                                 format!("Failed to load metadata for timeline_id {timeline_id}")
    1281            0 :                             })
    1282              :                         }
    1283              :                     },
    1284              :                 }
    1285              :             } else {
    1286            4 :                 if !timeline_dir.exists() {
    1287            0 :                     warn!("Timeline dir entry become invalid: {timeline_dir}");
    1288            0 :                     continue;
    1289            4 :                 }
    1290            4 :                 let timeline_id = TimelineId::try_from(timeline_dir.file_name())
    1291            4 :                     .with_context(|| {
    1292            0 :                         format!(
    1293            0 :                             "Could not parse timeline id out of the timeline dir name {timeline_dir}",
    1294            0 :                         )
    1295            4 :                     })?;
    1296            4 :                 let timeline_uninit_mark_file = self
    1297            4 :                     .conf
    1298            4 :                     .timeline_uninit_mark_file_path(self.tenant_shard_id, timeline_id);
    1299            4 :                 if timeline_uninit_mark_file.exists() {
    1300            2 :                     info!(
    1301            2 :                         %timeline_id,
    1302            2 :                         "Found an uninit mark file, removing the timeline and its uninit mark",
    1303            2 :                     );
    1304            0 :                     if let Err(e) =
    1305            2 :                         remove_timeline_and_uninit_mark(timeline_dir, &timeline_uninit_mark_file)
    1306              :                     {
    1307            0 :                         error!("Failed to clean up uninit marked timeline: {e:?}");
    1308            2 :                     }
    1309            2 :                     continue;
    1310            2 :                 }
    1311            2 : 
    1312            2 :                 let timeline_delete_mark_file = self
    1313            2 :                     .conf
    1314            2 :                     .timeline_delete_mark_file_path(self.tenant_shard_id, timeline_id);
    1315            2 :                 if timeline_delete_mark_file.exists() {
    1316              :                     // Cleanup should be done in `is_delete_mark` branch above
    1317            0 :                     continue;
    1318            2 :                 }
    1319            2 : 
    1320            2 :                 let file_name = entry.file_name();
    1321            2 :                 if let Ok(timeline_id) = file_name.parse::<TimelineId>() {
    1322            2 :                     let metadata = load_metadata(self.conf, &self.tenant_shard_id, &timeline_id)
    1323            2 :                         .context("failed to load metadata")?;
    1324            0 :                     timelines_to_load.insert(timeline_id, metadata);
    1325              :                 } else {
    1326              :                     // A file or directory that doesn't look like a timeline ID
    1327            0 :                     warn!("unexpected file or directory in timelines directory: {file_name}");
    1328              :                 }
    1329              :             }
    1330              :         }
    1331              : 
    1332              :         // Sort the array of timeline IDs into tree-order, so that parent comes before
    1333              :         // all its children.
    1334           80 :         tree_sort_timelines(timelines_to_load, |m| m.ancestor_timeline()).map(|sorted_timelines| {
    1335           80 :             TenantDirectoryScan {
    1336           80 :                 sorted_timelines_to_load: sorted_timelines,
    1337           80 :                 timelines_to_resume_deletion,
    1338           80 :             }
    1339           80 :         })
    1340           82 :     }
    1341              : 
    1342          866 :     async fn load_timeline_metadata(
    1343          866 :         self: &Arc<Tenant>,
    1344          866 :         timeline_ids: HashSet<TimelineId>,
    1345          866 :         remote_storage: &GenericRemoteStorage,
    1346          866 :         cancel: CancellationToken,
    1347          866 :     ) -> anyhow::Result<HashMap<TimelineId, TimelinePreload>> {
    1348          866 :         let mut part_downloads = JoinSet::new();
    1349         1312 :         for timeline_id in timeline_ids {
    1350          446 :             let client = RemoteTimelineClient::new(
    1351          446 :                 remote_storage.clone(),
    1352          446 :                 self.deletion_queue_client.clone(),
    1353          446 :                 self.conf,
    1354          446 :                 self.tenant_shard_id,
    1355          446 :                 timeline_id,
    1356          446 :                 self.generation,
    1357          446 :             );
    1358          446 :             let cancel_clone = cancel.clone();
    1359          446 :             part_downloads.spawn(
    1360          446 :                 async move {
    1361          446 :                     debug!("starting index part download");
    1362              : 
    1363         2929 :                     let index_part = client.download_index_file(&cancel_clone).await;
    1364              : 
    1365          446 :                     debug!("finished index part download");
    1366              : 
    1367          446 :                     Result::<_, anyhow::Error>::Ok(TimelinePreload {
    1368          446 :                         client,
    1369          446 :                         timeline_id,
    1370          446 :                         index_part,
    1371          446 :                     })
    1372          446 :                 }
    1373          446 :                 .map(move |res| {
    1374          446 :                     res.with_context(|| format!("download index part for timeline {timeline_id}"))
    1375          446 :                 })
    1376          446 :                 .instrument(info_span!("download_index_part", %timeline_id)),
    1377              :             );
    1378              :         }
    1379              : 
    1380          866 :         let mut timeline_preloads: HashMap<TimelineId, TimelinePreload> = HashMap::new();
    1381              : 
    1382         1312 :         loop {
    1383         1735 :             tokio::select!(
    1384         1312 :                 next = part_downloads.join_next() => {
    1385              :                     match next {
    1386              :                         Some(result) => {
    1387              :                             let preload_result = result.context("join preload task")?;
    1388              :                             let preload = preload_result?;
    1389              :                             timeline_preloads.insert(preload.timeline_id, preload);
    1390              :                         },
    1391              :                         None => {
    1392              :                             break;
    1393              :                         }
    1394              :                     }
    1395              :                 },
    1396              :                 _ = cancel.cancelled() => {
    1397              :                     anyhow::bail!("Cancelled while waiting for remote index download")
    1398              :                 }
    1399         1312 :             )
    1400         1312 :         }
    1401              : 
    1402          866 :         Ok(timeline_preloads)
    1403          866 :     }
    1404              : 
    1405              :     ///
    1406              :     /// Background task to load in-memory data structures for this tenant, from
    1407              :     /// files on disk. Used at pageserver startup.
    1408              :     ///
    1409              :     /// No background tasks are started as part of this routine.
    1410           82 :     async fn load_local(self: &Arc<Tenant>, ctx: &RequestContext) -> anyhow::Result<()> {
    1411           82 :         span::debug_assert_current_span_has_tenant_id();
    1412              : 
    1413            0 :         debug!("loading tenant task");
    1414              : 
    1415              :         // Load in-memory state to reflect the local files on disk
    1416              :         //
    1417              :         // Scan the directory, peek into the metadata file of each timeline, and
    1418              :         // collect a list of timelines and their ancestors.
    1419           82 :         let span = info_span!("blocking");
    1420           82 :         let cloned = Arc::clone(self);
    1421              : 
    1422           82 :         let scan = tokio::task::spawn_blocking(move || {
    1423           82 :             let _g = span.entered();
    1424           82 :             cloned.scan_and_sort_timelines_dir()
    1425           82 :         })
    1426           82 :         .await
    1427           82 :         .context("load spawn_blocking")
    1428           82 :         .and_then(|res| res)?;
    1429              : 
    1430              :         // FIXME original collect_timeline_files contained one more check:
    1431              :         //    1. "Timeline has no ancestor and no layer files"
    1432              : 
    1433              :         // Process loadable timelines first
    1434           80 :         for (timeline_id, local_metadata) in scan.sorted_timelines_to_load {
    1435            0 :             if let Err(e) = self
    1436            0 :                 .load_local_timeline(timeline_id, local_metadata, ctx, false)
    1437            0 :                 .await
    1438              :             {
    1439            0 :                 match e {
    1440            0 :                     LoadLocalTimelineError::Load(source) => {
    1441            0 :                         return Err(anyhow::anyhow!(source)).with_context(|| {
    1442            0 :                             format!("Failed to load local timeline: {timeline_id}")
    1443            0 :                         })
    1444              :                     }
    1445            0 :                     LoadLocalTimelineError::ResumeDeletion(source) => {
    1446              :                         // Make sure resumed deletion wont fail loading for entire tenant.
    1447            0 :                         error!("Failed to resume timeline deletion: {source:#}")
    1448              :                     }
    1449              :                 }
    1450            0 :             }
    1451              :         }
    1452              : 
    1453              :         // Resume deletion ones with deleted_mark
    1454           80 :         for (timeline_id, maybe_local_metadata) in scan.timelines_to_resume_deletion {
    1455            0 :             match maybe_local_metadata {
    1456              :                 None => {
    1457              :                     // See comment in `scan_and_sort_timelines_dir`.
    1458            0 :                     if let Err(e) =
    1459            0 :                         DeleteTimelineFlow::cleanup_remaining_timeline_fs_traces(self, timeline_id)
    1460            0 :                             .await
    1461              :                     {
    1462            0 :                         warn!(
    1463            0 :                             "cannot clean up deleted timeline dir timeline_id: {} error: {:#}",
    1464            0 :                             timeline_id, e
    1465            0 :                         );
    1466            0 :                     }
    1467              :                 }
    1468            0 :                 Some(local_metadata) => {
    1469            0 :                     if let Err(e) = self
    1470            0 :                         .load_local_timeline(timeline_id, local_metadata, ctx, true)
    1471            0 :                         .await
    1472              :                     {
    1473            0 :                         match e {
    1474            0 :                             LoadLocalTimelineError::Load(source) => {
    1475            0 :                                 // We tried to load deleted timeline, this is a bug.
    1476            0 :                                 return Err(anyhow::anyhow!(source).context(
    1477            0 :                                     format!("This is a bug. We tried to load deleted timeline which is wrong and loading failed. Timeline: {timeline_id}")
    1478            0 :                                 ));
    1479              :                             }
    1480            0 :                             LoadLocalTimelineError::ResumeDeletion(source) => {
    1481              :                                 // Make sure resumed deletion wont fail loading for entire tenant.
    1482            0 :                                 error!("Failed to resume timeline deletion: {source:#}")
    1483              :                             }
    1484              :                         }
    1485            0 :                     }
    1486              :                 }
    1487              :             }
    1488              :         }
    1489              : 
    1490            0 :         trace!("Done");
    1491              : 
    1492           80 :         Ok(())
    1493           82 :     }
    1494              : 
    1495              :     /// Subroutine of `load_tenant`, to load an individual timeline
    1496              :     ///
    1497              :     /// NB: The parent is assumed to be already loaded!
    1498            0 :     #[instrument(skip(self, local_metadata, ctx))]
    1499              :     async fn load_local_timeline(
    1500              :         self: &Arc<Self>,
    1501              :         timeline_id: TimelineId,
    1502              :         local_metadata: TimelineMetadata,
    1503              :         ctx: &RequestContext,
    1504              :         found_delete_mark: bool,
    1505              :     ) -> Result<(), LoadLocalTimelineError> {
    1506              :         span::debug_assert_current_span_has_tenant_id();
    1507              : 
    1508              :         let resources = self.build_timeline_resources(timeline_id);
    1509              : 
    1510              :         if found_delete_mark {
    1511              :             // There is no remote client, we found local metadata.
    1512              :             // Continue cleaning up local disk.
    1513              :             DeleteTimelineFlow::resume_deletion(
    1514              :                 Arc::clone(self),
    1515              :                 timeline_id,
    1516              :                 &local_metadata,
    1517              :                 None,
    1518              :                 self.deletion_queue_client.clone(),
    1519              :             )
    1520              :             .await
    1521              :             .context("resume deletion")
    1522              :             .map_err(LoadLocalTimelineError::ResumeDeletion)?;
    1523              :             return Ok(());
    1524              :         }
    1525              : 
    1526              :         let ancestor = if let Some(ancestor_timeline_id) = local_metadata.ancestor_timeline() {
    1527              :             let ancestor_timeline = self.get_timeline(ancestor_timeline_id, false)
    1528            0 :                 .with_context(|| anyhow::anyhow!("cannot find ancestor timeline {ancestor_timeline_id} for timeline {timeline_id}"))
    1529              :                 .map_err(LoadLocalTimelineError::Load)?;
    1530              :             Some(ancestor_timeline)
    1531              :         } else {
    1532              :             None
    1533              :         };
    1534              : 
    1535              :         self.timeline_init_and_sync(timeline_id, resources, None, local_metadata, ancestor, ctx)
    1536              :             .await
    1537              :             .map_err(LoadLocalTimelineError::Load)
    1538              :     }
    1539              : 
    1540          673 :     pub(crate) fn tenant_shard_id(&self) -> TenantShardId {
    1541          673 :         self.tenant_shard_id
    1542          673 :     }
    1543              : 
    1544              :     /// Get Timeline handle for given Neon timeline ID.
    1545              :     /// This function is idempotent. It doesn't change internal state in any way.
    1546        17648 :     pub fn get_timeline(
    1547        17648 :         &self,
    1548        17648 :         timeline_id: TimelineId,
    1549        17648 :         active_only: bool,
    1550        17648 :     ) -> Result<Arc<Timeline>, GetTimelineError> {
    1551        17648 :         let timelines_accessor = self.timelines.lock().unwrap();
    1552        17648 :         let timeline = timelines_accessor
    1553        17648 :             .get(&timeline_id)
    1554        17648 :             .ok_or(GetTimelineError::NotFound {
    1555        17648 :                 tenant_id: self.tenant_shard_id,
    1556        17648 :                 timeline_id,
    1557        17648 :             })?;
    1558              : 
    1559        17595 :         if active_only && !timeline.is_active() {
    1560            6 :             Err(GetTimelineError::NotActive {
    1561            6 :                 tenant_id: self.tenant_shard_id,
    1562            6 :                 timeline_id,
    1563            6 :                 state: timeline.current_state(),
    1564            6 :             })
    1565              :         } else {
    1566        17589 :             Ok(Arc::clone(timeline))
    1567              :         }
    1568        17648 :     }
    1569              : 
    1570              :     /// Lists timelines the tenant contains.
    1571              :     /// Up to tenant's implementation to omit certain timelines that ar not considered ready for use.
    1572          779 :     pub fn list_timelines(&self) -> Vec<Arc<Timeline>> {
    1573          779 :         self.timelines
    1574          779 :             .lock()
    1575          779 :             .unwrap()
    1576          779 :             .values()
    1577          779 :             .map(Arc::clone)
    1578          779 :             .collect()
    1579          779 :     }
    1580              : 
    1581          482 :     pub fn list_timeline_ids(&self) -> Vec<TimelineId> {
    1582          482 :         self.timelines.lock().unwrap().keys().cloned().collect()
    1583          482 :     }
    1584              : 
    1585              :     /// This is used to create the initial 'main' timeline during bootstrapping,
    1586              :     /// or when importing a new base backup. The caller is expected to load an
    1587              :     /// initial image of the datadir to the new timeline after this.
    1588              :     ///
    1589              :     /// Until that happens, the on-disk state is invalid (disk_consistent_lsn=Lsn(0))
    1590              :     /// and the timeline will fail to load at a restart.
    1591              :     ///
    1592              :     /// That's why we add an uninit mark file, and wrap it together witht the Timeline
    1593              :     /// in-memory object into UninitializedTimeline.
    1594              :     /// Once the caller is done setting up the timeline, they should call
    1595              :     /// `UninitializedTimeline::initialize_with_lock` to remove the uninit mark.
    1596              :     ///
    1597              :     /// For tests, use `DatadirModification::init_empty_test_timeline` + `commit` to setup the
    1598              :     /// minimum amount of keys required to get a writable timeline.
    1599              :     /// (Without it, `put` might fail due to `repartition` failing.)
    1600           88 :     pub(crate) async fn create_empty_timeline(
    1601           88 :         &self,
    1602           88 :         new_timeline_id: TimelineId,
    1603           88 :         initdb_lsn: Lsn,
    1604           88 :         pg_version: u32,
    1605           88 :         _ctx: &RequestContext,
    1606           88 :     ) -> anyhow::Result<UninitializedTimeline> {
    1607           88 :         anyhow::ensure!(
    1608           88 :             self.is_active(),
    1609            0 :             "Cannot create empty timelines on inactive tenant"
    1610              :         );
    1611              : 
    1612           88 :         let timeline_uninit_mark = self.create_timeline_uninit_mark(new_timeline_id)?;
    1613           86 :         let new_metadata = TimelineMetadata::new(
    1614           86 :             // Initialize disk_consistent LSN to 0, The caller must import some data to
    1615           86 :             // make it valid, before calling finish_creation()
    1616           86 :             Lsn(0),
    1617           86 :             None,
    1618           86 :             None,
    1619           86 :             Lsn(0),
    1620           86 :             initdb_lsn,
    1621           86 :             initdb_lsn,
    1622           86 :             pg_version,
    1623           86 :         );
    1624           86 :         self.prepare_new_timeline(
    1625           86 :             new_timeline_id,
    1626           86 :             &new_metadata,
    1627           86 :             timeline_uninit_mark,
    1628           86 :             initdb_lsn,
    1629           86 :             None,
    1630           86 :         )
    1631           98 :         .await
    1632           88 :     }
    1633              : 
    1634              :     /// Helper for unit tests to create an empty timeline.
    1635              :     ///
    1636              :     /// The timeline is has state value `Active` but its background loops are not running.
    1637              :     // This makes the various functions which anyhow::ensure! for Active state work in tests.
    1638              :     // Our current tests don't need the background loops.
    1639              :     #[cfg(test)]
    1640           68 :     pub async fn create_test_timeline(
    1641           68 :         &self,
    1642           68 :         new_timeline_id: TimelineId,
    1643           68 :         initdb_lsn: Lsn,
    1644           68 :         pg_version: u32,
    1645           68 :         ctx: &RequestContext,
    1646           68 :     ) -> anyhow::Result<Arc<Timeline>> {
    1647           68 :         let uninit_tl = self
    1648           68 :             .create_empty_timeline(new_timeline_id, initdb_lsn, pg_version, ctx)
    1649           68 :             .await?;
    1650           68 :         let tline = uninit_tl.raw_timeline().expect("we just created it");
    1651           68 :         assert_eq!(tline.get_last_record_lsn(), Lsn(0));
    1652              : 
    1653              :         // Setup minimum keys required for the timeline to be usable.
    1654           68 :         let mut modification = tline.begin_modification(initdb_lsn);
    1655           68 :         modification
    1656           68 :             .init_empty_test_timeline()
    1657           68 :             .context("init_empty_test_timeline")?;
    1658           68 :         modification
    1659           68 :             .commit(ctx)
    1660           68 :             .await
    1661           68 :             .context("commit init_empty_test_timeline modification")?;
    1662              : 
    1663              :         // Flush to disk so that uninit_tl's check for valid disk_consistent_lsn passes.
    1664           68 :         tline.maybe_spawn_flush_loop();
    1665           68 :         tline.freeze_and_flush().await.context("freeze_and_flush")?;
    1666              : 
    1667              :         // Make sure the freeze_and_flush reaches remote storage.
    1668           68 :         tline
    1669           68 :             .remote_client
    1670           68 :             .as_ref()
    1671           68 :             .unwrap()
    1672           68 :             .wait_completion()
    1673           25 :             .await
    1674           68 :             .unwrap();
    1675              : 
    1676           68 :         let tl = uninit_tl.finish_creation()?;
    1677              :         // The non-test code would call tl.activate() here.
    1678           68 :         tl.set_state(TimelineState::Active);
    1679           68 :         Ok(tl)
    1680           68 :     }
    1681              : 
    1682              :     /// Create a new timeline.
    1683              :     ///
    1684              :     /// Returns the new timeline ID and reference to its Timeline object.
    1685              :     ///
    1686              :     /// If the caller specified the timeline ID to use (`new_timeline_id`), and timeline with
    1687              :     /// the same timeline ID already exists, returns CreateTimelineError::AlreadyExists.
    1688              :     #[allow(clippy::too_many_arguments)]
    1689          892 :     pub(crate) async fn create_timeline(
    1690          892 :         &self,
    1691          892 :         new_timeline_id: TimelineId,
    1692          892 :         ancestor_timeline_id: Option<TimelineId>,
    1693          892 :         mut ancestor_start_lsn: Option<Lsn>,
    1694          892 :         pg_version: u32,
    1695          892 :         load_existing_initdb: Option<TimelineId>,
    1696          892 :         broker_client: storage_broker::BrokerClientChannel,
    1697          892 :         ctx: &RequestContext,
    1698          892 :     ) -> Result<Arc<Timeline>, CreateTimelineError> {
    1699          892 :         if !self.is_active() {
    1700            0 :             if matches!(self.current_state(), TenantState::Stopping { .. }) {
    1701            0 :                 return Err(CreateTimelineError::ShuttingDown);
    1702              :             } else {
    1703            0 :                 return Err(CreateTimelineError::Other(anyhow::anyhow!(
    1704            0 :                     "Cannot create timelines on inactive tenant"
    1705            0 :                 )));
    1706              :             }
    1707          892 :         }
    1708              : 
    1709          892 :         let _gate = self
    1710          892 :             .gate
    1711          892 :             .enter()
    1712          892 :             .map_err(|_| CreateTimelineError::ShuttingDown)?;
    1713              : 
    1714              :         // Get exclusive access to the timeline ID: this ensures that it does not already exist,
    1715              :         // and that no other creation attempts will be allowed in while we are working.  The
    1716              :         // uninit_mark is a guard.
    1717          892 :         let uninit_mark = match self.create_timeline_uninit_mark(new_timeline_id) {
    1718          868 :             Ok(m) => m,
    1719              :             Err(TimelineExclusionError::AlreadyCreating) => {
    1720              :                 // Creation is in progress, we cannot create it again, and we cannot
    1721              :                 // check if this request matches the existing one, so caller must try
    1722              :                 // again later.
    1723            1 :                 return Err(CreateTimelineError::AlreadyCreating);
    1724              :             }
    1725            0 :             Err(TimelineExclusionError::Other(e)) => {
    1726            0 :                 return Err(CreateTimelineError::Other(e));
    1727              :             }
    1728           23 :             Err(TimelineExclusionError::AlreadyExists(existing)) => {
    1729            0 :                 debug!("timeline {new_timeline_id} already exists");
    1730              : 
    1731              :                 // Idempotency: creating the same timeline twice is not an error, unless
    1732              :                 // the second creation has different parameters.
    1733           23 :                 if existing.get_ancestor_timeline_id() != ancestor_timeline_id
    1734           23 :                     || existing.pg_version != pg_version
    1735           23 :                     || (ancestor_start_lsn.is_some()
    1736            0 :                         && ancestor_start_lsn != Some(existing.get_ancestor_lsn()))
    1737              :                 {
    1738            0 :                     return Err(CreateTimelineError::Conflict);
    1739           23 :                 }
    1740              : 
    1741           23 :                 if let Some(remote_client) = existing.remote_client.as_ref() {
    1742              :                     // Wait for uploads to complete, so that when we return Ok, the timeline
    1743              :                     // is known to be durable on remote storage. Just like we do at the end of
    1744              :                     // this function, after we have created the timeline ourselves.
    1745              :                     //
    1746              :                     // We only really care that the initial version of `index_part.json` has
    1747              :                     // been uploaded. That's enough to remember that the timeline
    1748              :                     // exists. However, there is no function to wait specifically for that so
    1749              :                     // we just wait for all in-progress uploads to finish.
    1750           23 :                     remote_client
    1751           23 :                         .wait_completion()
    1752            0 :                         .await
    1753           23 :                         .context("wait for timeline uploads to complete")?;
    1754            0 :                 }
    1755              : 
    1756           23 :                 return Ok(existing);
    1757              :             }
    1758              :         };
    1759              : 
    1760          868 :         let loaded_timeline = match ancestor_timeline_id {
    1761          268 :             Some(ancestor_timeline_id) => {
    1762          268 :                 let ancestor_timeline = self
    1763          268 :                     .get_timeline(ancestor_timeline_id, false)
    1764          268 :                     .context("Cannot branch off the timeline that's not present in pageserver")?;
    1765              : 
    1766              :                 // instead of waiting around, just deny the request because ancestor is not yet
    1767              :                 // ready for other purposes either.
    1768          268 :                 if !ancestor_timeline.is_active() {
    1769            6 :                     return Err(CreateTimelineError::AncestorNotActive);
    1770          262 :                 }
    1771              : 
    1772          262 :                 if let Some(lsn) = ancestor_start_lsn.as_mut() {
    1773           33 :                     *lsn = lsn.align();
    1774           33 : 
    1775           33 :                     let ancestor_ancestor_lsn = ancestor_timeline.get_ancestor_lsn();
    1776           33 :                     if ancestor_ancestor_lsn > *lsn {
    1777              :                         // can we safely just branch from the ancestor instead?
    1778            2 :                         return Err(CreateTimelineError::AncestorLsn(anyhow::anyhow!(
    1779            2 :                             "invalid start lsn {} for ancestor timeline {}: less than timeline ancestor lsn {}",
    1780            2 :                             lsn,
    1781            2 :                             ancestor_timeline_id,
    1782            2 :                             ancestor_ancestor_lsn,
    1783            2 :                         )));
    1784           31 :                     }
    1785           31 : 
    1786           31 :                     // Wait for the WAL to arrive and be processed on the parent branch up
    1787           31 :                     // to the requested branch point. The repository code itself doesn't
    1788           31 :                     // require it, but if we start to receive WAL on the new timeline,
    1789           31 :                     // decoding the new WAL might need to look up previous pages, relation
    1790           31 :                     // sizes etc. and that would get confused if the previous page versions
    1791           31 :                     // are not in the repository yet.
    1792           31 :                     ancestor_timeline
    1793           31 :                         .wait_lsn(*lsn, ctx)
    1794            9 :                         .await
    1795           31 :                         .map_err(|e| match e {
    1796            0 :                             e @ (WaitLsnError::Timeout(_) | WaitLsnError::BadState) => {
    1797            0 :                                 CreateTimelineError::AncestorLsn(anyhow::anyhow!(e))
    1798              :                             }
    1799            0 :                             WaitLsnError::Shutdown => CreateTimelineError::ShuttingDown,
    1800           31 :                         })?;
    1801          229 :                 }
    1802              : 
    1803          260 :                 self.branch_timeline(
    1804          260 :                     &ancestor_timeline,
    1805          260 :                     new_timeline_id,
    1806          260 :                     ancestor_start_lsn,
    1807          260 :                     uninit_mark,
    1808          260 :                     ctx,
    1809          260 :                 )
    1810          260 :                 .await?
    1811              :             }
    1812              :             None => {
    1813          600 :                 self.bootstrap_timeline(
    1814          600 :                     new_timeline_id,
    1815          600 :                     pg_version,
    1816          600 :                     load_existing_initdb,
    1817          600 :                     uninit_mark,
    1818          600 :                     ctx,
    1819          600 :                 )
    1820      7751652 :                 .await?
    1821              :             }
    1822              :         };
    1823              : 
    1824              :         // At this point we have dropped our guard on [`Self::timelines_creating`], and
    1825              :         // the timeline is visible in [`Self::timelines`], but it is _not_ durable yet.  We must
    1826              :         // not send a success to the caller until it is.  The same applies to handling retries,
    1827              :         // see the handling of [`TimelineExclusionError::AlreadyExists`] above.
    1828          846 :         if let Some(remote_client) = loaded_timeline.remote_client.as_ref() {
    1829          846 :             let kind = ancestor_timeline_id
    1830          846 :                 .map(|_| "branched")
    1831          846 :                 .unwrap_or("bootstrapped");
    1832          846 :             remote_client.wait_completion().await.with_context(|| {
    1833            0 :                 format!("wait for {} timeline initial uploads to complete", kind)
    1834          842 :             })?;
    1835            0 :         }
    1836              : 
    1837          842 :         loaded_timeline.activate(broker_client, None, ctx);
    1838          842 : 
    1839          842 :         Ok(loaded_timeline)
    1840          886 :     }
    1841              : 
    1842           64 :     pub(crate) async fn delete_timeline(
    1843           64 :         self: Arc<Self>,
    1844           64 :         timeline_id: TimelineId,
    1845           64 :     ) -> Result<(), DeleteTimelineError> {
    1846          370 :         DeleteTimelineFlow::run(&self, timeline_id, false).await?;
    1847              : 
    1848           53 :         Ok(())
    1849           64 :     }
    1850              : 
    1851              :     /// perform one garbage collection iteration, removing old data files from disk.
    1852              :     /// this function is periodically called by gc task.
    1853              :     /// also it can be explicitly requested through page server api 'do_gc' command.
    1854              :     ///
    1855              :     /// `target_timeline_id` specifies the timeline to GC, or None for all.
    1856              :     ///
    1857              :     /// The `horizon` an `pitr` parameters determine how much WAL history needs to be retained.
    1858              :     /// Also known as the retention period, or the GC cutoff point. `horizon` specifies
    1859              :     /// the amount of history, as LSN difference from current latest LSN on each timeline.
    1860              :     /// `pitr` specifies the same as a time difference from the current time. The effective
    1861              :     /// GC cutoff point is determined conservatively by either `horizon` and `pitr`, whichever
    1862              :     /// requires more history to be retained.
    1863              :     //
    1864          358 :     pub async fn gc_iteration(
    1865          358 :         &self,
    1866          358 :         target_timeline_id: Option<TimelineId>,
    1867          358 :         horizon: u64,
    1868          358 :         pitr: Duration,
    1869          358 :         cancel: &CancellationToken,
    1870          358 :         ctx: &RequestContext,
    1871          358 :     ) -> anyhow::Result<GcResult> {
    1872          358 :         // Don't start doing work during shutdown
    1873          358 :         if let TenantState::Stopping { .. } = self.current_state() {
    1874            0 :             return Ok(GcResult::default());
    1875          358 :         }
    1876          358 : 
    1877          358 :         // there is a global allowed_error for this
    1878          358 :         anyhow::ensure!(
    1879          358 :             self.is_active(),
    1880            0 :             "Cannot run GC iteration on inactive tenant"
    1881              :         );
    1882              : 
    1883              :         {
    1884          358 :             let conf = self.tenant_conf.read().unwrap();
    1885          358 : 
    1886          358 :             if !conf.location.may_delete_layers_hint() {
    1887            1 :                 info!("Skipping GC in location state {:?}", conf.location);
    1888            1 :                 return Ok(GcResult::default());
    1889          357 :             }
    1890          357 :         }
    1891          357 : 
    1892          357 :         self.gc_iteration_internal(target_timeline_id, horizon, pitr, cancel, ctx)
    1893       275826 :             .await
    1894          357 :     }
    1895              : 
    1896              :     /// Perform one compaction iteration.
    1897              :     /// This function is periodically called by compactor task.
    1898              :     /// Also it can be explicitly requested per timeline through page server
    1899              :     /// api's 'compact' command.
    1900          408 :     async fn compaction_iteration(
    1901          408 :         &self,
    1902          408 :         cancel: &CancellationToken,
    1903          408 :         ctx: &RequestContext,
    1904          408 :     ) -> anyhow::Result<(), timeline::CompactionError> {
    1905          408 :         // Don't start doing work during shutdown, or when broken, we do not need those in the logs
    1906          408 :         if !self.is_active() {
    1907            0 :             return Ok(());
    1908          408 :         }
    1909          408 : 
    1910          408 :         {
    1911          408 :             let conf = self.tenant_conf.read().unwrap();
    1912          408 :             if !conf.location.may_delete_layers_hint() || !conf.location.may_upload_layers_hint() {
    1913            9 :                 info!("Skipping compaction in location state {:?}", conf.location);
    1914            9 :                 return Ok(());
    1915          399 :             }
    1916          399 :         }
    1917          399 : 
    1918          399 :         // Scan through the hashmap and collect a list of all the timelines,
    1919          399 :         // while holding the lock. Then drop the lock and actually perform the
    1920          399 :         // compactions.  We don't want to block everything else while the
    1921          399 :         // compaction runs.
    1922          399 :         let timelines_to_compact = {
    1923          399 :             let timelines = self.timelines.lock().unwrap();
    1924          399 :             let timelines_to_compact = timelines
    1925          399 :                 .iter()
    1926          580 :                 .filter_map(|(timeline_id, timeline)| {
    1927          580 :                     if timeline.is_active() {
    1928          577 :                         Some((*timeline_id, timeline.clone()))
    1929              :                     } else {
    1930            3 :                         None
    1931              :                     }
    1932          580 :                 })
    1933          399 :                 .collect::<Vec<_>>();
    1934          399 :             drop(timelines);
    1935          399 :             timelines_to_compact
    1936              :         };
    1937              : 
    1938          973 :         for (timeline_id, timeline) in &timelines_to_compact {
    1939          576 :             timeline
    1940          576 :                 .compact(cancel, EnumSet::empty(), ctx)
    1941          576 :                 .instrument(info_span!("compact_timeline", %timeline_id))
    1942        93928 :                 .await?;
    1943              :         }
    1944              : 
    1945          397 :         Ok(())
    1946          406 :     }
    1947              : 
    1948        34614 :     pub fn current_state(&self) -> TenantState {
    1949        34614 :         self.state.borrow().clone()
    1950        34614 :     }
    1951              : 
    1952         2762 :     pub fn is_active(&self) -> bool {
    1953         2762 :         self.current_state() == TenantState::Active
    1954         2762 :     }
    1955              : 
    1956          743 :     pub fn generation(&self) -> Generation {
    1957          743 :         self.generation
    1958          743 :     }
    1959              : 
    1960          482 :     pub(crate) fn wal_redo_manager_status(&self) -> Option<WalRedoManagerStatus> {
    1961          482 :         self.walredo_mgr.as_ref().and_then(|mgr| mgr.status())
    1962          482 :     }
    1963              : 
    1964              :     /// Changes tenant status to active, unless shutdown was already requested.
    1965              :     ///
    1966              :     /// `background_jobs_can_start` is an optional barrier set to a value during pageserver startup
    1967              :     /// to delay background jobs. Background jobs can be started right away when None is given.
    1968          856 :     fn activate(
    1969          856 :         self: &Arc<Self>,
    1970          856 :         broker_client: BrokerClientChannel,
    1971          856 :         background_jobs_can_start: Option<&completion::Barrier>,
    1972          856 :         ctx: &RequestContext,
    1973          856 :     ) {
    1974          856 :         span::debug_assert_current_span_has_tenant_id();
    1975          856 : 
    1976          856 :         let mut activating = false;
    1977          856 :         self.state.send_modify(|current_state| {
    1978          856 :             use pageserver_api::models::ActivatingFrom;
    1979          856 :             match &*current_state {
    1980              :                 TenantState::Activating(_) | TenantState::Active | TenantState::Broken { .. } | TenantState::Stopping { .. } => {
    1981            0 :                     panic!("caller is responsible for calling activate() only on Loading / Attaching tenants, got {state:?}", state = current_state);
    1982              :                 }
    1983            0 :                 TenantState::Loading => {
    1984            0 :                     *current_state = TenantState::Activating(ActivatingFrom::Loading);
    1985            0 :                 }
    1986          856 :                 TenantState::Attaching => {
    1987          856 :                     *current_state = TenantState::Activating(ActivatingFrom::Attaching);
    1988          856 :                 }
    1989              :             }
    1990          856 :             debug!(tenant_id = %self.tenant_shard_id.tenant_id, shard_id = %self.tenant_shard_id.shard_slug(), "Activating tenant");
    1991          856 :             activating = true;
    1992          856 :             // Continue outside the closure. We need to grab timelines.lock()
    1993          856 :             // and we plan to turn it into a tokio::sync::Mutex in a future patch.
    1994          856 :         });
    1995          856 : 
    1996          856 :         if activating {
    1997          856 :             let timelines_accessor = self.timelines.lock().unwrap();
    1998          856 :             let timelines_to_activate = timelines_accessor
    1999          856 :                 .values()
    2000          856 :                 .filter(|timeline| !(timeline.is_broken() || timeline.is_stopping()));
    2001          856 : 
    2002          856 :             // Spawn gc and compaction loops. The loops will shut themselves
    2003          856 :             // down when they notice that the tenant is inactive.
    2004          856 :             tasks::start_background_loops(self, background_jobs_can_start);
    2005          856 : 
    2006          856 :             let mut activated_timelines = 0;
    2007              : 
    2008         1261 :             for timeline in timelines_to_activate {
    2009          405 :                 timeline.activate(broker_client.clone(), background_jobs_can_start, ctx);
    2010          405 :                 activated_timelines += 1;
    2011          405 :             }
    2012              : 
    2013          856 :             self.state.send_modify(move |current_state| {
    2014          856 :                 assert!(
    2015          856 :                     matches!(current_state, TenantState::Activating(_)),
    2016            0 :                     "set_stopping and set_broken wait for us to leave Activating state",
    2017              :                 );
    2018          856 :                 *current_state = TenantState::Active;
    2019          856 : 
    2020          856 :                 let elapsed = self.constructed_at.elapsed();
    2021          856 :                 let total_timelines = timelines_accessor.len();
    2022          856 : 
    2023          856 :                 // log a lot of stuff, because some tenants sometimes suffer from user-visible
    2024          856 :                 // times to activate. see https://github.com/neondatabase/neon/issues/4025
    2025          856 :                 info!(
    2026          856 :                     since_creation_millis = elapsed.as_millis(),
    2027          856 :                     tenant_id = %self.tenant_shard_id.tenant_id,
    2028          856 :                     shard_id = %self.tenant_shard_id.shard_slug(),
    2029          856 :                     activated_timelines,
    2030          856 :                     total_timelines,
    2031          856 :                     post_state = <&'static str>::from(&*current_state),
    2032          856 :                     "activation attempt finished"
    2033          856 :                 );
    2034              : 
    2035          856 :                 TENANT.activation.observe(elapsed.as_secs_f64());
    2036          856 :             });
    2037            0 :         }
    2038          856 :     }
    2039              : 
    2040              :     /// Shutdown the tenant and join all of the spawned tasks.
    2041              :     ///
    2042              :     /// The method caters for all use-cases:
    2043              :     /// - pageserver shutdown (freeze_and_flush == true)
    2044              :     /// - detach + ignore (freeze_and_flush == false)
    2045              :     ///
    2046              :     /// This will attempt to shutdown even if tenant is broken.
    2047              :     ///
    2048              :     /// `shutdown_progress` is a [`completion::Barrier`] for the shutdown initiated by this call.
    2049              :     /// If the tenant is already shutting down, we return a clone of the first shutdown call's
    2050              :     /// `Barrier` as an `Err`. This not-first caller can use the returned barrier to join with
    2051              :     /// the ongoing shutdown.
    2052          464 :     async fn shutdown(
    2053          464 :         &self,
    2054          464 :         shutdown_progress: completion::Barrier,
    2055          464 :         freeze_and_flush: bool,
    2056          464 :     ) -> Result<(), completion::Barrier> {
    2057          464 :         span::debug_assert_current_span_has_tenant_id();
    2058          464 : 
    2059          464 :         // Set tenant (and its timlines) to Stoppping state.
    2060          464 :         //
    2061          464 :         // Since we can only transition into Stopping state after activation is complete,
    2062          464 :         // run it in a JoinSet so all tenants have a chance to stop before we get SIGKILLed.
    2063          464 :         //
    2064          464 :         // Transitioning tenants to Stopping state has a couple of non-obvious side effects:
    2065          464 :         // 1. Lock out any new requests to the tenants.
    2066          464 :         // 2. Signal cancellation to WAL receivers (we wait on it below).
    2067          464 :         // 3. Signal cancellation for other tenant background loops.
    2068          464 :         // 4. ???
    2069          464 :         //
    2070          464 :         // The waiting for the cancellation is not done uniformly.
    2071          464 :         // We certainly wait for WAL receivers to shut down.
    2072          464 :         // That is necessary so that no new data comes in before the freeze_and_flush.
    2073          464 :         // But the tenant background loops are joined-on in our caller.
    2074          464 :         // It's mesed up.
    2075          464 :         // we just ignore the failure to stop
    2076          464 : 
    2077          464 :         // If we're still attaching, fire the cancellation token early to drop out: this
    2078          464 :         // will prevent us flushing, but ensures timely shutdown if some I/O during attach
    2079          464 :         // is very slow.
    2080          464 :         if matches!(self.current_state(), TenantState::Attaching) {
    2081           18 :             self.cancel.cancel();
    2082          446 :         }
    2083              : 
    2084          464 :         match self.set_stopping(shutdown_progress, false, false).await {
    2085          407 :             Ok(()) => {}
    2086           57 :             Err(SetStoppingError::Broken) => {
    2087           57 :                 // assume that this is acceptable
    2088           57 :             }
    2089            0 :             Err(SetStoppingError::AlreadyStopping(other)) => {
    2090              :                 // give caller the option to wait for this this shutdown
    2091            0 :                 info!("Tenant::shutdown: AlreadyStopping");
    2092            0 :                 return Err(other);
    2093              :             }
    2094              :         };
    2095              : 
    2096          464 :         let mut js = tokio::task::JoinSet::new();
    2097          464 :         {
    2098          464 :             let timelines = self.timelines.lock().unwrap();
    2099          591 :             timelines.values().for_each(|timeline| {
    2100          591 :                 let timeline = Arc::clone(timeline);
    2101          591 :                 let timeline_id = timeline.timeline_id;
    2102              : 
    2103          591 :                 let span =
    2104          591 :                     tracing::info_span!("timeline_shutdown", %timeline_id, ?freeze_and_flush);
    2105          591 :                 js.spawn(async move {
    2106          591 :                     if freeze_and_flush {
    2107          876 :                         timeline.flush_and_shutdown().instrument(span).await
    2108              :                     } else {
    2109          736 :                         timeline.shutdown().instrument(span).await
    2110              :                     }
    2111          591 :                 });
    2112          591 :             })
    2113              :         };
    2114              :         // test_long_timeline_create_then_tenant_delete is leaning on this message
    2115          464 :         tracing::info!("Waiting for timelines...");
    2116         1055 :         while let Some(res) = js.join_next().await {
    2117            0 :             match res {
    2118          591 :                 Ok(()) => {}
    2119            0 :                 Err(je) if je.is_cancelled() => unreachable!("no cancelling used"),
    2120            0 :                 Err(je) if je.is_panic() => { /* logged already */ }
    2121            0 :                 Err(je) => warn!("unexpected JoinError: {je:?}"),
    2122              :             }
    2123              :         }
    2124              : 
    2125              :         // We cancel the Tenant's cancellation token _after_ the timelines have all shut down.  This permits
    2126              :         // them to continue to do work during their shutdown methods, e.g. flushing data.
    2127            0 :         tracing::debug!("Cancelling CancellationToken");
    2128          464 :         self.cancel.cancel();
    2129              : 
    2130              :         // shutdown all tenant and timeline tasks: gc, compaction, page service
    2131              :         // No new tasks will be started for this tenant because it's in `Stopping` state.
    2132              :         //
    2133              :         // this will additionally shutdown and await all timeline tasks.
    2134            0 :         tracing::debug!("Waiting for tasks...");
    2135          464 :         task_mgr::shutdown_tasks(None, Some(self.tenant_shard_id), None).await;
    2136              : 
    2137              :         // Wait for any in-flight operations to complete
    2138          464 :         self.gate.close().await;
    2139              : 
    2140          464 :         Ok(())
    2141          464 :     }
    2142              : 
    2143              :     /// Change tenant status to Stopping, to mark that it is being shut down.
    2144              :     ///
    2145              :     /// This function waits for the tenant to become active if it isn't already, before transitioning it into Stopping state.
    2146              :     ///
    2147              :     /// This function is not cancel-safe!
    2148              :     ///
    2149              :     /// `allow_transition_from_loading` is needed for the special case of loading task deleting the tenant.
    2150              :     /// `allow_transition_from_attaching` is needed for the special case of attaching deleted tenant.
    2151          485 :     async fn set_stopping(
    2152          485 :         &self,
    2153          485 :         progress: completion::Barrier,
    2154          485 :         allow_transition_from_loading: bool,
    2155          485 :         allow_transition_from_attaching: bool,
    2156          485 :     ) -> Result<(), SetStoppingError> {
    2157          485 :         let mut rx = self.state.subscribe();
    2158          485 : 
    2159          485 :         // cannot stop before we're done activating, so wait out until we're done activating
    2160          513 :         rx.wait_for(|state| match state {
    2161           21 :             TenantState::Attaching if allow_transition_from_attaching => true,
    2162              :             TenantState::Activating(_) | TenantState::Attaching => {
    2163           28 :                 info!(
    2164           28 :                     "waiting for {} to turn Active|Broken|Stopping",
    2165           28 :                     <&'static str>::from(state)
    2166           28 :                 );
    2167           28 :                 false
    2168              :             }
    2169            0 :             TenantState::Loading => allow_transition_from_loading,
    2170          464 :             TenantState::Active | TenantState::Broken { .. } | TenantState::Stopping { .. } => true,
    2171          513 :         })
    2172           28 :         .await
    2173          485 :         .expect("cannot drop self.state while on a &self method");
    2174          485 : 
    2175          485 :         // we now know we're done activating, let's see whether this task is the winner to transition into Stopping
    2176          485 :         let mut err = None;
    2177          485 :         let stopping = self.state.send_if_modified(|current_state| match current_state {
    2178              :             TenantState::Activating(_) => {
    2179            0 :                 unreachable!("1we ensured above that we're done with activation, and, there is no re-activation")
    2180              :             }
    2181              :             TenantState::Attaching => {
    2182           21 :                 if !allow_transition_from_attaching {
    2183            0 :                     unreachable!("2we ensured above that we're done with activation, and, there is no re-activation")
    2184           21 :                 };
    2185           21 :                 *current_state = TenantState::Stopping { progress };
    2186           21 :                 true
    2187              :             }
    2188              :             TenantState::Loading => {
    2189            0 :                 if !allow_transition_from_loading {
    2190            0 :                     unreachable!("3we ensured above that we're done with activation, and, there is no re-activation")
    2191            0 :                 };
    2192            0 :                 *current_state = TenantState::Stopping { progress };
    2193            0 :                 true
    2194              :             }
    2195              :             TenantState::Active => {
    2196              :                 // FIXME: due to time-of-check vs time-of-use issues, it can happen that new timelines
    2197              :                 // are created after the transition to Stopping. That's harmless, as the Timelines
    2198              :                 // won't be accessible to anyone afterwards, because the Tenant is in Stopping state.
    2199          407 :                 *current_state = TenantState::Stopping { progress };
    2200          407 :                 // Continue stopping outside the closure. We need to grab timelines.lock()
    2201          407 :                 // and we plan to turn it into a tokio::sync::Mutex in a future patch.
    2202          407 :                 true
    2203              :             }
    2204           57 :             TenantState::Broken { reason, .. } => {
    2205           57 :                 info!(
    2206           57 :                     "Cannot set tenant to Stopping state, it is in Broken state due to: {reason}"
    2207           57 :                 );
    2208           57 :                 err = Some(SetStoppingError::Broken);
    2209           57 :                 false
    2210              :             }
    2211            0 :             TenantState::Stopping { progress } => {
    2212            0 :                 info!("Tenant is already in Stopping state");
    2213            0 :                 err = Some(SetStoppingError::AlreadyStopping(progress.clone()));
    2214            0 :                 false
    2215              :             }
    2216          485 :         });
    2217          485 :         match (stopping, err) {
    2218          428 :             (true, None) => {} // continue
    2219           57 :             (false, Some(err)) => return Err(err),
    2220            0 :             (true, Some(_)) => unreachable!(
    2221            0 :                 "send_if_modified closure must error out if not transitioning to Stopping"
    2222            0 :             ),
    2223            0 :             (false, None) => unreachable!(
    2224            0 :                 "send_if_modified closure must return true if transitioning to Stopping"
    2225            0 :             ),
    2226              :         }
    2227              : 
    2228          428 :         let timelines_accessor = self.timelines.lock().unwrap();
    2229          428 :         let not_broken_timelines = timelines_accessor
    2230          428 :             .values()
    2231          531 :             .filter(|timeline| !timeline.is_broken());
    2232          950 :         for timeline in not_broken_timelines {
    2233          522 :             timeline.set_state(TimelineState::Stopping);
    2234          522 :         }
    2235          428 :         Ok(())
    2236          485 :     }
    2237              : 
    2238              :     /// Method for tenant::mgr to transition us into Broken state in case of a late failure in
    2239              :     /// `remove_tenant_from_memory`
    2240              :     ///
    2241              :     /// This function waits for the tenant to become active if it isn't already, before transitioning it into Stopping state.
    2242              :     ///
    2243              :     /// In tests, we also use this to set tenants to Broken state on purpose.
    2244           51 :     pub(crate) async fn set_broken(&self, reason: String) {
    2245           51 :         let mut rx = self.state.subscribe();
    2246           51 : 
    2247           51 :         // The load & attach routines own the tenant state until it has reached `Active`.
    2248           51 :         // So, wait until it's done.
    2249           51 :         rx.wait_for(|state| match state {
    2250              :             TenantState::Activating(_) | TenantState::Loading | TenantState::Attaching => {
    2251            0 :                 info!(
    2252            0 :                     "waiting for {} to turn Active|Broken|Stopping",
    2253            0 :                     <&'static str>::from(state)
    2254            0 :                 );
    2255            0 :                 false
    2256              :             }
    2257           51 :             TenantState::Active | TenantState::Broken { .. } | TenantState::Stopping { .. } => true,
    2258           51 :         })
    2259            0 :         .await
    2260           51 :         .expect("cannot drop self.state while on a &self method");
    2261           51 : 
    2262           51 :         // we now know we're done activating, let's see whether this task is the winner to transition into Broken
    2263           51 :         self.set_broken_no_wait(reason)
    2264           51 :     }
    2265              : 
    2266           51 :     pub(crate) fn set_broken_no_wait(&self, reason: impl Display) {
    2267           51 :         let reason = reason.to_string();
    2268           51 :         self.state.send_modify(|current_state| {
    2269           51 :             match *current_state {
    2270              :                 TenantState::Activating(_) | TenantState::Loading | TenantState::Attaching => {
    2271            0 :                     unreachable!("we ensured above that we're done with activation, and, there is no re-activation")
    2272              :                 }
    2273              :                 TenantState::Active => {
    2274            2 :                     if cfg!(feature = "testing") {
    2275            2 :                         warn!("Changing Active tenant to Broken state, reason: {}", reason);
    2276            2 :                         *current_state = TenantState::broken_from_reason(reason);
    2277              :                     } else {
    2278            0 :                         unreachable!("not allowed to call set_broken on Active tenants in non-testing builds")
    2279              :                     }
    2280              :                 }
    2281              :                 TenantState::Broken { .. } => {
    2282            0 :                     warn!("Tenant is already in Broken state");
    2283              :                 }
    2284              :                 // This is the only "expected" path, any other path is a bug.
    2285              :                 TenantState::Stopping { .. } => {
    2286           49 :                     warn!(
    2287           49 :                         "Marking Stopping tenant as Broken state, reason: {}",
    2288           49 :                         reason
    2289           49 :                     );
    2290           49 :                     *current_state = TenantState::broken_from_reason(reason);
    2291              :                 }
    2292              :            }
    2293           51 :         });
    2294           51 :     }
    2295              : 
    2296          413 :     pub fn subscribe_for_state_updates(&self) -> watch::Receiver<TenantState> {
    2297          413 :         self.state.subscribe()
    2298          413 :     }
    2299              : 
    2300              :     /// The activate_now semaphore is initialized with zero units.  As soon as
    2301              :     /// we add a unit, waiters will be able to acquire a unit and proceed.
    2302          571 :     pub(crate) fn activate_now(&self) {
    2303          571 :         self.activate_now_sem.add_permits(1);
    2304          571 :     }
    2305              : 
    2306         1210 :     pub(crate) async fn wait_to_become_active(
    2307         1210 :         &self,
    2308         1210 :         timeout: Duration,
    2309         1210 :     ) -> Result<(), GetActiveTenantError> {
    2310         1210 :         let mut receiver = self.state.subscribe();
    2311         1645 :         loop {
    2312         1645 :             let current_state = receiver.borrow_and_update().clone();
    2313         1645 :             match current_state {
    2314              :                 TenantState::Loading | TenantState::Attaching | TenantState::Activating(_) => {
    2315              :                     // in these states, there's a chance that we can reach ::Active
    2316          444 :                     self.activate_now();
    2317          575 :                     match timeout_cancellable(timeout, &self.cancel, receiver.changed()).await {
    2318          435 :                         Ok(r) => {
    2319          435 :                             r.map_err(
    2320          435 :                             |_e: tokio::sync::watch::error::RecvError|
    2321              :                                 // Tenant existed but was dropped: report it as non-existent
    2322          435 :                                 GetActiveTenantError::NotFound(GetTenantError::NotFound(self.tenant_shard_id.tenant_id))
    2323          435 :                         )?
    2324              :                         }
    2325              :                         Err(TimeoutCancellableError::Cancelled) => {
    2326            9 :                             return Err(GetActiveTenantError::Cancelled);
    2327              :                         }
    2328              :                         Err(TimeoutCancellableError::Timeout) => {
    2329            0 :                             return Err(GetActiveTenantError::WaitForActiveTimeout {
    2330            0 :                                 latest_state: Some(self.current_state()),
    2331            0 :                                 wait_time: timeout,
    2332            0 :                             });
    2333              :                         }
    2334              :                     }
    2335              :                 }
    2336              :                 TenantState::Active { .. } => {
    2337         1201 :                     return Ok(());
    2338              :                 }
    2339              :                 TenantState::Broken { .. } | TenantState::Stopping { .. } => {
    2340              :                     // There's no chance the tenant can transition back into ::Active
    2341            0 :                     return Err(GetActiveTenantError::WillNotBecomeActive(current_state));
    2342              :                 }
    2343              :             }
    2344              :         }
    2345         1210 :     }
    2346              : 
    2347           72 :     pub(crate) fn get_attach_mode(&self) -> AttachmentMode {
    2348           72 :         self.tenant_conf.read().unwrap().location.attach_mode
    2349           72 :     }
    2350              : 
    2351              :     /// For API access: generate a LocationConfig equivalent to the one that would be used to
    2352              :     /// create a Tenant in the same state.  Do not use this in hot paths: it's for relatively
    2353              :     /// rare external API calls, like a reconciliation at startup.
    2354            5 :     pub(crate) fn get_location_conf(&self) -> models::LocationConfig {
    2355            5 :         let conf = self.tenant_conf.read().unwrap();
    2356              : 
    2357            5 :         let location_config_mode = match conf.location.attach_mode {
    2358            5 :             AttachmentMode::Single => models::LocationConfigMode::AttachedSingle,
    2359            0 :             AttachmentMode::Multi => models::LocationConfigMode::AttachedMulti,
    2360            0 :             AttachmentMode::Stale => models::LocationConfigMode::AttachedStale,
    2361              :         };
    2362              : 
    2363              :         // We have a pageserver TenantConf, we need the API-facing TenantConfig.
    2364            5 :         let tenant_config: models::TenantConfig = conf.tenant_conf.into();
    2365            5 : 
    2366            5 :         models::LocationConfig {
    2367            5 :             mode: location_config_mode,
    2368            5 :             generation: self.generation.into(),
    2369            5 :             secondary_conf: None,
    2370            5 :             shard_number: self.shard_identity.number.0,
    2371            5 :             shard_count: self.shard_identity.count.0,
    2372            5 :             shard_stripe_size: self.shard_identity.stripe_size.0,
    2373            5 :             tenant_conf: tenant_config,
    2374            5 :         }
    2375            5 :     }
    2376              : 
    2377           72 :     pub(crate) fn get_tenant_shard_id(&self) -> &TenantShardId {
    2378           72 :         &self.tenant_shard_id
    2379           72 :     }
    2380              : 
    2381           12 :     pub(crate) fn get_generation(&self) -> Generation {
    2382           12 :         self.generation
    2383           12 :     }
    2384              : 
    2385              :     /// This function partially shuts down the tenant (it shuts down the Timelines) and is fallible,
    2386              :     /// and can leave the tenant in a bad state if it fails.  The caller is responsible for
    2387              :     /// resetting this tenant to a valid state if we fail.
    2388            5 :     pub(crate) async fn split_prepare(
    2389            5 :         &self,
    2390            5 :         child_shards: &Vec<TenantShardId>,
    2391            5 :     ) -> anyhow::Result<()> {
    2392            5 :         let timelines = self.timelines.lock().unwrap().clone();
    2393            5 :         for timeline in timelines.values() {
    2394            5 :             let Some(tl_client) = &timeline.remote_client else {
    2395            0 :                 anyhow::bail!("Remote storage is mandatory");
    2396              :             };
    2397              : 
    2398            5 :             let Some(remote_storage) = &self.remote_storage else {
    2399            0 :                 anyhow::bail!("Remote storage is mandatory");
    2400              :             };
    2401              : 
    2402              :             // We do not block timeline creation/deletion during splits inside the pageserver: it is up to higher levels
    2403              :             // to ensure that they do not start a split if currently in the process of doing these.
    2404              : 
    2405              :             // Upload an index from the parent: this is partly to provide freshness for the
    2406              :             // child tenants that will copy it, and partly for general ease-of-debugging: there will
    2407              :             // always be a parent shard index in the same generation as we wrote the child shard index.
    2408            5 :             tl_client.schedule_index_upload_for_file_changes()?;
    2409            5 :             tl_client.wait_completion().await?;
    2410              : 
    2411              :             // Shut down the timeline's remote client: this means that the indices we write
    2412              :             // for child shards will not be invalidated by the parent shard deleting layers.
    2413            5 :             tl_client.shutdown().await?;
    2414              : 
    2415              :             // Download methods can still be used after shutdown, as they don't flow through the remote client's
    2416              :             // queue.  In principal the RemoteTimelineClient could provide this without downloading it, but this
    2417              :             // operation is rare, so it's simpler to just download it (and robustly guarantees that the index
    2418              :             // we use here really is the remotely persistent one).
    2419            5 :             let result = tl_client
    2420            5 :                 .download_index_file(&self.cancel)
    2421            5 :                 .instrument(info_span!("download_index_file", tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug(), timeline_id=%timeline.timeline_id))
    2422            7 :                 .await?;
    2423            5 :             let index_part = match result {
    2424              :                 MaybeDeletedIndexPart::Deleted(_) => {
    2425            0 :                     anyhow::bail!("Timeline deletion happened concurrently with split")
    2426              :                 }
    2427            5 :                 MaybeDeletedIndexPart::IndexPart(p) => p,
    2428              :             };
    2429              : 
    2430           15 :             for child_shard in child_shards {
    2431           10 :                 upload_index_part(
    2432           10 :                     remote_storage,
    2433           10 :                     child_shard,
    2434           10 :                     &timeline.timeline_id,
    2435           10 :                     self.generation,
    2436           10 :                     &index_part,
    2437           10 :                     &self.cancel,
    2438           10 :                 )
    2439           26 :                 .await?;
    2440              :             }
    2441              :         }
    2442              : 
    2443            5 :         Ok(())
    2444            5 :     }
    2445              : }
    2446              : 
    2447              : /// Given a Vec of timelines and their ancestors (timeline_id, ancestor_id),
    2448              : /// perform a topological sort, so that the parent of each timeline comes
    2449              : /// before the children.
    2450              : /// E extracts the ancestor from T
    2451              : /// This allows for T to be different. It can be TimelineMetadata, can be Timeline itself, etc.
    2452         1069 : fn tree_sort_timelines<T, E>(
    2453         1069 :     timelines: HashMap<TimelineId, T>,
    2454         1069 :     extractor: E,
    2455         1069 : ) -> anyhow::Result<Vec<(TimelineId, T)>>
    2456         1069 : where
    2457         1069 :     E: Fn(&T) -> Option<TimelineId>,
    2458         1069 : {
    2459         1069 :     let mut result = Vec::with_capacity(timelines.len());
    2460         1069 : 
    2461         1069 :     let mut now = Vec::with_capacity(timelines.len());
    2462         1069 :     // (ancestor, children)
    2463         1069 :     let mut later: HashMap<TimelineId, Vec<(TimelineId, T)>> =
    2464         1069 :         HashMap::with_capacity(timelines.len());
    2465              : 
    2466         1663 :     for (timeline_id, value) in timelines {
    2467          594 :         if let Some(ancestor_id) = extractor(&value) {
    2468           57 :             let children = later.entry(ancestor_id).or_default();
    2469           57 :             children.push((timeline_id, value));
    2470          537 :         } else {
    2471          537 :             now.push((timeline_id, value));
    2472          537 :         }
    2473              :     }
    2474              : 
    2475         1663 :     while let Some((timeline_id, metadata)) = now.pop() {
    2476          594 :         result.push((timeline_id, metadata));
    2477              :         // All children of this can be loaded now
    2478          594 :         if let Some(mut children) = later.remove(&timeline_id) {
    2479           46 :             now.append(&mut children);
    2480          548 :         }
    2481              :     }
    2482              : 
    2483              :     // All timelines should be visited now. Unless there were timelines with missing ancestors.
    2484         1069 :     if !later.is_empty() {
    2485            0 :         for (missing_id, orphan_ids) in later {
    2486            0 :             for (orphan_id, _) in orphan_ids {
    2487            0 :                 error!("could not load timeline {orphan_id} because its ancestor timeline {missing_id} could not be loaded");
    2488              :             }
    2489              :         }
    2490            0 :         bail!("could not load tenant because some timelines are missing ancestors");
    2491         1069 :     }
    2492         1069 : 
    2493         1069 :     Ok(result)
    2494         1069 : }
    2495              : 
    2496              : impl Tenant {
    2497           90 :     pub fn tenant_specific_overrides(&self) -> TenantConfOpt {
    2498           90 :         self.tenant_conf.read().unwrap().tenant_conf
    2499           90 :     }
    2500              : 
    2501           45 :     pub fn effective_config(&self) -> TenantConf {
    2502           45 :         self.tenant_specific_overrides()
    2503           45 :             .merge(self.conf.default_tenant_conf)
    2504           45 :     }
    2505              : 
    2506            5 :     pub fn get_checkpoint_distance(&self) -> u64 {
    2507            5 :         let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
    2508            5 :         tenant_conf
    2509            5 :             .checkpoint_distance
    2510            5 :             .unwrap_or(self.conf.default_tenant_conf.checkpoint_distance)
    2511            5 :     }
    2512              : 
    2513            5 :     pub fn get_checkpoint_timeout(&self) -> Duration {
    2514            5 :         let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
    2515            5 :         tenant_conf
    2516            5 :             .checkpoint_timeout
    2517            5 :             .unwrap_or(self.conf.default_tenant_conf.checkpoint_timeout)
    2518            5 :     }
    2519              : 
    2520            5 :     pub fn get_compaction_target_size(&self) -> u64 {
    2521            5 :         let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
    2522            5 :         tenant_conf
    2523            5 :             .compaction_target_size
    2524            5 :             .unwrap_or(self.conf.default_tenant_conf.compaction_target_size)
    2525            5 :     }
    2526              : 
    2527         1241 :     pub fn get_compaction_period(&self) -> Duration {
    2528         1241 :         let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
    2529         1241 :         tenant_conf
    2530         1241 :             .compaction_period
    2531         1241 :             .unwrap_or(self.conf.default_tenant_conf.compaction_period)
    2532         1241 :     }
    2533              : 
    2534            5 :     pub fn get_compaction_threshold(&self) -> usize {
    2535            5 :         let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
    2536            5 :         tenant_conf
    2537            5 :             .compaction_threshold
    2538            5 :             .unwrap_or(self.conf.default_tenant_conf.compaction_threshold)
    2539            5 :     }
    2540              : 
    2541          529 :     pub fn get_gc_horizon(&self) -> u64 {
    2542          529 :         let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
    2543          529 :         tenant_conf
    2544          529 :             .gc_horizon
    2545          529 :             .unwrap_or(self.conf.default_tenant_conf.gc_horizon)
    2546          529 :     }
    2547              : 
    2548         1097 :     pub fn get_gc_period(&self) -> Duration {
    2549         1097 :         let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
    2550         1097 :         tenant_conf
    2551         1097 :             .gc_period
    2552         1097 :             .unwrap_or(self.conf.default_tenant_conf.gc_period)
    2553         1097 :     }
    2554              : 
    2555            5 :     pub fn get_image_creation_threshold(&self) -> usize {
    2556            5 :         let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
    2557            5 :         tenant_conf
    2558            5 :             .image_creation_threshold
    2559            5 :             .unwrap_or(self.conf.default_tenant_conf.image_creation_threshold)
    2560            5 :     }
    2561              : 
    2562          442 :     pub fn get_pitr_interval(&self) -> Duration {
    2563          442 :         let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
    2564          442 :         tenant_conf
    2565          442 :             .pitr_interval
    2566          442 :             .unwrap_or(self.conf.default_tenant_conf.pitr_interval)
    2567          442 :     }
    2568              : 
    2569         9910 :     pub fn get_trace_read_requests(&self) -> bool {
    2570         9910 :         let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
    2571         9910 :         tenant_conf
    2572         9910 :             .trace_read_requests
    2573         9910 :             .unwrap_or(self.conf.default_tenant_conf.trace_read_requests)
    2574         9910 :     }
    2575              : 
    2576           35 :     pub fn get_min_resident_size_override(&self) -> Option<u64> {
    2577           35 :         let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
    2578           35 :         tenant_conf
    2579           35 :             .min_resident_size_override
    2580           35 :             .or(self.conf.default_tenant_conf.min_resident_size_override)
    2581           35 :     }
    2582              : 
    2583         1000 :     pub fn get_heatmap_period(&self) -> Option<Duration> {
    2584         1000 :         let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
    2585         1000 :         let heatmap_period = tenant_conf
    2586         1000 :             .heatmap_period
    2587         1000 :             .unwrap_or(self.conf.default_tenant_conf.heatmap_period);
    2588         1000 :         if heatmap_period.is_zero() {
    2589         1000 :             None
    2590              :         } else {
    2591            0 :             Some(heatmap_period)
    2592              :         }
    2593         1000 :     }
    2594              : 
    2595           30 :     pub fn set_new_tenant_config(&self, new_tenant_conf: TenantConfOpt) {
    2596           30 :         self.tenant_conf.write().unwrap().tenant_conf = new_tenant_conf;
    2597           30 :         // Don't hold self.timelines.lock() during the notifies.
    2598           30 :         // There's no risk of deadlock right now, but there could be if we consolidate
    2599           30 :         // mutexes in struct Timeline in the future.
    2600           30 :         let timelines = self.list_timelines();
    2601           62 :         for timeline in timelines {
    2602           32 :             timeline.tenant_conf_updated();
    2603           32 :         }
    2604           30 :     }
    2605              : 
    2606           37 :     pub(crate) fn set_new_location_config(&self, new_conf: AttachedTenantConf) {
    2607           37 :         *self.tenant_conf.write().unwrap() = new_conf;
    2608           37 :         // Don't hold self.timelines.lock() during the notifies.
    2609           37 :         // There's no risk of deadlock right now, but there could be if we consolidate
    2610           37 :         // mutexes in struct Timeline in the future.
    2611           37 :         let timelines = self.list_timelines();
    2612           67 :         for timeline in timelines {
    2613           30 :             timeline.tenant_conf_updated();
    2614           30 :         }
    2615           37 :     }
    2616              : 
    2617              :     /// Helper function to create a new Timeline struct.
    2618              :     ///
    2619              :     /// The returned Timeline is in Loading state. The caller is responsible for
    2620              :     /// initializing any on-disk state, and for inserting the Timeline to the 'timelines'
    2621              :     /// map.
    2622              :     ///
    2623              :     /// `validate_ancestor == false` is used when a timeline is created for deletion
    2624              :     /// and we might not have the ancestor present anymore which is fine for to be
    2625              :     /// deleted timelines.
    2626         1592 :     fn create_timeline_struct(
    2627         1592 :         &self,
    2628         1592 :         new_timeline_id: TimelineId,
    2629         1592 :         new_metadata: &TimelineMetadata,
    2630         1592 :         ancestor: Option<Arc<Timeline>>,
    2631         1592 :         resources: TimelineResources,
    2632         1592 :         cause: CreateTimelineCause,
    2633         1592 :     ) -> anyhow::Result<Arc<Timeline>> {
    2634         1592 :         let state = match cause {
    2635              :             CreateTimelineCause::Load => {
    2636         1580 :                 let ancestor_id = new_metadata.ancestor_timeline();
    2637         1580 :                 anyhow::ensure!(
    2638         1580 :                     ancestor_id == ancestor.as_ref().map(|t| t.timeline_id),
    2639            0 :                     "Timeline's {new_timeline_id} ancestor {ancestor_id:?} was not found"
    2640              :                 );
    2641         1580 :                 TimelineState::Loading
    2642              :             }
    2643           12 :             CreateTimelineCause::Delete => TimelineState::Stopping,
    2644              :         };
    2645              : 
    2646         1592 :         let pg_version = new_metadata.pg_version();
    2647         1592 : 
    2648         1592 :         let timeline = Timeline::new(
    2649         1592 :             self.conf,
    2650         1592 :             Arc::clone(&self.tenant_conf),
    2651         1592 :             new_metadata,
    2652         1592 :             ancestor,
    2653         1592 :             new_timeline_id,
    2654         1592 :             self.tenant_shard_id,
    2655         1592 :             self.generation,
    2656         1592 :             self.shard_identity,
    2657         1592 :             self.walredo_mgr.as_ref().map(Arc::clone),
    2658         1592 :             resources,
    2659         1592 :             pg_version,
    2660         1592 :             state,
    2661         1592 :             self.cancel.child_token(),
    2662         1592 :         );
    2663         1592 : 
    2664         1592 :         Ok(timeline)
    2665         1592 :     }
    2666              : 
    2667              :     // Allow too_many_arguments because a constructor's argument list naturally grows with the
    2668              :     // number of attributes in the struct: breaking these out into a builder wouldn't be helpful.
    2669              :     #[allow(clippy::too_many_arguments)]
    2670          970 :     fn new(
    2671          970 :         state: TenantState,
    2672          970 :         conf: &'static PageServerConf,
    2673          970 :         attached_conf: AttachedTenantConf,
    2674          970 :         shard_identity: ShardIdentity,
    2675          970 :         walredo_mgr: Option<Arc<WalRedoManager>>,
    2676          970 :         tenant_shard_id: TenantShardId,
    2677          970 :         remote_storage: Option<GenericRemoteStorage>,
    2678          970 :         deletion_queue_client: DeletionQueueClient,
    2679          970 :     ) -> Tenant {
    2680          970 :         let (state, mut rx) = watch::channel(state);
    2681          970 : 
    2682          970 :         tokio::spawn(async move {
    2683          970 :             // reflect tenant state in metrics:
    2684          970 :             // - global per tenant state: TENANT_STATE_METRIC
    2685          970 :             // - "set" of broken tenants: BROKEN_TENANTS_SET
    2686          970 :             //
    2687          970 :             // set of broken tenants should not have zero counts so that it remains accessible for
    2688          970 :             // alerting.
    2689          970 : 
    2690          970 :             let tid = tenant_shard_id.to_string();
    2691          970 :             let shard_id = tenant_shard_id.shard_slug().to_string();
    2692          970 :             let set_key = &[tid.as_str(), shard_id.as_str()][..];
    2693          970 : 
    2694         2633 :             fn inspect_state(state: &TenantState) -> ([&'static str; 1], bool) {
    2695         2633 :                 ([state.into()], matches!(state, TenantState::Broken { .. }))
    2696         2633 :             }
    2697          970 : 
    2698          970 :             let mut tuple = inspect_state(&rx.borrow_and_update());
    2699          970 : 
    2700          970 :             let is_broken = tuple.1;
    2701          970 :             let mut counted_broken = if is_broken {
    2702              :                 // add the id to the set right away, there should not be any updates on the channel
    2703              :                 // after before tenant is removed, if ever
    2704            0 :                 BROKEN_TENANTS_SET.with_label_values(set_key).set(1);
    2705            0 :                 true
    2706              :             } else {
    2707          970 :                 false
    2708              :             };
    2709              : 
    2710         2633 :             loop {
    2711         2633 :                 let labels = &tuple.0;
    2712         2633 :                 let current = TENANT_STATE_METRIC.with_label_values(labels);
    2713         2633 :                 current.inc();
    2714         2633 : 
    2715         2633 :                 if rx.changed().await.is_err() {
    2716              :                     // tenant has been dropped
    2717          240 :                     current.dec();
    2718          240 :                     drop(BROKEN_TENANTS_SET.remove_label_values(set_key));
    2719          240 :                     break;
    2720         1663 :                 }
    2721         1663 : 
    2722         1663 :                 current.dec();
    2723         1663 :                 tuple = inspect_state(&rx.borrow_and_update());
    2724         1663 : 
    2725         1663 :                 let is_broken = tuple.1;
    2726         1663 :                 if is_broken && !counted_broken {
    2727           58 :                     counted_broken = true;
    2728           58 :                     // insert the tenant_id (back) into the set while avoiding needless counter
    2729           58 :                     // access
    2730           58 :                     BROKEN_TENANTS_SET.with_label_values(set_key).set(1);
    2731         1605 :                 }
    2732              :             }
    2733          970 :         });
    2734          970 : 
    2735          970 :         Tenant {
    2736          970 :             tenant_shard_id,
    2737          970 :             shard_identity,
    2738          970 :             generation: attached_conf.location.generation,
    2739          970 :             conf,
    2740          970 :             // using now here is good enough approximation to catch tenants with really long
    2741          970 :             // activation times.
    2742          970 :             constructed_at: Instant::now(),
    2743          970 :             tenant_conf: Arc::new(RwLock::new(attached_conf)),
    2744          970 :             timelines: Mutex::new(HashMap::new()),
    2745          970 :             timelines_creating: Mutex::new(HashSet::new()),
    2746          970 :             gc_cs: tokio::sync::Mutex::new(()),
    2747          970 :             walredo_mgr,
    2748          970 :             remote_storage,
    2749          970 :             deletion_queue_client,
    2750          970 :             state,
    2751          970 :             cached_logical_sizes: tokio::sync::Mutex::new(HashMap::new()),
    2752          970 :             cached_synthetic_tenant_size: Arc::new(AtomicU64::new(0)),
    2753          970 :             eviction_task_tenant_state: tokio::sync::Mutex::new(EvictionTaskTenantState::default()),
    2754          970 :             activate_now_sem: tokio::sync::Semaphore::new(0),
    2755          970 :             delete_progress: Arc::new(tokio::sync::Mutex::new(DeleteTenantFlow::default())),
    2756          970 :             cancel: CancellationToken::default(),
    2757          970 :             gate: Gate::default(),
    2758          970 :         }
    2759          970 :     }
    2760              : 
    2761              :     /// Locate and load config
    2762          244 :     pub(super) fn load_tenant_config(
    2763          244 :         conf: &'static PageServerConf,
    2764          244 :         tenant_shard_id: &TenantShardId,
    2765          244 :     ) -> anyhow::Result<LocationConf> {
    2766          244 :         let legacy_config_path = conf.tenant_config_path(tenant_shard_id);
    2767          244 :         let config_path = conf.tenant_location_config_path(tenant_shard_id);
    2768          244 : 
    2769          244 :         if config_path.exists() {
    2770              :             // New-style config takes precedence
    2771          240 :             let deserialized = Self::read_config(&config_path)?;
    2772          240 :             Ok(toml_edit::de::from_document::<LocationConf>(deserialized)?)
    2773            4 :         } else if legacy_config_path.exists() {
    2774              :             // Upgrade path: found an old-style configuration only
    2775            0 :             let deserialized = Self::read_config(&legacy_config_path)?;
    2776              : 
    2777            0 :             let mut tenant_conf = TenantConfOpt::default();
    2778            0 :             for (key, item) in deserialized.iter() {
    2779            0 :                 match key {
    2780            0 :                     "tenant_config" => {
    2781            0 :                         tenant_conf = TenantConfOpt::try_from(item.to_owned()).context(format!("Failed to parse config from file '{legacy_config_path}' as pageserver config"))?;
    2782              :                     }
    2783            0 :                     _ => bail!(
    2784            0 :                         "config file {legacy_config_path} has unrecognized pageserver option '{key}'"
    2785            0 :                     ),
    2786              :                 }
    2787              :             }
    2788              : 
    2789              :             // Legacy configs are implicitly in attached state, and do not support sharding
    2790            0 :             Ok(LocationConf::attached_single(
    2791            0 :                 tenant_conf,
    2792            0 :                 Generation::none(),
    2793            0 :                 &models::ShardParameters::default(),
    2794            0 :             ))
    2795              :         } else {
    2796              :             // FIXME If the config file is not found, assume that we're attaching
    2797              :             // a detached tenant and config is passed via attach command.
    2798              :             // https://github.com/neondatabase/neon/issues/1555
    2799              :             // OR: we're loading after incomplete deletion that managed to remove config.
    2800            4 :             info!(
    2801            4 :                 "tenant config not found in {} or {}",
    2802            4 :                 config_path, legacy_config_path
    2803            4 :             );
    2804            4 :             Ok(LocationConf::default())
    2805              :         }
    2806          244 :     }
    2807              : 
    2808          240 :     fn read_config(path: &Utf8Path) -> anyhow::Result<toml_edit::Document> {
    2809          240 :         info!("loading tenant configuration from {path}");
    2810              : 
    2811              :         // load and parse file
    2812          240 :         let config = fs::read_to_string(path)
    2813          240 :             .with_context(|| format!("Failed to load config from path '{path}'"))?;
    2814              : 
    2815          240 :         config
    2816          240 :             .parse::<toml_edit::Document>()
    2817          240 :             .with_context(|| format!("Failed to parse config from file '{path}' as toml file"))
    2818          240 :     }
    2819              : 
    2820         1968 :     #[tracing::instrument(skip_all, fields(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug()))]
    2821              :     pub(super) async fn persist_tenant_config(
    2822              :         conf: &'static PageServerConf,
    2823              :         tenant_shard_id: &TenantShardId,
    2824              :         location_conf: &LocationConf,
    2825              :     ) -> anyhow::Result<()> {
    2826              :         let legacy_config_path = conf.tenant_config_path(tenant_shard_id);
    2827              :         let config_path = conf.tenant_location_config_path(tenant_shard_id);
    2828              : 
    2829              :         Self::persist_tenant_config_at(
    2830              :             tenant_shard_id,
    2831              :             &config_path,
    2832              :             &legacy_config_path,
    2833              :             location_conf,
    2834              :         )
    2835              :         .await
    2836              :     }
    2837              : 
    2838         1968 :     #[tracing::instrument(skip_all, fields(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug()))]
    2839              :     pub(super) async fn persist_tenant_config_at(
    2840              :         tenant_shard_id: &TenantShardId,
    2841              :         config_path: &Utf8Path,
    2842              :         legacy_config_path: &Utf8Path,
    2843              :         location_conf: &LocationConf,
    2844              :     ) -> anyhow::Result<()> {
    2845              :         // Forward compat: write out an old-style configuration that old versions can read, in case we roll back
    2846              :         Self::persist_tenant_config_legacy(
    2847              :             tenant_shard_id,
    2848              :             legacy_config_path,
    2849              :             &location_conf.tenant_conf,
    2850              :         )
    2851              :         .await?;
    2852              : 
    2853              :         if let LocationMode::Attached(attach_conf) = &location_conf.mode {
    2854              :             // Once we use LocationMode, generations are mandatory.  If we aren't using generations,
    2855              :             // then drop out after writing legacy-style config.
    2856              :             if attach_conf.generation.is_none() {
    2857            0 :                 tracing::debug!("Running without generations, not writing new-style LocationConf");
    2858              :                 return Ok(());
    2859              :             }
    2860              :         }
    2861              : 
    2862            0 :         debug!("persisting tenantconf to {config_path}");
    2863              : 
    2864              :         let mut conf_content = r#"# This file contains a specific per-tenant's config.
    2865              : #  It is read in case of pageserver restart.
    2866              : "#
    2867              :         .to_string();
    2868              : 
    2869            1 :         fail::fail_point!("tenant-config-before-write", |_| {
    2870            1 :             anyhow::bail!("tenant-config-before-write");
    2871            1 :         });
    2872              : 
    2873              :         // Convert the config to a toml file.
    2874              :         conf_content += &toml_edit::ser::to_string_pretty(&location_conf)?;
    2875              : 
    2876              :         let temp_path = path_with_suffix_extension(config_path, TEMP_FILE_SUFFIX);
    2877              : 
    2878              :         let tenant_shard_id = *tenant_shard_id;
    2879              :         let config_path = config_path.to_owned();
    2880              :         let conf_content = conf_content.into_bytes();
    2881              :         VirtualFile::crashsafe_overwrite(config_path.clone(), temp_path, conf_content)
    2882              :             .await
    2883            0 :             .with_context(|| format!("write tenant {tenant_shard_id} config to {config_path}"))?;
    2884              : 
    2885              :         Ok(())
    2886              :     }
    2887              : 
    2888         1968 :     #[tracing::instrument(skip_all, fields(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug()))]
    2889              :     async fn persist_tenant_config_legacy(
    2890              :         tenant_shard_id: &TenantShardId,
    2891              :         target_config_path: &Utf8Path,
    2892              :         tenant_conf: &TenantConfOpt,
    2893              :     ) -> anyhow::Result<()> {
    2894            0 :         debug!("persisting tenantconf to {target_config_path}");
    2895              : 
    2896              :         let mut conf_content = r#"# This file contains a specific per-tenant's config.
    2897              : #  It is read in case of pageserver restart.
    2898              : 
    2899              : [tenant_config]
    2900              : "#
    2901              :         .to_string();
    2902              : 
    2903              :         // Convert the config to a toml file.
    2904              :         conf_content += &toml_edit::ser::to_string(&tenant_conf)?;
    2905              : 
    2906              :         let temp_path = path_with_suffix_extension(target_config_path, TEMP_FILE_SUFFIX);
    2907              : 
    2908              :         let tenant_shard_id = *tenant_shard_id;
    2909              :         let target_config_path = target_config_path.to_owned();
    2910              :         let conf_content = conf_content.into_bytes();
    2911              :         VirtualFile::crashsafe_overwrite(target_config_path.clone(), temp_path, conf_content)
    2912              :             .await
    2913            0 :             .with_context(|| {
    2914            0 :                 format!("write tenant {tenant_shard_id} config to {target_config_path}")
    2915            0 :             })?;
    2916              :         Ok(())
    2917              :     }
    2918              : 
    2919              :     //
    2920              :     // How garbage collection works:
    2921              :     //
    2922              :     //                    +--bar------------->
    2923              :     //                   /
    2924              :     //             +----+-----foo---------------->
    2925              :     //            /
    2926              :     // ----main--+-------------------------->
    2927              :     //                \
    2928              :     //                 +-----baz-------->
    2929              :     //
    2930              :     //
    2931              :     // 1. Grab 'gc_cs' mutex to prevent new timelines from being created while Timeline's
    2932              :     //    `gc_infos` are being refreshed
    2933              :     // 2. Scan collected timelines, and on each timeline, make note of the
    2934              :     //    all the points where other timelines have been branched off.
    2935              :     //    We will refrain from removing page versions at those LSNs.
    2936              :     // 3. For each timeline, scan all layer files on the timeline.
    2937              :     //    Remove all files for which a newer file exists and which
    2938              :     //    don't cover any branch point LSNs.
    2939              :     //
    2940              :     // TODO:
    2941              :     // - if a relation has a non-incremental persistent layer on a child branch, then we
    2942              :     //   don't need to keep that in the parent anymore. But currently
    2943              :     //   we do.
    2944          357 :     async fn gc_iteration_internal(
    2945          357 :         &self,
    2946          357 :         target_timeline_id: Option<TimelineId>,
    2947          357 :         horizon: u64,
    2948          357 :         pitr: Duration,
    2949          357 :         cancel: &CancellationToken,
    2950          357 :         ctx: &RequestContext,
    2951          357 :     ) -> anyhow::Result<GcResult> {
    2952          357 :         let mut totals: GcResult = Default::default();
    2953          357 :         let now = Instant::now();
    2954              : 
    2955          357 :         let gc_timelines = match self
    2956          357 :             .refresh_gc_info_internal(target_timeline_id, horizon, pitr, cancel, ctx)
    2957       275618 :             .await
    2958              :         {
    2959          355 :             Ok(result) => result,
    2960            1 :             Err(e) => {
    2961            0 :                 if let Some(PageReconstructError::Cancelled) =
    2962            1 :                     e.downcast_ref::<PageReconstructError>()
    2963              :                 {
    2964              :                     // Handle cancellation
    2965            0 :                     totals.elapsed = now.elapsed();
    2966            0 :                     return Ok(totals);
    2967              :                 } else {
    2968              :                     // Propagate other errors
    2969            1 :                     return Err(e);
    2970              :                 }
    2971              :             }
    2972              :         };
    2973              : 
    2974            3 :         failpoint_support::sleep_millis_async!("gc_iteration_internal_after_getting_gc_timelines");
    2975              : 
    2976              :         // If there is nothing to GC, we don't want any messages in the INFO log.
    2977          355 :         if !gc_timelines.is_empty() {
    2978          344 :             info!("{} timelines need GC", gc_timelines.len());
    2979              :         } else {
    2980            0 :             debug!("{} timelines need GC", gc_timelines.len());
    2981              :         }
    2982              : 
    2983              :         // Perform GC for each timeline.
    2984              :         //
    2985              :         // Note that we don't hold the `Tenant::gc_cs` lock here because we don't want to delay the
    2986              :         // branch creation task, which requires the GC lock. A GC iteration can run concurrently
    2987              :         // with branch creation.
    2988              :         //
    2989              :         // See comments in [`Tenant::branch_timeline`] for more information about why branch
    2990              :         // creation task can run concurrently with timeline's GC iteration.
    2991          831 :         for timeline in gc_timelines {
    2992          476 :             if task_mgr::is_shutdown_requested() || cancel.is_cancelled() {
    2993              :                 // We were requested to shut down. Stop and return with the progress we
    2994              :                 // made.
    2995            0 :                 break;
    2996          476 :             }
    2997          476 :             let result = timeline.gc().await?;
    2998          476 :             totals += result;
    2999              :         }
    3000              : 
    3001          355 :         totals.elapsed = now.elapsed();
    3002          355 :         Ok(totals)
    3003          356 :     }
    3004              : 
    3005              :     /// Refreshes the Timeline::gc_info for all timelines, returning the
    3006              :     /// vector of timelines which have [`Timeline::get_last_record_lsn`] past
    3007              :     /// [`Tenant::get_gc_horizon`].
    3008              :     ///
    3009              :     /// This is usually executed as part of periodic gc, but can now be triggered more often.
    3010           87 :     pub async fn refresh_gc_info(
    3011           87 :         &self,
    3012           87 :         cancel: &CancellationToken,
    3013           87 :         ctx: &RequestContext,
    3014           87 :     ) -> anyhow::Result<Vec<Arc<Timeline>>> {
    3015           87 :         // since this method can now be called at different rates than the configured gc loop, it
    3016           87 :         // might be that these configuration values get applied faster than what it was previously,
    3017           87 :         // since these were only read from the gc task.
    3018           87 :         let horizon = self.get_gc_horizon();
    3019           87 :         let pitr = self.get_pitr_interval();
    3020           87 : 
    3021           87 :         // refresh all timelines
    3022           87 :         let target_timeline_id = None;
    3023           87 : 
    3024           87 :         self.refresh_gc_info_internal(target_timeline_id, horizon, pitr, cancel, ctx)
    3025           24 :             .await
    3026           87 :     }
    3027              : 
    3028          444 :     async fn refresh_gc_info_internal(
    3029          444 :         &self,
    3030          444 :         target_timeline_id: Option<TimelineId>,
    3031          444 :         horizon: u64,
    3032          444 :         pitr: Duration,
    3033          444 :         cancel: &CancellationToken,
    3034          444 :         ctx: &RequestContext,
    3035          444 :     ) -> anyhow::Result<Vec<Arc<Timeline>>> {
    3036              :         // grab mutex to prevent new timelines from being created here.
    3037          444 :         let gc_cs = self.gc_cs.lock().await;
    3038              : 
    3039              :         // Scan all timelines. For each timeline, remember the timeline ID and
    3040              :         // the branch point where it was created.
    3041          443 :         let (all_branchpoints, timeline_ids): (BTreeSet<(TimelineId, Lsn)>, _) = {
    3042          444 :             let timelines = self.timelines.lock().unwrap();
    3043          444 :             let mut all_branchpoints = BTreeSet::new();
    3044          443 :             let timeline_ids = {
    3045          444 :                 if let Some(target_timeline_id) = target_timeline_id.as_ref() {
    3046          331 :                     if timelines.get(target_timeline_id).is_none() {
    3047            1 :                         bail!("gc target timeline does not exist")
    3048          330 :                     }
    3049          113 :                 };
    3050              : 
    3051          443 :                 timelines
    3052          443 :                     .iter()
    3053          926 :                     .map(|(timeline_id, timeline_entry)| {
    3054          486 :                         if let Some(ancestor_timeline_id) =
    3055          926 :                             &timeline_entry.get_ancestor_timeline_id()
    3056              :                         {
    3057              :                             // If target_timeline is specified, we only need to know branchpoints of its children
    3058          486 :                             if let Some(timeline_id) = target_timeline_id {
    3059          301 :                                 if ancestor_timeline_id == &timeline_id {
    3060            9 :                                     all_branchpoints.insert((
    3061            9 :                                         *ancestor_timeline_id,
    3062            9 :                                         timeline_entry.get_ancestor_lsn(),
    3063            9 :                                     ));
    3064          292 :                                 }
    3065              :                             }
    3066              :                             // Collect branchpoints for all timelines
    3067          185 :                             else {
    3068          185 :                                 all_branchpoints.insert((
    3069          185 :                                     *ancestor_timeline_id,
    3070          185 :                                     timeline_entry.get_ancestor_lsn(),
    3071          185 :                                 ));
    3072          185 :                             }
    3073          440 :                         }
    3074              : 
    3075          926 :                         *timeline_id
    3076          926 :                     })
    3077          443 :                     .collect::<Vec<_>>()
    3078          443 :             };
    3079          443 :             (all_branchpoints, timeline_ids)
    3080          443 :         };
    3081          443 : 
    3082          443 :         // Ok, we now know all the branch points.
    3083          443 :         // Update the GC information for each timeline.
    3084          443 :         let mut gc_timelines = Vec::with_capacity(timeline_ids.len());
    3085         1361 :         for timeline_id in timeline_ids {
    3086              :             // Timeline is known to be local and loaded.
    3087          919 :             let timeline = self
    3088          919 :                 .get_timeline(timeline_id, false)
    3089          919 :                 .with_context(|| format!("Timeline {timeline_id} was not found"))?;
    3090              : 
    3091              :             // If target_timeline is specified, ignore all other timelines
    3092          919 :             if let Some(target_timeline_id) = target_timeline_id {
    3093          631 :                 if timeline_id != target_timeline_id {
    3094          301 :                     continue;
    3095          330 :                 }
    3096          288 :             }
    3097              : 
    3098          618 :             if let Some(cutoff) = timeline.get_last_record_lsn().checked_sub(horizon) {
    3099          539 :                 let branchpoints: Vec<Lsn> = all_branchpoints
    3100          539 :                     .range((
    3101          539 :                         Included((timeline_id, Lsn(0))),
    3102          539 :                         Included((timeline_id, Lsn(u64::MAX))),
    3103          539 :                     ))
    3104          539 :                     .map(|&x| x.1)
    3105          539 :                     .collect();
    3106          539 :                 timeline
    3107          539 :                     .update_gc_info(branchpoints, cutoff, pitr, cancel, ctx)
    3108       275642 :                     .await?;
    3109              : 
    3110          538 :                 gc_timelines.push(timeline);
    3111           79 :             }
    3112              :         }
    3113          442 :         drop(gc_cs);
    3114          442 :         Ok(gc_timelines)
    3115          443 :     }
    3116              : 
    3117              :     /// A substitute for `branch_timeline` for use in unit tests.
    3118              :     /// The returned timeline will have state value `Active` to make various `anyhow::ensure!()`
    3119              :     /// calls pass, but, we do not actually call `.activate()` under the hood. So, none of the
    3120              :     /// timeline background tasks are launched, except the flush loop.
    3121              :     #[cfg(test)]
    3122          214 :     async fn branch_timeline_test(
    3123          214 :         &self,
    3124          214 :         src_timeline: &Arc<Timeline>,
    3125          214 :         dst_id: TimelineId,
    3126          214 :         start_lsn: Option<Lsn>,
    3127          214 :         ctx: &RequestContext,
    3128          214 :     ) -> Result<Arc<Timeline>, CreateTimelineError> {
    3129          214 :         let uninit_mark = self.create_timeline_uninit_mark(dst_id).unwrap();
    3130          214 :         let tl = self
    3131          214 :             .branch_timeline_impl(src_timeline, dst_id, start_lsn, uninit_mark, ctx)
    3132          212 :             .await?;
    3133          210 :         tl.set_state(TimelineState::Active);
    3134          210 :         Ok(tl)
    3135          214 :     }
    3136              : 
    3137              :     /// Branch an existing timeline.
    3138              :     ///
    3139              :     /// The caller is responsible for activating the returned timeline.
    3140          260 :     async fn branch_timeline(
    3141          260 :         &self,
    3142          260 :         src_timeline: &Arc<Timeline>,
    3143          260 :         dst_id: TimelineId,
    3144          260 :         start_lsn: Option<Lsn>,
    3145          260 :         timeline_uninit_mark: TimelineUninitMark<'_>,
    3146          260 :         ctx: &RequestContext,
    3147          260 :     ) -> Result<Arc<Timeline>, CreateTimelineError> {
    3148          260 :         self.branch_timeline_impl(src_timeline, dst_id, start_lsn, timeline_uninit_mark, ctx)
    3149          260 :             .await
    3150          260 :     }
    3151              : 
    3152          474 :     async fn branch_timeline_impl(
    3153          474 :         &self,
    3154          474 :         src_timeline: &Arc<Timeline>,
    3155          474 :         dst_id: TimelineId,
    3156          474 :         start_lsn: Option<Lsn>,
    3157          474 :         timeline_uninit_mark: TimelineUninitMark<'_>,
    3158          474 :         _ctx: &RequestContext,
    3159          474 :     ) -> Result<Arc<Timeline>, CreateTimelineError> {
    3160          474 :         let src_id = src_timeline.timeline_id;
    3161              : 
    3162              :         // We will validate our ancestor LSN in this function.  Acquire the GC lock so that
    3163              :         // this check cannot race with GC, and the ancestor LSN is guaranteed to remain
    3164              :         // valid while we are creating the branch.
    3165          474 :         let _gc_cs = self.gc_cs.lock().await;
    3166              : 
    3167              :         // If no start LSN is specified, we branch the new timeline from the source timeline's last record LSN
    3168          474 :         let start_lsn = start_lsn.unwrap_or_else(|| {
    3169          229 :             let lsn = src_timeline.get_last_record_lsn();
    3170          229 :             info!("branching timeline {dst_id} from timeline {src_id} at last record LSN: {lsn}");
    3171          229 :             lsn
    3172          474 :         });
    3173          474 : 
    3174          474 :         // Ensure that `start_lsn` is valid, i.e. the LSN is within the PITR
    3175          474 :         // horizon on the source timeline
    3176          474 :         //
    3177          474 :         // We check it against both the planned GC cutoff stored in 'gc_info',
    3178          474 :         // and the 'latest_gc_cutoff' of the last GC that was performed.  The
    3179          474 :         // planned GC cutoff in 'gc_info' is normally larger than
    3180          474 :         // 'latest_gc_cutoff_lsn', but beware of corner cases like if you just
    3181          474 :         // changed the GC settings for the tenant to make the PITR window
    3182          474 :         // larger, but some of the data was already removed by an earlier GC
    3183          474 :         // iteration.
    3184          474 : 
    3185          474 :         // check against last actual 'latest_gc_cutoff' first
    3186          474 :         let latest_gc_cutoff_lsn = src_timeline.get_latest_gc_cutoff_lsn();
    3187          474 :         src_timeline
    3188          474 :             .check_lsn_is_in_scope(start_lsn, &latest_gc_cutoff_lsn)
    3189          474 :             .context(format!(
    3190          474 :                 "invalid branch start lsn: less than latest GC cutoff {}",
    3191          474 :                 *latest_gc_cutoff_lsn,
    3192          474 :             ))
    3193          474 :             .map_err(CreateTimelineError::AncestorLsn)?;
    3194              : 
    3195              :         // and then the planned GC cutoff
    3196              :         {
    3197          464 :             let gc_info = src_timeline.gc_info.read().unwrap();
    3198          464 :             let cutoff = min(gc_info.pitr_cutoff, gc_info.horizon_cutoff);
    3199          464 :             if start_lsn < cutoff {
    3200            0 :                 return Err(CreateTimelineError::AncestorLsn(anyhow::anyhow!(
    3201            0 :                     "invalid branch start lsn: less than planned GC cutoff {cutoff}"
    3202            0 :                 )));
    3203          464 :             }
    3204          464 :         }
    3205          464 : 
    3206          464 :         //
    3207          464 :         // The branch point is valid, and we are still holding the 'gc_cs' lock
    3208          464 :         // so that GC cannot advance the GC cutoff until we are finished.
    3209          464 :         // Proceed with the branch creation.
    3210          464 :         //
    3211          464 : 
    3212          464 :         // Determine prev-LSN for the new timeline. We can only determine it if
    3213          464 :         // the timeline was branched at the current end of the source timeline.
    3214          464 :         let RecordLsn {
    3215          464 :             last: src_last,
    3216          464 :             prev: src_prev,
    3217          464 :         } = src_timeline.get_last_record_rlsn();
    3218          464 :         let dst_prev = if src_last == start_lsn {
    3219          446 :             Some(src_prev)
    3220              :         } else {
    3221           18 :             None
    3222              :         };
    3223              : 
    3224              :         // Create the metadata file, noting the ancestor of the new timeline.
    3225              :         // There is initially no data in it, but all the read-calls know to look
    3226              :         // into the ancestor.
    3227          464 :         let metadata = TimelineMetadata::new(
    3228          464 :             start_lsn,
    3229          464 :             dst_prev,
    3230          464 :             Some(src_id),
    3231          464 :             start_lsn,
    3232          464 :             *src_timeline.latest_gc_cutoff_lsn.read(), // FIXME: should we hold onto this guard longer?
    3233          464 :             src_timeline.initdb_lsn,
    3234          464 :             src_timeline.pg_version,
    3235          464 :         );
    3236              : 
    3237          464 :         let uninitialized_timeline = self
    3238          464 :             .prepare_new_timeline(
    3239          464 :                 dst_id,
    3240          464 :                 &metadata,
    3241          464 :                 timeline_uninit_mark,
    3242          464 :                 start_lsn + 1,
    3243          464 :                 Some(Arc::clone(src_timeline)),
    3244          464 :             )
    3245          466 :             .await?;
    3246              : 
    3247          464 :         let new_timeline = uninitialized_timeline.finish_creation()?;
    3248              : 
    3249              :         // Root timeline gets its layers during creation and uploads them along with the metadata.
    3250              :         // A branch timeline though, when created, can get no writes for some time, hence won't get any layers created.
    3251              :         // We still need to upload its metadata eagerly: if other nodes `attach` the tenant and miss this timeline, their GC
    3252              :         // could get incorrect information and remove more layers, than needed.
    3253              :         // See also https://github.com/neondatabase/neon/issues/3865
    3254          464 :         if let Some(remote_client) = new_timeline.remote_client.as_ref() {
    3255          464 :             remote_client
    3256          464 :                 .schedule_index_upload_for_metadata_update(&metadata)
    3257          464 :                 .context("branch initial metadata upload")?;
    3258            0 :         }
    3259              : 
    3260          464 :         Ok(new_timeline)
    3261          474 :     }
    3262              : 
    3263              :     /// For unit tests, make this visible so that other modules can directly create timelines
    3264              :     #[cfg(test)]
    3265            4 :     #[tracing::instrument(fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug(), %timeline_id))]
    3266              :     pub(crate) async fn bootstrap_timeline_test(
    3267              :         &self,
    3268              :         timeline_id: TimelineId,
    3269              :         pg_version: u32,
    3270              :         load_existing_initdb: Option<TimelineId>,
    3271              :         ctx: &RequestContext,
    3272              :     ) -> anyhow::Result<Arc<Timeline>> {
    3273              :         let uninit_mark = self.create_timeline_uninit_mark(timeline_id).unwrap();
    3274              :         self.bootstrap_timeline(
    3275              :             timeline_id,
    3276              :             pg_version,
    3277              :             load_existing_initdb,
    3278              :             uninit_mark,
    3279              :             ctx,
    3280              :         )
    3281              :         .await
    3282              :     }
    3283              : 
    3284          567 :     async fn upload_initdb(
    3285          567 :         &self,
    3286          567 :         timelines_path: &Utf8PathBuf,
    3287          567 :         pgdata_path: &Utf8PathBuf,
    3288          567 :         timeline_id: &TimelineId,
    3289          567 :     ) -> anyhow::Result<()> {
    3290          567 :         let Some(storage) = &self.remote_storage else {
    3291              :             // No remote storage?  No upload.
    3292            0 :             return Ok(());
    3293              :         };
    3294              : 
    3295          567 :         let temp_path = timelines_path.join(format!(
    3296          567 :             "{INITDB_PATH}.upload-{timeline_id}.{TEMP_FILE_SUFFIX}"
    3297          567 :         ));
    3298              : 
    3299          567 :         scopeguard::defer! {
    3300          567 :             if let Err(e) = fs::remove_file(&temp_path) {
    3301            0 :                 error!("Failed to remove temporary initdb archive '{temp_path}': {e}");
    3302          567 :             }
    3303              :         }
    3304              : 
    3305          567 :         let (pgdata_zstd, tar_zst_size) =
    3306      4715078 :             import_datadir::create_tar_zst(pgdata_path, &temp_path).await?;
    3307              : 
    3308          567 :         pausable_failpoint!("before-initdb-upload");
    3309              : 
    3310          567 :         backoff::retry(
    3311          634 :             || async {
    3312          634 :                 self::remote_timeline_client::upload_initdb_dir(
    3313          634 :                     storage,
    3314          634 :                     &self.tenant_shard_id.tenant_id,
    3315          634 :                     timeline_id,
    3316          634 :                     pgdata_zstd.try_clone().await?,
    3317          634 :                     tar_zst_size,
    3318          634 :                     &self.cancel,
    3319              :                 )
    3320        27392 :                 .await
    3321         1268 :             },
    3322          567 :             |_| false,
    3323          567 :             3,
    3324          567 :             u32::MAX,
    3325          567 :             "persist_initdb_tar_zst",
    3326          567 :             &self.cancel,
    3327          567 :         )
    3328        28021 :         .await
    3329          567 :         .ok_or_else(|| anyhow::anyhow!("Cancelled"))
    3330          567 :         .and_then(|x| x)
    3331          567 :     }
    3332              : 
    3333              :     /// - run initdb to init temporary instance and get bootstrap data
    3334              :     /// - after initialization completes, tar up the temp dir and upload it to S3.
    3335              :     ///
    3336              :     /// The caller is responsible for activating the returned timeline.
    3337          602 :     async fn bootstrap_timeline(
    3338          602 :         &self,
    3339          602 :         timeline_id: TimelineId,
    3340          602 :         pg_version: u32,
    3341          602 :         load_existing_initdb: Option<TimelineId>,
    3342          602 :         timeline_uninit_mark: TimelineUninitMark<'_>,
    3343          602 :         ctx: &RequestContext,
    3344          602 :     ) -> anyhow::Result<Arc<Timeline>> {
    3345          602 :         // create a `tenant/{tenant_id}/timelines/basebackup-{timeline_id}.{TEMP_FILE_SUFFIX}/`
    3346          602 :         // temporary directory for basebackup files for the given timeline.
    3347          602 : 
    3348          602 :         let timelines_path = self.conf.timelines_path(&self.tenant_shard_id);
    3349          602 :         let pgdata_path = path_with_suffix_extension(
    3350          602 :             timelines_path.join(format!("basebackup-{timeline_id}")),
    3351          602 :             TEMP_FILE_SUFFIX,
    3352          602 :         );
    3353          602 : 
    3354          602 :         // an uninit mark was placed before, nothing else can access this timeline files
    3355          602 :         // current initdb was not run yet, so remove whatever was left from the previous runs
    3356          602 :         if pgdata_path.exists() {
    3357            0 :             fs::remove_dir_all(&pgdata_path).with_context(|| {
    3358            0 :                 format!("Failed to remove already existing initdb directory: {pgdata_path}")
    3359            0 :             })?;
    3360          602 :         }
    3361              :         // this new directory is very temporary, set to remove it immediately after bootstrap, we don't need it
    3362          600 :         scopeguard::defer! {
    3363          600 :             if let Err(e) = fs::remove_dir_all(&pgdata_path) {
    3364              :                 // this is unlikely, but we will remove the directory on pageserver restart or another bootstrap call
    3365            0 :                 error!("Failed to remove temporary initdb directory '{pgdata_path}': {e}");
    3366          600 :             }
    3367              :         }
    3368          602 :         if let Some(existing_initdb_timeline_id) = load_existing_initdb {
    3369            4 :             let Some(storage) = &self.remote_storage else {
    3370            0 :                 bail!("no storage configured but load_existing_initdb set to {existing_initdb_timeline_id}");
    3371              :             };
    3372            4 :             if existing_initdb_timeline_id != timeline_id {
    3373            0 :                 let source_path = &remote_initdb_archive_path(
    3374            0 :                     &self.tenant_shard_id.tenant_id,
    3375            0 :                     &existing_initdb_timeline_id,
    3376            0 :                 );
    3377            0 :                 let dest_path =
    3378            0 :                     &remote_initdb_archive_path(&self.tenant_shard_id.tenant_id, &timeline_id);
    3379            0 :                 storage
    3380            0 :                     .copy_object(source_path, dest_path)
    3381            0 :                     .await
    3382            0 :                     .context("copy initdb tar")?;
    3383            4 :             }
    3384            4 :             let (initdb_tar_zst_path, initdb_tar_zst) =
    3385            4 :                 self::remote_timeline_client::download_initdb_tar_zst(
    3386            4 :                     self.conf,
    3387            4 :                     storage,
    3388            4 :                     &self.tenant_shard_id,
    3389            4 :                     &existing_initdb_timeline_id,
    3390            4 :                     &self.cancel,
    3391            4 :                 )
    3392         1630 :                 .await
    3393            4 :                 .context("download initdb tar")?;
    3394              : 
    3395            4 :             scopeguard::defer! {
    3396            4 :                 if let Err(e) = fs::remove_file(&initdb_tar_zst_path) {
    3397            0 :                     error!("Failed to remove temporary initdb archive '{initdb_tar_zst_path}': {e}");
    3398            4 :                 }
    3399              :             }
    3400              : 
    3401            4 :             let buf_read =
    3402            4 :                 BufReader::with_capacity(remote_timeline_client::BUFFER_SIZE, initdb_tar_zst);
    3403            4 :             import_datadir::extract_tar_zst(&pgdata_path, buf_read)
    3404        22132 :                 .await
    3405            4 :                 .context("extract initdb tar")?;
    3406              :         } else {
    3407              :             // Init temporarily repo to get bootstrap data, this creates a directory in the `pgdata_path` path
    3408         1169 :             run_initdb(self.conf, &pgdata_path, pg_version, &self.cancel).await?;
    3409              : 
    3410              :             // Upload the created data dir to S3
    3411          595 :             if self.tenant_shard_id().is_zero() {
    3412          567 :                 self.upload_initdb(&timelines_path, &pgdata_path, &timeline_id)
    3413      4743662 :                     .await?;
    3414           28 :             }
    3415              :         }
    3416          599 :         let pgdata_lsn = import_datadir::get_lsn_from_controlfile(&pgdata_path)?.align();
    3417          599 : 
    3418          599 :         // Import the contents of the data directory at the initial checkpoint
    3419          599 :         // LSN, and any WAL after that.
    3420          599 :         // Initdb lsn will be equal to last_record_lsn which will be set after import.
    3421          599 :         // Because we know it upfront avoid having an option or dummy zero value by passing it to the metadata.
    3422          599 :         let new_metadata = TimelineMetadata::new(
    3423          599 :             Lsn(0),
    3424          599 :             None,
    3425          599 :             None,
    3426          599 :             Lsn(0),
    3427          599 :             pgdata_lsn,
    3428          599 :             pgdata_lsn,
    3429          599 :             pg_version,
    3430          599 :         );
    3431          599 :         let raw_timeline = self
    3432          599 :             .prepare_new_timeline(
    3433          599 :                 timeline_id,
    3434          599 :                 &new_metadata,
    3435          599 :                 timeline_uninit_mark,
    3436          599 :                 pgdata_lsn,
    3437          599 :                 None,
    3438          599 :             )
    3439          620 :             .await?;
    3440              : 
    3441          598 :         let tenant_shard_id = raw_timeline.owning_tenant.tenant_shard_id;
    3442          598 :         let unfinished_timeline = raw_timeline.raw_timeline()?;
    3443              : 
    3444          598 :         import_datadir::import_timeline_from_postgres_datadir(
    3445          598 :             unfinished_timeline,
    3446          598 :             &pgdata_path,
    3447          598 :             pgdata_lsn,
    3448          598 :             ctx,
    3449          598 :         )
    3450      3003066 :         .await
    3451          598 :         .with_context(|| {
    3452            0 :             format!("Failed to import pgdatadir for timeline {tenant_shard_id}/{timeline_id}")
    3453          598 :         })?;
    3454              : 
    3455              :         // Flush the new layer files to disk, before we make the timeline as available to
    3456              :         // the outside world.
    3457              :         //
    3458              :         // Flush loop needs to be spawned in order to be able to flush.
    3459          598 :         unfinished_timeline.maybe_spawn_flush_loop();
    3460          598 : 
    3461          598 :         fail::fail_point!("before-checkpoint-new-timeline", |_| {
    3462            2 :             anyhow::bail!("failpoint before-checkpoint-new-timeline");
    3463          598 :         });
    3464              : 
    3465          596 :         unfinished_timeline
    3466          596 :             .freeze_and_flush()
    3467          595 :             .await
    3468          595 :             .with_context(|| {
    3469            0 :                 format!(
    3470            0 :                     "Failed to flush after pgdatadir import for timeline {tenant_shard_id}/{timeline_id}"
    3471            0 :                 )
    3472          595 :             })?;
    3473              : 
    3474              :         // All done!
    3475          595 :         let timeline = raw_timeline.finish_creation()?;
    3476              : 
    3477          594 :         Ok(timeline)
    3478          600 :     }
    3479              : 
    3480              :     /// Call this before constructing a timeline, to build its required structures
    3481         1149 :     fn build_timeline_resources(&self, timeline_id: TimelineId) -> TimelineResources {
    3482         1149 :         let remote_client = if let Some(remote_storage) = self.remote_storage.as_ref() {
    3483         1149 :             let remote_client = RemoteTimelineClient::new(
    3484         1149 :                 remote_storage.clone(),
    3485         1149 :                 self.deletion_queue_client.clone(),
    3486         1149 :                 self.conf,
    3487         1149 :                 self.tenant_shard_id,
    3488         1149 :                 timeline_id,
    3489         1149 :                 self.generation,
    3490         1149 :             );
    3491         1149 :             Some(remote_client)
    3492              :         } else {
    3493            0 :             None
    3494              :         };
    3495              : 
    3496         1149 :         TimelineResources {
    3497         1149 :             remote_client,
    3498         1149 :             deletion_queue_client: self.deletion_queue_client.clone(),
    3499         1149 :         }
    3500         1149 :     }
    3501              : 
    3502              :     /// Creates intermediate timeline structure and its files.
    3503              :     ///
    3504              :     /// An empty layer map is initialized, and new data and WAL can be imported starting
    3505              :     /// at 'disk_consistent_lsn'. After any initial data has been imported, call
    3506              :     /// `finish_creation` to insert the Timeline into the timelines map and to remove the
    3507              :     /// uninit mark file.
    3508         1149 :     async fn prepare_new_timeline<'a>(
    3509         1149 :         &'a self,
    3510         1149 :         new_timeline_id: TimelineId,
    3511         1149 :         new_metadata: &TimelineMetadata,
    3512         1149 :         uninit_mark: TimelineUninitMark<'a>,
    3513         1149 :         start_lsn: Lsn,
    3514         1149 :         ancestor: Option<Arc<Timeline>>,
    3515         1149 :     ) -> anyhow::Result<UninitializedTimeline> {
    3516         1149 :         let tenant_shard_id = self.tenant_shard_id;
    3517         1149 : 
    3518         1149 :         let resources = self.build_timeline_resources(new_timeline_id);
    3519         1149 :         if let Some(remote_client) = &resources.remote_client {
    3520         1149 :             remote_client.init_upload_queue_for_empty_remote(new_metadata)?;
    3521            0 :         }
    3522              : 
    3523         1149 :         let timeline_struct = self
    3524         1149 :             .create_timeline_struct(
    3525         1149 :                 new_timeline_id,
    3526         1149 :                 new_metadata,
    3527         1149 :                 ancestor,
    3528         1149 :                 resources,
    3529         1149 :                 CreateTimelineCause::Load,
    3530         1149 :             )
    3531         1149 :             .context("Failed to create timeline data structure")?;
    3532              : 
    3533         1149 :         timeline_struct.init_empty_layer_map(start_lsn);
    3534              : 
    3535         1149 :         if let Err(e) = self
    3536         1149 :             .create_timeline_files(&uninit_mark.timeline_path, &new_timeline_id, new_metadata)
    3537         1184 :             .await
    3538              :         {
    3539            1 :             error!("Failed to create initial files for timeline {tenant_shard_id}/{new_timeline_id}, cleaning up: {e:?}");
    3540            1 :             cleanup_timeline_directory(uninit_mark);
    3541            1 :             return Err(e);
    3542         1148 :         }
    3543              : 
    3544            0 :         debug!(
    3545            0 :             "Successfully created initial files for timeline {tenant_shard_id}/{new_timeline_id}"
    3546            0 :         );
    3547              : 
    3548         1148 :         Ok(UninitializedTimeline::new(
    3549         1148 :             self,
    3550         1148 :             new_timeline_id,
    3551         1148 :             Some((timeline_struct, uninit_mark)),
    3552         1148 :         ))
    3553         1149 :     }
    3554              : 
    3555         1149 :     async fn create_timeline_files(
    3556         1149 :         &self,
    3557         1149 :         timeline_path: &Utf8Path,
    3558         1149 :         new_timeline_id: &TimelineId,
    3559         1149 :         new_metadata: &TimelineMetadata,
    3560         1149 :     ) -> anyhow::Result<()> {
    3561         1149 :         crashsafe::create_dir(timeline_path).context("Failed to create timeline directory")?;
    3562              : 
    3563         1149 :         fail::fail_point!("after-timeline-uninit-mark-creation", |_| {
    3564            1 :             anyhow::bail!("failpoint after-timeline-uninit-mark-creation");
    3565         1149 :         });
    3566              : 
    3567         1148 :         save_metadata(
    3568         1148 :             self.conf,
    3569         1148 :             &self.tenant_shard_id,
    3570         1148 :             new_timeline_id,
    3571         1148 :             new_metadata,
    3572         1148 :         )
    3573         1184 :         .await
    3574         1148 :         .context("Failed to create timeline metadata")?;
    3575         1148 :         Ok(())
    3576         1149 :     }
    3577              : 
    3578              :     /// Attempts to create an uninit mark file for the timeline initialization.
    3579              :     /// Bails, if the timeline is already loaded into the memory (i.e. initialized before), or the uninit mark file already exists.
    3580              :     ///
    3581              :     /// This way, we need to hold the timelines lock only for small amount of time during the mark check/creation per timeline init.
    3582         1196 :     fn create_timeline_uninit_mark(
    3583         1196 :         &self,
    3584         1196 :         timeline_id: TimelineId,
    3585         1196 :     ) -> Result<TimelineUninitMark, TimelineExclusionError> {
    3586         1196 :         let tenant_shard_id = self.tenant_shard_id;
    3587         1196 : 
    3588         1196 :         let uninit_mark_path = self
    3589         1196 :             .conf
    3590         1196 :             .timeline_uninit_mark_file_path(tenant_shard_id, timeline_id);
    3591         1196 :         let timeline_path = self.conf.timeline_path(&tenant_shard_id, &timeline_id);
    3592              : 
    3593         1196 :         let uninit_mark = TimelineUninitMark::new(
    3594         1196 :             self,
    3595         1196 :             timeline_id,
    3596         1196 :             uninit_mark_path.clone(),
    3597         1196 :             timeline_path.clone(),
    3598         1196 :         )?;
    3599              : 
    3600              :         // At this stage, we have got exclusive access to in-memory state for this timeline ID
    3601              :         // for creation.
    3602              :         // A timeline directory should never exist on disk already:
    3603              :         // - a previous failed creation would have cleaned up after itself
    3604              :         // - a pageserver restart would clean up timeline directories that don't have valid remote state
    3605              :         //
    3606              :         // Therefore it is an unexpected internal error to encounter a timeline directory already existing here,
    3607              :         // this error may indicate a bug in cleanup on failed creations.
    3608         1170 :         if timeline_path.exists() {
    3609            0 :             return Err(TimelineExclusionError::Other(anyhow::anyhow!(
    3610            0 :                 "Timeline directory already exists! This is a bug."
    3611            0 :             )));
    3612         1170 :         }
    3613         1170 : 
    3614         1170 :         // Create the on-disk uninit mark _after_ the in-memory acquisition of the tenant ID: guarantees
    3615         1170 :         // that during process runtime, colliding creations will be caught in-memory without getting
    3616         1170 :         // as far as failing to write a file.
    3617         1170 :         fs::OpenOptions::new()
    3618         1170 :             .write(true)
    3619         1170 :             .create_new(true)
    3620         1170 :             .open(&uninit_mark_path)
    3621         1170 :             .context("Failed to create uninit mark file")
    3622         1170 :             .and_then(|_| {
    3623         1170 :                 crashsafe::fsync_file_and_parent(&uninit_mark_path)
    3624         1170 :                     .context("Failed to fsync uninit mark file")
    3625         1170 :             })
    3626         1170 :             .with_context(|| {
    3627            0 :                 format!("Failed to crate uninit mark for timeline {tenant_shard_id}/{timeline_id}")
    3628         1170 :             })?;
    3629              : 
    3630         1170 :         Ok(uninit_mark)
    3631         1196 :     }
    3632              : 
    3633              :     /// Gathers inputs from all of the timelines to produce a sizing model input.
    3634              :     ///
    3635              :     /// Future is cancellation safe. Only one calculation can be running at once per tenant.
    3636          160 :     #[instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug()))]
    3637              :     pub async fn gather_size_inputs(
    3638              :         &self,
    3639              :         // `max_retention_period` overrides the cutoff that is used to calculate the size
    3640              :         // (only if it is shorter than the real cutoff).
    3641              :         max_retention_period: Option<u64>,
    3642              :         cause: LogicalSizeCalculationCause,
    3643              :         cancel: &CancellationToken,
    3644              :         ctx: &RequestContext,
    3645              :     ) -> anyhow::Result<size::ModelInputs> {
    3646              :         let logical_sizes_at_once = self
    3647              :             .conf
    3648              :             .concurrent_tenant_size_logical_size_queries
    3649              :             .inner();
    3650              : 
    3651              :         // TODO: Having a single mutex block concurrent reads is not great for performance.
    3652              :         //
    3653              :         // But the only case where we need to run multiple of these at once is when we
    3654              :         // request a size for a tenant manually via API, while another background calculation
    3655              :         // is in progress (which is not a common case).
    3656              :         //
    3657              :         // See more for on the issue #2748 condenced out of the initial PR review.
    3658              :         let mut shared_cache = self.cached_logical_sizes.lock().await;
    3659              : 
    3660              :         size::gather_inputs(
    3661              :             self,
    3662              :             logical_sizes_at_once,
    3663              :             max_retention_period,
    3664              :             &mut shared_cache,
    3665              :             cause,
    3666              :             cancel,
    3667              :             ctx,
    3668              :         )
    3669              :         .await
    3670              :     }
    3671              : 
    3672              :     /// Calculate synthetic tenant size and cache the result.
    3673              :     /// This is periodically called by background worker.
    3674              :     /// result is cached in tenant struct
    3675           50 :     #[instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug()))]
    3676              :     pub async fn calculate_synthetic_size(
    3677              :         &self,
    3678              :         cause: LogicalSizeCalculationCause,
    3679              :         cancel: &CancellationToken,
    3680              :         ctx: &RequestContext,
    3681              :     ) -> anyhow::Result<u64> {
    3682              :         let inputs = self.gather_size_inputs(None, cause, cancel, ctx).await?;
    3683              : 
    3684              :         let size = inputs.calculate()?;
    3685              : 
    3686              :         self.set_cached_synthetic_size(size);
    3687              : 
    3688              :         Ok(size)
    3689              :     }
    3690              : 
    3691              :     /// Cache given synthetic size and update the metric value
    3692           24 :     pub fn set_cached_synthetic_size(&self, size: u64) {
    3693           24 :         self.cached_synthetic_tenant_size
    3694           24 :             .store(size, Ordering::Relaxed);
    3695              : 
    3696              :         // Only shard zero should be calculating synthetic sizes
    3697           24 :         debug_assert!(self.shard_identity.is_zero());
    3698              : 
    3699           24 :         TENANT_SYNTHETIC_SIZE_METRIC
    3700           24 :             .get_metric_with_label_values(&[&self.tenant_shard_id.tenant_id.to_string()])
    3701           24 :             .unwrap()
    3702           24 :             .set(size);
    3703           24 :     }
    3704              : 
    3705           24 :     pub fn cached_synthetic_size(&self) -> u64 {
    3706           24 :         self.cached_synthetic_tenant_size.load(Ordering::Relaxed)
    3707           24 :     }
    3708              : 
    3709              :     /// Flush any in-progress layers, schedule uploads, and wait for uploads to complete.
    3710              :     ///
    3711              :     /// This function can take a long time: callers should wrap it in a timeout if calling
    3712              :     /// from an external API handler.
    3713              :     ///
    3714              :     /// Cancel-safety: cancelling this function may leave I/O running, but such I/O is
    3715              :     /// still bounded by tenant/timeline shutdown.
    3716           10 :     #[tracing::instrument(skip_all)]
    3717              :     pub(crate) async fn flush_remote(&self) -> anyhow::Result<()> {
    3718              :         let timelines = self.timelines.lock().unwrap().clone();
    3719              : 
    3720            5 :         async fn flush_timeline(_gate: GateGuard, timeline: Arc<Timeline>) -> anyhow::Result<()> {
    3721            5 :             tracing::info!(timeline_id=%timeline.timeline_id, "Flushing...");
    3722            5 :             timeline.freeze_and_flush().await?;
    3723            5 :             tracing::info!(timeline_id=%timeline.timeline_id, "Waiting for uploads...");
    3724            5 :             if let Some(client) = &timeline.remote_client {
    3725            5 :                 client.wait_completion().await?;
    3726            0 :             }
    3727              : 
    3728            5 :             Ok(())
    3729            5 :         }
    3730              : 
    3731              :         // We do not use a JoinSet for these tasks, because we don't want them to be
    3732              :         // aborted when this function's future is cancelled: they should stay alive
    3733              :         // holding their GateGuard until they complete, to ensure their I/Os complete
    3734              :         // before Timeline shutdown completes.
    3735              :         let mut results = FuturesUnordered::new();
    3736              : 
    3737              :         for (_timeline_id, timeline) in timelines {
    3738              :             // Run each timeline's flush in a task holding the timeline's gate: this
    3739              :             // means that if this function's future is cancelled, the Timeline shutdown
    3740              :             // will still wait for any I/O in here to complete.
    3741              :             let gate = match timeline.gate.enter() {
    3742              :                 Ok(g) => g,
    3743              :                 Err(_) => continue,
    3744              :             };
    3745           10 :             let jh = tokio::task::spawn(async move { flush_timeline(gate, timeline).await });
    3746              :             results.push(jh);
    3747              :         }
    3748              : 
    3749              :         while let Some(r) = results.next().await {
    3750              :             if let Err(e) = r {
    3751              :                 if !e.is_cancelled() && !e.is_panic() {
    3752            0 :                     tracing::error!("unexpected join error: {e:?}");
    3753              :                 }
    3754              :             }
    3755              :         }
    3756              : 
    3757              :         // The flushes we did above were just writes, but the Tenant might have had
    3758              :         // pending deletions as well from recent compaction/gc: we want to flush those
    3759              :         // as well.  This requires flushing the global delete queue.  This is cheap
    3760              :         // because it's typically a no-op.
    3761              :         match self.deletion_queue_client.flush_execute().await {
    3762              :             Ok(_) => {}
    3763              :             Err(DeletionQueueError::ShuttingDown) => {}
    3764              :         }
    3765              : 
    3766              :         Ok(())
    3767              :     }
    3768              : 
    3769            5 :     pub(crate) fn get_tenant_conf(&self) -> TenantConfOpt {
    3770            5 :         self.tenant_conf.read().unwrap().tenant_conf
    3771            5 :     }
    3772              : }
    3773              : 
    3774            2 : fn remove_timeline_and_uninit_mark(
    3775            2 :     timeline_dir: &Utf8Path,
    3776            2 :     uninit_mark: &Utf8Path,
    3777            2 : ) -> anyhow::Result<()> {
    3778            2 :     fs::remove_dir_all(timeline_dir)
    3779            2 :         .or_else(|e| {
    3780            0 :             if e.kind() == std::io::ErrorKind::NotFound {
    3781              :                 // we can leave the uninit mark without a timeline dir,
    3782              :                 // just remove the mark then
    3783            0 :                 Ok(())
    3784              :             } else {
    3785            0 :                 Err(e)
    3786              :             }
    3787            2 :         })
    3788            2 :         .with_context(|| {
    3789            0 :             format!("Failed to remove unit marked timeline directory {timeline_dir}")
    3790            2 :         })?;
    3791            2 :     fs::remove_file(uninit_mark)
    3792            2 :         .with_context(|| format!("Failed to remove timeline uninit mark file {uninit_mark}"))?;
    3793              : 
    3794            2 :     Ok(())
    3795            2 : }
    3796              : 
    3797              : /// Create the cluster temporarily in 'initdbpath' directory inside the repository
    3798              : /// to get bootstrap data for timeline initialization.
    3799          598 : async fn run_initdb(
    3800          598 :     conf: &'static PageServerConf,
    3801          598 :     initdb_target_dir: &Utf8Path,
    3802          598 :     pg_version: u32,
    3803          598 :     cancel: &CancellationToken,
    3804          598 : ) -> Result<(), InitdbError> {
    3805          598 :     let initdb_bin_path = conf
    3806          598 :         .pg_bin_dir(pg_version)
    3807          598 :         .map_err(InitdbError::Other)?
    3808          598 :         .join("initdb");
    3809          598 :     let initdb_lib_dir = conf.pg_lib_dir(pg_version).map_err(InitdbError::Other)?;
    3810          598 :     info!(
    3811          598 :         "running {} in {}, libdir: {}",
    3812          598 :         initdb_bin_path, initdb_target_dir, initdb_lib_dir,
    3813          598 :     );
    3814              : 
    3815          598 :     let _permit = INIT_DB_SEMAPHORE.acquire().await;
    3816              : 
    3817          598 :     let initdb_command = tokio::process::Command::new(&initdb_bin_path)
    3818          598 :         .args(["-D", initdb_target_dir.as_ref()])
    3819          598 :         .args(["-U", &conf.superuser])
    3820          598 :         .args(["-E", "utf8"])
    3821          598 :         .arg("--no-instructions")
    3822          598 :         .arg("--no-sync")
    3823          598 :         .env_clear()
    3824          598 :         .env("LD_LIBRARY_PATH", &initdb_lib_dir)
    3825          598 :         .env("DYLD_LIBRARY_PATH", &initdb_lib_dir)
    3826          598 :         .stdin(std::process::Stdio::null())
    3827          598 :         // stdout invocation produces the same output every time, we don't need it
    3828          598 :         .stdout(std::process::Stdio::null())
    3829          598 :         // we would be interested in the stderr output, if there was any
    3830          598 :         .stderr(std::process::Stdio::piped())
    3831          598 :         .spawn()?;
    3832              : 
    3833              :     // Ideally we'd select here with the cancellation token, but the problem is that
    3834              :     // we can't safely terminate initdb: it launches processes of its own, and killing
    3835              :     // initdb doesn't kill them. After we return from this function, we want the target
    3836              :     // directory to be able to be cleaned up.
    3837              :     // See https://github.com/neondatabase/neon/issues/6385
    3838         1169 :     let initdb_output = initdb_command.wait_with_output().await?;
    3839          597 :     if !initdb_output.status.success() {
    3840            0 :         return Err(InitdbError::Failed(
    3841            0 :             initdb_output.status,
    3842            0 :             initdb_output.stderr,
    3843            0 :         ));
    3844          597 :     }
    3845          597 : 
    3846          597 :     // This isn't true cancellation support, see above. Still return an error to
    3847          597 :     // excercise the cancellation code path.
    3848          597 :     if cancel.is_cancelled() {
    3849            2 :         return Err(InitdbError::Cancelled);
    3850          595 :     }
    3851          595 : 
    3852          595 :     Ok(())
    3853          597 : }
    3854              : 
    3855              : impl Drop for Tenant {
    3856          318 :     fn drop(&mut self) {
    3857          318 :         remove_tenant_metrics(&self.tenant_shard_id);
    3858          318 :     }
    3859              : }
    3860              : /// Dump contents of a layer file to stdout.
    3861            0 : pub async fn dump_layerfile_from_path(
    3862            0 :     path: &Utf8Path,
    3863            0 :     verbose: bool,
    3864            0 :     ctx: &RequestContext,
    3865            0 : ) -> anyhow::Result<()> {
    3866              :     use std::os::unix::fs::FileExt;
    3867              : 
    3868              :     // All layer files start with a two-byte "magic" value, to identify the kind of
    3869              :     // file.
    3870            0 :     let file = File::open(path)?;
    3871            0 :     let mut header_buf = [0u8; 2];
    3872            0 :     file.read_exact_at(&mut header_buf, 0)?;
    3873              : 
    3874            0 :     match u16::from_be_bytes(header_buf) {
    3875              :         crate::IMAGE_FILE_MAGIC => {
    3876            0 :             ImageLayer::new_for_path(path, file)?
    3877            0 :                 .dump(verbose, ctx)
    3878            0 :                 .await?
    3879              :         }
    3880              :         crate::DELTA_FILE_MAGIC => {
    3881            0 :             DeltaLayer::new_for_path(path, file)?
    3882            0 :                 .dump(verbose, ctx)
    3883            0 :                 .await?
    3884              :         }
    3885            0 :         magic => bail!("unrecognized magic identifier: {:?}", magic),
    3886              :     }
    3887              : 
    3888            0 :     Ok(())
    3889            0 : }
    3890              : 
    3891              : #[cfg(test)]
    3892              : pub(crate) mod harness {
    3893              :     use bytes::{Bytes, BytesMut};
    3894              :     use camino::Utf8PathBuf;
    3895              :     use once_cell::sync::OnceCell;
    3896              :     use pageserver_api::models::ShardParameters;
    3897              :     use pageserver_api::shard::ShardIndex;
    3898              :     use std::fs;
    3899              :     use std::sync::Arc;
    3900              :     use utils::logging;
    3901              :     use utils::lsn::Lsn;
    3902              : 
    3903              :     use crate::deletion_queue::mock::MockDeletionQueue;
    3904              :     use crate::walredo::apply_neon;
    3905              :     use crate::{
    3906              :         config::PageServerConf, repository::Key, tenant::Tenant, walrecord::NeonWalRecord,
    3907              :     };
    3908              : 
    3909              :     use super::*;
    3910              :     use crate::tenant::config::{TenantConf, TenantConfOpt};
    3911              :     use hex_literal::hex;
    3912              :     use utils::id::{TenantId, TimelineId};
    3913              : 
    3914              :     pub const TIMELINE_ID: TimelineId =
    3915              :         TimelineId::from_array(hex!("11223344556677881122334455667788"));
    3916              :     pub const NEW_TIMELINE_ID: TimelineId =
    3917              :         TimelineId::from_array(hex!("AA223344556677881122334455667788"));
    3918              : 
    3919              :     /// Convenience function to create a page image with given string as the only content
    3920              :     #[allow(non_snake_case)]
    3921      1708261 :     pub fn TEST_IMG(s: &str) -> Bytes {
    3922      1708261 :         let mut buf = BytesMut::new();
    3923      1708261 :         buf.extend_from_slice(s.as_bytes());
    3924      1708261 :         buf.resize(64, 0);
    3925      1708261 : 
    3926      1708261 :         buf.freeze()
    3927      1708261 :     }
    3928              : 
    3929              :     impl From<TenantConf> for TenantConfOpt {
    3930           86 :         fn from(tenant_conf: TenantConf) -> Self {
    3931           86 :             Self {
    3932           86 :                 checkpoint_distance: Some(tenant_conf.checkpoint_distance),
    3933           86 :                 checkpoint_timeout: Some(tenant_conf.checkpoint_timeout),
    3934           86 :                 compaction_target_size: Some(tenant_conf.compaction_target_size),
    3935           86 :                 compaction_period: Some(tenant_conf.compaction_period),
    3936           86 :                 compaction_threshold: Some(tenant_conf.compaction_threshold),
    3937           86 :                 gc_horizon: Some(tenant_conf.gc_horizon),
    3938           86 :                 gc_period: Some(tenant_conf.gc_period),
    3939           86 :                 image_creation_threshold: Some(tenant_conf.image_creation_threshold),
    3940           86 :                 pitr_interval: Some(tenant_conf.pitr_interval),
    3941           86 :                 walreceiver_connect_timeout: Some(tenant_conf.walreceiver_connect_timeout),
    3942           86 :                 lagging_wal_timeout: Some(tenant_conf.lagging_wal_timeout),
    3943           86 :                 max_lsn_wal_lag: Some(tenant_conf.max_lsn_wal_lag),
    3944           86 :                 trace_read_requests: Some(tenant_conf.trace_read_requests),
    3945           86 :                 eviction_policy: Some(tenant_conf.eviction_policy),
    3946           86 :                 min_resident_size_override: tenant_conf.min_resident_size_override,
    3947           86 :                 evictions_low_residence_duration_metric_threshold: Some(
    3948           86 :                     tenant_conf.evictions_low_residence_duration_metric_threshold,
    3949           86 :                 ),
    3950           86 :                 gc_feedback: Some(tenant_conf.gc_feedback),
    3951           86 :                 heatmap_period: Some(tenant_conf.heatmap_period),
    3952           86 :                 lazy_slru_download: Some(tenant_conf.lazy_slru_download),
    3953           86 :             }
    3954           86 :         }
    3955              :     }
    3956              : 
    3957              :     #[cfg(test)]
    3958           86 :     #[derive(Debug)]
    3959              :     enum LoadMode {
    3960              :         Local,
    3961              :         Remote,
    3962              :     }
    3963              : 
    3964              :     pub struct TenantHarness {
    3965              :         pub conf: &'static PageServerConf,
    3966              :         pub tenant_conf: TenantConf,
    3967              :         pub tenant_shard_id: TenantShardId,
    3968              :         pub generation: Generation,
    3969              :         pub shard: ShardIndex,
    3970              :         pub remote_storage: GenericRemoteStorage,
    3971              :         pub remote_fs_dir: Utf8PathBuf,
    3972              :         pub deletion_queue: MockDeletionQueue,
    3973              :     }
    3974              : 
    3975              :     static LOG_HANDLE: OnceCell<()> = OnceCell::new();
    3976              : 
    3977           90 :     pub(crate) fn setup_logging() {
    3978           90 :         LOG_HANDLE.get_or_init(|| {
    3979           90 :             logging::init(
    3980           90 :                 logging::LogFormat::Test,
    3981           90 :                 // enable it in case the tests exercise code paths that use
    3982           90 :                 // debug_assert_current_span_has_tenant_and_timeline_id
    3983           90 :                 logging::TracingErrorLayerEnablement::EnableWithRustLogFilter,
    3984           90 :                 logging::Output::Stdout,
    3985           90 :             )
    3986           90 :             .expect("Failed to init test logging")
    3987           90 :         });
    3988           90 :     }
    3989              : 
    3990              :     impl TenantHarness {
    3991           84 :         pub fn create(test_name: &'static str) -> anyhow::Result<Self> {
    3992           84 :             setup_logging();
    3993           84 : 
    3994           84 :             let repo_dir = PageServerConf::test_repo_dir(test_name);
    3995           84 :             let _ = fs::remove_dir_all(&repo_dir);
    3996           84 :             fs::create_dir_all(&repo_dir)?;
    3997              : 
    3998           84 :             let conf = PageServerConf::dummy_conf(repo_dir);
    3999           84 :             // Make a static copy of the config. This can never be free'd, but that's
    4000           84 :             // OK in a test.
    4001           84 :             let conf: &'static PageServerConf = Box::leak(Box::new(conf));
    4002           84 : 
    4003           84 :             // Disable automatic GC and compaction to make the unit tests more deterministic.
    4004           84 :             // The tests perform them manually if needed.
    4005           84 :             let tenant_conf = TenantConf {
    4006           84 :                 gc_period: Duration::ZERO,
    4007           84 :                 compaction_period: Duration::ZERO,
    4008           84 :                 ..TenantConf::default()
    4009           84 :             };
    4010           84 : 
    4011           84 :             let tenant_id = TenantId::generate();
    4012           84 :             let tenant_shard_id = TenantShardId::unsharded(tenant_id);
    4013           84 :             fs::create_dir_all(conf.tenant_path(&tenant_shard_id))?;
    4014           84 :             fs::create_dir_all(conf.timelines_path(&tenant_shard_id))?;
    4015              : 
    4016              :             use remote_storage::{RemoteStorageConfig, RemoteStorageKind};
    4017           84 :             let remote_fs_dir = conf.workdir.join("localfs");
    4018           84 :             std::fs::create_dir_all(&remote_fs_dir).unwrap();
    4019           84 :             let config = RemoteStorageConfig {
    4020           84 :                 storage: RemoteStorageKind::LocalFs(remote_fs_dir.clone()),
    4021           84 :             };
    4022           84 :             let remote_storage = GenericRemoteStorage::from_config(&config).unwrap();
    4023           84 :             let deletion_queue = MockDeletionQueue::new(Some(remote_storage.clone()));
    4024           84 : 
    4025           84 :             Ok(Self {
    4026           84 :                 conf,
    4027           84 :                 tenant_conf,
    4028           84 :                 tenant_shard_id,
    4029           84 :                 generation: Generation::new(0xdeadbeef),
    4030           84 :                 shard: ShardIndex::unsharded(),
    4031           84 :                 remote_storage,
    4032           84 :                 remote_fs_dir,
    4033           84 :                 deletion_queue,
    4034           84 :             })
    4035           84 :         }
    4036              : 
    4037           10 :         pub fn span(&self) -> tracing::Span {
    4038           10 :             info_span!("TenantHarness", tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug())
    4039           10 :         }
    4040              : 
    4041           82 :         pub(crate) async fn load(&self) -> (Arc<Tenant>, RequestContext) {
    4042           82 :             let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error);
    4043           82 :             (
    4044           82 :                 self.try_load(&ctx)
    4045          109 :                     .await
    4046           82 :                     .expect("failed to load test tenant"),
    4047           82 :                 ctx,
    4048           82 :             )
    4049           82 :         }
    4050              : 
    4051              :         /// For tests that specifically want to exercise the local load path, which does
    4052              :         /// not use remote storage.
    4053            2 :         pub(crate) async fn try_load_local(
    4054            2 :             &self,
    4055            2 :             ctx: &RequestContext,
    4056            2 :         ) -> anyhow::Result<Arc<Tenant>> {
    4057            2 :             self.do_try_load(ctx, LoadMode::Local).await
    4058            2 :         }
    4059              : 
    4060              :         /// The 'load' in this function is either a local load or a normal attachment,
    4061           84 :         pub(crate) async fn try_load(&self, ctx: &RequestContext) -> anyhow::Result<Arc<Tenant>> {
    4062              :             // If we have nothing in remote storage, must use load_local instead of attach: attach
    4063              :             // will error out if there are no timelines.
    4064              :             //
    4065              :             // See https://github.com/neondatabase/neon/issues/5456 for how we will eliminate
    4066              :             // this weird state of a Tenant which exists but doesn't have any timelines.
    4067           84 :             let mode = match self.remote_empty() {
    4068           80 :                 true => LoadMode::Local,
    4069            4 :                 false => LoadMode::Remote,
    4070              :             };
    4071              : 
    4072          111 :             self.do_try_load(ctx, mode).await
    4073           84 :         }
    4074              : 
    4075          172 :         #[instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug(), ?mode))]
    4076              :         async fn do_try_load(
    4077              :             &self,
    4078              :             ctx: &RequestContext,
    4079              :             mode: LoadMode,
    4080              :         ) -> anyhow::Result<Arc<Tenant>> {
    4081              :             let walredo_mgr = Arc::new(WalRedoManager::from(TestRedoManager));
    4082              : 
    4083              :             let tenant = Arc::new(Tenant::new(
    4084              :                 TenantState::Loading,
    4085              :                 self.conf,
    4086              :                 AttachedTenantConf::try_from(LocationConf::attached_single(
    4087              :                     TenantConfOpt::from(self.tenant_conf),
    4088              :                     self.generation,
    4089              :                     &ShardParameters::default(),
    4090              :                 ))
    4091              :                 .unwrap(),
    4092              :                 // This is a legacy/test code path: sharding isn't supported here.
    4093              :                 ShardIdentity::unsharded(),
    4094              :                 Some(walredo_mgr),
    4095              :                 self.tenant_shard_id,
    4096              :                 Some(self.remote_storage.clone()),
    4097              :                 self.deletion_queue.new_client(),
    4098              :             ));
    4099              : 
    4100              :             match mode {
    4101              :                 LoadMode::Local => {
    4102              :                     tenant.load_local(ctx).await?;
    4103              :                 }
    4104              :                 LoadMode::Remote => {
    4105              :                     let preload = tenant
    4106              :                         .preload(&self.remote_storage, CancellationToken::new())
    4107              :                         .await?;
    4108              :                     tenant.attach(Some(preload), SpawnMode::Normal, ctx).await?;
    4109              :                 }
    4110              :             }
    4111              : 
    4112              :             tenant.state.send_replace(TenantState::Active);
    4113              :             for timeline in tenant.timelines.lock().unwrap().values() {
    4114              :                 timeline.set_state(TimelineState::Active);
    4115              :             }
    4116              :             Ok(tenant)
    4117              :         }
    4118              : 
    4119           84 :         fn remote_empty(&self) -> bool {
    4120           84 :             let tenant_path = self.conf.tenant_path(&self.tenant_shard_id);
    4121           84 :             let remote_tenant_dir = self
    4122           84 :                 .remote_fs_dir
    4123           84 :                 .join(tenant_path.strip_prefix(&self.conf.workdir).unwrap());
    4124           84 :             if std::fs::metadata(&remote_tenant_dir).is_err() {
    4125           80 :                 return true;
    4126            4 :             }
    4127            4 : 
    4128            4 :             match std::fs::read_dir(remote_tenant_dir)
    4129            4 :                 .unwrap()
    4130            4 :                 .flatten()
    4131            4 :                 .next()
    4132              :             {
    4133            4 :                 Some(entry) => {
    4134            4 :                     tracing::debug!(
    4135            0 :                         "remote_empty: not empty, found file {}",
    4136            0 :                         entry.file_name().to_string_lossy(),
    4137            0 :                     );
    4138            4 :                     false
    4139              :                 }
    4140            0 :                 None => true,
    4141              :             }
    4142           84 :         }
    4143              : 
    4144            6 :         pub fn timeline_path(&self, timeline_id: &TimelineId) -> Utf8PathBuf {
    4145            6 :             self.conf.timeline_path(&self.tenant_shard_id, timeline_id)
    4146            6 :         }
    4147              :     }
    4148              : 
    4149              :     // Mock WAL redo manager that doesn't do much
    4150              :     pub(crate) struct TestRedoManager;
    4151              : 
    4152              :     impl TestRedoManager {
    4153              :         /// # Cancel-Safety
    4154              :         ///
    4155              :         /// This method is cancellation-safe.
    4156            8 :         pub async fn request_redo(
    4157            8 :             &self,
    4158            8 :             key: Key,
    4159            8 :             lsn: Lsn,
    4160            8 :             base_img: Option<(Lsn, Bytes)>,
    4161            8 :             records: Vec<(Lsn, NeonWalRecord)>,
    4162            8 :             _pg_version: u32,
    4163            8 :         ) -> anyhow::Result<Bytes> {
    4164           12 :             let records_neon = records.iter().all(|r| apply_neon::can_apply_in_neon(&r.1));
    4165            8 : 
    4166            8 :             if records_neon {
    4167              :                 // For Neon wal records, we can decode without spawning postgres, so do so.
    4168            8 :                 let base_img = base_img.expect("Neon WAL redo requires base image").1;
    4169            8 :                 let mut page = BytesMut::new();
    4170            8 :                 page.extend_from_slice(&base_img);
    4171           20 :                 for (_record_lsn, record) in records {
    4172           12 :                     apply_neon::apply_in_neon(&record, key, &mut page)?;
    4173              :                 }
    4174            8 :                 Ok(page.freeze())
    4175              :             } else {
    4176              :                 // We never spawn a postgres walredo process in unit tests: just log what we might have done.
    4177            0 :                 let s = format!(
    4178            0 :                     "redo for {} to get to {}, with {} and {} records",
    4179            0 :                     key,
    4180            0 :                     lsn,
    4181            0 :                     if base_img.is_some() {
    4182            0 :                         "base image"
    4183              :                     } else {
    4184            0 :                         "no base image"
    4185              :                     },
    4186            0 :                     records.len()
    4187            0 :                 );
    4188            0 :                 println!("{s}");
    4189            0 : 
    4190            0 :                 Ok(TEST_IMG(&s))
    4191              :             }
    4192            8 :         }
    4193              :     }
    4194              : }
    4195              : 
    4196              : #[cfg(test)]
    4197              : mod tests {
    4198              :     use super::*;
    4199              :     use crate::keyspace::KeySpaceAccum;
    4200              :     use crate::repository::{Key, Value};
    4201              :     use crate::tenant::harness::*;
    4202              :     use crate::DEFAULT_PG_VERSION;
    4203              :     use crate::METADATA_FILE_NAME;
    4204              :     use bytes::BytesMut;
    4205              :     use hex_literal::hex;
    4206              :     use once_cell::sync::Lazy;
    4207              :     use rand::{thread_rng, Rng};
    4208              :     use tokio_util::sync::CancellationToken;
    4209              : 
    4210              :     static TEST_KEY: Lazy<Key> =
    4211           18 :         Lazy::new(|| Key::from_slice(&hex!("010000000033333333444444445500000001")));
    4212              : 
    4213            2 :     #[tokio::test]
    4214            2 :     async fn test_basic() -> anyhow::Result<()> {
    4215            2 :         let (tenant, ctx) = TenantHarness::create("test_basic")?.load().await;
    4216            2 :         let tline = tenant
    4217            2 :             .create_test_timeline(TIMELINE_ID, Lsn(0x08), DEFAULT_PG_VERSION, &ctx)
    4218            7 :             .await?;
    4219            2 : 
    4220            2 :         let writer = tline.writer().await;
    4221            2 :         writer
    4222            2 :             .put(
    4223            2 :                 *TEST_KEY,
    4224            2 :                 Lsn(0x10),
    4225            2 :                 &Value::Image(TEST_IMG("foo at 0x10")),
    4226            2 :                 &ctx,
    4227            2 :             )
    4228            2 :             .await?;
    4229            2 :         writer.finish_write(Lsn(0x10));
    4230            2 :         drop(writer);
    4231            2 : 
    4232            2 :         let writer = tline.writer().await;
    4233            2 :         writer
    4234            2 :             .put(
    4235            2 :                 *TEST_KEY,
    4236            2 :                 Lsn(0x20),
    4237            2 :                 &Value::Image(TEST_IMG("foo at 0x20")),
    4238            2 :                 &ctx,
    4239            2 :             )
    4240            2 :             .await?;
    4241            2 :         writer.finish_write(Lsn(0x20));
    4242            2 :         drop(writer);
    4243            2 : 
    4244            2 :         assert_eq!(
    4245            2 :             tline.get(*TEST_KEY, Lsn(0x10), &ctx).await?,
    4246            2 :             TEST_IMG("foo at 0x10")
    4247            2 :         );
    4248            2 :         assert_eq!(
    4249            2 :             tline.get(*TEST_KEY, Lsn(0x1f), &ctx).await?,
    4250            2 :             TEST_IMG("foo at 0x10")
    4251            2 :         );
    4252            2 :         assert_eq!(
    4253            2 :             tline.get(*TEST_KEY, Lsn(0x20), &ctx).await?,
    4254            2 :             TEST_IMG("foo at 0x20")
    4255            2 :         );
    4256            2 : 
    4257            2 :         Ok(())
    4258            2 :     }
    4259              : 
    4260            2 :     #[tokio::test]
    4261            2 :     async fn no_duplicate_timelines() -> anyhow::Result<()> {
    4262            2 :         let (tenant, ctx) = TenantHarness::create("no_duplicate_timelines")?
    4263            2 :             .load()
    4264            2 :             .await;
    4265            2 :         let _ = tenant
    4266            2 :             .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
    4267            6 :             .await?;
    4268            2 : 
    4269            2 :         match tenant
    4270            2 :             .create_empty_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
    4271            2 :             .await
    4272            2 :         {
    4273            2 :             Ok(_) => panic!("duplicate timeline creation should fail"),
    4274            2 :             Err(e) => assert_eq!(e.to_string(), "Already exists".to_string()),
    4275            2 :         }
    4276            2 : 
    4277            2 :         Ok(())
    4278            2 :     }
    4279              : 
    4280              :     /// Convenience function to create a page image with given string as the only content
    4281           10 :     pub fn test_value(s: &str) -> Value {
    4282           10 :         let mut buf = BytesMut::new();
    4283           10 :         buf.extend_from_slice(s.as_bytes());
    4284           10 :         Value::Image(buf.freeze())
    4285           10 :     }
    4286              : 
    4287              :     ///
    4288              :     /// Test branch creation
    4289              :     ///
    4290            2 :     #[tokio::test]
    4291            2 :     async fn test_branch() -> anyhow::Result<()> {
    4292            2 :         use std::str::from_utf8;
    4293            2 : 
    4294            2 :         let (tenant, ctx) = TenantHarness::create("test_branch")?.load().await;
    4295            2 :         let tline = tenant
    4296            2 :             .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
    4297            7 :             .await?;
    4298            2 :         let writer = tline.writer().await;
    4299            2 : 
    4300            2 :         #[allow(non_snake_case)]
    4301            2 :         let TEST_KEY_A: Key = Key::from_hex("110000000033333333444444445500000001").unwrap();
    4302            2 :         #[allow(non_snake_case)]
    4303            2 :         let TEST_KEY_B: Key = Key::from_hex("110000000033333333444444445500000002").unwrap();
    4304            2 : 
    4305            2 :         // Insert a value on the timeline
    4306            2 :         writer
    4307            2 :             .put(TEST_KEY_A, Lsn(0x20), &test_value("foo at 0x20"), &ctx)
    4308            2 :             .await?;
    4309            2 :         writer
    4310            2 :             .put(TEST_KEY_B, Lsn(0x20), &test_value("foobar at 0x20"), &ctx)
    4311            2 :             .await?;
    4312            2 :         writer.finish_write(Lsn(0x20));
    4313            2 : 
    4314            2 :         writer
    4315            2 :             .put(TEST_KEY_A, Lsn(0x30), &test_value("foo at 0x30"), &ctx)
    4316            2 :             .await?;
    4317            2 :         writer.finish_write(Lsn(0x30));
    4318            2 :         writer
    4319            2 :             .put(TEST_KEY_A, Lsn(0x40), &test_value("foo at 0x40"), &ctx)
    4320            2 :             .await?;
    4321            2 :         writer.finish_write(Lsn(0x40));
    4322            2 : 
    4323            2 :         //assert_current_logical_size(&tline, Lsn(0x40));
    4324            2 : 
    4325            2 :         // Branch the history, modify relation differently on the new timeline
    4326            2 :         tenant
    4327            2 :             .branch_timeline_test(&tline, NEW_TIMELINE_ID, Some(Lsn(0x30)), &ctx)
    4328            2 :             .await?;
    4329            2 :         let newtline = tenant
    4330            2 :             .get_timeline(NEW_TIMELINE_ID, true)
    4331            2 :             .expect("Should have a local timeline");
    4332            2 :         let new_writer = newtline.writer().await;
    4333            2 :         new_writer
    4334            2 :             .put(TEST_KEY_A, Lsn(0x40), &test_value("bar at 0x40"), &ctx)
    4335            2 :             .await?;
    4336            2 :         new_writer.finish_write(Lsn(0x40));
    4337            2 : 
    4338            2 :         // Check page contents on both branches
    4339            2 :         assert_eq!(
    4340            2 :             from_utf8(&tline.get(TEST_KEY_A, Lsn(0x40), &ctx).await?)?,
    4341            2 :             "foo at 0x40"
    4342            2 :         );
    4343            2 :         assert_eq!(
    4344            2 :             from_utf8(&newtline.get(TEST_KEY_A, Lsn(0x40), &ctx).await?)?,
    4345            2 :             "bar at 0x40"
    4346            2 :         );
    4347            2 :         assert_eq!(
    4348            2 :             from_utf8(&newtline.get(TEST_KEY_B, Lsn(0x40), &ctx).await?)?,
    4349            2 :             "foobar at 0x20"
    4350            2 :         );
    4351            2 : 
    4352            2 :         //assert_current_logical_size(&tline, Lsn(0x40));
    4353            2 : 
    4354            2 :         Ok(())
    4355            2 :     }
    4356              : 
    4357           20 :     async fn make_some_layers(
    4358           20 :         tline: &Timeline,
    4359           20 :         start_lsn: Lsn,
    4360           20 :         ctx: &RequestContext,
    4361           20 :     ) -> anyhow::Result<()> {
    4362           20 :         let mut lsn = start_lsn;
    4363              :         #[allow(non_snake_case)]
    4364              :         {
    4365           20 :             let writer = tline.writer().await;
    4366              :             // Create a relation on the timeline
    4367           20 :             writer
    4368           20 :                 .put(
    4369           20 :                     *TEST_KEY,
    4370           20 :                     lsn,
    4371           20 :                     &Value::Image(TEST_IMG(&format!("foo at {}", lsn))),
    4372           20 :                     ctx,
    4373           20 :                 )
    4374           10 :                 .await?;
    4375           20 :             writer.finish_write(lsn);
    4376           20 :             lsn += 0x10;
    4377           20 :             writer
    4378           20 :                 .put(
    4379           20 :                     *TEST_KEY,
    4380           20 :                     lsn,
    4381           20 :                     &Value::Image(TEST_IMG(&format!("foo at {}", lsn))),
    4382           20 :                     ctx,
    4383           20 :                 )
    4384            0 :                 .await?;
    4385           20 :             writer.finish_write(lsn);
    4386           20 :             lsn += 0x10;
    4387           20 :         }
    4388           20 :         tline.freeze_and_flush().await?;
    4389              :         {
    4390           20 :             let writer = tline.writer().await;
    4391           20 :             writer
    4392           20 :                 .put(
    4393           20 :                     *TEST_KEY,
    4394           20 :                     lsn,
    4395           20 :                     &Value::Image(TEST_IMG(&format!("foo at {}", lsn))),
    4396           20 :                     ctx,
    4397           20 :                 )
    4398           10 :                 .await?;
    4399           20 :             writer.finish_write(lsn);
    4400           20 :             lsn += 0x10;
    4401           20 :             writer
    4402           20 :                 .put(
    4403           20 :                     *TEST_KEY,
    4404           20 :                     lsn,
    4405           20 :                     &Value::Image(TEST_IMG(&format!("foo at {}", lsn))),
    4406           20 :                     ctx,
    4407           20 :                 )
    4408            0 :                 .await?;
    4409           20 :             writer.finish_write(lsn);
    4410           20 :         }
    4411           20 :         tline.freeze_and_flush().await
    4412           20 :     }
    4413              : 
    4414            2 :     #[tokio::test]
    4415            2 :     async fn test_prohibit_branch_creation_on_garbage_collected_data() -> anyhow::Result<()> {
    4416            2 :         let (tenant, ctx) =
    4417            2 :             TenantHarness::create("test_prohibit_branch_creation_on_garbage_collected_data")?
    4418            2 :                 .load()
    4419            2 :                 .await;
    4420            2 :         let tline = tenant
    4421            2 :             .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
    4422            7 :             .await?;
    4423            6 :         make_some_layers(tline.as_ref(), Lsn(0x20), &ctx).await?;
    4424            2 : 
    4425            2 :         // this removes layers before lsn 40 (50 minus 10), so there are two remaining layers, image and delta for 31-50
    4426            2 :         // FIXME: this doesn't actually remove any layer currently, given how the flushing
    4427            2 :         // and compaction works. But it does set the 'cutoff' point so that the cross check
    4428            2 :         // below should fail.
    4429            2 :         tenant
    4430            2 :             .gc_iteration(
    4431            2 :                 Some(TIMELINE_ID),
    4432            2 :                 0x10,
    4433            2 :                 Duration::ZERO,
    4434            2 :                 &CancellationToken::new(),
    4435            2 :                 &ctx,
    4436            2 :             )
    4437            2 :             .await?;
    4438            2 : 
    4439            2 :         // try to branch at lsn 25, should fail because we already garbage collected the data
    4440            2 :         match tenant
    4441            2 :             .branch_timeline_test(&tline, NEW_TIMELINE_ID, Some(Lsn(0x25)), &ctx)
    4442            2 :             .await
    4443            2 :         {
    4444            2 :             Ok(_) => panic!("branching should have failed"),
    4445            2 :             Err(err) => {
    4446            2 :                 let CreateTimelineError::AncestorLsn(err) = err else {
    4447            2 :                     panic!("wrong error type")
    4448            2 :                 };
    4449            2 :                 assert!(err.to_string().contains("invalid branch start lsn"));
    4450            2 :                 assert!(err
    4451            2 :                     .source()
    4452            2 :                     .unwrap()
    4453            2 :                     .to_string()
    4454            2 :                     .contains("we might've already garbage collected needed data"))
    4455            2 :             }
    4456            2 :         }
    4457            2 : 
    4458            2 :         Ok(())
    4459            2 :     }
    4460              : 
    4461            2 :     #[tokio::test]
    4462            2 :     async fn test_prohibit_branch_creation_on_pre_initdb_lsn() -> anyhow::Result<()> {
    4463            2 :         let (tenant, ctx) =
    4464            2 :             TenantHarness::create("test_prohibit_branch_creation_on_pre_initdb_lsn")?
    4465            2 :                 .load()
    4466            2 :                 .await;
    4467            2 : 
    4468            2 :         let tline = tenant
    4469            2 :             .create_test_timeline(TIMELINE_ID, Lsn(0x50), DEFAULT_PG_VERSION, &ctx)
    4470            7 :             .await?;
    4471            2 :         // try to branch at lsn 0x25, should fail because initdb lsn is 0x50
    4472            2 :         match tenant
    4473            2 :             .branch_timeline_test(&tline, NEW_TIMELINE_ID, Some(Lsn(0x25)), &ctx)
    4474            2 :             .await
    4475            2 :         {
    4476            2 :             Ok(_) => panic!("branching should have failed"),
    4477            2 :             Err(err) => {
    4478            2 :                 let CreateTimelineError::AncestorLsn(err) = err else {
    4479            2 :                     panic!("wrong error type");
    4480            2 :                 };
    4481            2 :                 assert!(&err.to_string().contains("invalid branch start lsn"));
    4482            2 :                 assert!(&err
    4483            2 :                     .source()
    4484            2 :                     .unwrap()
    4485            2 :                     .to_string()
    4486            2 :                     .contains("is earlier than latest GC horizon"));
    4487            2 :             }
    4488            2 :         }
    4489            2 : 
    4490            2 :         Ok(())
    4491            2 :     }
    4492              : 
    4493              :     /*
    4494              :     // FIXME: This currently fails to error out. Calling GC doesn't currently
    4495              :     // remove the old value, we'd need to work a little harder
    4496              :     #[tokio::test]
    4497              :     async fn test_prohibit_get_for_garbage_collected_data() -> anyhow::Result<()> {
    4498              :         let repo =
    4499              :             RepoHarness::create("test_prohibit_get_for_garbage_collected_data")?
    4500              :             .load();
    4501              : 
    4502              :         let tline = repo.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION)?;
    4503              :         make_some_layers(tline.as_ref(), Lsn(0x20), &ctx).await?;
    4504              : 
    4505              :         repo.gc_iteration(Some(TIMELINE_ID), 0x10, Duration::ZERO)?;
    4506              :         let latest_gc_cutoff_lsn = tline.get_latest_gc_cutoff_lsn();
    4507              :         assert!(*latest_gc_cutoff_lsn > Lsn(0x25));
    4508              :         match tline.get(*TEST_KEY, Lsn(0x25)) {
    4509              :             Ok(_) => panic!("request for page should have failed"),
    4510              :             Err(err) => assert!(err.to_string().contains("not found at")),
    4511              :         }
    4512              :         Ok(())
    4513              :     }
    4514              :      */
    4515              : 
    4516            2 :     #[tokio::test]
    4517            2 :     async fn test_get_branchpoints_from_an_inactive_timeline() -> anyhow::Result<()> {
    4518            2 :         let (tenant, ctx) =
    4519            2 :             TenantHarness::create("test_get_branchpoints_from_an_inactive_timeline")?
    4520            2 :                 .load()
    4521            2 :                 .await;
    4522            2 :         let tline = tenant
    4523            2 :             .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
    4524            6 :             .await?;
    4525            6 :         make_some_layers(tline.as_ref(), Lsn(0x20), &ctx).await?;
    4526            2 : 
    4527            2 :         tenant
    4528            2 :             .branch_timeline_test(&tline, NEW_TIMELINE_ID, Some(Lsn(0x40)), &ctx)
    4529            2 :             .await?;
    4530            2 :         let newtline = tenant
    4531            2 :             .get_timeline(NEW_TIMELINE_ID, true)
    4532            2 :             .expect("Should have a local timeline");
    4533            2 : 
    4534            6 :         make_some_layers(newtline.as_ref(), Lsn(0x60), &ctx).await?;
    4535            2 : 
    4536            2 :         tline.set_broken("test".to_owned());
    4537            2 : 
    4538            2 :         tenant
    4539            2 :             .gc_iteration(
    4540            2 :                 Some(TIMELINE_ID),
    4541            2 :                 0x10,
    4542            2 :                 Duration::ZERO,
    4543            2 :                 &CancellationToken::new(),
    4544            2 :                 &ctx,
    4545            2 :             )
    4546            2 :             .await?;
    4547            2 : 
    4548            2 :         // The branchpoints should contain all timelines, even ones marked
    4549            2 :         // as Broken.
    4550            2 :         {
    4551            2 :             let branchpoints = &tline.gc_info.read().unwrap().retain_lsns;
    4552            2 :             assert_eq!(branchpoints.len(), 1);
    4553            2 :             assert_eq!(branchpoints[0], Lsn(0x40));
    4554            2 :         }
    4555            2 : 
    4556            2 :         // You can read the key from the child branch even though the parent is
    4557            2 :         // Broken, as long as you don't need to access data from the parent.
    4558            2 :         assert_eq!(
    4559            4 :             newtline.get(*TEST_KEY, Lsn(0x70), &ctx).await?,
    4560            2 :             TEST_IMG(&format!("foo at {}", Lsn(0x70)))
    4561            2 :         );
    4562            2 : 
    4563            2 :         // This needs to traverse to the parent, and fails.
    4564            2 :         let err = newtline.get(*TEST_KEY, Lsn(0x50), &ctx).await.unwrap_err();
    4565            2 :         assert!(err
    4566            2 :             .to_string()
    4567            2 :             .contains("will not become active. Current state: Broken"));
    4568            2 : 
    4569            2 :         Ok(())
    4570            2 :     }
    4571              : 
    4572            2 :     #[tokio::test]
    4573            2 :     async fn test_retain_data_in_parent_which_is_needed_for_child() -> anyhow::Result<()> {
    4574            2 :         let (tenant, ctx) =
    4575            2 :             TenantHarness::create("test_retain_data_in_parent_which_is_needed_for_child")?
    4576            2 :                 .load()
    4577            2 :                 .await;
    4578            2 :         let tline = tenant
    4579            2 :             .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
    4580            7 :             .await?;
    4581            6 :         make_some_layers(tline.as_ref(), Lsn(0x20), &ctx).await?;
    4582            2 : 
    4583            2 :         tenant
    4584            2 :             .branch_timeline_test(&tline, NEW_TIMELINE_ID, Some(Lsn(0x40)), &ctx)
    4585            2 :             .await?;
    4586            2 :         let newtline = tenant
    4587            2 :             .get_timeline(NEW_TIMELINE_ID, true)
    4588            2 :             .expect("Should have a local timeline");
    4589            2 :         // this removes layers before lsn 40 (50 minus 10), so there are two remaining layers, image and delta for 31-50
    4590            2 :         tenant
    4591            2 :             .gc_iteration(
    4592            2 :                 Some(TIMELINE_ID),
    4593            2 :                 0x10,
    4594            2 :                 Duration::ZERO,
    4595            2 :                 &CancellationToken::new(),
    4596            2 :                 &ctx,
    4597            2 :             )
    4598            2 :             .await?;
    4599            4 :         assert!(newtline.get(*TEST_KEY, Lsn(0x25), &ctx).await.is_ok());
    4600            2 : 
    4601            2 :         Ok(())
    4602            2 :     }
    4603            2 :     #[tokio::test]
    4604            2 :     async fn test_parent_keeps_data_forever_after_branching() -> anyhow::Result<()> {
    4605            2 :         let (tenant, ctx) =
    4606            2 :             TenantHarness::create("test_parent_keeps_data_forever_after_branching")?
    4607            2 :                 .load()
    4608            2 :                 .await;
    4609            2 :         let tline = tenant
    4610            2 :             .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
    4611            6 :             .await?;
    4612            6 :         make_some_layers(tline.as_ref(), Lsn(0x20), &ctx).await?;
    4613            2 : 
    4614            2 :         tenant
    4615            2 :             .branch_timeline_test(&tline, NEW_TIMELINE_ID, Some(Lsn(0x40)), &ctx)
    4616            2 :             .await?;
    4617            2 :         let newtline = tenant
    4618            2 :             .get_timeline(NEW_TIMELINE_ID, true)
    4619            2 :             .expect("Should have a local timeline");
    4620            2 : 
    4621            6 :         make_some_layers(newtline.as_ref(), Lsn(0x60), &ctx).await?;
    4622            2 : 
    4623            2 :         // run gc on parent
    4624            2 :         tenant
    4625            2 :             .gc_iteration(
    4626            2 :                 Some(TIMELINE_ID),
    4627            2 :                 0x10,
    4628            2 :                 Duration::ZERO,
    4629            2 :                 &CancellationToken::new(),
    4630            2 :                 &ctx,
    4631            2 :             )
    4632            2 :             .await?;
    4633            2 : 
    4634            2 :         // Check that the data is still accessible on the branch.
    4635            2 :         assert_eq!(
    4636            7 :             newtline.get(*TEST_KEY, Lsn(0x50), &ctx).await?,
    4637            2 :             TEST_IMG(&format!("foo at {}", Lsn(0x40)))
    4638            2 :         );
    4639            2 : 
    4640            2 :         Ok(())
    4641            2 :     }
    4642              : 
    4643            2 :     #[tokio::test]
    4644            2 :     async fn timeline_load() -> anyhow::Result<()> {
    4645            2 :         const TEST_NAME: &str = "timeline_load";
    4646            2 :         let harness = TenantHarness::create(TEST_NAME)?;
    4647            2 :         {
    4648            2 :             let (tenant, ctx) = harness.load().await;
    4649            2 :             let tline = tenant
    4650            2 :                 .create_test_timeline(TIMELINE_ID, Lsn(0x7000), DEFAULT_PG_VERSION, &ctx)
    4651            7 :                 .await?;
    4652            6 :             make_some_layers(tline.as_ref(), Lsn(0x8000), &ctx).await?;
    4653            2 :             // so that all uploads finish & we can call harness.load() below again
    4654            2 :             tenant
    4655            2 :                 .shutdown(Default::default(), true)
    4656            2 :                 .instrument(harness.span())
    4657            2 :                 .await
    4658            2 :                 .ok()
    4659            2 :                 .unwrap();
    4660            2 :         }
    4661            2 : 
    4662           12 :         let (tenant, _ctx) = harness.load().await;
    4663            2 :         tenant
    4664            2 :             .get_timeline(TIMELINE_ID, true)
    4665            2 :             .expect("cannot load timeline");
    4666            2 : 
    4667            2 :         Ok(())
    4668            2 :     }
    4669              : 
    4670            2 :     #[tokio::test]
    4671            2 :     async fn timeline_load_with_ancestor() -> anyhow::Result<()> {
    4672            2 :         const TEST_NAME: &str = "timeline_load_with_ancestor";
    4673            2 :         let harness = TenantHarness::create(TEST_NAME)?;
    4674            2 :         // create two timelines
    4675            2 :         {
    4676            2 :             let (tenant, ctx) = harness.load().await;
    4677            2 :             let tline = tenant
    4678            2 :                 .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
    4679            7 :                 .await?;
    4680            2 : 
    4681            6 :             make_some_layers(tline.as_ref(), Lsn(0x20), &ctx).await?;
    4682            2 : 
    4683            2 :             let child_tline = tenant
    4684            2 :                 .branch_timeline_test(&tline, NEW_TIMELINE_ID, Some(Lsn(0x40)), &ctx)
    4685            2 :                 .await?;
    4686            2 :             child_tline.set_state(TimelineState::Active);
    4687            2 : 
    4688            2 :             let newtline = tenant
    4689            2 :                 .get_timeline(NEW_TIMELINE_ID, true)
    4690            2 :                 .expect("Should have a local timeline");
    4691            2 : 
    4692            6 :             make_some_layers(newtline.as_ref(), Lsn(0x60), &ctx).await?;
    4693            2 : 
    4694            2 :             // so that all uploads finish & we can call harness.load() below again
    4695            2 :             tenant
    4696            2 :                 .shutdown(Default::default(), true)
    4697            2 :                 .instrument(harness.span())
    4698            2 :                 .await
    4699            2 :                 .ok()
    4700            2 :                 .unwrap();
    4701            2 :         }
    4702            2 : 
    4703            2 :         // check that both of them are initially unloaded
    4704           19 :         let (tenant, _ctx) = harness.load().await;
    4705            2 : 
    4706            2 :         // check that both, child and ancestor are loaded
    4707            2 :         let _child_tline = tenant
    4708            2 :             .get_timeline(NEW_TIMELINE_ID, true)
    4709            2 :             .expect("cannot get child timeline loaded");
    4710            2 : 
    4711            2 :         let _ancestor_tline = tenant
    4712            2 :             .get_timeline(TIMELINE_ID, true)
    4713            2 :             .expect("cannot get ancestor timeline loaded");
    4714            2 : 
    4715            2 :         Ok(())
    4716            2 :     }
    4717              : 
    4718            2 :     #[tokio::test]
    4719            2 :     async fn delta_layer_dumping() -> anyhow::Result<()> {
    4720            2 :         use storage_layer::AsLayerDesc;
    4721            2 :         let (tenant, ctx) = TenantHarness::create("test_layer_dumping")?.load().await;
    4722            2 :         let tline = tenant
    4723            2 :             .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
    4724            7 :             .await?;
    4725            6 :         make_some_layers(tline.as_ref(), Lsn(0x20), &ctx).await?;
    4726            2 : 
    4727            2 :         let layer_map = tline.layers.read().await;
    4728            2 :         let level0_deltas = layer_map
    4729            2 :             .layer_map()
    4730            2 :             .get_level0_deltas()?
    4731            2 :             .into_iter()
    4732            4 :             .map(|desc| layer_map.get_from_desc(&desc))
    4733            2 :             .collect::<Vec<_>>();
    4734            2 : 
    4735            2 :         assert!(!level0_deltas.is_empty());
    4736            2 : 
    4737            6 :         for delta in level0_deltas {
    4738            2 :             // Ensure we are dumping a delta layer here
    4739            4 :             assert!(delta.layer_desc().is_delta);
    4740            8 :             delta.dump(true, &ctx).await.unwrap();
    4741            2 :         }
    4742            2 : 
    4743            2 :         Ok(())
    4744            2 :     }
    4745              : 
    4746            2 :     #[tokio::test]
    4747            2 :     async fn corrupt_local_metadata() -> anyhow::Result<()> {
    4748            2 :         const TEST_NAME: &str = "corrupt_metadata";
    4749            2 :         let harness = TenantHarness::create(TEST_NAME)?;
    4750            2 :         let (tenant, ctx) = harness.load().await;
    4751            2 : 
    4752            2 :         let tline = tenant
    4753            2 :             .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
    4754            7 :             .await?;
    4755            2 :         drop(tline);
    4756            2 :         // so that all uploads finish & we can call harness.try_load() below again
    4757            2 :         tenant
    4758            2 :             .shutdown(Default::default(), true)
    4759            2 :             .instrument(harness.span())
    4760            2 :             .await
    4761            2 :             .ok()
    4762            2 :             .unwrap();
    4763            2 :         drop(tenant);
    4764            2 : 
    4765            2 :         // Corrupt local metadata
    4766            2 :         let metadata_path = harness.timeline_path(&TIMELINE_ID).join(METADATA_FILE_NAME);
    4767            2 :         assert!(metadata_path.is_file());
    4768            2 :         let mut metadata_bytes = std::fs::read(&metadata_path)?;
    4769            2 :         assert_eq!(metadata_bytes.len(), 512);
    4770            2 :         metadata_bytes[8] ^= 1;
    4771            2 :         std::fs::write(metadata_path, metadata_bytes)?;
    4772            2 : 
    4773            2 :         let err = harness.try_load_local(&ctx).await.expect_err("should fail");
    4774            2 :         // get all the stack with all .context, not only the last one
    4775            2 :         let message = format!("{err:#}");
    4776            2 :         let expected = "failed to load metadata";
    4777            2 :         assert!(
    4778            2 :             message.contains(expected),
    4779            2 :             "message '{message}' expected to contain {expected}"
    4780            2 :         );
    4781            2 : 
    4782            2 :         let mut found_error_message = false;
    4783            2 :         let mut err_source = err.source();
    4784            2 :         while let Some(source) = err_source {
    4785            2 :             if source.to_string().contains("metadata checksum mismatch") {
    4786            2 :                 found_error_message = true;
    4787            2 :                 break;
    4788            2 :             }
    4789            0 :             err_source = source.source();
    4790            2 :         }
    4791            2 :         assert!(
    4792            2 :             found_error_message,
    4793            2 :             "didn't find the corrupted metadata error in {}",
    4794            2 :             message
    4795            2 :         );
    4796            2 : 
    4797            2 :         Ok(())
    4798            2 :     }
    4799              : 
    4800            2 :     #[tokio::test]
    4801            2 :     async fn test_images() -> anyhow::Result<()> {
    4802            2 :         let (tenant, ctx) = TenantHarness::create("test_images")?.load().await;
    4803            2 :         let tline = tenant
    4804            2 :             .create_test_timeline(TIMELINE_ID, Lsn(0x08), DEFAULT_PG_VERSION, &ctx)
    4805            7 :             .await?;
    4806            2 : 
    4807            2 :         let writer = tline.writer().await;
    4808            2 :         writer
    4809            2 :             .put(
    4810            2 :                 *TEST_KEY,
    4811            2 :                 Lsn(0x10),
    4812            2 :                 &Value::Image(TEST_IMG("foo at 0x10")),
    4813            2 :                 &ctx,
    4814            2 :             )
    4815            2 :             .await?;
    4816            2 :         writer.finish_write(Lsn(0x10));
    4817            2 :         drop(writer);
    4818            2 : 
    4819            2 :         tline.freeze_and_flush().await?;
    4820            2 :         tline
    4821            2 :             .compact(&CancellationToken::new(), EnumSet::empty(), &ctx)
    4822            2 :             .await?;
    4823            2 : 
    4824            2 :         let writer = tline.writer().await;
    4825            2 :         writer
    4826            2 :             .put(
    4827            2 :                 *TEST_KEY,
    4828            2 :                 Lsn(0x20),
    4829            2 :                 &Value::Image(TEST_IMG("foo at 0x20")),
    4830            2 :                 &ctx,
    4831            2 :             )
    4832            2 :             .await?;
    4833            2 :         writer.finish_write(Lsn(0x20));
    4834            2 :         drop(writer);
    4835            2 : 
    4836            2 :         tline.freeze_and_flush().await?;
    4837            2 :         tline
    4838            2 :             .compact(&CancellationToken::new(), EnumSet::empty(), &ctx)
    4839            2 :             .await?;
    4840            2 : 
    4841            2 :         let writer = tline.writer().await;
    4842            2 :         writer
    4843            2 :             .put(
    4844            2 :                 *TEST_KEY,
    4845            2 :                 Lsn(0x30),
    4846            2 :                 &Value::Image(TEST_IMG("foo at 0x30")),
    4847            2 :                 &ctx,
    4848            2 :             )
    4849            2 :             .await?;
    4850            2 :         writer.finish_write(Lsn(0x30));
    4851            2 :         drop(writer);
    4852            2 : 
    4853            2 :         tline.freeze_and_flush().await?;
    4854            2 :         tline
    4855            2 :             .compact(&CancellationToken::new(), EnumSet::empty(), &ctx)
    4856            2 :             .await?;
    4857            2 : 
    4858            2 :         let writer = tline.writer().await;
    4859            2 :         writer
    4860            2 :             .put(
    4861            2 :                 *TEST_KEY,
    4862            2 :                 Lsn(0x40),
    4863            2 :                 &Value::Image(TEST_IMG("foo at 0x40")),
    4864            2 :                 &ctx,
    4865            2 :             )
    4866            2 :             .await?;
    4867            2 :         writer.finish_write(Lsn(0x40));
    4868            2 :         drop(writer);
    4869            2 : 
    4870            2 :         tline.freeze_and_flush().await?;
    4871            2 :         tline
    4872            2 :             .compact(&CancellationToken::new(), EnumSet::empty(), &ctx)
    4873            2 :             .await?;
    4874            2 : 
    4875            2 :         assert_eq!(
    4876            4 :             tline.get(*TEST_KEY, Lsn(0x10), &ctx).await?,
    4877            2 :             TEST_IMG("foo at 0x10")
    4878            2 :         );
    4879            2 :         assert_eq!(
    4880            3 :             tline.get(*TEST_KEY, Lsn(0x1f), &ctx).await?,
    4881            2 :             TEST_IMG("foo at 0x10")
    4882            2 :         );
    4883            2 :         assert_eq!(
    4884            2 :             tline.get(*TEST_KEY, Lsn(0x20), &ctx).await?,
    4885            2 :             TEST_IMG("foo at 0x20")
    4886            2 :         );
    4887            2 :         assert_eq!(
    4888            4 :             tline.get(*TEST_KEY, Lsn(0x30), &ctx).await?,
    4889            2 :             TEST_IMG("foo at 0x30")
    4890            2 :         );
    4891            2 :         assert_eq!(
    4892            4 :             tline.get(*TEST_KEY, Lsn(0x40), &ctx).await?,
    4893            2 :             TEST_IMG("foo at 0x40")
    4894            2 :         );
    4895            2 : 
    4896            2 :         Ok(())
    4897            2 :     }
    4898              : 
    4899              :     //
    4900              :     // Insert 1000 key-value pairs with increasing keys, flush, compact, GC.
    4901              :     // Repeat 50 times.
    4902              :     //
    4903            2 :     #[tokio::test]
    4904            2 :     async fn test_bulk_insert() -> anyhow::Result<()> {
    4905            2 :         let harness = TenantHarness::create("test_bulk_insert")?;
    4906            2 :         let (tenant, ctx) = harness.load().await;
    4907            2 :         let tline = tenant
    4908            2 :             .create_test_timeline(TIMELINE_ID, Lsn(0x08), DEFAULT_PG_VERSION, &ctx)
    4909            6 :             .await?;
    4910            2 : 
    4911            2 :         let mut lsn = Lsn(0x10);
    4912            2 : 
    4913            2 :         let mut keyspace = KeySpaceAccum::new();
    4914            2 : 
    4915            2 :         let mut test_key = Key::from_hex("010000000033333333444444445500000000").unwrap();
    4916            2 :         let mut blknum = 0;
    4917          102 :         for _ in 0..50 {
    4918      1000100 :             for _ in 0..10000 {
    4919      1000000 :                 test_key.field6 = blknum;
    4920      1000000 :                 let writer = tline.writer().await;
    4921      1000000 :                 writer
    4922      1000000 :                     .put(
    4923      1000000 :                         test_key,
    4924      1000000 :                         lsn,
    4925      1000000 :                         &Value::Image(TEST_IMG(&format!("{} at {}", blknum, lsn))),
    4926      1000000 :                         &ctx,
    4927      1000000 :                     )
    4928        15854 :                     .await?;
    4929      1000000 :                 writer.finish_write(lsn);
    4930      1000000 :                 drop(writer);
    4931      1000000 : 
    4932      1000000 :                 keyspace.add_key(test_key);
    4933      1000000 : 
    4934      1000000 :                 lsn = Lsn(lsn.0 + 0x10);
    4935      1000000 :                 blknum += 1;
    4936            2 :             }
    4937            2 : 
    4938          100 :             let cutoff = tline.get_last_record_lsn();
    4939          100 : 
    4940          100 :             tline
    4941          100 :                 .update_gc_info(
    4942          100 :                     Vec::new(),
    4943          100 :                     cutoff,
    4944          100 :                     Duration::ZERO,
    4945          100 :                     &CancellationToken::new(),
    4946          100 :                     &ctx,
    4947          100 :                 )
    4948            2 :                 .await?;
    4949          100 :             tline.freeze_and_flush().await?;
    4950          100 :             tline
    4951          100 :                 .compact(&CancellationToken::new(), EnumSet::empty(), &ctx)
    4952        18640 :                 .await?;
    4953          100 :             tline.gc().await?;
    4954            2 :         }
    4955            2 : 
    4956            2 :         Ok(())
    4957            2 :     }
    4958              : 
    4959            2 :     #[tokio::test]
    4960            2 :     async fn test_random_updates() -> anyhow::Result<()> {
    4961            2 :         let harness = TenantHarness::create("test_random_updates")?;
    4962            2 :         let (tenant, ctx) = harness.load().await;
    4963            2 :         let tline = tenant
    4964            2 :             .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
    4965            6 :             .await?;
    4966            2 : 
    4967            2 :         const NUM_KEYS: usize = 1000;
    4968            2 : 
    4969            2 :         let mut test_key = Key::from_hex("010000000033333333444444445500000000").unwrap();
    4970            2 : 
    4971            2 :         let mut keyspace = KeySpaceAccum::new();
    4972            2 : 
    4973            2 :         // Track when each page was last modified. Used to assert that
    4974            2 :         // a read sees the latest page version.
    4975            2 :         let mut updated = [Lsn(0); NUM_KEYS];
    4976            2 : 
    4977            2 :         let mut lsn = Lsn(0x10);
    4978            2 :         #[allow(clippy::needless_range_loop)]
    4979         2002 :         for blknum in 0..NUM_KEYS {
    4980         2000 :             lsn = Lsn(lsn.0 + 0x10);
    4981         2000 :             test_key.field6 = blknum as u32;
    4982         2000 :             let writer = tline.writer().await;
    4983         2000 :             writer
    4984         2000 :                 .put(
    4985         2000 :                     test_key,
    4986         2000 :                     lsn,
    4987         2000 :                     &Value::Image(TEST_IMG(&format!("{} at {}", blknum, lsn))),
    4988         2000 :                     &ctx,
    4989         2000 :                 )
    4990           33 :                 .await?;
    4991         2000 :             writer.finish_write(lsn);
    4992         2000 :             updated[blknum] = lsn;
    4993         2000 :             drop(writer);
    4994         2000 : 
    4995         2000 :             keyspace.add_key(test_key);
    4996            2 :         }
    4997            2 : 
    4998          102 :         for _ in 0..50 {
    4999       100100 :             for _ in 0..NUM_KEYS {
    5000       100000 :                 lsn = Lsn(lsn.0 + 0x10);
    5001       100000 :                 let blknum = thread_rng().gen_range(0..NUM_KEYS);
    5002       100000 :                 test_key.field6 = blknum as u32;
    5003       100000 :                 let writer = tline.writer().await;
    5004       100000 :                 writer
    5005       100000 :                     .put(
    5006       100000 :                         test_key,
    5007       100000 :                         lsn,
    5008       100000 :                         &Value::Image(TEST_IMG(&format!("{} at {}", blknum, lsn))),
    5009       100000 :                         &ctx,
    5010       100000 :                     )
    5011         1604 :                     .await?;
    5012       100000 :                 writer.finish_write(lsn);
    5013       100000 :                 drop(writer);
    5014       100000 :                 updated[blknum] = lsn;
    5015            2 :             }
    5016            2 : 
    5017            2 :             // Read all the blocks
    5018       100000 :             for (blknum, last_lsn) in updated.iter().enumerate() {
    5019       100000 :                 test_key.field6 = blknum as u32;
    5020       100000 :                 assert_eq!(
    5021       100000 :                     tline.get(test_key, lsn, &ctx).await?,
    5022       100000 :                     TEST_IMG(&format!("{} at {}", blknum, last_lsn))
    5023            2 :                 );
    5024            2 :             }
    5025            2 : 
    5026            2 :             // Perform a cycle of flush, compact, and GC
    5027          100 :             let cutoff = tline.get_last_record_lsn();
    5028          100 :             tline
    5029          100 :                 .update_gc_info(
    5030          100 :                     Vec::new(),
    5031          100 :                     cutoff,
    5032          100 :                     Duration::ZERO,
    5033          100 :                     &CancellationToken::new(),
    5034          100 :                     &ctx,
    5035          100 :                 )
    5036            2 :                 .await?;
    5037          101 :             tline.freeze_and_flush().await?;
    5038          100 :             tline
    5039          100 :                 .compact(&CancellationToken::new(), EnumSet::empty(), &ctx)
    5040         2268 :                 .await?;
    5041          100 :             tline.gc().await?;
    5042            2 :         }
    5043            2 : 
    5044            2 :         Ok(())
    5045            2 :     }
    5046              : 
    5047            2 :     #[tokio::test]
    5048            2 :     async fn test_traverse_branches() -> anyhow::Result<()> {
    5049            2 :         let (tenant, ctx) = TenantHarness::create("test_traverse_branches")?
    5050            2 :             .load()
    5051            2 :             .await;
    5052            2 :         let mut tline = tenant
    5053            2 :             .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
    5054            6 :             .await?;
    5055            2 : 
    5056            2 :         const NUM_KEYS: usize = 1000;
    5057            2 : 
    5058            2 :         let mut test_key = Key::from_hex("010000000033333333444444445500000000").unwrap();
    5059            2 : 
    5060            2 :         let mut keyspace = KeySpaceAccum::new();
    5061            2 : 
    5062            2 :         // Track when each page was last modified. Used to assert that
    5063            2 :         // a read sees the latest page version.
    5064            2 :         let mut updated = [Lsn(0); NUM_KEYS];
    5065            2 : 
    5066            2 :         let mut lsn = Lsn(0x10);
    5067            2 :         #[allow(clippy::needless_range_loop)]
    5068         2002 :         for blknum in 0..NUM_KEYS {
    5069         2000 :             lsn = Lsn(lsn.0 + 0x10);
    5070         2000 :             test_key.field6 = blknum as u32;
    5071         2000 :             let writer = tline.writer().await;
    5072         2000 :             writer
    5073         2000 :                 .put(
    5074         2000 :                     test_key,
    5075         2000 :                     lsn,
    5076         2000 :                     &Value::Image(TEST_IMG(&format!("{} at {}", blknum, lsn))),
    5077         2000 :                     &ctx,
    5078         2000 :                 )
    5079           33 :                 .await?;
    5080         2000 :             writer.finish_write(lsn);
    5081         2000 :             updated[blknum] = lsn;
    5082         2000 :             drop(writer);
    5083         2000 : 
    5084         2000 :             keyspace.add_key(test_key);
    5085            2 :         }
    5086            2 : 
    5087          102 :         for _ in 0..50 {
    5088          100 :             let new_tline_id = TimelineId::generate();
    5089          100 :             tenant
    5090          100 :                 .branch_timeline_test(&tline, new_tline_id, Some(lsn), &ctx)
    5091          102 :                 .await?;
    5092          100 :             tline = tenant
    5093          100 :                 .get_timeline(new_tline_id, true)
    5094          100 :                 .expect("Should have the branched timeline");
    5095            2 : 
    5096       100100 :             for _ in 0..NUM_KEYS {
    5097       100000 :                 lsn = Lsn(lsn.0 + 0x10);
    5098       100000 :                 let blknum = thread_rng().gen_range(0..NUM_KEYS);
    5099       100000 :                 test_key.field6 = blknum as u32;
    5100       100000 :                 let writer = tline.writer().await;
    5101       100000 :                 writer
    5102       100000 :                     .put(
    5103       100000 :                         test_key,
    5104       100000 :                         lsn,
    5105       100000 :                         &Value::Image(TEST_IMG(&format!("{} at {}", blknum, lsn))),
    5106       100000 :                         &ctx,
    5107       100000 :                     )
    5108         1600 :                     .await?;
    5109       100000 :                 println!("updating {} at {}", blknum, lsn);
    5110       100000 :                 writer.finish_write(lsn);
    5111       100000 :                 drop(writer);
    5112       100000 :                 updated[blknum] = lsn;
    5113            2 :             }
    5114            2 : 
    5115            2 :             // Read all the blocks
    5116       100000 :             for (blknum, last_lsn) in updated.iter().enumerate() {
    5117       100000 :                 test_key.field6 = blknum as u32;
    5118       100000 :                 assert_eq!(
    5119       100000 :                     tline.get(test_key, lsn, &ctx).await?,
    5120       100000 :                     TEST_IMG(&format!("{} at {}", blknum, last_lsn))
    5121            2 :                 );
    5122            2 :             }
    5123            2 : 
    5124            2 :             // Perform a cycle of flush, compact, and GC
    5125          100 :             let cutoff = tline.get_last_record_lsn();
    5126          100 :             tline
    5127          100 :                 .update_gc_info(
    5128          100 :                     Vec::new(),
    5129          100 :                     cutoff,
    5130          100 :                     Duration::ZERO,
    5131          100 :                     &CancellationToken::new(),
    5132          100 :                     &ctx,
    5133          100 :                 )
    5134            2 :                 .await?;
    5135          104 :             tline.freeze_and_flush().await?;
    5136          100 :             tline
    5137          100 :                 .compact(&CancellationToken::new(), EnumSet::empty(), &ctx)
    5138        13114 :                 .await?;
    5139          100 :             tline.gc().await?;
    5140            2 :         }
    5141            2 : 
    5142            2 :         Ok(())
    5143            2 :     }
    5144              : 
    5145            2 :     #[tokio::test]
    5146            2 :     async fn test_traverse_ancestors() -> anyhow::Result<()> {
    5147            2 :         let (tenant, ctx) = TenantHarness::create("test_traverse_ancestors")?
    5148            2 :             .load()
    5149            2 :             .await;
    5150            2 :         let mut tline = tenant
    5151            2 :             .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
    5152            6 :             .await?;
    5153            2 : 
    5154            2 :         const NUM_KEYS: usize = 100;
    5155            2 :         const NUM_TLINES: usize = 50;
    5156            2 : 
    5157            2 :         let mut test_key = Key::from_hex("010000000033333333444444445500000000").unwrap();
    5158            2 :         // Track page mutation lsns across different timelines.
    5159            2 :         let mut updated = [[Lsn(0); NUM_KEYS]; NUM_TLINES];
    5160            2 : 
    5161            2 :         let mut lsn = Lsn(0x10);
    5162            2 : 
    5163            2 :         #[allow(clippy::needless_range_loop)]
    5164          102 :         for idx in 0..NUM_TLINES {
    5165          100 :             let new_tline_id = TimelineId::generate();
    5166          100 :             tenant
    5167          100 :                 .branch_timeline_test(&tline, new_tline_id, Some(lsn), &ctx)
    5168          100 :                 .await?;
    5169          100 :             tline = tenant
    5170          100 :                 .get_timeline(new_tline_id, true)
    5171          100 :                 .expect("Should have the branched timeline");
    5172            2 : 
    5173        10100 :             for _ in 0..NUM_KEYS {
    5174        10000 :                 lsn = Lsn(lsn.0 + 0x10);
    5175        10000 :                 let blknum = thread_rng().gen_range(0..NUM_KEYS);
    5176        10000 :                 test_key.field6 = blknum as u32;
    5177        10000 :                 let writer = tline.writer().await;
    5178        10000 :                 writer
    5179        10000 :                     .put(
    5180        10000 :                         test_key,
    5181        10000 :                         lsn,
    5182        10000 :                         &Value::Image(TEST_IMG(&format!("{} {} at {}", idx, blknum, lsn))),
    5183        10000 :                         &ctx,
    5184        10000 :                     )
    5185          150 :                     .await?;
    5186        10000 :                 println!("updating [{}][{}] at {}", idx, blknum, lsn);
    5187        10000 :                 writer.finish_write(lsn);
    5188        10000 :                 drop(writer);
    5189        10000 :                 updated[idx][blknum] = lsn;
    5190            2 :             }
    5191            2 :         }
    5192            2 : 
    5193            2 :         // Read pages from leaf timeline across all ancestors.
    5194          100 :         for (idx, lsns) in updated.iter().enumerate() {
    5195        10000 :             for (blknum, lsn) in lsns.iter().enumerate() {
    5196            2 :                 // Skip empty mutations.
    5197        10000 :                 if lsn.0 == 0 {
    5198         3639 :                     continue;
    5199         6361 :                 }
    5200         6361 :                 println!("checking [{idx}][{blknum}] at {lsn}");
    5201         6361 :                 test_key.field6 = blknum as u32;
    5202         6361 :                 assert_eq!(
    5203         6361 :                     tline.get(test_key, *lsn, &ctx).await?,
    5204         6361 :                     TEST_IMG(&format!("{idx} {blknum} at {lsn}"))
    5205            2 :                 );
    5206            2 :             }
    5207            2 :         }
    5208            2 :         Ok(())
    5209            2 :     }
    5210              : 
    5211            2 :     #[tokio::test]
    5212            2 :     async fn test_write_at_initdb_lsn_takes_optimization_code_path() -> anyhow::Result<()> {
    5213            2 :         let (tenant, ctx) = TenantHarness::create("test_empty_test_timeline_is_usable")?
    5214            2 :             .load()
    5215            2 :             .await;
    5216            2 : 
    5217            2 :         let initdb_lsn = Lsn(0x20);
    5218            2 :         let utline = tenant
    5219            2 :             .create_empty_timeline(TIMELINE_ID, initdb_lsn, DEFAULT_PG_VERSION, &ctx)
    5220            2 :             .await?;
    5221            2 :         let tline = utline.raw_timeline().unwrap();
    5222            2 : 
    5223            2 :         // Spawn flush loop now so that we can set the `expect_initdb_optimization`
    5224            2 :         tline.maybe_spawn_flush_loop();
    5225            2 : 
    5226            2 :         // Make sure the timeline has the minimum set of required keys for operation.
    5227            2 :         // The only operation you can always do on an empty timeline is to `put` new data.
    5228            2 :         // Except if you `put` at `initdb_lsn`.
    5229            2 :         // In that case, there's an optimization to directly create image layers instead of delta layers.
    5230            2 :         // It uses `repartition()`, which assumes some keys to be present.
    5231            2 :         // Let's make sure the test timeline can handle that case.
    5232            2 :         {
    5233            2 :             let mut state = tline.flush_loop_state.lock().unwrap();
    5234            2 :             assert_eq!(
    5235            2 :                 timeline::FlushLoopState::Running {
    5236            2 :                     expect_initdb_optimization: false,
    5237            2 :                     initdb_optimization_count: 0,
    5238            2 :                 },
    5239            2 :                 *state
    5240            2 :             );
    5241            2 :             *state = timeline::FlushLoopState::Running {
    5242            2 :                 expect_initdb_optimization: true,
    5243            2 :                 initdb_optimization_count: 0,
    5244            2 :             };
    5245            2 :         }
    5246            2 : 
    5247            2 :         // Make writes at the initdb_lsn. When we flush it below, it should be handled by the optimization.
    5248            2 :         // As explained above, the optimization requires some keys to be present.
    5249            2 :         // As per `create_empty_timeline` documentation, use init_empty to set them.
    5250            2 :         // This is what `create_test_timeline` does, by the way.
    5251            2 :         let mut modification = tline.begin_modification(initdb_lsn);
    5252            2 :         modification
    5253            2 :             .init_empty_test_timeline()
    5254            2 :             .context("init_empty_test_timeline")?;
    5255            2 :         modification
    5256            2 :             .commit(&ctx)
    5257            2 :             .await
    5258            2 :             .context("commit init_empty_test_timeline modification")?;
    5259            2 : 
    5260            2 :         // Do the flush. The flush code will check the expectations that we set above.
    5261            2 :         tline.freeze_and_flush().await?;
    5262            2 : 
    5263            2 :         // assert freeze_and_flush exercised the initdb optimization
    5264            2 :         {
    5265            2 :             let state = tline.flush_loop_state.lock().unwrap();
    5266            2 :             let timeline::FlushLoopState::Running {
    5267            2 :                 expect_initdb_optimization,
    5268            2 :                 initdb_optimization_count,
    5269            2 :             } = *state
    5270            2 :             else {
    5271            2 :                 panic!("unexpected state: {:?}", *state);
    5272            2 :             };
    5273            2 :             assert!(expect_initdb_optimization);
    5274            2 :             assert!(initdb_optimization_count > 0);
    5275            2 :         }
    5276            2 :         Ok(())
    5277            2 :     }
    5278              : 
    5279            2 :     #[tokio::test]
    5280            2 :     async fn test_uninit_mark_crash() -> anyhow::Result<()> {
    5281            2 :         let name = "test_uninit_mark_crash";
    5282            2 :         let harness = TenantHarness::create(name)?;
    5283            2 :         {
    5284            2 :             let (tenant, ctx) = harness.load().await;
    5285            2 :             let tline = tenant
    5286            2 :                 .create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)
    5287            2 :                 .await?;
    5288            2 :             // Keeps uninit mark in place
    5289            2 :             let raw_tline = tline.raw_timeline().unwrap();
    5290            2 :             raw_tline
    5291            2 :                 .shutdown()
    5292            2 :                 .instrument(info_span!("test_shutdown", tenant_id=%raw_tline.tenant_shard_id, shard_id=%raw_tline.tenant_shard_id.shard_slug(), timeline_id=%TIMELINE_ID))
    5293            2 :                 .await;
    5294            2 :             std::mem::forget(tline);
    5295            2 :         }
    5296            2 : 
    5297            2 :         let (tenant, _) = harness.load().await;
    5298            2 :         match tenant.get_timeline(TIMELINE_ID, false) {
    5299            2 :             Ok(_) => panic!("timeline should've been removed during load"),
    5300            2 :             Err(e) => {
    5301            2 :                 assert_eq!(
    5302            2 :                     e,
    5303            2 :                     GetTimelineError::NotFound {
    5304            2 :                         tenant_id: tenant.tenant_shard_id,
    5305            2 :                         timeline_id: TIMELINE_ID,
    5306            2 :                     }
    5307            2 :                 )
    5308            2 :             }
    5309            2 :         }
    5310            2 : 
    5311            2 :         assert!(!harness
    5312            2 :             .conf
    5313            2 :             .timeline_path(&tenant.tenant_shard_id, &TIMELINE_ID)
    5314            2 :             .exists());
    5315            2 : 
    5316            2 :         assert!(!harness
    5317            2 :             .conf
    5318            2 :             .timeline_uninit_mark_file_path(tenant.tenant_shard_id, TIMELINE_ID)
    5319            2 :             .exists());
    5320            2 : 
    5321            2 :         Ok(())
    5322            2 :     }
    5323              : }
        

Generated by: LCOV version 2.1-beta