LCOV - code coverage report
Current view: top level - pageserver/src - tenant.rs (source / functions) Coverage Total Hit
Test: c639aa5f7ab62b43d647b10f40d15a15686ce8a9.info Lines: 88.1 % 3473 3061
Test Date: 2024-02-12 20:26:03 Functions: 74.2 % 395 293

            Line data    Source code
       1              : //!
       2              : //! Timeline repository implementation that keeps old data in files on disk, and
       3              : //! the recent changes in memory. See tenant/*_layer.rs files.
       4              : //! The functions here are responsible for locating the correct layer for the
       5              : //! get/put call, walking back the timeline branching history as needed.
       6              : //!
       7              : //! The files are stored in the .neon/tenants/<tenant_id>/timelines/<timeline_id>
       8              : //! directory. See docs/pageserver-storage.md for how the files are managed.
       9              : //! In addition to the layer files, there is a metadata file in the same
      10              : //! directory that contains information about the timeline, in particular its
      11              : //! parent timeline, and the last LSN that has been written to disk.
      12              : //!
      13              : 
      14              : use anyhow::{bail, Context};
      15              : use camino::Utf8Path;
      16              : use camino::Utf8PathBuf;
      17              : use enumset::EnumSet;
      18              : use futures::stream::FuturesUnordered;
      19              : use futures::FutureExt;
      20              : use futures::StreamExt;
      21              : use pageserver_api::models;
      22              : use pageserver_api::models::TimelineState;
      23              : use pageserver_api::models::WalRedoManagerStatus;
      24              : use pageserver_api::shard::ShardIdentity;
      25              : use pageserver_api::shard::TenantShardId;
      26              : use remote_storage::DownloadError;
      27              : use remote_storage::GenericRemoteStorage;
      28              : use std::fmt;
      29              : use storage_broker::BrokerClientChannel;
      30              : use tokio::io::BufReader;
      31              : use tokio::runtime::Handle;
      32              : use tokio::sync::watch;
      33              : use tokio::task::JoinSet;
      34              : use tokio_util::sync::CancellationToken;
      35              : use tracing::*;
      36              : use utils::backoff;
      37              : use utils::completion;
      38              : use utils::crashsafe::path_with_suffix_extension;
      39              : use utils::failpoint_support;
      40              : use utils::fs_ext;
      41              : use utils::sync::gate::Gate;
      42              : use utils::sync::gate::GateGuard;
      43              : use utils::timeout::timeout_cancellable;
      44              : use utils::timeout::TimeoutCancellableError;
      45              : 
      46              : use self::config::AttachedLocationConfig;
      47              : use self::config::AttachmentMode;
      48              : use self::config::LocationConf;
      49              : use self::config::TenantConf;
      50              : use self::delete::DeleteTenantFlow;
      51              : use self::metadata::LoadMetadataError;
      52              : use self::metadata::TimelineMetadata;
      53              : use self::mgr::GetActiveTenantError;
      54              : use self::mgr::GetTenantError;
      55              : use self::mgr::TenantsMap;
      56              : use self::remote_timeline_client::upload::upload_index_part;
      57              : use self::remote_timeline_client::RemoteTimelineClient;
      58              : use self::timeline::uninit::TimelineExclusionError;
      59              : use self::timeline::uninit::TimelineUninitMark;
      60              : use self::timeline::uninit::UninitializedTimeline;
      61              : use self::timeline::EvictionTaskTenantState;
      62              : use self::timeline::TimelineResources;
      63              : use self::timeline::WaitLsnError;
      64              : use crate::config::PageServerConf;
      65              : use crate::context::{DownloadBehavior, RequestContext};
      66              : use crate::deletion_queue::DeletionQueueClient;
      67              : use crate::deletion_queue::DeletionQueueError;
      68              : use crate::import_datadir;
      69              : use crate::is_uninit_mark;
      70              : use crate::metrics::TENANT;
      71              : use crate::metrics::{
      72              :     remove_tenant_metrics, BROKEN_TENANTS_SET, TENANT_STATE_METRIC, TENANT_SYNTHETIC_SIZE_METRIC,
      73              : };
      74              : use crate::repository::GcResult;
      75              : use crate::task_mgr;
      76              : use crate::task_mgr::TaskKind;
      77              : use crate::tenant::config::LocationMode;
      78              : use crate::tenant::config::TenantConfOpt;
      79              : use crate::tenant::metadata::load_metadata;
      80              : pub use crate::tenant::remote_timeline_client::index::IndexPart;
      81              : use crate::tenant::remote_timeline_client::remote_initdb_archive_path;
      82              : use crate::tenant::remote_timeline_client::MaybeDeletedIndexPart;
      83              : use crate::tenant::remote_timeline_client::INITDB_PATH;
      84              : use crate::tenant::storage_layer::DeltaLayer;
      85              : use crate::tenant::storage_layer::ImageLayer;
      86              : use crate::InitializationOrder;
      87              : use std::cmp::min;
      88              : use std::collections::hash_map::Entry;
      89              : use std::collections::BTreeSet;
      90              : use std::collections::HashMap;
      91              : use std::collections::HashSet;
      92              : use std::fmt::Debug;
      93              : use std::fmt::Display;
      94              : use std::fs;
      95              : use std::fs::File;
      96              : use std::io;
      97              : use std::ops::Bound::Included;
      98              : use std::sync::atomic::AtomicU64;
      99              : use std::sync::atomic::Ordering;
     100              : use std::sync::Arc;
     101              : use std::sync::{Mutex, RwLock};
     102              : use std::time::{Duration, Instant};
     103              : 
     104              : use crate::span;
     105              : use crate::tenant::timeline::delete::DeleteTimelineFlow;
     106              : use crate::tenant::timeline::uninit::cleanup_timeline_directory;
     107              : use crate::virtual_file::VirtualFile;
     108              : use crate::walredo::PostgresRedoManager;
     109              : use crate::TEMP_FILE_SUFFIX;
     110              : use once_cell::sync::Lazy;
     111              : pub use pageserver_api::models::TenantState;
     112              : use tokio::sync::Semaphore;
     113              : 
     114          378 : static INIT_DB_SEMAPHORE: Lazy<Semaphore> = Lazy::new(|| Semaphore::new(8));
     115              : use toml_edit;
     116              : use utils::{
     117              :     crashsafe,
     118              :     generation::Generation,
     119              :     id::TimelineId,
     120              :     lsn::{Lsn, RecordLsn},
     121              : };
     122              : 
     123              : /// Declare a failpoint that can use the `pause` failpoint action.
     124              : /// We don't want to block the executor thread, hence, spawn_blocking + await.
     125              : macro_rules! pausable_failpoint {
     126              :     ($name:literal) => {
     127              :         if cfg!(feature = "testing") {
     128              :             tokio::task::spawn_blocking({
     129              :                 let current = tracing::Span::current();
     130              :                 move || {
     131              :                     let _entered = current.entered();
     132              :                     tracing::info!("at failpoint {}", $name);
     133              :                     fail::fail_point!($name);
     134              :                 }
     135              :             })
     136              :             .await
     137              :             .expect("spawn_blocking");
     138              :         }
     139              :     };
     140              :     ($name:literal, $cond:expr) => {
     141              :         if cfg!(feature = "testing") {
     142              :             if $cond {
     143              :                 pausable_failpoint!($name)
     144              :             }
     145              :         }
     146              :     };
     147              : }
     148              : 
     149              : pub mod blob_io;
     150              : pub mod block_io;
     151              : 
     152              : pub mod disk_btree;
     153              : pub(crate) mod ephemeral_file;
     154              : pub mod layer_map;
     155              : 
     156              : pub mod metadata;
     157              : mod par_fsync;
     158              : pub mod remote_timeline_client;
     159              : pub mod storage_layer;
     160              : 
     161              : pub mod config;
     162              : pub mod delete;
     163              : pub mod mgr;
     164              : pub mod secondary;
     165              : pub mod tasks;
     166              : pub mod upload_queue;
     167              : 
     168              : pub(crate) mod timeline;
     169              : 
     170              : pub mod size;
     171              : 
     172              : pub(crate) use crate::span::debug_assert_current_span_has_tenant_and_timeline_id;
     173              : pub(crate) use timeline::{LogicalSizeCalculationCause, PageReconstructError, Timeline};
     174              : 
     175              : // re-export for use in remote_timeline_client.rs
     176              : pub use crate::tenant::metadata::save_metadata;
     177              : 
     178              : // re-export for use in walreceiver
     179              : pub use crate::tenant::timeline::WalReceiverInfo;
     180              : 
     181              : /// The "tenants" part of `tenants/<tenant>/timelines...`
     182              : pub const TENANTS_SEGMENT_NAME: &str = "tenants";
     183              : 
     184              : /// Parts of the `.neon/tenants/<tenant_id>/timelines/<timeline_id>` directory prefix.
     185              : pub const TIMELINES_SEGMENT_NAME: &str = "timelines";
     186              : 
     187              : pub const TENANT_DELETED_MARKER_FILE_NAME: &str = "deleted";
     188              : 
     189              : /// References to shared objects that are passed into each tenant, such
     190              : /// as the shared remote storage client and process initialization state.
     191          881 : #[derive(Clone)]
     192              : pub struct TenantSharedResources {
     193              :     pub broker_client: storage_broker::BrokerClientChannel,
     194              :     pub remote_storage: Option<GenericRemoteStorage>,
     195              :     pub deletion_queue_client: DeletionQueueClient,
     196              : }
     197              : 
     198              : /// A [`Tenant`] is really an _attached_ tenant.  The configuration
     199              : /// for an attached tenant is a subset of the [`LocationConf`], represented
     200              : /// in this struct.
     201              : pub(super) struct AttachedTenantConf {
     202              :     tenant_conf: TenantConfOpt,
     203              :     location: AttachedLocationConfig,
     204              : }
     205              : 
     206              : impl AttachedTenantConf {
     207         1004 :     fn try_from(location_conf: LocationConf) -> anyhow::Result<Self> {
     208         1004 :         match &location_conf.mode {
     209         1004 :             LocationMode::Attached(attach_conf) => Ok(Self {
     210         1004 :                 tenant_conf: location_conf.tenant_conf,
     211         1004 :                 location: *attach_conf,
     212         1004 :             }),
     213              :             LocationMode::Secondary(_) => {
     214            0 :                 anyhow::bail!("Attempted to construct AttachedTenantConf from a LocationConf in secondary mode")
     215              :             }
     216              :         }
     217         1004 :     }
     218              : }
     219              : struct TimelinePreload {
     220              :     timeline_id: TimelineId,
     221              :     client: RemoteTimelineClient,
     222              :     index_part: Result<MaybeDeletedIndexPart, DownloadError>,
     223              : }
     224              : 
     225              : pub(crate) struct TenantPreload {
     226              :     deleting: bool,
     227              :     timelines: HashMap<TimelineId, TimelinePreload>,
     228              : }
     229              : 
     230              : /// When we spawn a tenant, there is a special mode for tenant creation that
     231              : /// avoids trying to read anything from remote storage.
     232              : pub(crate) enum SpawnMode {
     233              :     Normal,
     234              :     Create,
     235              : }
     236              : 
     237              : ///
     238              : /// Tenant consists of multiple timelines. Keep them in a hash table.
     239              : ///
     240              : pub struct Tenant {
     241              :     // Global pageserver config parameters
     242              :     pub conf: &'static PageServerConf,
     243              : 
     244              :     /// The value creation timestamp, used to measure activation delay, see:
     245              :     /// <https://github.com/neondatabase/neon/issues/4025>
     246              :     constructed_at: Instant,
     247              : 
     248              :     state: watch::Sender<TenantState>,
     249              : 
     250              :     // Overridden tenant-specific config parameters.
     251              :     // We keep TenantConfOpt sturct here to preserve the information
     252              :     // about parameters that are not set.
     253              :     // This is necessary to allow global config updates.
     254              :     tenant_conf: Arc<RwLock<AttachedTenantConf>>,
     255              : 
     256              :     tenant_shard_id: TenantShardId,
     257              : 
     258              :     // The detailed sharding information, beyond the number/count in tenant_shard_id
     259              :     shard_identity: ShardIdentity,
     260              : 
     261              :     /// The remote storage generation, used to protect S3 objects from split-brain.
     262              :     /// Does not change over the lifetime of the [`Tenant`] object.
     263              :     ///
     264              :     /// This duplicates the generation stored in LocationConf, but that structure is mutable:
     265              :     /// this copy enforces the invariant that generatio doesn't change during a Tenant's lifetime.
     266              :     generation: Generation,
     267              : 
     268              :     timelines: Mutex<HashMap<TimelineId, Arc<Timeline>>>,
     269              : 
     270              :     /// During timeline creation, we first insert the TimelineId to the
     271              :     /// creating map, then `timelines`, then remove it from the creating map.
     272              :     /// **Lock order**: if acquring both, acquire`timelines` before `timelines_creating`
     273              :     timelines_creating: std::sync::Mutex<HashSet<TimelineId>>,
     274              : 
     275              :     // This mutex prevents creation of new timelines during GC.
     276              :     // Adding yet another mutex (in addition to `timelines`) is needed because holding
     277              :     // `timelines` mutex during all GC iteration
     278              :     // may block for a long time `get_timeline`, `get_timelines_state`,... and other operations
     279              :     // with timelines, which in turn may cause dropping replication connection, expiration of wait_for_lsn
     280              :     // timeout...
     281              :     gc_cs: tokio::sync::Mutex<()>,
     282              :     walredo_mgr: Option<Arc<WalRedoManager>>,
     283              : 
     284              :     // provides access to timeline data sitting in the remote storage
     285              :     pub(crate) remote_storage: Option<GenericRemoteStorage>,
     286              : 
     287              :     // Access to global deletion queue for when this tenant wants to schedule a deletion
     288              :     deletion_queue_client: DeletionQueueClient,
     289              : 
     290              :     /// Cached logical sizes updated updated on each [`Tenant::gather_size_inputs`].
     291              :     cached_logical_sizes: tokio::sync::Mutex<HashMap<(TimelineId, Lsn), u64>>,
     292              :     cached_synthetic_tenant_size: Arc<AtomicU64>,
     293              : 
     294              :     eviction_task_tenant_state: tokio::sync::Mutex<EvictionTaskTenantState>,
     295              : 
     296              :     /// If the tenant is in Activating state, notify this to encourage it
     297              :     /// to proceed to Active as soon as possible, rather than waiting for lazy
     298              :     /// background warmup.
     299              :     pub(crate) activate_now_sem: tokio::sync::Semaphore,
     300              : 
     301              :     pub(crate) delete_progress: Arc<tokio::sync::Mutex<DeleteTenantFlow>>,
     302              : 
     303              :     // Cancellation token fires when we have entered shutdown().  This is a parent of
     304              :     // Timelines' cancellation token.
     305              :     pub(crate) cancel: CancellationToken,
     306              : 
     307              :     // Users of the Tenant such as the page service must take this Gate to avoid
     308              :     // trying to use a Tenant which is shutting down.
     309              :     pub(crate) gate: Gate,
     310              : }
     311              : 
     312              : impl std::fmt::Debug for Tenant {
     313            2 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
     314            2 :         write!(f, "{} ({})", self.tenant_shard_id, self.current_state())
     315            2 :     }
     316              : }
     317              : 
     318              : pub(crate) enum WalRedoManager {
     319              :     Prod(PostgresRedoManager),
     320              :     #[cfg(test)]
     321              :     Test(harness::TestRedoManager),
     322              : }
     323              : 
     324              : impl From<PostgresRedoManager> for WalRedoManager {
     325          884 :     fn from(mgr: PostgresRedoManager) -> Self {
     326          884 :         Self::Prod(mgr)
     327          884 :     }
     328              : }
     329              : 
     330              : #[cfg(test)]
     331              : impl From<harness::TestRedoManager> for WalRedoManager {
     332           84 :     fn from(mgr: harness::TestRedoManager) -> Self {
     333           84 :         Self::Test(mgr)
     334           84 :     }
     335              : }
     336              : 
     337              : impl WalRedoManager {
     338            0 :     pub(crate) fn maybe_quiesce(&self, idle_timeout: Duration) {
     339            0 :         match self {
     340          774 :             Self::Prod(mgr) => mgr.maybe_quiesce(idle_timeout),
     341          774 :             #[cfg(test)]
     342          774 :             Self::Test(_) => {
     343            0 :                 // Not applicable to test redo manager
     344            0 :             }
     345          774 :         }
     346          774 :     }
     347              : 
     348              :     /// # Cancel-Safety
     349              :     ///
     350              :     /// This method is cancellation-safe.
     351      2346627 :     pub async fn request_redo(
     352      2346627 :         &self,
     353      2346627 :         key: crate::repository::Key,
     354      2346627 :         lsn: Lsn,
     355      2346627 :         base_img: Option<(Lsn, bytes::Bytes)>,
     356      2346627 :         records: Vec<(Lsn, crate::walrecord::NeonWalRecord)>,
     357      2346627 :         pg_version: u32,
     358      2346627 :     ) -> anyhow::Result<bytes::Bytes> {
     359            0 :         match self {
     360      2346627 :             Self::Prod(mgr) => {
     361            0 :                 mgr.request_redo(key, lsn, base_img, records, pg_version)
     362            0 :                     .await
     363              :             }
     364              :             #[cfg(test)]
     365            0 :             Self::Test(mgr) => {
     366            0 :                 mgr.request_redo(key, lsn, base_img, records, pg_version)
     367            0 :                     .await
     368              :             }
     369              :         }
     370      2346627 :     }
     371              : 
     372            0 :     pub(crate) fn status(&self) -> Option<WalRedoManagerStatus> {
     373            0 :         match self {
     374          479 :             WalRedoManager::Prod(m) => m.status(),
     375          479 :             #[cfg(test)]
     376          479 :             WalRedoManager::Test(_) => None,
     377          479 :         }
     378          479 :     }
     379              : }
     380              : 
     381          106 : #[derive(Debug, thiserror::Error, PartialEq, Eq)]
     382              : pub enum GetTimelineError {
     383              :     #[error("Timeline {tenant_id}/{timeline_id} is not active, state: {state:?}")]
     384              :     NotActive {
     385              :         tenant_id: TenantShardId,
     386              :         timeline_id: TimelineId,
     387              :         state: TimelineState,
     388              :     },
     389              :     #[error("Timeline {tenant_id}/{timeline_id} was not found")]
     390              :     NotFound {
     391              :         tenant_id: TenantShardId,
     392              :         timeline_id: TimelineId,
     393              :     },
     394              : }
     395              : 
     396            0 : #[derive(Debug, thiserror::Error)]
     397              : pub enum LoadLocalTimelineError {
     398              :     #[error("FailedToLoad")]
     399              :     Load(#[source] anyhow::Error),
     400              :     #[error("FailedToResumeDeletion")]
     401              :     ResumeDeletion(#[source] anyhow::Error),
     402              : }
     403              : 
     404          139 : #[derive(thiserror::Error)]
     405              : pub enum DeleteTimelineError {
     406              :     #[error("NotFound")]
     407              :     NotFound,
     408              : 
     409              :     #[error("HasChildren")]
     410              :     HasChildren(Vec<TimelineId>),
     411              : 
     412              :     #[error("Timeline deletion is already in progress")]
     413              :     AlreadyInProgress(Arc<tokio::sync::Mutex<DeleteTimelineFlow>>),
     414              : 
     415              :     #[error(transparent)]
     416              :     Other(#[from] anyhow::Error),
     417              : }
     418              : 
     419              : impl Debug for DeleteTimelineError {
     420            0 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
     421            0 :         match self {
     422            0 :             Self::NotFound => write!(f, "NotFound"),
     423            0 :             Self::HasChildren(c) => f.debug_tuple("HasChildren").field(c).finish(),
     424            0 :             Self::AlreadyInProgress(_) => f.debug_tuple("AlreadyInProgress").finish(),
     425            0 :             Self::Other(e) => f.debug_tuple("Other").field(e).finish(),
     426              :         }
     427            0 :     }
     428              : }
     429              : 
     430              : pub enum SetStoppingError {
     431              :     AlreadyStopping(completion::Barrier),
     432              :     Broken,
     433              : }
     434              : 
     435              : impl Debug for SetStoppingError {
     436            0 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
     437            0 :         match self {
     438            0 :             Self::AlreadyStopping(_) => f.debug_tuple("AlreadyStopping").finish(),
     439            0 :             Self::Broken => write!(f, "Broken"),
     440              :         }
     441            0 :     }
     442              : }
     443              : 
     444            6 : #[derive(thiserror::Error, Debug)]
     445              : pub enum CreateTimelineError {
     446              :     #[error("creation of timeline with the given ID is in progress")]
     447              :     AlreadyCreating,
     448              :     #[error("timeline already exists with different parameters")]
     449              :     Conflict,
     450              :     #[error(transparent)]
     451              :     AncestorLsn(anyhow::Error),
     452              :     #[error("ancestor timeline is not active")]
     453              :     AncestorNotActive,
     454              :     #[error("tenant shutting down")]
     455              :     ShuttingDown,
     456              :     #[error(transparent)]
     457              :     Other(#[from] anyhow::Error),
     458              : }
     459              : 
     460            0 : #[derive(thiserror::Error, Debug)]
     461              : enum InitdbError {
     462              :     Other(anyhow::Error),
     463              :     Cancelled,
     464              :     Spawn(std::io::Result<()>),
     465              :     Failed(std::process::ExitStatus, Vec<u8>),
     466              : }
     467              : 
     468              : impl fmt::Display for InitdbError {
     469            0 :     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
     470            0 :         match self {
     471            0 :             InitdbError::Cancelled => write!(f, "Operation was cancelled"),
     472            0 :             InitdbError::Spawn(e) => write!(f, "Spawn error: {:?}", e),
     473            0 :             InitdbError::Failed(status, stderr) => write!(
     474            0 :                 f,
     475            0 :                 "Command failed with status {:?}: {}",
     476            0 :                 status,
     477            0 :                 String::from_utf8_lossy(stderr)
     478            0 :             ),
     479            0 :             InitdbError::Other(e) => write!(f, "Error: {:?}", e),
     480              :         }
     481            0 :     }
     482              : }
     483              : 
     484              : impl From<std::io::Error> for InitdbError {
     485            0 :     fn from(error: std::io::Error) -> Self {
     486            0 :         InitdbError::Spawn(Err(error))
     487            0 :     }
     488              : }
     489              : 
     490              : struct TenantDirectoryScan {
     491              :     sorted_timelines_to_load: Vec<(TimelineId, TimelineMetadata)>,
     492              :     timelines_to_resume_deletion: Vec<(TimelineId, Option<TimelineMetadata>)>,
     493              : }
     494              : 
     495              : enum CreateTimelineCause {
     496              :     Load,
     497              :     Delete,
     498              : }
     499              : 
     500              : impl Tenant {
     501              :     /// Yet another helper for timeline initialization.
     502              :     ///
     503              :     /// - Initializes the Timeline struct and inserts it into the tenant's hash map
     504              :     /// - Scans the local timeline directory for layer files and builds the layer map
     505              :     /// - Downloads remote index file and adds remote files to the layer map
     506              :     /// - Schedules remote upload tasks for any files that are present locally but missing from remote storage.
     507              :     ///
     508              :     /// If the operation fails, the timeline is left in the tenant's hash map in Broken state. On success,
     509              :     /// it is marked as Active.
     510              :     #[allow(clippy::too_many_arguments)]
     511          433 :     async fn timeline_init_and_sync(
     512          433 :         &self,
     513          433 :         timeline_id: TimelineId,
     514          433 :         resources: TimelineResources,
     515          433 :         index_part: Option<IndexPart>,
     516          433 :         metadata: TimelineMetadata,
     517          433 :         ancestor: Option<Arc<Timeline>>,
     518          433 :         _ctx: &RequestContext,
     519          433 :     ) -> anyhow::Result<()> {
     520          433 :         let tenant_id = self.tenant_shard_id;
     521              : 
     522          433 :         let timeline = self.create_timeline_struct(
     523          433 :             timeline_id,
     524          433 :             &metadata,
     525          433 :             ancestor.clone(),
     526          433 :             resources,
     527          433 :             CreateTimelineCause::Load,
     528          433 :         )?;
     529          433 :         let disk_consistent_lsn = timeline.get_disk_consistent_lsn();
     530          433 :         anyhow::ensure!(
     531          433 :             disk_consistent_lsn.is_valid(),
     532            0 :             "Timeline {tenant_id}/{timeline_id} has invalid disk_consistent_lsn"
     533              :         );
     534          433 :         assert_eq!(
     535          433 :             disk_consistent_lsn,
     536          433 :             metadata.disk_consistent_lsn(),
     537            0 :             "these are used interchangeably"
     538              :         );
     539              : 
     540          433 :         if let Some(index_part) = index_part.as_ref() {
     541          433 :             timeline
     542          433 :                 .remote_client
     543          433 :                 .as_ref()
     544          433 :                 .unwrap()
     545          433 :                 .init_upload_queue(index_part)?;
     546            0 :         } else if self.remote_storage.is_some() {
     547              :             // No data on the remote storage, but we have local metadata file. We can end up
     548              :             // here with timeline_create being interrupted before finishing index part upload.
     549              :             // By doing what we do here, the index part upload is retried.
     550              :             // If control plane retries timeline creation in the meantime, the mgmt API handler
     551              :             // for timeline creation will coalesce on the upload we queue here.
     552            0 :             let rtc = timeline.remote_client.as_ref().unwrap();
     553            0 :             rtc.init_upload_queue_for_empty_remote(&metadata)?;
     554            0 :             rtc.schedule_index_upload_for_metadata_update(&metadata)?;
     555            0 :         }
     556              : 
     557          433 :         timeline
     558          433 :             .load_layer_map(disk_consistent_lsn, index_part)
     559          433 :             .await
     560          433 :             .with_context(|| {
     561            0 :                 format!("Failed to load layermap for timeline {tenant_id}/{timeline_id}")
     562          433 :             })?;
     563              : 
     564              :         {
     565              :             // avoiding holding it across awaits
     566          433 :             let mut timelines_accessor = self.timelines.lock().unwrap();
     567          433 :             match timelines_accessor.entry(timeline_id) {
     568              :                 Entry::Occupied(_) => {
     569              :                     // The uninit mark file acts as a lock that prevents another task from
     570              :                     // initializing the timeline at the same time.
     571            0 :                     unreachable!(
     572            0 :                         "Timeline {tenant_id}/{timeline_id} already exists in the tenant map"
     573            0 :                     );
     574              :                 }
     575          433 :                 Entry::Vacant(v) => {
     576          433 :                     v.insert(Arc::clone(&timeline));
     577          433 :                     timeline.maybe_spawn_flush_loop();
     578          433 :                 }
     579          433 :             }
     580          433 :         };
     581          433 : 
     582          433 :         // Sanity check: a timeline should have some content.
     583          433 :         anyhow::ensure!(
     584          433 :             ancestor.is_some()
     585          380 :                 || timeline
     586          380 :                     .layers
     587          380 :                     .read()
     588            0 :                     .await
     589          380 :                     .layer_map()
     590          380 :                     .iter_historic_layers()
     591          380 :                     .next()
     592          380 :                     .is_some(),
     593            0 :             "Timeline has no ancestor and no layer files"
     594              :         );
     595              : 
     596          433 :         Ok(())
     597          433 :     }
     598              : 
     599              :     /// Attach a tenant that's available in cloud storage.
     600              :     ///
     601              :     /// This returns quickly, after just creating the in-memory object
     602              :     /// Tenant struct and launching a background task to download
     603              :     /// the remote index files.  On return, the tenant is most likely still in
     604              :     /// Attaching state, and it will become Active once the background task
     605              :     /// finishes. You can use wait_until_active() to wait for the task to
     606              :     /// complete.
     607              :     ///
     608              :     #[allow(clippy::too_many_arguments)]
     609          884 :     pub(crate) fn spawn(
     610          884 :         conf: &'static PageServerConf,
     611          884 :         tenant_shard_id: TenantShardId,
     612          884 :         resources: TenantSharedResources,
     613          884 :         attached_conf: AttachedTenantConf,
     614          884 :         shard_identity: ShardIdentity,
     615          884 :         init_order: Option<InitializationOrder>,
     616          884 :         tenants: &'static std::sync::RwLock<TenantsMap>,
     617          884 :         mode: SpawnMode,
     618          884 :         ctx: &RequestContext,
     619          884 :     ) -> anyhow::Result<Arc<Tenant>> {
     620          884 :         let wal_redo_manager = Arc::new(WalRedoManager::from(PostgresRedoManager::new(
     621          884 :             conf,
     622          884 :             tenant_shard_id,
     623          884 :         )));
     624          884 : 
     625          884 :         let TenantSharedResources {
     626          884 :             broker_client,
     627          884 :             remote_storage,
     628          884 :             deletion_queue_client,
     629          884 :         } = resources;
     630          884 : 
     631          884 :         let attach_mode = attached_conf.location.attach_mode;
     632          884 :         let generation = attached_conf.location.generation;
     633          884 : 
     634          884 :         let tenant = Arc::new(Tenant::new(
     635          884 :             TenantState::Attaching,
     636          884 :             conf,
     637          884 :             attached_conf,
     638          884 :             shard_identity,
     639          884 :             Some(wal_redo_manager),
     640          884 :             tenant_shard_id,
     641          884 :             remote_storage.clone(),
     642          884 :             deletion_queue_client,
     643          884 :         ));
     644          884 : 
     645          884 :         // The attach task will carry a GateGuard, so that shutdown() reliably waits for it to drop out if
     646          884 :         // we shut down while attaching.
     647          884 :         let attach_gate_guard = tenant
     648          884 :             .gate
     649          884 :             .enter()
     650          884 :             .expect("We just created the Tenant: nothing else can have shut it down yet");
     651          884 : 
     652          884 :         // Do all the hard work in the background
     653          884 :         let tenant_clone = Arc::clone(&tenant);
     654          884 :         let ctx = ctx.detached_child(TaskKind::Attach, DownloadBehavior::Warn);
     655          884 :         task_mgr::spawn(
     656          884 :             &tokio::runtime::Handle::current(),
     657          884 :             TaskKind::Attach,
     658          884 :             Some(tenant_shard_id),
     659          884 :             None,
     660          884 :             "attach tenant",
     661              :             false,
     662          884 :             async move {
     663          884 : 
     664          884 :                 info!(
     665          884 :                     ?attach_mode,
     666          884 :                     "Attaching tenant"
     667          884 :                 );
     668              : 
     669          884 :                 let _gate_guard = attach_gate_guard;
     670          884 : 
     671          884 :                 // Is this tenant being spawned as part of process startup?
     672          884 :                 let starting_up = init_order.is_some();
     673          872 :                 scopeguard::defer! {
     674          872 :                     if starting_up {
     675          205 :                         TENANT.startup_complete.inc();
     676          667 :                     }
     677              :                 }
     678              : 
     679              :                 // Ideally we should use Tenant::set_broken_no_wait, but it is not supposed to be used when tenant is in loading state.
     680          884 :                 let make_broken =
     681            7 :                     |t: &Tenant, err: anyhow::Error| {
     682            7 :                         error!("attach failed, setting tenant state to Broken: {err:?}");
     683            7 :                         t.state.send_modify(|state| {
     684            7 :                             // The Stopping case is for when we have passed control on to DeleteTenantFlow:
     685            7 :                             // if it errors, we will call make_broken when tenant is already in Stopping.
     686            7 :                             assert!(
     687            7 :                             matches!(*state, TenantState::Attaching | TenantState::Stopping { .. }),
     688            0 :                             "the attach task owns the tenant state until activation is complete"
     689              :                         );
     690              : 
     691            7 :                             *state = TenantState::broken_from_reason(err.to_string());
     692            7 :                         });
     693            7 :                     };
     694              : 
     695          884 :                 let mut init_order = init_order;
     696          884 :                 // take the completion because initial tenant loading will complete when all of
     697          884 :                 // these tasks complete.
     698          884 :                 let _completion = init_order
     699          884 :                     .as_mut()
     700          884 :                     .and_then(|x| x.initial_tenant_load.take());
     701          884 :                 let remote_load_completion = init_order
     702          884 :                     .as_mut()
     703          884 :                     .and_then(|x| x.initial_tenant_load_remote.take());
     704              : 
     705              :                 enum AttachType<'a> {
     706              :                     // During pageserver startup, we are attaching this tenant lazily in the background
     707              :                     Warmup(tokio::sync::SemaphorePermit<'a>),
     708              :                     // During pageserver startup, we are attaching this tenant as soon as we can,
     709              :                     // because a client tried to access it.
     710              :                     OnDemand,
     711              :                     // During normal operations after startup, we are attaching a tenant.
     712              :                     Normal,
     713              :                 }
     714              : 
     715              :                 // Before doing any I/O, wait for either or:
     716              :                 // - A client to attempt to access to this tenant (on-demand loading)
     717              :                 // - A permit to become available in the warmup semaphore (background warmup)
     718              :                 //
     719              :                 // Some-ness of init_order is how we know if we're attaching during startup or later
     720              :                 // in process lifetime.
     721          884 :                 let attach_type = if init_order.is_some() {
     722          217 :                     tokio::select!(
     723              :                         _ = tenant_clone.activate_now_sem.acquire() => {
     724            3 :                             tracing::info!("Activating tenant (on-demand)");
     725              :                             AttachType::OnDemand
     726              :                         },
     727          213 :                         permit_result = conf.concurrent_tenant_warmup.inner().acquire() => {
     728              :                             match permit_result {
     729              :                                 Ok(p) => {
     730          213 :                                     tracing::info!("Activating tenant (warmup)");
     731              :                                     AttachType::Warmup(p)
     732              :                                 }
     733              :                                 Err(_) => {
     734              :                                     // This is unexpected: the warmup semaphore should stay alive
     735              :                                     // for the lifetime of init_order.  Log a warning and proceed.
     736            0 :                                     tracing::warn!("warmup_limit semaphore unexpectedly closed");
     737              :                                     AttachType::Normal
     738              :                                 }
     739              :                             }
     740              : 
     741              :                         }
     742              :                         _ = tenant_clone.cancel.cancelled() => {
     743              :                             // This is safe, but should be pretty rare: it is interesting if a tenant
     744              :                             // stayed in Activating for such a long time that shutdown found it in
     745              :                             // that state.
     746            1 :                             tracing::info!(state=%tenant_clone.current_state(), "Tenant shut down before activation");
     747              :                             // Make the tenant broken so that set_stopping will not hang waiting for it to leave
     748              :                             // the Attaching state.  This is an over-reaction (nothing really broke, the tenant is
     749              :                             // just shutting down), but ensures progress.
     750              :                             make_broken(&tenant_clone, anyhow::anyhow!("Shut down while Attaching"));
     751              :                             return Ok(());
     752              :                         },
     753              :                     )
     754              :                 } else {
     755          667 :                     AttachType::Normal
     756              :                 };
     757              : 
     758          883 :                 let preload = match (&mode, &remote_storage) {
     759              :                     (SpawnMode::Create, _) => {
     760           15 :                         None
     761              :                     },
     762          868 :                     (SpawnMode::Normal, Some(remote_storage)) => {
     763          868 :                         let _preload_timer = TENANT.preload.start_timer();
     764          868 :                         let res = tenant_clone
     765          868 :                             .preload(remote_storage, task_mgr::shutdown_token())
     766         2618 :                             .await;
     767          868 :                         match res {
     768          862 :                             Ok(p) => Some(p),
     769            6 :                             Err(e) => {
     770            6 :                                 make_broken(&tenant_clone, anyhow::anyhow!(e));
     771            6 :                                 return Ok(());
     772              :                             }
     773              :                         }
     774              :                     }
     775              :                     (SpawnMode::Normal, None) => {
     776            0 :                         let _preload_timer = TENANT.preload.start_timer();
     777            0 :                         None
     778              :                     }
     779              :                 };
     780              : 
     781              :                 // Remote preload is complete.
     782          877 :                 drop(remote_load_completion);
     783              : 
     784          877 :                 let pending_deletion = {
     785          877 :                     match DeleteTenantFlow::should_resume_deletion(
     786          877 :                         conf,
     787          877 :                         preload.as_ref().map(|p| p.deleting).unwrap_or(false),
     788          877 :                         &tenant_clone,
     789          877 :                     )
     790            0 :                     .await
     791              :                     {
     792          877 :                         Ok(should_resume_deletion) => should_resume_deletion,
     793            0 :                         Err(err) => {
     794            0 :                             make_broken(&tenant_clone, anyhow::anyhow!(err));
     795            0 :                             return Ok(());
     796              :                         }
     797              :                     }
     798              :                 };
     799              : 
     800          877 :                 info!("pending_deletion {}", pending_deletion.is_some());
     801              : 
     802          877 :                 if let Some(deletion) = pending_deletion {
     803              :                     // as we are no longer loading, signal completion by dropping
     804              :                     // the completion while we resume deletion
     805           21 :                     drop(_completion);
     806           21 :                     let background_jobs_can_start =
     807           21 :                         init_order.as_ref().map(|x| &x.background_jobs_can_start);
     808           21 :                     if let Some(background) = background_jobs_can_start {
     809           20 :                         info!("waiting for backgound jobs barrier");
     810           20 :                         background.clone().wait().await;
     811           20 :                         info!("ready for backgound jobs barrier");
     812            1 :                     }
     813              : 
     814           21 :                     let deleted = DeleteTenantFlow::resume_from_attach(
     815           21 :                         deletion,
     816           21 :                         &tenant_clone,
     817           21 :                         preload,
     818           21 :                         tenants,
     819           21 :                         &ctx,
     820           21 :                     )
     821         1003 :                     .await;
     822              : 
     823           21 :                     if let Err(e) = deleted {
     824            0 :                         make_broken(&tenant_clone, anyhow::anyhow!(e));
     825           21 :                     }
     826              : 
     827           21 :                     return Ok(());
     828          856 :                 }
     829              : 
     830              :                 // We will time the duration of the attach phase unless this is a creation (attach will do no work)
     831          856 :                 let attached = {
     832          856 :                     let _attach_timer = match mode {
     833           15 :                         SpawnMode::Create => None,
     834          841 :                         SpawnMode::Normal => {Some(TENANT.attach.start_timer())}
     835              :                     };
     836          856 :                     tenant_clone.attach(preload, mode, &ctx).await
     837              :                 };
     838              : 
     839          856 :                 match attached {
     840              :                     Ok(()) => {
     841          856 :                         info!("attach finished, activating");
     842          856 :                         tenant_clone.activate(broker_client, None, &ctx);
     843              :                     }
     844            0 :                     Err(e) => {
     845            0 :                         make_broken(&tenant_clone, anyhow::anyhow!(e));
     846            0 :                     }
     847              :                 }
     848              : 
     849              :                 // If we are doing an opportunistic warmup attachment at startup, initialize
     850              :                 // logical size at the same time.  This is better than starting a bunch of idle tenants
     851              :                 // with cold caches and then coming back later to initialize their logical sizes.
     852              :                 //
     853              :                 // It also prevents the warmup proccess competing with the concurrency limit on
     854              :                 // logical size calculations: if logical size calculation semaphore is saturated,
     855              :                 // then warmup will wait for that before proceeding to the next tenant.
     856          856 :                 if let AttachType::Warmup(_permit) = attach_type {
     857          245 :                     let mut futs: FuturesUnordered<_> = tenant_clone.timelines.lock().unwrap().values().cloned().map(|t| t.await_initial_logical_size()).collect();
     858          193 :                     tracing::info!("Waiting for initial logical sizes while warming up...");
     859          426 :                     while futs.next().await.is_some() {}
     860          181 :                     tracing::info!("Warm-up complete");
     861          663 :                 }
     862              : 
     863          844 :                 Ok(())
     864          872 :             }
     865          884 :             .instrument(tracing::info_span!(parent: None, "attach", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), gen=?generation)),
     866              :         );
     867          884 :         Ok(tenant)
     868          884 :     }
     869              : 
     870         1744 :     #[instrument(skip_all)]
     871              :     pub(crate) async fn preload(
     872              :         self: &Arc<Tenant>,
     873              :         remote_storage: &GenericRemoteStorage,
     874              :         cancel: CancellationToken,
     875              :     ) -> anyhow::Result<TenantPreload> {
     876              :         span::debug_assert_current_span_has_tenant_id();
     877              :         // Get list of remote timelines
     878              :         // download index files for every tenant timeline
     879          872 :         info!("listing remote timelines");
     880              :         let (remote_timeline_ids, other_keys) = remote_timeline_client::list_remote_timelines(
     881              :             remote_storage,
     882              :             self.tenant_shard_id,
     883              :             cancel.clone(),
     884              :         )
     885              :         .await?;
     886              : 
     887              :         let deleting = other_keys.contains(TENANT_DELETED_MARKER_FILE_NAME);
     888          866 :         info!(
     889          866 :             "found {} timelines, deleting={}",
     890          866 :             remote_timeline_ids.len(),
     891          866 :             deleting
     892          866 :         );
     893              : 
     894              :         for k in other_keys {
     895              :             if k != TENANT_DELETED_MARKER_FILE_NAME {
     896            0 :                 warn!("Unexpected non timeline key {k}");
     897              :             }
     898              :         }
     899              : 
     900              :         Ok(TenantPreload {
     901              :             deleting,
     902              :             timelines: self
     903              :                 .load_timeline_metadata(remote_timeline_ids, remote_storage, cancel)
     904              :                 .await?,
     905              :         })
     906              :     }
     907              : 
     908              :     ///
     909              :     /// Background task that downloads all data for a tenant and brings it to Active state.
     910              :     ///
     911              :     /// No background tasks are started as part of this routine.
     912              :     ///
     913          881 :     async fn attach(
     914          881 :         self: &Arc<Tenant>,
     915          881 :         preload: Option<TenantPreload>,
     916          881 :         mode: SpawnMode,
     917          881 :         ctx: &RequestContext,
     918          881 :     ) -> anyhow::Result<()> {
     919          881 :         span::debug_assert_current_span_has_tenant_id();
     920              : 
     921            3 :         failpoint_support::sleep_millis_async!("before-attaching-tenant");
     922              : 
     923          881 :         let preload = match (preload, mode) {
     924          866 :             (Some(p), _) => p,
     925           15 :             (None, SpawnMode::Create) => TenantPreload {
     926           15 :                 deleting: false,
     927           15 :                 timelines: HashMap::new(),
     928           15 :             },
     929              :             (None, SpawnMode::Normal) => {
     930              :                 // Deprecated dev mode: load from local disk state instead of remote storage
     931              :                 // https://github.com/neondatabase/neon/issues/5624
     932            0 :                 return self.load_local(ctx).await;
     933              :             }
     934              :         };
     935              : 
     936          881 :         let mut timelines_to_resume_deletions = vec![];
     937          881 : 
     938          881 :         let mut remote_index_and_client = HashMap::new();
     939          881 :         let mut timeline_ancestors = HashMap::new();
     940          881 :         let mut existent_timelines = HashSet::new();
     941         1329 :         for (timeline_id, preload) in preload.timelines {
     942          445 :             let index_part = match preload.index_part {
     943          445 :                 Ok(i) => {
     944            0 :                     debug!("remote index part exists for timeline {timeline_id}");
     945              :                     // We found index_part on the remote, this is the standard case.
     946          445 :                     existent_timelines.insert(timeline_id);
     947          445 :                     i
     948              :                 }
     949              :                 Err(DownloadError::NotFound) => {
     950              :                     // There is no index_part on the remote. We only get here
     951              :                     // if there is some prefix for the timeline in the remote storage.
     952              :                     // This can e.g. be the initdb.tar.zst archive, maybe a
     953              :                     // remnant from a prior incomplete creation or deletion attempt.
     954              :                     // Delete the local directory as the deciding criterion for a
     955              :                     // timeline's existence is presence of index_part.
     956            3 :                     info!(%timeline_id, "index_part not found on remote");
     957            3 :                     continue;
     958              :                 }
     959            0 :                 Err(e) => {
     960              :                     // Some (possibly ephemeral) error happened during index_part download.
     961              :                     // Pretend the timeline exists to not delete the timeline directory,
     962              :                     // as it might be a temporary issue and we don't want to re-download
     963              :                     // everything after it resolves.
     964            0 :                     warn!(%timeline_id, "Failed to load index_part from remote storage, failed creation? ({e})");
     965              : 
     966            0 :                     existent_timelines.insert(timeline_id);
     967            0 :                     continue;
     968              :                 }
     969              :             };
     970          445 :             match index_part {
     971          433 :                 MaybeDeletedIndexPart::IndexPart(index_part) => {
     972          433 :                     timeline_ancestors.insert(timeline_id, index_part.metadata.clone());
     973          433 :                     remote_index_and_client.insert(timeline_id, (index_part, preload.client));
     974          433 :                 }
     975           12 :                 MaybeDeletedIndexPart::Deleted(index_part) => {
     976           12 :                     info!(
     977           12 :                         "timeline {} is deleted, picking to resume deletion",
     978           12 :                         timeline_id
     979           12 :                     );
     980           12 :                     timelines_to_resume_deletions.push((timeline_id, index_part, preload.client));
     981              :                 }
     982              :             }
     983              :         }
     984              : 
     985              :         // For every timeline, download the metadata file, scan the local directory,
     986              :         // and build a layer map that contains an entry for each remote and local
     987              :         // layer file.
     988          881 :         let sorted_timelines = tree_sort_timelines(timeline_ancestors, |m| m.ancestor_timeline())?;
     989         1314 :         for (timeline_id, remote_metadata) in sorted_timelines {
     990          433 :             let (index_part, remote_client) = remote_index_and_client
     991          433 :                 .remove(&timeline_id)
     992          433 :                 .expect("just put it in above");
     993          433 : 
     994          433 :             // TODO again handle early failure
     995          433 :             self.load_remote_timeline(
     996          433 :                 timeline_id,
     997          433 :                 index_part,
     998          433 :                 remote_metadata,
     999          433 :                 TimelineResources {
    1000          433 :                     remote_client: Some(remote_client),
    1001          433 :                     deletion_queue_client: self.deletion_queue_client.clone(),
    1002          433 :                 },
    1003          433 :                 ctx,
    1004          433 :             )
    1005          863 :             .await
    1006          433 :             .with_context(|| {
    1007            0 :                 format!(
    1008            0 :                     "failed to load remote timeline {} for tenant {}",
    1009            0 :                     timeline_id, self.tenant_shard_id
    1010            0 :                 )
    1011          433 :             })?;
    1012              :         }
    1013              : 
    1014              :         // Walk through deleted timelines, resume deletion
    1015          893 :         for (timeline_id, index_part, remote_timeline_client) in timelines_to_resume_deletions {
    1016           12 :             remote_timeline_client
    1017           12 :                 .init_upload_queue_stopped_to_continue_deletion(&index_part)
    1018           12 :                 .context("init queue stopped")
    1019           12 :                 .map_err(LoadLocalTimelineError::ResumeDeletion)?;
    1020              : 
    1021           12 :             DeleteTimelineFlow::resume_deletion(
    1022           12 :                 Arc::clone(self),
    1023           12 :                 timeline_id,
    1024           12 :                 &index_part.metadata,
    1025           12 :                 Some(remote_timeline_client),
    1026           12 :                 self.deletion_queue_client.clone(),
    1027           12 :             )
    1028           12 :             .instrument(tracing::info_span!("timeline_delete", %timeline_id))
    1029            0 :             .await
    1030           12 :             .context("resume_deletion")
    1031           12 :             .map_err(LoadLocalTimelineError::ResumeDeletion)?;
    1032              :         }
    1033              : 
    1034              :         // The local filesystem contents are a cache of what's in the remote IndexPart;
    1035              :         // IndexPart is the source of truth.
    1036          881 :         self.clean_up_timelines(&existent_timelines)?;
    1037              : 
    1038          881 :         fail::fail_point!("attach-before-activate", |_| {
    1039            0 :             anyhow::bail!("attach-before-activate");
    1040          881 :         });
    1041            3 :         failpoint_support::sleep_millis_async!("attach-before-activate-sleep", &self.cancel);
    1042              : 
    1043          881 :         info!("Done");
    1044              : 
    1045          881 :         Ok(())
    1046          881 :     }
    1047              : 
    1048              :     /// Check for any local timeline directories that are temporary, or do not correspond to a
    1049              :     /// timeline that still exists: this can happen if we crashed during a deletion/creation, or
    1050              :     /// if a timeline was deleted while the tenant was attached to a different pageserver.
    1051          881 :     fn clean_up_timelines(&self, existent_timelines: &HashSet<TimelineId>) -> anyhow::Result<()> {
    1052          881 :         let timelines_dir = self.conf.timelines_path(&self.tenant_shard_id);
    1053              : 
    1054          881 :         let entries = match timelines_dir.read_dir_utf8() {
    1055          879 :             Ok(d) => d,
    1056            2 :             Err(e) => {
    1057            2 :                 if e.kind() == std::io::ErrorKind::NotFound {
    1058            2 :                     return Ok(());
    1059              :                 } else {
    1060            0 :                     return Err(e).context("list timelines directory for tenant");
    1061              :                 }
    1062              :             }
    1063              :         };
    1064              : 
    1065         1332 :         for entry in entries {
    1066          453 :             let entry = entry.context("read timeline dir entry")?;
    1067          453 :             let entry_path = entry.path();
    1068              : 
    1069          453 :             let purge = if crate::is_temporary(entry_path)
    1070              :                 // TODO: uninit_mark isn't needed any more, since uninitialized timelines are already
    1071              :                 // covered by the check that the timeline must exist in remote storage.
    1072          450 :                 || is_uninit_mark(entry_path)
    1073          448 :                 || crate::is_delete_mark(entry_path)
    1074              :             {
    1075            5 :                 true
    1076              :             } else {
    1077          448 :                 match TimelineId::try_from(entry_path.file_name()) {
    1078          448 :                     Ok(i) => {
    1079          448 :                         // Purge if the timeline ID does not exist in remote storage: remote storage is the authority.
    1080          448 :                         !existent_timelines.contains(&i)
    1081              :                     }
    1082            0 :                     Err(e) => {
    1083            0 :                         tracing::warn!(
    1084            0 :                             "Unparseable directory in timelines directory: {entry_path}, ignoring ({e})"
    1085            0 :                         );
    1086              :                         // Do not purge junk: if we don't recognize it, be cautious and leave it for a human.
    1087            0 :                         false
    1088              :                     }
    1089              :                 }
    1090              :             };
    1091              : 
    1092          453 :             if purge {
    1093           10 :                 tracing::info!("Purging stale timeline dentry {entry_path}");
    1094           10 :                 if let Err(e) = match entry.file_type() {
    1095           10 :                     Ok(t) => if t.is_dir() {
    1096            7 :                         std::fs::remove_dir_all(entry_path)
    1097              :                     } else {
    1098            3 :                         std::fs::remove_file(entry_path)
    1099              :                     }
    1100           10 :                     .or_else(fs_ext::ignore_not_found),
    1101            0 :                     Err(e) => Err(e),
    1102              :                 } {
    1103            0 :                     tracing::warn!("Failed to purge stale timeline dentry {entry_path}: {e}");
    1104           10 :                 }
    1105          443 :             }
    1106              :         }
    1107              : 
    1108          879 :         Ok(())
    1109          881 :     }
    1110              : 
    1111              :     /// Get sum of all remote timelines sizes
    1112              :     ///
    1113              :     /// This function relies on the index_part instead of listing the remote storage
    1114           19 :     pub fn remote_size(&self) -> u64 {
    1115           19 :         let mut size = 0;
    1116              : 
    1117           19 :         for timeline in self.list_timelines() {
    1118           12 :             if let Some(remote_client) = &timeline.remote_client {
    1119           12 :                 size += remote_client.get_remote_physical_size();
    1120           12 :             }
    1121              :         }
    1122              : 
    1123           19 :         size
    1124           19 :     }
    1125              : 
    1126          866 :     #[instrument(skip_all, fields(timeline_id=%timeline_id))]
    1127              :     async fn load_remote_timeline(
    1128              :         &self,
    1129              :         timeline_id: TimelineId,
    1130              :         index_part: IndexPart,
    1131              :         remote_metadata: TimelineMetadata,
    1132              :         resources: TimelineResources,
    1133              :         ctx: &RequestContext,
    1134              :     ) -> anyhow::Result<()> {
    1135              :         span::debug_assert_current_span_has_tenant_id();
    1136              : 
    1137          433 :         info!("downloading index file for timeline {}", timeline_id);
    1138              :         tokio::fs::create_dir_all(self.conf.timeline_path(&self.tenant_shard_id, &timeline_id))
    1139              :             .await
    1140              :             .context("Failed to create new timeline directory")?;
    1141              : 
    1142              :         let ancestor = if let Some(ancestor_id) = remote_metadata.ancestor_timeline() {
    1143              :             let timelines = self.timelines.lock().unwrap();
    1144              :             Some(Arc::clone(timelines.get(&ancestor_id).ok_or_else(
    1145            0 :                 || {
    1146            0 :                     anyhow::anyhow!(
    1147            0 :                         "cannot find ancestor timeline {ancestor_id} for timeline {timeline_id}"
    1148            0 :                     )
    1149            0 :                 },
    1150              :             )?))
    1151              :         } else {
    1152              :             None
    1153              :         };
    1154              : 
    1155              :         // timeline loading after attach expects to find metadata file for each metadata
    1156              :         save_metadata(
    1157              :             self.conf,
    1158              :             &self.tenant_shard_id,
    1159              :             &timeline_id,
    1160              :             &remote_metadata,
    1161              :         )
    1162              :         .await
    1163              :         .context("save_metadata")
    1164              :         .map_err(LoadLocalTimelineError::Load)?;
    1165              : 
    1166              :         self.timeline_init_and_sync(
    1167              :             timeline_id,
    1168              :             resources,
    1169              :             Some(index_part),
    1170              :             remote_metadata,
    1171              :             ancestor,
    1172              :             ctx,
    1173              :         )
    1174              :         .await
    1175              :     }
    1176              : 
    1177              :     /// Create a placeholder Tenant object for a broken tenant
    1178            0 :     pub fn create_broken_tenant(
    1179            0 :         conf: &'static PageServerConf,
    1180            0 :         tenant_shard_id: TenantShardId,
    1181            0 :         reason: String,
    1182            0 :     ) -> Arc<Tenant> {
    1183            0 :         Arc::new(Tenant::new(
    1184            0 :             TenantState::Broken {
    1185            0 :                 reason,
    1186            0 :                 backtrace: String::new(),
    1187            0 :             },
    1188            0 :             conf,
    1189            0 :             AttachedTenantConf::try_from(LocationConf::default()).unwrap(),
    1190            0 :             // Shard identity isn't meaningful for a broken tenant: it's just a placeholder
    1191            0 :             // to occupy the slot for this TenantShardId.
    1192            0 :             ShardIdentity::broken(tenant_shard_id.shard_number, tenant_shard_id.shard_count),
    1193            0 :             None,
    1194            0 :             tenant_shard_id,
    1195            0 :             None,
    1196            0 :             DeletionQueueClient::broken(),
    1197            0 :         ))
    1198            0 :     }
    1199              : 
    1200           80 :     fn scan_and_sort_timelines_dir(self: Arc<Tenant>) -> anyhow::Result<TenantDirectoryScan> {
    1201           80 :         let mut timelines_to_load: HashMap<TimelineId, TimelineMetadata> = HashMap::new();
    1202           80 :         // Note timelines_to_resume_deletion needs to be separate because it can be not sortable
    1203           80 :         // from the point of `tree_sort_timelines`. I e some parents can be missing because deletion
    1204           80 :         // completed in non topological order (for example because parent has smaller number of layer files in it)
    1205           80 :         let mut timelines_to_resume_deletion: Vec<(TimelineId, Option<TimelineMetadata>)> = vec![];
    1206           80 : 
    1207           80 :         let timelines_dir = self.conf.timelines_path(&self.tenant_shard_id);
    1208              : 
    1209           80 :         for entry in timelines_dir
    1210           80 :             .read_dir_utf8()
    1211           80 :             .context("list timelines directory for tenant")?
    1212              :         {
    1213            6 :             let entry = entry.context("read timeline dir entry")?;
    1214            6 :             let timeline_dir = entry.path();
    1215            6 : 
    1216            6 :             if crate::is_temporary(timeline_dir) {
    1217            0 :                 info!("Found temporary timeline directory, removing: {timeline_dir}");
    1218            0 :                 if let Err(e) = std::fs::remove_dir_all(timeline_dir) {
    1219            0 :                     error!("Failed to remove temporary directory '{timeline_dir}': {e:?}");
    1220            0 :                 }
    1221            6 :             } else if is_uninit_mark(timeline_dir) {
    1222            2 :                 if !timeline_dir.exists() {
    1223            2 :                     warn!("Timeline dir entry become invalid: {timeline_dir}");
    1224            2 :                     continue;
    1225            0 :                 }
    1226            0 : 
    1227            0 :                 let timeline_uninit_mark_file = &timeline_dir;
    1228            0 :                 info!(
    1229            0 :                     "Found an uninit mark file {timeline_uninit_mark_file}, removing the timeline and its uninit mark",
    1230            0 :                 );
    1231            0 :                 let timeline_id =
    1232            0 :                     TimelineId::try_from(timeline_uninit_mark_file.file_stem())
    1233            0 :                         .with_context(|| {
    1234            0 :                             format!(
    1235            0 :                                 "Could not parse timeline id out of the timeline uninit mark name {timeline_uninit_mark_file}",
    1236            0 :                             )
    1237            0 :                         })?;
    1238            0 :                 let timeline_dir = self.conf.timeline_path(&self.tenant_shard_id, &timeline_id);
    1239            0 :                 if let Err(e) =
    1240            0 :                     remove_timeline_and_uninit_mark(&timeline_dir, timeline_uninit_mark_file)
    1241              :                 {
    1242            0 :                     error!("Failed to clean up uninit marked timeline: {e:?}");
    1243            0 :                 }
    1244            4 :             } else if crate::is_delete_mark(timeline_dir) {
    1245              :                 // If metadata exists, load as usual, continue deletion
    1246            0 :                 let timeline_id = TimelineId::try_from(timeline_dir.file_stem())
    1247            0 :                     .with_context(|| {
    1248            0 :                         format!(
    1249            0 :                             "Could not parse timeline id out of the timeline uninit mark name {timeline_dir}",
    1250            0 :                         )
    1251            0 :                     })?;
    1252              : 
    1253            0 :                 info!("Found deletion mark for timeline {}", timeline_id);
    1254              : 
    1255            0 :                 match load_metadata(self.conf, &self.tenant_shard_id, &timeline_id) {
    1256            0 :                     Ok(metadata) => {
    1257            0 :                         timelines_to_resume_deletion.push((timeline_id, Some(metadata)))
    1258              :                     }
    1259            0 :                     Err(e) => match &e {
    1260            0 :                         LoadMetadataError::Read(r) => {
    1261            0 :                             if r.kind() != io::ErrorKind::NotFound {
    1262            0 :                                 return Err(anyhow::anyhow!(e)).with_context(|| {
    1263            0 :                                     format!("Failed to load metadata for timeline_id {timeline_id}")
    1264            0 :                                 });
    1265            0 :                             }
    1266            0 : 
    1267            0 :                             // If metadata doesnt exist it means that we've crashed without
    1268            0 :                             // completing cleanup_remaining_timeline_fs_traces in DeleteTimelineFlow.
    1269            0 :                             // So save timeline_id for later call to `DeleteTimelineFlow::cleanup_remaining_timeline_fs_traces`.
    1270            0 :                             // We cant do it here because the method is async so we'd need block_on
    1271            0 :                             // and here we're in spawn_blocking. cleanup_remaining_timeline_fs_traces uses fs operations
    1272            0 :                             // so that basically results in a cycle:
    1273            0 :                             // spawn_blocking
    1274            0 :                             // - block_on
    1275            0 :                             //   - spawn_blocking
    1276            0 :                             // which can lead to running out of threads in blocing pool.
    1277            0 :                             timelines_to_resume_deletion.push((timeline_id, None));
    1278              :                         }
    1279              :                         _ => {
    1280            0 :                             return Err(anyhow::anyhow!(e)).with_context(|| {
    1281            0 :                                 format!("Failed to load metadata for timeline_id {timeline_id}")
    1282            0 :                             })
    1283              :                         }
    1284              :                     },
    1285              :                 }
    1286              :             } else {
    1287            4 :                 if !timeline_dir.exists() {
    1288            0 :                     warn!("Timeline dir entry become invalid: {timeline_dir}");
    1289            0 :                     continue;
    1290            4 :                 }
    1291            4 :                 let timeline_id = TimelineId::try_from(timeline_dir.file_name())
    1292            4 :                     .with_context(|| {
    1293            0 :                         format!(
    1294            0 :                             "Could not parse timeline id out of the timeline dir name {timeline_dir}",
    1295            0 :                         )
    1296            4 :                     })?;
    1297            4 :                 let timeline_uninit_mark_file = self
    1298            4 :                     .conf
    1299            4 :                     .timeline_uninit_mark_file_path(self.tenant_shard_id, timeline_id);
    1300            4 :                 if timeline_uninit_mark_file.exists() {
    1301            2 :                     info!(
    1302            2 :                         %timeline_id,
    1303            2 :                         "Found an uninit mark file, removing the timeline and its uninit mark",
    1304            2 :                     );
    1305            0 :                     if let Err(e) =
    1306            2 :                         remove_timeline_and_uninit_mark(timeline_dir, &timeline_uninit_mark_file)
    1307              :                     {
    1308            0 :                         error!("Failed to clean up uninit marked timeline: {e:?}");
    1309            2 :                     }
    1310            2 :                     continue;
    1311            2 :                 }
    1312            2 : 
    1313            2 :                 let timeline_delete_mark_file = self
    1314            2 :                     .conf
    1315            2 :                     .timeline_delete_mark_file_path(self.tenant_shard_id, timeline_id);
    1316            2 :                 if timeline_delete_mark_file.exists() {
    1317              :                     // Cleanup should be done in `is_delete_mark` branch above
    1318            0 :                     continue;
    1319            2 :                 }
    1320            2 : 
    1321            2 :                 let file_name = entry.file_name();
    1322            2 :                 if let Ok(timeline_id) = file_name.parse::<TimelineId>() {
    1323            2 :                     let metadata = load_metadata(self.conf, &self.tenant_shard_id, &timeline_id)
    1324            2 :                         .context("failed to load metadata")?;
    1325            0 :                     timelines_to_load.insert(timeline_id, metadata);
    1326              :                 } else {
    1327              :                     // A file or directory that doesn't look like a timeline ID
    1328            0 :                     warn!("unexpected file or directory in timelines directory: {file_name}");
    1329              :                 }
    1330              :             }
    1331              :         }
    1332              : 
    1333              :         // Sort the array of timeline IDs into tree-order, so that parent comes before
    1334              :         // all its children.
    1335           78 :         tree_sort_timelines(timelines_to_load, |m| m.ancestor_timeline()).map(|sorted_timelines| {
    1336           78 :             TenantDirectoryScan {
    1337           78 :                 sorted_timelines_to_load: sorted_timelines,
    1338           78 :                 timelines_to_resume_deletion,
    1339           78 :             }
    1340           78 :         })
    1341           80 :     }
    1342              : 
    1343          866 :     async fn load_timeline_metadata(
    1344          866 :         self: &Arc<Tenant>,
    1345          866 :         timeline_ids: HashSet<TimelineId>,
    1346          866 :         remote_storage: &GenericRemoteStorage,
    1347          866 :         cancel: CancellationToken,
    1348          866 :     ) -> anyhow::Result<HashMap<TimelineId, TimelinePreload>> {
    1349          866 :         let mut part_downloads = JoinSet::new();
    1350         1314 :         for timeline_id in timeline_ids {
    1351          448 :             let client = RemoteTimelineClient::new(
    1352          448 :                 remote_storage.clone(),
    1353          448 :                 self.deletion_queue_client.clone(),
    1354          448 :                 self.conf,
    1355          448 :                 self.tenant_shard_id,
    1356          448 :                 timeline_id,
    1357          448 :                 self.generation,
    1358          448 :             );
    1359          448 :             let cancel_clone = cancel.clone();
    1360          448 :             part_downloads.spawn(
    1361          448 :                 async move {
    1362          448 :                     debug!("starting index part download");
    1363              : 
    1364         2942 :                     let index_part = client.download_index_file(&cancel_clone).await;
    1365              : 
    1366          448 :                     debug!("finished index part download");
    1367              : 
    1368          448 :                     Result::<_, anyhow::Error>::Ok(TimelinePreload {
    1369          448 :                         client,
    1370          448 :                         timeline_id,
    1371          448 :                         index_part,
    1372          448 :                     })
    1373          448 :                 }
    1374          448 :                 .map(move |res| {
    1375          448 :                     res.with_context(|| format!("download index part for timeline {timeline_id}"))
    1376          448 :                 })
    1377          448 :                 .instrument(info_span!("download_index_part", %timeline_id)),
    1378              :             );
    1379              :         }
    1380              : 
    1381          866 :         let mut timeline_preloads: HashMap<TimelineId, TimelinePreload> = HashMap::new();
    1382              : 
    1383         1314 :         loop {
    1384         1734 :             tokio::select!(
    1385         1314 :                 next = part_downloads.join_next() => {
    1386              :                     match next {
    1387              :                         Some(result) => {
    1388              :                             let preload_result = result.context("join preload task")?;
    1389              :                             let preload = preload_result?;
    1390              :                             timeline_preloads.insert(preload.timeline_id, preload);
    1391              :                         },
    1392              :                         None => {
    1393              :                             break;
    1394              :                         }
    1395              :                     }
    1396              :                 },
    1397              :                 _ = cancel.cancelled() => {
    1398              :                     anyhow::bail!("Cancelled while waiting for remote index download")
    1399              :                 }
    1400         1314 :             )
    1401         1314 :         }
    1402              : 
    1403          866 :         Ok(timeline_preloads)
    1404          866 :     }
    1405              : 
    1406              :     ///
    1407              :     /// Background task to load in-memory data structures for this tenant, from
    1408              :     /// files on disk. Used at pageserver startup.
    1409              :     ///
    1410              :     /// No background tasks are started as part of this routine.
    1411           80 :     async fn load_local(self: &Arc<Tenant>, ctx: &RequestContext) -> anyhow::Result<()> {
    1412           80 :         span::debug_assert_current_span_has_tenant_id();
    1413              : 
    1414            0 :         debug!("loading tenant task");
    1415              : 
    1416              :         // Load in-memory state to reflect the local files on disk
    1417              :         //
    1418              :         // Scan the directory, peek into the metadata file of each timeline, and
    1419              :         // collect a list of timelines and their ancestors.
    1420           80 :         let span = info_span!("blocking");
    1421           80 :         let cloned = Arc::clone(self);
    1422              : 
    1423           80 :         let scan = tokio::task::spawn_blocking(move || {
    1424           80 :             let _g = span.entered();
    1425           80 :             cloned.scan_and_sort_timelines_dir()
    1426           80 :         })
    1427           79 :         .await
    1428           80 :         .context("load spawn_blocking")
    1429           80 :         .and_then(|res| res)?;
    1430              : 
    1431              :         // FIXME original collect_timeline_files contained one more check:
    1432              :         //    1. "Timeline has no ancestor and no layer files"
    1433              : 
    1434              :         // Process loadable timelines first
    1435           78 :         for (timeline_id, local_metadata) in scan.sorted_timelines_to_load {
    1436            0 :             if let Err(e) = self
    1437            0 :                 .load_local_timeline(timeline_id, local_metadata, ctx, false)
    1438            0 :                 .await
    1439              :             {
    1440            0 :                 match e {
    1441            0 :                     LoadLocalTimelineError::Load(source) => {
    1442            0 :                         return Err(anyhow::anyhow!(source)).with_context(|| {
    1443            0 :                             format!("Failed to load local timeline: {timeline_id}")
    1444            0 :                         })
    1445              :                     }
    1446            0 :                     LoadLocalTimelineError::ResumeDeletion(source) => {
    1447              :                         // Make sure resumed deletion wont fail loading for entire tenant.
    1448            0 :                         error!("Failed to resume timeline deletion: {source:#}")
    1449              :                     }
    1450              :                 }
    1451            0 :             }
    1452              :         }
    1453              : 
    1454              :         // Resume deletion ones with deleted_mark
    1455           78 :         for (timeline_id, maybe_local_metadata) in scan.timelines_to_resume_deletion {
    1456            0 :             match maybe_local_metadata {
    1457              :                 None => {
    1458              :                     // See comment in `scan_and_sort_timelines_dir`.
    1459            0 :                     if let Err(e) =
    1460            0 :                         DeleteTimelineFlow::cleanup_remaining_timeline_fs_traces(self, timeline_id)
    1461            0 :                             .await
    1462              :                     {
    1463            0 :                         warn!(
    1464            0 :                             "cannot clean up deleted timeline dir timeline_id: {} error: {:#}",
    1465            0 :                             timeline_id, e
    1466            0 :                         );
    1467            0 :                     }
    1468              :                 }
    1469            0 :                 Some(local_metadata) => {
    1470            0 :                     if let Err(e) = self
    1471            0 :                         .load_local_timeline(timeline_id, local_metadata, ctx, true)
    1472            0 :                         .await
    1473              :                     {
    1474            0 :                         match e {
    1475            0 :                             LoadLocalTimelineError::Load(source) => {
    1476            0 :                                 // We tried to load deleted timeline, this is a bug.
    1477            0 :                                 return Err(anyhow::anyhow!(source).context(
    1478            0 :                                     format!("This is a bug. We tried to load deleted timeline which is wrong and loading failed. Timeline: {timeline_id}")
    1479            0 :                                 ));
    1480              :                             }
    1481            0 :                             LoadLocalTimelineError::ResumeDeletion(source) => {
    1482              :                                 // Make sure resumed deletion wont fail loading for entire tenant.
    1483            0 :                                 error!("Failed to resume timeline deletion: {source:#}")
    1484              :                             }
    1485              :                         }
    1486            0 :                     }
    1487              :                 }
    1488              :             }
    1489              :         }
    1490              : 
    1491            0 :         trace!("Done");
    1492              : 
    1493           78 :         Ok(())
    1494           80 :     }
    1495              : 
    1496              :     /// Subroutine of `load_tenant`, to load an individual timeline
    1497              :     ///
    1498              :     /// NB: The parent is assumed to be already loaded!
    1499            0 :     #[instrument(skip(self, local_metadata, ctx))]
    1500              :     async fn load_local_timeline(
    1501              :         self: &Arc<Self>,
    1502              :         timeline_id: TimelineId,
    1503              :         local_metadata: TimelineMetadata,
    1504              :         ctx: &RequestContext,
    1505              :         found_delete_mark: bool,
    1506              :     ) -> Result<(), LoadLocalTimelineError> {
    1507              :         span::debug_assert_current_span_has_tenant_id();
    1508              : 
    1509              :         let resources = self.build_timeline_resources(timeline_id);
    1510              : 
    1511              :         if found_delete_mark {
    1512              :             // There is no remote client, we found local metadata.
    1513              :             // Continue cleaning up local disk.
    1514              :             DeleteTimelineFlow::resume_deletion(
    1515              :                 Arc::clone(self),
    1516              :                 timeline_id,
    1517              :                 &local_metadata,
    1518              :                 None,
    1519              :                 self.deletion_queue_client.clone(),
    1520              :             )
    1521              :             .await
    1522              :             .context("resume deletion")
    1523              :             .map_err(LoadLocalTimelineError::ResumeDeletion)?;
    1524              :             return Ok(());
    1525              :         }
    1526              : 
    1527              :         let ancestor = if let Some(ancestor_timeline_id) = local_metadata.ancestor_timeline() {
    1528              :             let ancestor_timeline = self.get_timeline(ancestor_timeline_id, false)
    1529            0 :                 .with_context(|| anyhow::anyhow!("cannot find ancestor timeline {ancestor_timeline_id} for timeline {timeline_id}"))
    1530              :                 .map_err(LoadLocalTimelineError::Load)?;
    1531              :             Some(ancestor_timeline)
    1532              :         } else {
    1533              :             None
    1534              :         };
    1535              : 
    1536              :         self.timeline_init_and_sync(timeline_id, resources, None, local_metadata, ancestor, ctx)
    1537              :             .await
    1538              :             .map_err(LoadLocalTimelineError::Load)
    1539              :     }
    1540              : 
    1541          678 :     pub(crate) fn tenant_shard_id(&self) -> TenantShardId {
    1542          678 :         self.tenant_shard_id
    1543          678 :     }
    1544              : 
    1545              :     /// Get Timeline handle for given Neon timeline ID.
    1546              :     /// This function is idempotent. It doesn't change internal state in any way.
    1547        17743 :     pub fn get_timeline(
    1548        17743 :         &self,
    1549        17743 :         timeline_id: TimelineId,
    1550        17743 :         active_only: bool,
    1551        17743 :     ) -> Result<Arc<Timeline>, GetTimelineError> {
    1552        17743 :         let timelines_accessor = self.timelines.lock().unwrap();
    1553        17743 :         let timeline = timelines_accessor
    1554        17743 :             .get(&timeline_id)
    1555        17743 :             .ok_or(GetTimelineError::NotFound {
    1556        17743 :                 tenant_id: self.tenant_shard_id,
    1557        17743 :                 timeline_id,
    1558        17743 :             })?;
    1559              : 
    1560        17690 :         if active_only && !timeline.is_active() {
    1561            6 :             Err(GetTimelineError::NotActive {
    1562            6 :                 tenant_id: self.tenant_shard_id,
    1563            6 :                 timeline_id,
    1564            6 :                 state: timeline.current_state(),
    1565            6 :             })
    1566              :         } else {
    1567        17684 :             Ok(Arc::clone(timeline))
    1568              :         }
    1569        17743 :     }
    1570              : 
    1571              :     /// Lists timelines the tenant contains.
    1572              :     /// Up to tenant's implementation to omit certain timelines that ar not considered ready for use.
    1573          763 :     pub fn list_timelines(&self) -> Vec<Arc<Timeline>> {
    1574          763 :         self.timelines
    1575          763 :             .lock()
    1576          763 :             .unwrap()
    1577          763 :             .values()
    1578          763 :             .map(Arc::clone)
    1579          763 :             .collect()
    1580          763 :     }
    1581              : 
    1582          479 :     pub fn list_timeline_ids(&self) -> Vec<TimelineId> {
    1583          479 :         self.timelines.lock().unwrap().keys().cloned().collect()
    1584          479 :     }
    1585              : 
    1586              :     /// This is used to create the initial 'main' timeline during bootstrapping,
    1587              :     /// or when importing a new base backup. The caller is expected to load an
    1588              :     /// initial image of the datadir to the new timeline after this.
    1589              :     ///
    1590              :     /// Until that happens, the on-disk state is invalid (disk_consistent_lsn=Lsn(0))
    1591              :     /// and the timeline will fail to load at a restart.
    1592              :     ///
    1593              :     /// That's why we add an uninit mark file, and wrap it together witht the Timeline
    1594              :     /// in-memory object into UninitializedTimeline.
    1595              :     /// Once the caller is done setting up the timeline, they should call
    1596              :     /// `UninitializedTimeline::initialize_with_lock` to remove the uninit mark.
    1597              :     ///
    1598              :     /// For tests, use `DatadirModification::init_empty_test_timeline` + `commit` to setup the
    1599              :     /// minimum amount of keys required to get a writable timeline.
    1600              :     /// (Without it, `put` might fail due to `repartition` failing.)
    1601           86 :     pub(crate) async fn create_empty_timeline(
    1602           86 :         &self,
    1603           86 :         new_timeline_id: TimelineId,
    1604           86 :         initdb_lsn: Lsn,
    1605           86 :         pg_version: u32,
    1606           86 :         _ctx: &RequestContext,
    1607           86 :     ) -> anyhow::Result<UninitializedTimeline> {
    1608           86 :         anyhow::ensure!(
    1609           86 :             self.is_active(),
    1610            0 :             "Cannot create empty timelines on inactive tenant"
    1611              :         );
    1612              : 
    1613           86 :         let timeline_uninit_mark = self.create_timeline_uninit_mark(new_timeline_id)?;
    1614           84 :         let new_metadata = TimelineMetadata::new(
    1615           84 :             // Initialize disk_consistent LSN to 0, The caller must import some data to
    1616           84 :             // make it valid, before calling finish_creation()
    1617           84 :             Lsn(0),
    1618           84 :             None,
    1619           84 :             None,
    1620           84 :             Lsn(0),
    1621           84 :             initdb_lsn,
    1622           84 :             initdb_lsn,
    1623           84 :             pg_version,
    1624           84 :         );
    1625           84 :         self.prepare_new_timeline(
    1626           84 :             new_timeline_id,
    1627           84 :             &new_metadata,
    1628           84 :             timeline_uninit_mark,
    1629           84 :             initdb_lsn,
    1630           84 :             None,
    1631           84 :         )
    1632          108 :         .await
    1633           86 :     }
    1634              : 
    1635              :     /// Helper for unit tests to create an empty timeline.
    1636              :     ///
    1637              :     /// The timeline is has state value `Active` but its background loops are not running.
    1638              :     // This makes the various functions which anyhow::ensure! for Active state work in tests.
    1639              :     // Our current tests don't need the background loops.
    1640              :     #[cfg(test)]
    1641           68 :     pub async fn create_test_timeline(
    1642           68 :         &self,
    1643           68 :         new_timeline_id: TimelineId,
    1644           68 :         initdb_lsn: Lsn,
    1645           68 :         pg_version: u32,
    1646           68 :         ctx: &RequestContext,
    1647           68 :     ) -> anyhow::Result<Arc<Timeline>> {
    1648           68 :         let uninit_tl = self
    1649           68 :             .create_empty_timeline(new_timeline_id, initdb_lsn, pg_version, ctx)
    1650          102 :             .await?;
    1651           68 :         let tline = uninit_tl.raw_timeline().expect("we just created it");
    1652           68 :         assert_eq!(tline.get_last_record_lsn(), Lsn(0));
    1653              : 
    1654              :         // Setup minimum keys required for the timeline to be usable.
    1655           68 :         let mut modification = tline.begin_modification(initdb_lsn);
    1656           68 :         modification
    1657           68 :             .init_empty_test_timeline()
    1658           68 :             .context("init_empty_test_timeline")?;
    1659           68 :         modification
    1660           68 :             .commit(ctx)
    1661           34 :             .await
    1662           68 :             .context("commit init_empty_test_timeline modification")?;
    1663              : 
    1664              :         // Flush to disk so that uninit_tl's check for valid disk_consistent_lsn passes.
    1665           68 :         tline.maybe_spawn_flush_loop();
    1666           68 :         tline.freeze_and_flush().await.context("freeze_and_flush")?;
    1667              : 
    1668              :         // Make sure the freeze_and_flush reaches remote storage.
    1669           68 :         tline
    1670           68 :             .remote_client
    1671           68 :             .as_ref()
    1672           68 :             .unwrap()
    1673           68 :             .wait_completion()
    1674           30 :             .await
    1675           68 :             .unwrap();
    1676              : 
    1677           68 :         let tl = uninit_tl.finish_creation()?;
    1678              :         // The non-test code would call tl.activate() here.
    1679           68 :         tl.set_state(TimelineState::Active);
    1680           68 :         Ok(tl)
    1681           68 :     }
    1682              : 
    1683              :     /// Create a new timeline.
    1684              :     ///
    1685              :     /// Returns the new timeline ID and reference to its Timeline object.
    1686              :     ///
    1687              :     /// If the caller specified the timeline ID to use (`new_timeline_id`), and timeline with
    1688              :     /// the same timeline ID already exists, returns CreateTimelineError::AlreadyExists.
    1689              :     #[allow(clippy::too_many_arguments)]
    1690          894 :     pub(crate) async fn create_timeline(
    1691          894 :         &self,
    1692          894 :         new_timeline_id: TimelineId,
    1693          894 :         ancestor_timeline_id: Option<TimelineId>,
    1694          894 :         mut ancestor_start_lsn: Option<Lsn>,
    1695          894 :         pg_version: u32,
    1696          894 :         load_existing_initdb: Option<TimelineId>,
    1697          894 :         broker_client: storage_broker::BrokerClientChannel,
    1698          894 :         ctx: &RequestContext,
    1699          894 :     ) -> Result<Arc<Timeline>, CreateTimelineError> {
    1700          894 :         if !self.is_active() {
    1701            0 :             if matches!(self.current_state(), TenantState::Stopping { .. }) {
    1702            0 :                 return Err(CreateTimelineError::ShuttingDown);
    1703              :             } else {
    1704            0 :                 return Err(CreateTimelineError::Other(anyhow::anyhow!(
    1705            0 :                     "Cannot create timelines on inactive tenant"
    1706            0 :                 )));
    1707              :             }
    1708          894 :         }
    1709              : 
    1710          894 :         let _gate = self
    1711          894 :             .gate
    1712          894 :             .enter()
    1713          894 :             .map_err(|_| CreateTimelineError::ShuttingDown)?;
    1714              : 
    1715              :         // Get exclusive access to the timeline ID: this ensures that it does not already exist,
    1716              :         // and that no other creation attempts will be allowed in while we are working.  The
    1717              :         // uninit_mark is a guard.
    1718          894 :         let uninit_mark = match self.create_timeline_uninit_mark(new_timeline_id) {
    1719          866 :             Ok(m) => m,
    1720              :             Err(TimelineExclusionError::AlreadyCreating) => {
    1721              :                 // Creation is in progress, we cannot create it again, and we cannot
    1722              :                 // check if this request matches the existing one, so caller must try
    1723              :                 // again later.
    1724            0 :                 return Err(CreateTimelineError::AlreadyCreating);
    1725              :             }
    1726            0 :             Err(TimelineExclusionError::Other(e)) => {
    1727            0 :                 return Err(CreateTimelineError::Other(e));
    1728              :             }
    1729           28 :             Err(TimelineExclusionError::AlreadyExists(existing)) => {
    1730            0 :                 debug!("timeline {new_timeline_id} already exists");
    1731              : 
    1732              :                 // Idempotency: creating the same timeline twice is not an error, unless
    1733              :                 // the second creation has different parameters.
    1734           28 :                 if existing.get_ancestor_timeline_id() != ancestor_timeline_id
    1735           28 :                     || existing.pg_version != pg_version
    1736           28 :                     || (ancestor_start_lsn.is_some()
    1737            0 :                         && ancestor_start_lsn != Some(existing.get_ancestor_lsn()))
    1738              :                 {
    1739            0 :                     return Err(CreateTimelineError::Conflict);
    1740           28 :                 }
    1741              : 
    1742           28 :                 if let Some(remote_client) = existing.remote_client.as_ref() {
    1743              :                     // Wait for uploads to complete, so that when we return Ok, the timeline
    1744              :                     // is known to be durable on remote storage. Just like we do at the end of
    1745              :                     // this function, after we have created the timeline ourselves.
    1746              :                     //
    1747              :                     // We only really care that the initial version of `index_part.json` has
    1748              :                     // been uploaded. That's enough to remember that the timeline
    1749              :                     // exists. However, there is no function to wait specifically for that so
    1750              :                     // we just wait for all in-progress uploads to finish.
    1751           28 :                     remote_client
    1752           28 :                         .wait_completion()
    1753            0 :                         .await
    1754           28 :                         .context("wait for timeline uploads to complete")?;
    1755            0 :                 }
    1756              : 
    1757           28 :                 return Ok(existing);
    1758              :             }
    1759              :         };
    1760              : 
    1761          866 :         let loaded_timeline = match ancestor_timeline_id {
    1762          268 :             Some(ancestor_timeline_id) => {
    1763          268 :                 let ancestor_timeline = self
    1764          268 :                     .get_timeline(ancestor_timeline_id, false)
    1765          268 :                     .context("Cannot branch off the timeline that's not present in pageserver")?;
    1766              : 
    1767              :                 // instead of waiting around, just deny the request because ancestor is not yet
    1768              :                 // ready for other purposes either.
    1769          268 :                 if !ancestor_timeline.is_active() {
    1770            6 :                     return Err(CreateTimelineError::AncestorNotActive);
    1771          262 :                 }
    1772              : 
    1773          262 :                 if let Some(lsn) = ancestor_start_lsn.as_mut() {
    1774           33 :                     *lsn = lsn.align();
    1775           33 : 
    1776           33 :                     let ancestor_ancestor_lsn = ancestor_timeline.get_ancestor_lsn();
    1777           33 :                     if ancestor_ancestor_lsn > *lsn {
    1778              :                         // can we safely just branch from the ancestor instead?
    1779            2 :                         return Err(CreateTimelineError::AncestorLsn(anyhow::anyhow!(
    1780            2 :                             "invalid start lsn {} for ancestor timeline {}: less than timeline ancestor lsn {}",
    1781            2 :                             lsn,
    1782            2 :                             ancestor_timeline_id,
    1783            2 :                             ancestor_ancestor_lsn,
    1784            2 :                         )));
    1785           31 :                     }
    1786           31 : 
    1787           31 :                     // Wait for the WAL to arrive and be processed on the parent branch up
    1788           31 :                     // to the requested branch point. The repository code itself doesn't
    1789           31 :                     // require it, but if we start to receive WAL on the new timeline,
    1790           31 :                     // decoding the new WAL might need to look up previous pages, relation
    1791           31 :                     // sizes etc. and that would get confused if the previous page versions
    1792           31 :                     // are not in the repository yet.
    1793           31 :                     ancestor_timeline
    1794           31 :                         .wait_lsn(*lsn, ctx)
    1795            7 :                         .await
    1796           31 :                         .map_err(|e| match e {
    1797            0 :                             e @ (WaitLsnError::Timeout(_) | WaitLsnError::BadState) => {
    1798            0 :                                 CreateTimelineError::AncestorLsn(anyhow::anyhow!(e))
    1799              :                             }
    1800            0 :                             WaitLsnError::Shutdown => CreateTimelineError::ShuttingDown,
    1801           31 :                         })?;
    1802          229 :                 }
    1803              : 
    1804          260 :                 self.branch_timeline(
    1805          260 :                     &ancestor_timeline,
    1806          260 :                     new_timeline_id,
    1807          260 :                     ancestor_start_lsn,
    1808          260 :                     uninit_mark,
    1809          260 :                     ctx,
    1810          260 :                 )
    1811            6 :                 .await?
    1812              :             }
    1813              :             None => {
    1814          598 :                 self.bootstrap_timeline(
    1815          598 :                     new_timeline_id,
    1816          598 :                     pg_version,
    1817          598 :                     load_existing_initdb,
    1818          598 :                     uninit_mark,
    1819          598 :                     ctx,
    1820          598 :                 )
    1821      7682544 :                 .await?
    1822              :             }
    1823              :         };
    1824              : 
    1825              :         // At this point we have dropped our guard on [`Self::timelines_creating`], and
    1826              :         // the timeline is visible in [`Self::timelines`], but it is _not_ durable yet.  We must
    1827              :         // not send a success to the caller until it is.  The same applies to handling retries,
    1828              :         // see the handling of [`TimelineExclusionError::AlreadyExists`] above.
    1829          844 :         if let Some(remote_client) = loaded_timeline.remote_client.as_ref() {
    1830          844 :             let kind = ancestor_timeline_id
    1831          844 :                 .map(|_| "branched")
    1832          844 :                 .unwrap_or("bootstrapped");
    1833          844 :             remote_client.wait_completion().await.with_context(|| {
    1834            0 :                 format!("wait for {} timeline initial uploads to complete", kind)
    1835          840 :             })?;
    1836            0 :         }
    1837              : 
    1838          840 :         loaded_timeline.activate(broker_client, None, ctx);
    1839          840 : 
    1840          840 :         Ok(loaded_timeline)
    1841          888 :     }
    1842              : 
    1843           64 :     pub(crate) async fn delete_timeline(
    1844           64 :         self: Arc<Self>,
    1845           64 :         timeline_id: TimelineId,
    1846           64 :     ) -> Result<(), DeleteTimelineError> {
    1847          370 :         DeleteTimelineFlow::run(&self, timeline_id, false).await?;
    1848              : 
    1849           53 :         Ok(())
    1850           64 :     }
    1851              : 
    1852              :     /// perform one garbage collection iteration, removing old data files from disk.
    1853              :     /// this function is periodically called by gc task.
    1854              :     /// also it can be explicitly requested through page server api 'do_gc' command.
    1855              :     ///
    1856              :     /// `target_timeline_id` specifies the timeline to GC, or None for all.
    1857              :     ///
    1858              :     /// The `horizon` an `pitr` parameters determine how much WAL history needs to be retained.
    1859              :     /// Also known as the retention period, or the GC cutoff point. `horizon` specifies
    1860              :     /// the amount of history, as LSN difference from current latest LSN on each timeline.
    1861              :     /// `pitr` specifies the same as a time difference from the current time. The effective
    1862              :     /// GC cutoff point is determined conservatively by either `horizon` and `pitr`, whichever
    1863              :     /// requires more history to be retained.
    1864              :     //
    1865          405 :     pub async fn gc_iteration(
    1866          405 :         &self,
    1867          405 :         target_timeline_id: Option<TimelineId>,
    1868          405 :         horizon: u64,
    1869          405 :         pitr: Duration,
    1870          405 :         cancel: &CancellationToken,
    1871          405 :         ctx: &RequestContext,
    1872          405 :     ) -> anyhow::Result<GcResult> {
    1873          405 :         // Don't start doing work during shutdown
    1874          405 :         if let TenantState::Stopping { .. } = self.current_state() {
    1875            0 :             return Ok(GcResult::default());
    1876          405 :         }
    1877          405 : 
    1878          405 :         // there is a global allowed_error for this
    1879          405 :         anyhow::ensure!(
    1880          405 :             self.is_active(),
    1881            0 :             "Cannot run GC iteration on inactive tenant"
    1882              :         );
    1883              : 
    1884              :         {
    1885          405 :             let conf = self.tenant_conf.read().unwrap();
    1886          405 : 
    1887          405 :             if !conf.location.may_delete_layers_hint() {
    1888            1 :                 info!("Skipping GC in location state {:?}", conf.location);
    1889            1 :                 return Ok(GcResult::default());
    1890          404 :             }
    1891          404 :         }
    1892          404 : 
    1893          404 :         self.gc_iteration_internal(target_timeline_id, horizon, pitr, cancel, ctx)
    1894       265132 :             .await
    1895          403 :     }
    1896              : 
    1897              :     /// Perform one compaction iteration.
    1898              :     /// This function is periodically called by compactor task.
    1899              :     /// Also it can be explicitly requested per timeline through page server
    1900              :     /// api's 'compact' command.
    1901          396 :     async fn compaction_iteration(
    1902          396 :         &self,
    1903          396 :         cancel: &CancellationToken,
    1904          396 :         ctx: &RequestContext,
    1905          396 :     ) -> anyhow::Result<(), timeline::CompactionError> {
    1906          396 :         // Don't start doing work during shutdown, or when broken, we do not need those in the logs
    1907          396 :         if !self.is_active() {
    1908            1 :             return Ok(());
    1909          395 :         }
    1910          395 : 
    1911          395 :         {
    1912          395 :             let conf = self.tenant_conf.read().unwrap();
    1913          395 :             if !conf.location.may_delete_layers_hint() || !conf.location.may_upload_layers_hint() {
    1914            8 :                 info!("Skipping compaction in location state {:?}", conf.location);
    1915            8 :                 return Ok(());
    1916          387 :             }
    1917          387 :         }
    1918          387 : 
    1919          387 :         // Scan through the hashmap and collect a list of all the timelines,
    1920          387 :         // while holding the lock. Then drop the lock and actually perform the
    1921          387 :         // compactions.  We don't want to block everything else while the
    1922          387 :         // compaction runs.
    1923          387 :         let timelines_to_compact = {
    1924          387 :             let timelines = self.timelines.lock().unwrap();
    1925          387 :             let timelines_to_compact = timelines
    1926          387 :                 .iter()
    1927          551 :                 .filter_map(|(timeline_id, timeline)| {
    1928          551 :                     if timeline.is_active() {
    1929          548 :                         Some((*timeline_id, timeline.clone()))
    1930              :                     } else {
    1931            3 :                         None
    1932              :                     }
    1933          551 :                 })
    1934          387 :                 .collect::<Vec<_>>();
    1935          387 :             drop(timelines);
    1936          387 :             timelines_to_compact
    1937              :         };
    1938              : 
    1939          934 :         for (timeline_id, timeline) in &timelines_to_compact {
    1940          548 :             timeline
    1941          548 :                 .compact(cancel, EnumSet::empty(), ctx)
    1942          548 :                 .instrument(info_span!("compact_timeline", %timeline_id))
    1943        97251 :                 .await?;
    1944              :         }
    1945              : 
    1946          386 :         Ok(())
    1947          395 :     }
    1948              : 
    1949        34474 :     pub fn current_state(&self) -> TenantState {
    1950        34474 :         self.state.borrow().clone()
    1951        34474 :     }
    1952              : 
    1953         2725 :     pub fn is_active(&self) -> bool {
    1954         2725 :         self.current_state() == TenantState::Active
    1955         2725 :     }
    1956              : 
    1957          739 :     pub fn generation(&self) -> Generation {
    1958          739 :         self.generation
    1959          739 :     }
    1960              : 
    1961          479 :     pub(crate) fn wal_redo_manager_status(&self) -> Option<WalRedoManagerStatus> {
    1962          479 :         self.walredo_mgr.as_ref().and_then(|mgr| mgr.status())
    1963          479 :     }
    1964              : 
    1965              :     /// Changes tenant status to active, unless shutdown was already requested.
    1966              :     ///
    1967              :     /// `background_jobs_can_start` is an optional barrier set to a value during pageserver startup
    1968              :     /// to delay background jobs. Background jobs can be started right away when None is given.
    1969          856 :     fn activate(
    1970          856 :         self: &Arc<Self>,
    1971          856 :         broker_client: BrokerClientChannel,
    1972          856 :         background_jobs_can_start: Option<&completion::Barrier>,
    1973          856 :         ctx: &RequestContext,
    1974          856 :     ) {
    1975          856 :         span::debug_assert_current_span_has_tenant_id();
    1976          856 : 
    1977          856 :         let mut activating = false;
    1978          856 :         self.state.send_modify(|current_state| {
    1979          856 :             use pageserver_api::models::ActivatingFrom;
    1980          856 :             match &*current_state {
    1981              :                 TenantState::Activating(_) | TenantState::Active | TenantState::Broken { .. } | TenantState::Stopping { .. } => {
    1982            0 :                     panic!("caller is responsible for calling activate() only on Loading / Attaching tenants, got {state:?}", state = current_state);
    1983              :                 }
    1984            0 :                 TenantState::Loading => {
    1985            0 :                     *current_state = TenantState::Activating(ActivatingFrom::Loading);
    1986            0 :                 }
    1987          856 :                 TenantState::Attaching => {
    1988          856 :                     *current_state = TenantState::Activating(ActivatingFrom::Attaching);
    1989          856 :                 }
    1990              :             }
    1991          856 :             debug!(tenant_id = %self.tenant_shard_id.tenant_id, shard_id = %self.tenant_shard_id.shard_slug(), "Activating tenant");
    1992          856 :             activating = true;
    1993          856 :             // Continue outside the closure. We need to grab timelines.lock()
    1994          856 :             // and we plan to turn it into a tokio::sync::Mutex in a future patch.
    1995          856 :         });
    1996          856 : 
    1997          856 :         if activating {
    1998          856 :             let timelines_accessor = self.timelines.lock().unwrap();
    1999          856 :             let timelines_to_activate = timelines_accessor
    2000          856 :                 .values()
    2001          856 :                 .filter(|timeline| !(timeline.is_broken() || timeline.is_stopping()));
    2002          856 : 
    2003          856 :             // Spawn gc and compaction loops. The loops will shut themselves
    2004          856 :             // down when they notice that the tenant is inactive.
    2005          856 :             tasks::start_background_loops(self, background_jobs_can_start);
    2006          856 : 
    2007          856 :             let mut activated_timelines = 0;
    2008              : 
    2009         1263 :             for timeline in timelines_to_activate {
    2010          407 :                 timeline.activate(broker_client.clone(), background_jobs_can_start, ctx);
    2011          407 :                 activated_timelines += 1;
    2012          407 :             }
    2013              : 
    2014          856 :             self.state.send_modify(move |current_state| {
    2015          856 :                 assert!(
    2016          856 :                     matches!(current_state, TenantState::Activating(_)),
    2017            0 :                     "set_stopping and set_broken wait for us to leave Activating state",
    2018              :                 );
    2019          856 :                 *current_state = TenantState::Active;
    2020          856 : 
    2021          856 :                 let elapsed = self.constructed_at.elapsed();
    2022          856 :                 let total_timelines = timelines_accessor.len();
    2023          856 : 
    2024          856 :                 // log a lot of stuff, because some tenants sometimes suffer from user-visible
    2025          856 :                 // times to activate. see https://github.com/neondatabase/neon/issues/4025
    2026          856 :                 info!(
    2027          856 :                     since_creation_millis = elapsed.as_millis(),
    2028          856 :                     tenant_id = %self.tenant_shard_id.tenant_id,
    2029          856 :                     shard_id = %self.tenant_shard_id.shard_slug(),
    2030          856 :                     activated_timelines,
    2031          856 :                     total_timelines,
    2032          856 :                     post_state = <&'static str>::from(&*current_state),
    2033          856 :                     "activation attempt finished"
    2034          856 :                 );
    2035              : 
    2036          856 :                 TENANT.activation.observe(elapsed.as_secs_f64());
    2037          856 :             });
    2038            0 :         }
    2039          856 :     }
    2040              : 
    2041              :     /// Shutdown the tenant and join all of the spawned tasks.
    2042              :     ///
    2043              :     /// The method caters for all use-cases:
    2044              :     /// - pageserver shutdown (freeze_and_flush == true)
    2045              :     /// - detach + ignore (freeze_and_flush == false)
    2046              :     ///
    2047              :     /// This will attempt to shutdown even if tenant is broken.
    2048              :     ///
    2049              :     /// `shutdown_progress` is a [`completion::Barrier`] for the shutdown initiated by this call.
    2050              :     /// If the tenant is already shutting down, we return a clone of the first shutdown call's
    2051              :     /// `Barrier` as an `Err`. This not-first caller can use the returned barrier to join with
    2052              :     /// the ongoing shutdown.
    2053          464 :     async fn shutdown(
    2054          464 :         &self,
    2055          464 :         shutdown_progress: completion::Barrier,
    2056          464 :         freeze_and_flush: bool,
    2057          464 :     ) -> Result<(), completion::Barrier> {
    2058          464 :         span::debug_assert_current_span_has_tenant_id();
    2059          464 : 
    2060          464 :         // Set tenant (and its timlines) to Stoppping state.
    2061          464 :         //
    2062          464 :         // Since we can only transition into Stopping state after activation is complete,
    2063          464 :         // run it in a JoinSet so all tenants have a chance to stop before we get SIGKILLed.
    2064          464 :         //
    2065          464 :         // Transitioning tenants to Stopping state has a couple of non-obvious side effects:
    2066          464 :         // 1. Lock out any new requests to the tenants.
    2067          464 :         // 2. Signal cancellation to WAL receivers (we wait on it below).
    2068          464 :         // 3. Signal cancellation for other tenant background loops.
    2069          464 :         // 4. ???
    2070          464 :         //
    2071          464 :         // The waiting for the cancellation is not done uniformly.
    2072          464 :         // We certainly wait for WAL receivers to shut down.
    2073          464 :         // That is necessary so that no new data comes in before the freeze_and_flush.
    2074          464 :         // But the tenant background loops are joined-on in our caller.
    2075          464 :         // It's mesed up.
    2076          464 :         // we just ignore the failure to stop
    2077          464 : 
    2078          464 :         // If we're still attaching, fire the cancellation token early to drop out: this
    2079          464 :         // will prevent us flushing, but ensures timely shutdown if some I/O during attach
    2080          464 :         // is very slow.
    2081          464 :         if matches!(self.current_state(), TenantState::Attaching) {
    2082           14 :             self.cancel.cancel();
    2083          450 :         }
    2084              : 
    2085          464 :         match self.set_stopping(shutdown_progress, false, false).await {
    2086          407 :             Ok(()) => {}
    2087           57 :             Err(SetStoppingError::Broken) => {
    2088           57 :                 // assume that this is acceptable
    2089           57 :             }
    2090            0 :             Err(SetStoppingError::AlreadyStopping(other)) => {
    2091              :                 // give caller the option to wait for this this shutdown
    2092            0 :                 info!("Tenant::shutdown: AlreadyStopping");
    2093            0 :                 return Err(other);
    2094              :             }
    2095              :         };
    2096              : 
    2097          464 :         let mut js = tokio::task::JoinSet::new();
    2098          464 :         {
    2099          464 :             let timelines = self.timelines.lock().unwrap();
    2100          590 :             timelines.values().for_each(|timeline| {
    2101          590 :                 let timeline = Arc::clone(timeline);
    2102          590 :                 let timeline_id = timeline.timeline_id;
    2103              : 
    2104          590 :                 let span =
    2105          590 :                     tracing::info_span!("timeline_shutdown", %timeline_id, ?freeze_and_flush);
    2106          590 :                 js.spawn(async move {
    2107          590 :                     if freeze_and_flush {
    2108          888 :                         timeline.flush_and_shutdown().instrument(span).await
    2109              :                     } else {
    2110          697 :                         timeline.shutdown().instrument(span).await
    2111              :                     }
    2112          590 :                 });
    2113          590 :             })
    2114              :         };
    2115              :         // test_long_timeline_create_then_tenant_delete is leaning on this message
    2116          464 :         tracing::info!("Waiting for timelines...");
    2117         1054 :         while let Some(res) = js.join_next().await {
    2118            0 :             match res {
    2119          590 :                 Ok(()) => {}
    2120            0 :                 Err(je) if je.is_cancelled() => unreachable!("no cancelling used"),
    2121            0 :                 Err(je) if je.is_panic() => { /* logged already */ }
    2122            0 :                 Err(je) => warn!("unexpected JoinError: {je:?}"),
    2123              :             }
    2124              :         }
    2125              : 
    2126              :         // We cancel the Tenant's cancellation token _after_ the timelines have all shut down.  This permits
    2127              :         // them to continue to do work during their shutdown methods, e.g. flushing data.
    2128            0 :         tracing::debug!("Cancelling CancellationToken");
    2129          464 :         self.cancel.cancel();
    2130              : 
    2131              :         // shutdown all tenant and timeline tasks: gc, compaction, page service
    2132              :         // No new tasks will be started for this tenant because it's in `Stopping` state.
    2133              :         //
    2134              :         // this will additionally shutdown and await all timeline tasks.
    2135            0 :         tracing::debug!("Waiting for tasks...");
    2136          464 :         task_mgr::shutdown_tasks(None, Some(self.tenant_shard_id), None).await;
    2137              : 
    2138              :         // Wait for any in-flight operations to complete
    2139          464 :         self.gate.close().await;
    2140              : 
    2141          464 :         Ok(())
    2142          464 :     }
    2143              : 
    2144              :     /// Change tenant status to Stopping, to mark that it is being shut down.
    2145              :     ///
    2146              :     /// This function waits for the tenant to become active if it isn't already, before transitioning it into Stopping state.
    2147              :     ///
    2148              :     /// This function is not cancel-safe!
    2149              :     ///
    2150              :     /// `allow_transition_from_loading` is needed for the special case of loading task deleting the tenant.
    2151              :     /// `allow_transition_from_attaching` is needed for the special case of attaching deleted tenant.
    2152          485 :     async fn set_stopping(
    2153          485 :         &self,
    2154          485 :         progress: completion::Barrier,
    2155          485 :         allow_transition_from_loading: bool,
    2156          485 :         allow_transition_from_attaching: bool,
    2157          485 :     ) -> Result<(), SetStoppingError> {
    2158          485 :         let mut rx = self.state.subscribe();
    2159          485 : 
    2160          485 :         // cannot stop before we're done activating, so wait out until we're done activating
    2161          508 :         rx.wait_for(|state| match state {
    2162           21 :             TenantState::Attaching if allow_transition_from_attaching => true,
    2163              :             TenantState::Activating(_) | TenantState::Attaching => {
    2164           23 :                 info!(
    2165           23 :                     "waiting for {} to turn Active|Broken|Stopping",
    2166           23 :                     <&'static str>::from(state)
    2167           23 :                 );
    2168           23 :                 false
    2169              :             }
    2170            0 :             TenantState::Loading => allow_transition_from_loading,
    2171          464 :             TenantState::Active | TenantState::Broken { .. } | TenantState::Stopping { .. } => true,
    2172          508 :         })
    2173           23 :         .await
    2174          485 :         .expect("cannot drop self.state while on a &self method");
    2175          485 : 
    2176          485 :         // we now know we're done activating, let's see whether this task is the winner to transition into Stopping
    2177          485 :         let mut err = None;
    2178          485 :         let stopping = self.state.send_if_modified(|current_state| match current_state {
    2179              :             TenantState::Activating(_) => {
    2180            0 :                 unreachable!("1we ensured above that we're done with activation, and, there is no re-activation")
    2181              :             }
    2182              :             TenantState::Attaching => {
    2183           21 :                 if !allow_transition_from_attaching {
    2184            0 :                     unreachable!("2we ensured above that we're done with activation, and, there is no re-activation")
    2185           21 :                 };
    2186           21 :                 *current_state = TenantState::Stopping { progress };
    2187           21 :                 true
    2188              :             }
    2189              :             TenantState::Loading => {
    2190            0 :                 if !allow_transition_from_loading {
    2191            0 :                     unreachable!("3we ensured above that we're done with activation, and, there is no re-activation")
    2192            0 :                 };
    2193            0 :                 *current_state = TenantState::Stopping { progress };
    2194            0 :                 true
    2195              :             }
    2196              :             TenantState::Active => {
    2197              :                 // FIXME: due to time-of-check vs time-of-use issues, it can happen that new timelines
    2198              :                 // are created after the transition to Stopping. That's harmless, as the Timelines
    2199              :                 // won't be accessible to anyone afterwards, because the Tenant is in Stopping state.
    2200          407 :                 *current_state = TenantState::Stopping { progress };
    2201          407 :                 // Continue stopping outside the closure. We need to grab timelines.lock()
    2202          407 :                 // and we plan to turn it into a tokio::sync::Mutex in a future patch.
    2203          407 :                 true
    2204              :             }
    2205           57 :             TenantState::Broken { reason, .. } => {
    2206           57 :                 info!(
    2207           57 :                     "Cannot set tenant to Stopping state, it is in Broken state due to: {reason}"
    2208           57 :                 );
    2209           57 :                 err = Some(SetStoppingError::Broken);
    2210           57 :                 false
    2211              :             }
    2212            0 :             TenantState::Stopping { progress } => {
    2213            0 :                 info!("Tenant is already in Stopping state");
    2214            0 :                 err = Some(SetStoppingError::AlreadyStopping(progress.clone()));
    2215            0 :                 false
    2216              :             }
    2217          485 :         });
    2218          485 :         match (stopping, err) {
    2219          428 :             (true, None) => {} // continue
    2220           57 :             (false, Some(err)) => return Err(err),
    2221            0 :             (true, Some(_)) => unreachable!(
    2222            0 :                 "send_if_modified closure must error out if not transitioning to Stopping"
    2223            0 :             ),
    2224            0 :             (false, None) => unreachable!(
    2225            0 :                 "send_if_modified closure must return true if transitioning to Stopping"
    2226            0 :             ),
    2227              :         }
    2228              : 
    2229          428 :         let timelines_accessor = self.timelines.lock().unwrap();
    2230          428 :         let not_broken_timelines = timelines_accessor
    2231          428 :             .values()
    2232          530 :             .filter(|timeline| !timeline.is_broken());
    2233          949 :         for timeline in not_broken_timelines {
    2234          521 :             timeline.set_state(TimelineState::Stopping);
    2235          521 :         }
    2236          428 :         Ok(())
    2237          485 :     }
    2238              : 
    2239              :     /// Method for tenant::mgr to transition us into Broken state in case of a late failure in
    2240              :     /// `remove_tenant_from_memory`
    2241              :     ///
    2242              :     /// This function waits for the tenant to become active if it isn't already, before transitioning it into Stopping state.
    2243              :     ///
    2244              :     /// In tests, we also use this to set tenants to Broken state on purpose.
    2245           51 :     pub(crate) async fn set_broken(&self, reason: String) {
    2246           51 :         let mut rx = self.state.subscribe();
    2247           51 : 
    2248           51 :         // The load & attach routines own the tenant state until it has reached `Active`.
    2249           51 :         // So, wait until it's done.
    2250           51 :         rx.wait_for(|state| match state {
    2251              :             TenantState::Activating(_) | TenantState::Loading | TenantState::Attaching => {
    2252            0 :                 info!(
    2253            0 :                     "waiting for {} to turn Active|Broken|Stopping",
    2254            0 :                     <&'static str>::from(state)
    2255            0 :                 );
    2256            0 :                 false
    2257              :             }
    2258           51 :             TenantState::Active | TenantState::Broken { .. } | TenantState::Stopping { .. } => true,
    2259           51 :         })
    2260            0 :         .await
    2261           51 :         .expect("cannot drop self.state while on a &self method");
    2262           51 : 
    2263           51 :         // we now know we're done activating, let's see whether this task is the winner to transition into Broken
    2264           51 :         self.set_broken_no_wait(reason)
    2265           51 :     }
    2266              : 
    2267           51 :     pub(crate) fn set_broken_no_wait(&self, reason: impl Display) {
    2268           51 :         let reason = reason.to_string();
    2269           51 :         self.state.send_modify(|current_state| {
    2270           51 :             match *current_state {
    2271              :                 TenantState::Activating(_) | TenantState::Loading | TenantState::Attaching => {
    2272            0 :                     unreachable!("we ensured above that we're done with activation, and, there is no re-activation")
    2273              :                 }
    2274              :                 TenantState::Active => {
    2275            2 :                     if cfg!(feature = "testing") {
    2276            2 :                         warn!("Changing Active tenant to Broken state, reason: {}", reason);
    2277            2 :                         *current_state = TenantState::broken_from_reason(reason);
    2278              :                     } else {
    2279            0 :                         unreachable!("not allowed to call set_broken on Active tenants in non-testing builds")
    2280              :                     }
    2281              :                 }
    2282              :                 TenantState::Broken { .. } => {
    2283            0 :                     warn!("Tenant is already in Broken state");
    2284              :                 }
    2285              :                 // This is the only "expected" path, any other path is a bug.
    2286              :                 TenantState::Stopping { .. } => {
    2287           49 :                     warn!(
    2288           49 :                         "Marking Stopping tenant as Broken state, reason: {}",
    2289           49 :                         reason
    2290           49 :                     );
    2291           49 :                     *current_state = TenantState::broken_from_reason(reason);
    2292              :                 }
    2293              :            }
    2294           51 :         });
    2295           51 :     }
    2296              : 
    2297          401 :     pub fn subscribe_for_state_updates(&self) -> watch::Receiver<TenantState> {
    2298          401 :         self.state.subscribe()
    2299          401 :     }
    2300              : 
    2301              :     /// The activate_now semaphore is initialized with zero units.  As soon as
    2302              :     /// we add a unit, waiters will be able to acquire a unit and proceed.
    2303          614 :     pub(crate) fn activate_now(&self) {
    2304          614 :         self.activate_now_sem.add_permits(1);
    2305          614 :     }
    2306              : 
    2307         1221 :     pub(crate) async fn wait_to_become_active(
    2308         1221 :         &self,
    2309         1221 :         timeout: Duration,
    2310         1221 :     ) -> Result<(), GetActiveTenantError> {
    2311         1221 :         let mut receiver = self.state.subscribe();
    2312         1697 :         loop {
    2313         1697 :             let current_state = receiver.borrow_and_update().clone();
    2314         1697 :             match current_state {
    2315              :                 TenantState::Loading | TenantState::Attaching | TenantState::Activating(_) => {
    2316              :                     // in these states, there's a chance that we can reach ::Active
    2317          483 :                     self.activate_now();
    2318          617 :                     match timeout_cancellable(timeout, &self.cancel, receiver.changed()).await {
    2319          476 :                         Ok(r) => {
    2320          476 :                             r.map_err(
    2321          476 :                             |_e: tokio::sync::watch::error::RecvError|
    2322              :                                 // Tenant existed but was dropped: report it as non-existent
    2323          476 :                                 GetActiveTenantError::NotFound(GetTenantError::NotFound(self.tenant_shard_id.tenant_id))
    2324          476 :                         )?
    2325              :                         }
    2326              :                         Err(TimeoutCancellableError::Cancelled) => {
    2327            7 :                             return Err(GetActiveTenantError::Cancelled);
    2328              :                         }
    2329              :                         Err(TimeoutCancellableError::Timeout) => {
    2330            0 :                             return Err(GetActiveTenantError::WaitForActiveTimeout {
    2331            0 :                                 latest_state: Some(self.current_state()),
    2332            0 :                                 wait_time: timeout,
    2333            0 :                             });
    2334              :                         }
    2335              :                     }
    2336              :                 }
    2337              :                 TenantState::Active { .. } => {
    2338         1212 :                     return Ok(());
    2339              :                 }
    2340              :                 TenantState::Broken { .. } | TenantState::Stopping { .. } => {
    2341              :                     // There's no chance the tenant can transition back into ::Active
    2342            2 :                     return Err(GetActiveTenantError::WillNotBecomeActive(current_state));
    2343              :                 }
    2344              :             }
    2345              :         }
    2346         1221 :     }
    2347              : 
    2348           75 :     pub(crate) fn get_attach_mode(&self) -> AttachmentMode {
    2349           75 :         self.tenant_conf.read().unwrap().location.attach_mode
    2350           75 :     }
    2351              : 
    2352              :     /// For API access: generate a LocationConfig equivalent to the one that would be used to
    2353              :     /// create a Tenant in the same state.  Do not use this in hot paths: it's for relatively
    2354              :     /// rare external API calls, like a reconciliation at startup.
    2355            5 :     pub(crate) fn get_location_conf(&self) -> models::LocationConfig {
    2356            5 :         let conf = self.tenant_conf.read().unwrap();
    2357              : 
    2358            5 :         let location_config_mode = match conf.location.attach_mode {
    2359            5 :             AttachmentMode::Single => models::LocationConfigMode::AttachedSingle,
    2360            0 :             AttachmentMode::Multi => models::LocationConfigMode::AttachedMulti,
    2361            0 :             AttachmentMode::Stale => models::LocationConfigMode::AttachedStale,
    2362              :         };
    2363              : 
    2364              :         // We have a pageserver TenantConf, we need the API-facing TenantConfig.
    2365            5 :         let tenant_config: models::TenantConfig = conf.tenant_conf.into();
    2366            5 : 
    2367            5 :         models::LocationConfig {
    2368            5 :             mode: location_config_mode,
    2369            5 :             generation: self.generation.into(),
    2370            5 :             secondary_conf: None,
    2371            5 :             shard_number: self.shard_identity.number.0,
    2372            5 :             shard_count: self.shard_identity.count.0,
    2373            5 :             shard_stripe_size: self.shard_identity.stripe_size.0,
    2374            5 :             tenant_conf: tenant_config,
    2375            5 :         }
    2376            5 :     }
    2377              : 
    2378           72 :     pub(crate) fn get_tenant_shard_id(&self) -> &TenantShardId {
    2379           72 :         &self.tenant_shard_id
    2380           72 :     }
    2381              : 
    2382           12 :     pub(crate) fn get_generation(&self) -> Generation {
    2383           12 :         self.generation
    2384           12 :     }
    2385              : 
    2386              :     /// This function partially shuts down the tenant (it shuts down the Timelines) and is fallible,
    2387              :     /// and can leave the tenant in a bad state if it fails.  The caller is responsible for
    2388              :     /// resetting this tenant to a valid state if we fail.
    2389            5 :     pub(crate) async fn split_prepare(
    2390            5 :         &self,
    2391            5 :         child_shards: &Vec<TenantShardId>,
    2392            5 :     ) -> anyhow::Result<()> {
    2393            5 :         let timelines = self.timelines.lock().unwrap().clone();
    2394            5 :         for timeline in timelines.values() {
    2395            5 :             let Some(tl_client) = &timeline.remote_client else {
    2396            0 :                 anyhow::bail!("Remote storage is mandatory");
    2397              :             };
    2398              : 
    2399            5 :             let Some(remote_storage) = &self.remote_storage else {
    2400            0 :                 anyhow::bail!("Remote storage is mandatory");
    2401              :             };
    2402              : 
    2403              :             // We do not block timeline creation/deletion during splits inside the pageserver: it is up to higher levels
    2404              :             // to ensure that they do not start a split if currently in the process of doing these.
    2405              : 
    2406              :             // Upload an index from the parent: this is partly to provide freshness for the
    2407              :             // child tenants that will copy it, and partly for general ease-of-debugging: there will
    2408              :             // always be a parent shard index in the same generation as we wrote the child shard index.
    2409            5 :             tl_client.schedule_index_upload_for_file_changes()?;
    2410            5 :             tl_client.wait_completion().await?;
    2411              : 
    2412              :             // Shut down the timeline's remote client: this means that the indices we write
    2413              :             // for child shards will not be invalidated by the parent shard deleting layers.
    2414            5 :             tl_client.shutdown().await?;
    2415              : 
    2416              :             // Download methods can still be used after shutdown, as they don't flow through the remote client's
    2417              :             // queue.  In principal the RemoteTimelineClient could provide this without downloading it, but this
    2418              :             // operation is rare, so it's simpler to just download it (and robustly guarantees that the index
    2419              :             // we use here really is the remotely persistent one).
    2420            5 :             let result = tl_client
    2421            5 :                 .download_index_file(&self.cancel)
    2422            5 :                 .instrument(info_span!("download_index_file", tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug(), timeline_id=%timeline.timeline_id))
    2423            8 :                 .await?;
    2424            5 :             let index_part = match result {
    2425              :                 MaybeDeletedIndexPart::Deleted(_) => {
    2426            0 :                     anyhow::bail!("Timeline deletion happened concurrently with split")
    2427              :                 }
    2428            5 :                 MaybeDeletedIndexPart::IndexPart(p) => p,
    2429              :             };
    2430              : 
    2431           15 :             for child_shard in child_shards {
    2432           10 :                 upload_index_part(
    2433           10 :                     remote_storage,
    2434           10 :                     child_shard,
    2435           10 :                     &timeline.timeline_id,
    2436           10 :                     self.generation,
    2437           10 :                     &index_part,
    2438           10 :                     &self.cancel,
    2439           10 :                 )
    2440           26 :                 .await?;
    2441              :             }
    2442              :         }
    2443              : 
    2444            5 :         Ok(())
    2445            5 :     }
    2446              : }
    2447              : 
    2448              : /// Given a Vec of timelines and their ancestors (timeline_id, ancestor_id),
    2449              : /// perform a topological sort, so that the parent of each timeline comes
    2450              : /// before the children.
    2451              : /// E extracts the ancestor from T
    2452              : /// This allows for T to be different. It can be TimelineMetadata, can be Timeline itself, etc.
    2453         1066 : fn tree_sort_timelines<T, E>(
    2454         1066 :     timelines: HashMap<TimelineId, T>,
    2455         1066 :     extractor: E,
    2456         1066 : ) -> anyhow::Result<Vec<(TimelineId, T)>>
    2457         1066 : where
    2458         1066 :     E: Fn(&T) -> Option<TimelineId>,
    2459         1066 : {
    2460         1066 :     let mut result = Vec::with_capacity(timelines.len());
    2461         1066 : 
    2462         1066 :     let mut now = Vec::with_capacity(timelines.len());
    2463         1066 :     // (ancestor, children)
    2464         1066 :     let mut later: HashMap<TimelineId, Vec<(TimelineId, T)>> =
    2465         1066 :         HashMap::with_capacity(timelines.len());
    2466              : 
    2467         1661 :     for (timeline_id, value) in timelines {
    2468          595 :         if let Some(ancestor_id) = extractor(&value) {
    2469           57 :             let children = later.entry(ancestor_id).or_default();
    2470           57 :             children.push((timeline_id, value));
    2471          538 :         } else {
    2472          538 :             now.push((timeline_id, value));
    2473          538 :         }
    2474              :     }
    2475              : 
    2476         1661 :     while let Some((timeline_id, metadata)) = now.pop() {
    2477          595 :         result.push((timeline_id, metadata));
    2478              :         // All children of this can be loaded now
    2479          595 :         if let Some(mut children) = later.remove(&timeline_id) {
    2480           46 :             now.append(&mut children);
    2481          549 :         }
    2482              :     }
    2483              : 
    2484              :     // All timelines should be visited now. Unless there were timelines with missing ancestors.
    2485         1066 :     if !later.is_empty() {
    2486            0 :         for (missing_id, orphan_ids) in later {
    2487            0 :             for (orphan_id, _) in orphan_ids {
    2488            0 :                 error!("could not load timeline {orphan_id} because its ancestor timeline {missing_id} could not be loaded");
    2489              :             }
    2490              :         }
    2491            0 :         bail!("could not load tenant because some timelines are missing ancestors");
    2492         1066 :     }
    2493         1066 : 
    2494         1066 :     Ok(result)
    2495         1066 : }
    2496              : 
    2497              : impl Tenant {
    2498           90 :     pub fn tenant_specific_overrides(&self) -> TenantConfOpt {
    2499           90 :         self.tenant_conf.read().unwrap().tenant_conf
    2500           90 :     }
    2501              : 
    2502           45 :     pub fn effective_config(&self) -> TenantConf {
    2503           45 :         self.tenant_specific_overrides()
    2504           45 :             .merge(self.conf.default_tenant_conf)
    2505           45 :     }
    2506              : 
    2507            5 :     pub fn get_checkpoint_distance(&self) -> u64 {
    2508            5 :         let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
    2509            5 :         tenant_conf
    2510            5 :             .checkpoint_distance
    2511            5 :             .unwrap_or(self.conf.default_tenant_conf.checkpoint_distance)
    2512            5 :     }
    2513              : 
    2514            5 :     pub fn get_checkpoint_timeout(&self) -> Duration {
    2515            5 :         let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
    2516            5 :         tenant_conf
    2517            5 :             .checkpoint_timeout
    2518            5 :             .unwrap_or(self.conf.default_tenant_conf.checkpoint_timeout)
    2519            5 :     }
    2520              : 
    2521            5 :     pub fn get_compaction_target_size(&self) -> u64 {
    2522            5 :         let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
    2523            5 :         tenant_conf
    2524            5 :             .compaction_target_size
    2525            5 :             .unwrap_or(self.conf.default_tenant_conf.compaction_target_size)
    2526            5 :     }
    2527              : 
    2528         1220 :     pub fn get_compaction_period(&self) -> Duration {
    2529         1220 :         let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
    2530         1220 :         tenant_conf
    2531         1220 :             .compaction_period
    2532         1220 :             .unwrap_or(self.conf.default_tenant_conf.compaction_period)
    2533         1220 :     }
    2534              : 
    2535            5 :     pub fn get_compaction_threshold(&self) -> usize {
    2536            5 :         let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
    2537            5 :         tenant_conf
    2538            5 :             .compaction_threshold
    2539            5 :             .unwrap_or(self.conf.default_tenant_conf.compaction_threshold)
    2540            5 :     }
    2541              : 
    2542          501 :     pub fn get_gc_horizon(&self) -> u64 {
    2543          501 :         let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
    2544          501 :         tenant_conf
    2545          501 :             .gc_horizon
    2546          501 :             .unwrap_or(self.conf.default_tenant_conf.gc_horizon)
    2547          501 :     }
    2548              : 
    2549         1081 :     pub fn get_gc_period(&self) -> Duration {
    2550         1081 :         let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
    2551         1081 :         tenant_conf
    2552         1081 :             .gc_period
    2553         1081 :             .unwrap_or(self.conf.default_tenant_conf.gc_period)
    2554         1081 :     }
    2555              : 
    2556            5 :     pub fn get_image_creation_threshold(&self) -> usize {
    2557            5 :         let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
    2558            5 :         tenant_conf
    2559            5 :             .image_creation_threshold
    2560            5 :             .unwrap_or(self.conf.default_tenant_conf.image_creation_threshold)
    2561            5 :     }
    2562              : 
    2563          485 :     pub fn get_pitr_interval(&self) -> Duration {
    2564          485 :         let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
    2565          485 :         tenant_conf
    2566          485 :             .pitr_interval
    2567          485 :             .unwrap_or(self.conf.default_tenant_conf.pitr_interval)
    2568          485 :     }
    2569              : 
    2570         9847 :     pub fn get_trace_read_requests(&self) -> bool {
    2571         9847 :         let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
    2572         9847 :         tenant_conf
    2573         9847 :             .trace_read_requests
    2574         9847 :             .unwrap_or(self.conf.default_tenant_conf.trace_read_requests)
    2575         9847 :     }
    2576              : 
    2577           37 :     pub fn get_min_resident_size_override(&self) -> Option<u64> {
    2578           37 :         let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
    2579           37 :         tenant_conf
    2580           37 :             .min_resident_size_override
    2581           37 :             .or(self.conf.default_tenant_conf.min_resident_size_override)
    2582           37 :     }
    2583              : 
    2584          928 :     pub fn get_heatmap_period(&self) -> Option<Duration> {
    2585          928 :         let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
    2586          928 :         let heatmap_period = tenant_conf
    2587          928 :             .heatmap_period
    2588          928 :             .unwrap_or(self.conf.default_tenant_conf.heatmap_period);
    2589          928 :         if heatmap_period.is_zero() {
    2590          928 :             None
    2591              :         } else {
    2592            0 :             Some(heatmap_period)
    2593              :         }
    2594          928 :     }
    2595              : 
    2596           30 :     pub fn set_new_tenant_config(&self, new_tenant_conf: TenantConfOpt) {
    2597           30 :         self.tenant_conf.write().unwrap().tenant_conf = new_tenant_conf;
    2598           30 :         // Don't hold self.timelines.lock() during the notifies.
    2599           30 :         // There's no risk of deadlock right now, but there could be if we consolidate
    2600           30 :         // mutexes in struct Timeline in the future.
    2601           30 :         let timelines = self.list_timelines();
    2602           62 :         for timeline in timelines {
    2603           32 :             timeline.tenant_conf_updated();
    2604           32 :         }
    2605           30 :     }
    2606              : 
    2607           36 :     pub(crate) fn set_new_location_config(&self, new_conf: AttachedTenantConf) {
    2608           36 :         *self.tenant_conf.write().unwrap() = new_conf;
    2609           36 :         // Don't hold self.timelines.lock() during the notifies.
    2610           36 :         // There's no risk of deadlock right now, but there could be if we consolidate
    2611           36 :         // mutexes in struct Timeline in the future.
    2612           36 :         let timelines = self.list_timelines();
    2613           65 :         for timeline in timelines {
    2614           29 :             timeline.tenant_conf_updated();
    2615           29 :         }
    2616           36 :     }
    2617              : 
    2618              :     /// Helper function to create a new Timeline struct.
    2619              :     ///
    2620              :     /// The returned Timeline is in Loading state. The caller is responsible for
    2621              :     /// initializing any on-disk state, and for inserting the Timeline to the 'timelines'
    2622              :     /// map.
    2623              :     ///
    2624              :     /// `validate_ancestor == false` is used when a timeline is created for deletion
    2625              :     /// and we might not have the ancestor present anymore which is fine for to be
    2626              :     /// deleted timelines.
    2627         1590 :     fn create_timeline_struct(
    2628         1590 :         &self,
    2629         1590 :         new_timeline_id: TimelineId,
    2630         1590 :         new_metadata: &TimelineMetadata,
    2631         1590 :         ancestor: Option<Arc<Timeline>>,
    2632         1590 :         resources: TimelineResources,
    2633         1590 :         cause: CreateTimelineCause,
    2634         1590 :     ) -> anyhow::Result<Arc<Timeline>> {
    2635         1590 :         let state = match cause {
    2636              :             CreateTimelineCause::Load => {
    2637         1578 :                 let ancestor_id = new_metadata.ancestor_timeline();
    2638         1578 :                 anyhow::ensure!(
    2639         1578 :                     ancestor_id == ancestor.as_ref().map(|t| t.timeline_id),
    2640            0 :                     "Timeline's {new_timeline_id} ancestor {ancestor_id:?} was not found"
    2641              :                 );
    2642         1578 :                 TimelineState::Loading
    2643              :             }
    2644           12 :             CreateTimelineCause::Delete => TimelineState::Stopping,
    2645              :         };
    2646              : 
    2647         1590 :         let pg_version = new_metadata.pg_version();
    2648         1590 : 
    2649         1590 :         let timeline = Timeline::new(
    2650         1590 :             self.conf,
    2651         1590 :             Arc::clone(&self.tenant_conf),
    2652         1590 :             new_metadata,
    2653         1590 :             ancestor,
    2654         1590 :             new_timeline_id,
    2655         1590 :             self.tenant_shard_id,
    2656         1590 :             self.generation,
    2657         1590 :             self.shard_identity,
    2658         1590 :             self.walredo_mgr.as_ref().map(Arc::clone),
    2659         1590 :             resources,
    2660         1590 :             pg_version,
    2661         1590 :             state,
    2662         1590 :             self.cancel.child_token(),
    2663         1590 :         );
    2664         1590 : 
    2665         1590 :         Ok(timeline)
    2666         1590 :     }
    2667              : 
    2668              :     // Allow too_many_arguments because a constructor's argument list naturally grows with the
    2669              :     // number of attributes in the struct: breaking these out into a builder wouldn't be helpful.
    2670              :     #[allow(clippy::too_many_arguments)]
    2671          968 :     fn new(
    2672          968 :         state: TenantState,
    2673          968 :         conf: &'static PageServerConf,
    2674          968 :         attached_conf: AttachedTenantConf,
    2675          968 :         shard_identity: ShardIdentity,
    2676          968 :         walredo_mgr: Option<Arc<WalRedoManager>>,
    2677          968 :         tenant_shard_id: TenantShardId,
    2678          968 :         remote_storage: Option<GenericRemoteStorage>,
    2679          968 :         deletion_queue_client: DeletionQueueClient,
    2680          968 :     ) -> Tenant {
    2681          968 :         let (state, mut rx) = watch::channel(state);
    2682          968 : 
    2683          968 :         tokio::spawn(async move {
    2684          967 :             // reflect tenant state in metrics:
    2685          967 :             // - global per tenant state: TENANT_STATE_METRIC
    2686          967 :             // - "set" of broken tenants: BROKEN_TENANTS_SET
    2687          967 :             //
    2688          967 :             // set of broken tenants should not have zero counts so that it remains accessible for
    2689          967 :             // alerting.
    2690          967 : 
    2691          967 :             let tid = tenant_shard_id.to_string();
    2692          967 :             let shard_id = tenant_shard_id.shard_slug().to_string();
    2693          967 :             let set_key = &[tid.as_str(), shard_id.as_str()][..];
    2694          967 : 
    2695         2658 :             fn inspect_state(state: &TenantState) -> ([&'static str; 1], bool) {
    2696         2658 :                 ([state.into()], matches!(state, TenantState::Broken { .. }))
    2697         2658 :             }
    2698          967 : 
    2699          967 :             let mut tuple = inspect_state(&rx.borrow_and_update());
    2700          967 : 
    2701          967 :             let is_broken = tuple.1;
    2702          967 :             let mut counted_broken = if is_broken {
    2703              :                 // add the id to the set right away, there should not be any updates on the channel
    2704              :                 // after before tenant is removed, if ever
    2705            0 :                 BROKEN_TENANTS_SET.with_label_values(set_key).set(1);
    2706            0 :                 true
    2707              :             } else {
    2708          967 :                 false
    2709              :             };
    2710              : 
    2711         2658 :             loop {
    2712         2658 :                 let labels = &tuple.0;
    2713         2658 :                 let current = TENANT_STATE_METRIC.with_label_values(labels);
    2714         2658 :                 current.inc();
    2715         2658 : 
    2716         2658 :                 if rx.changed().await.is_err() {
    2717              :                     // tenant has been dropped
    2718          239 :                     current.dec();
    2719          239 :                     drop(BROKEN_TENANTS_SET.remove_label_values(set_key));
    2720          239 :                     break;
    2721         1691 :                 }
    2722         1691 : 
    2723         1691 :                 current.dec();
    2724         1691 :                 tuple = inspect_state(&rx.borrow_and_update());
    2725         1691 : 
    2726         1691 :                 let is_broken = tuple.1;
    2727         1691 :                 if is_broken && !counted_broken {
    2728           58 :                     counted_broken = true;
    2729           58 :                     // insert the tenant_id (back) into the set while avoiding needless counter
    2730           58 :                     // access
    2731           58 :                     BROKEN_TENANTS_SET.with_label_values(set_key).set(1);
    2732         1633 :                 }
    2733              :             }
    2734          968 :         });
    2735          968 : 
    2736          968 :         Tenant {
    2737          968 :             tenant_shard_id,
    2738          968 :             shard_identity,
    2739          968 :             generation: attached_conf.location.generation,
    2740          968 :             conf,
    2741          968 :             // using now here is good enough approximation to catch tenants with really long
    2742          968 :             // activation times.
    2743          968 :             constructed_at: Instant::now(),
    2744          968 :             tenant_conf: Arc::new(RwLock::new(attached_conf)),
    2745          968 :             timelines: Mutex::new(HashMap::new()),
    2746          968 :             timelines_creating: Mutex::new(HashSet::new()),
    2747          968 :             gc_cs: tokio::sync::Mutex::new(()),
    2748          968 :             walredo_mgr,
    2749          968 :             remote_storage,
    2750          968 :             deletion_queue_client,
    2751          968 :             state,
    2752          968 :             cached_logical_sizes: tokio::sync::Mutex::new(HashMap::new()),
    2753          968 :             cached_synthetic_tenant_size: Arc::new(AtomicU64::new(0)),
    2754          968 :             eviction_task_tenant_state: tokio::sync::Mutex::new(EvictionTaskTenantState::default()),
    2755          968 :             activate_now_sem: tokio::sync::Semaphore::new(0),
    2756          968 :             delete_progress: Arc::new(tokio::sync::Mutex::new(DeleteTenantFlow::default())),
    2757          968 :             cancel: CancellationToken::default(),
    2758          968 :             gate: Gate::default(),
    2759          968 :         }
    2760          968 :     }
    2761              : 
    2762              :     /// Locate and load config
    2763          245 :     pub(super) fn load_tenant_config(
    2764          245 :         conf: &'static PageServerConf,
    2765          245 :         tenant_shard_id: &TenantShardId,
    2766          245 :     ) -> anyhow::Result<LocationConf> {
    2767          245 :         let legacy_config_path = conf.tenant_config_path(tenant_shard_id);
    2768          245 :         let config_path = conf.tenant_location_config_path(tenant_shard_id);
    2769          245 : 
    2770          245 :         if config_path.exists() {
    2771              :             // New-style config takes precedence
    2772          241 :             let deserialized = Self::read_config(&config_path)?;
    2773          241 :             Ok(toml_edit::de::from_document::<LocationConf>(deserialized)?)
    2774            4 :         } else if legacy_config_path.exists() {
    2775              :             // Upgrade path: found an old-style configuration only
    2776            0 :             let deserialized = Self::read_config(&legacy_config_path)?;
    2777              : 
    2778            0 :             let mut tenant_conf = TenantConfOpt::default();
    2779            0 :             for (key, item) in deserialized.iter() {
    2780            0 :                 match key {
    2781            0 :                     "tenant_config" => {
    2782            0 :                         tenant_conf = TenantConfOpt::try_from(item.to_owned()).context(format!("Failed to parse config from file '{legacy_config_path}' as pageserver config"))?;
    2783              :                     }
    2784            0 :                     _ => bail!(
    2785            0 :                         "config file {legacy_config_path} has unrecognized pageserver option '{key}'"
    2786            0 :                     ),
    2787              :                 }
    2788              :             }
    2789              : 
    2790              :             // Legacy configs are implicitly in attached state, and do not support sharding
    2791            0 :             Ok(LocationConf::attached_single(
    2792            0 :                 tenant_conf,
    2793            0 :                 Generation::none(),
    2794            0 :                 &models::ShardParameters::default(),
    2795            0 :             ))
    2796              :         } else {
    2797              :             // FIXME If the config file is not found, assume that we're attaching
    2798              :             // a detached tenant and config is passed via attach command.
    2799              :             // https://github.com/neondatabase/neon/issues/1555
    2800              :             // OR: we're loading after incomplete deletion that managed to remove config.
    2801            4 :             info!(
    2802            4 :                 "tenant config not found in {} or {}",
    2803            4 :                 config_path, legacy_config_path
    2804            4 :             );
    2805            4 :             Ok(LocationConf::default())
    2806              :         }
    2807          245 :     }
    2808              : 
    2809          241 :     fn read_config(path: &Utf8Path) -> anyhow::Result<toml_edit::Document> {
    2810          241 :         info!("loading tenant configuration from {path}");
    2811              : 
    2812              :         // load and parse file
    2813          241 :         let config = fs::read_to_string(path)
    2814          241 :             .with_context(|| format!("Failed to load config from path '{path}'"))?;
    2815              : 
    2816          241 :         config
    2817          241 :             .parse::<toml_edit::Document>()
    2818          241 :             .with_context(|| format!("Failed to parse config from file '{path}' as toml file"))
    2819          241 :     }
    2820              : 
    2821         1966 :     #[tracing::instrument(skip_all, fields(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug()))]
    2822              :     pub(super) async fn persist_tenant_config(
    2823              :         conf: &'static PageServerConf,
    2824              :         tenant_shard_id: &TenantShardId,
    2825              :         location_conf: &LocationConf,
    2826              :     ) -> anyhow::Result<()> {
    2827              :         let legacy_config_path = conf.tenant_config_path(tenant_shard_id);
    2828              :         let config_path = conf.tenant_location_config_path(tenant_shard_id);
    2829              : 
    2830              :         Self::persist_tenant_config_at(
    2831              :             tenant_shard_id,
    2832              :             &config_path,
    2833              :             &legacy_config_path,
    2834              :             location_conf,
    2835              :         )
    2836              :         .await
    2837              :     }
    2838              : 
    2839         1966 :     #[tracing::instrument(skip_all, fields(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug()))]
    2840              :     pub(super) async fn persist_tenant_config_at(
    2841              :         tenant_shard_id: &TenantShardId,
    2842              :         config_path: &Utf8Path,
    2843              :         legacy_config_path: &Utf8Path,
    2844              :         location_conf: &LocationConf,
    2845              :     ) -> anyhow::Result<()> {
    2846              :         // Forward compat: write out an old-style configuration that old versions can read, in case we roll back
    2847              :         Self::persist_tenant_config_legacy(
    2848              :             tenant_shard_id,
    2849              :             legacy_config_path,
    2850              :             &location_conf.tenant_conf,
    2851              :         )
    2852              :         .await?;
    2853              : 
    2854              :         if let LocationMode::Attached(attach_conf) = &location_conf.mode {
    2855              :             // Once we use LocationMode, generations are mandatory.  If we aren't using generations,
    2856              :             // then drop out after writing legacy-style config.
    2857              :             if attach_conf.generation.is_none() {
    2858            0 :                 tracing::debug!("Running without generations, not writing new-style LocationConf");
    2859              :                 return Ok(());
    2860              :             }
    2861              :         }
    2862              : 
    2863            0 :         debug!("persisting tenantconf to {config_path}");
    2864              : 
    2865              :         let mut conf_content = r#"# This file contains a specific per-tenant's config.
    2866              : #  It is read in case of pageserver restart.
    2867              : "#
    2868              :         .to_string();
    2869              : 
    2870            1 :         fail::fail_point!("tenant-config-before-write", |_| {
    2871            1 :             anyhow::bail!("tenant-config-before-write");
    2872            1 :         });
    2873              : 
    2874              :         // Convert the config to a toml file.
    2875              :         conf_content += &toml_edit::ser::to_string_pretty(&location_conf)?;
    2876              : 
    2877              :         let temp_path = path_with_suffix_extension(config_path, TEMP_FILE_SUFFIX);
    2878              : 
    2879              :         let tenant_shard_id = *tenant_shard_id;
    2880              :         let config_path = config_path.to_owned();
    2881          982 :         tokio::task::spawn_blocking(move || {
    2882          982 :             Handle::current().block_on(async move {
    2883          982 :                 let conf_content = conf_content.as_bytes();
    2884          982 :                 VirtualFile::crashsafe_overwrite(&config_path, &temp_path, conf_content)
    2885            0 :                     .await
    2886          982 :                     .with_context(|| {
    2887            0 :                         format!("write tenant {tenant_shard_id} config to {config_path}")
    2888          982 :                     })
    2889          982 :             })
    2890          982 :         })
    2891              :         .await??;
    2892              : 
    2893              :         Ok(())
    2894              :     }
    2895              : 
    2896         1966 :     #[tracing::instrument(skip_all, fields(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug()))]
    2897              :     async fn persist_tenant_config_legacy(
    2898              :         tenant_shard_id: &TenantShardId,
    2899              :         target_config_path: &Utf8Path,
    2900              :         tenant_conf: &TenantConfOpt,
    2901              :     ) -> anyhow::Result<()> {
    2902            0 :         debug!("persisting tenantconf to {target_config_path}");
    2903              : 
    2904              :         let mut conf_content = r#"# This file contains a specific per-tenant's config.
    2905              : #  It is read in case of pageserver restart.
    2906              : 
    2907              : [tenant_config]
    2908              : "#
    2909              :         .to_string();
    2910              : 
    2911              :         // Convert the config to a toml file.
    2912              :         conf_content += &toml_edit::ser::to_string(&tenant_conf)?;
    2913              : 
    2914              :         let temp_path = path_with_suffix_extension(target_config_path, TEMP_FILE_SUFFIX);
    2915              : 
    2916              :         let tenant_shard_id = *tenant_shard_id;
    2917              :         let target_config_path = target_config_path.to_owned();
    2918          983 :         tokio::task::spawn_blocking(move || {
    2919          983 :             Handle::current().block_on(async move {
    2920          983 :                 let conf_content = conf_content.as_bytes();
    2921          983 :                 VirtualFile::crashsafe_overwrite(&target_config_path, &temp_path, conf_content)
    2922            0 :                     .await
    2923          983 :                     .with_context(|| {
    2924            0 :                         format!("write tenant {tenant_shard_id} config to {target_config_path}")
    2925          983 :                     })
    2926          983 :             })
    2927          983 :         })
    2928              :         .await??;
    2929              :         Ok(())
    2930              :     }
    2931              : 
    2932              :     //
    2933              :     // How garbage collection works:
    2934              :     //
    2935              :     //                    +--bar------------->
    2936              :     //                   /
    2937              :     //             +----+-----foo---------------->
    2938              :     //            /
    2939              :     // ----main--+-------------------------->
    2940              :     //                \
    2941              :     //                 +-----baz-------->
    2942              :     //
    2943              :     //
    2944              :     // 1. Grab 'gc_cs' mutex to prevent new timelines from being created while Timeline's
    2945              :     //    `gc_infos` are being refreshed
    2946              :     // 2. Scan collected timelines, and on each timeline, make note of the
    2947              :     //    all the points where other timelines have been branched off.
    2948              :     //    We will refrain from removing page versions at those LSNs.
    2949              :     // 3. For each timeline, scan all layer files on the timeline.
    2950              :     //    Remove all files for which a newer file exists and which
    2951              :     //    don't cover any branch point LSNs.
    2952              :     //
    2953              :     // TODO:
    2954              :     // - if a relation has a non-incremental persistent layer on a child branch, then we
    2955              :     //   don't need to keep that in the parent anymore. But currently
    2956              :     //   we do.
    2957          404 :     async fn gc_iteration_internal(
    2958          404 :         &self,
    2959          404 :         target_timeline_id: Option<TimelineId>,
    2960          404 :         horizon: u64,
    2961          404 :         pitr: Duration,
    2962          404 :         cancel: &CancellationToken,
    2963          404 :         ctx: &RequestContext,
    2964          404 :     ) -> anyhow::Result<GcResult> {
    2965          404 :         let mut totals: GcResult = Default::default();
    2966          404 :         let now = Instant::now();
    2967              : 
    2968          404 :         let gc_timelines = match self
    2969          404 :             .refresh_gc_info_internal(target_timeline_id, horizon, pitr, cancel, ctx)
    2970       264899 :             .await
    2971              :         {
    2972          401 :             Ok(result) => result,
    2973            1 :             Err(e) => {
    2974            0 :                 if let Some(PageReconstructError::Cancelled) =
    2975            1 :                     e.downcast_ref::<PageReconstructError>()
    2976              :                 {
    2977              :                     // Handle cancellation
    2978            0 :                     totals.elapsed = now.elapsed();
    2979            0 :                     return Ok(totals);
    2980              :                 } else {
    2981              :                     // Propagate other errors
    2982            1 :                     return Err(e);
    2983              :                 }
    2984              :             }
    2985              :         };
    2986              : 
    2987            3 :         failpoint_support::sleep_millis_async!("gc_iteration_internal_after_getting_gc_timelines");
    2988              : 
    2989              :         // If there is nothing to GC, we don't want any messages in the INFO log.
    2990          401 :         if !gc_timelines.is_empty() {
    2991          390 :             info!("{} timelines need GC", gc_timelines.len());
    2992              :         } else {
    2993            0 :             debug!("{} timelines need GC", gc_timelines.len());
    2994              :         }
    2995              : 
    2996              :         // Perform GC for each timeline.
    2997              :         //
    2998              :         // Note that we don't hold the `Tenant::gc_cs` lock here because we don't want to delay the
    2999              :         // branch creation task, which requires the GC lock. A GC iteration can run concurrently
    3000              :         // with branch creation.
    3001              :         //
    3002              :         // See comments in [`Tenant::branch_timeline`] for more information about why branch
    3003              :         // creation task can run concurrently with timeline's GC iteration.
    3004          897 :         for timeline in gc_timelines {
    3005          496 :             if task_mgr::is_shutdown_requested() || cancel.is_cancelled() {
    3006              :                 // We were requested to shut down. Stop and return with the progress we
    3007              :                 // made.
    3008            0 :                 break;
    3009          496 :             }
    3010          496 :             let result = timeline.gc().await?;
    3011          496 :             totals += result;
    3012              :         }
    3013              : 
    3014          401 :         totals.elapsed = now.elapsed();
    3015          401 :         Ok(totals)
    3016          402 :     }
    3017              : 
    3018              :     /// Refreshes the Timeline::gc_info for all timelines, returning the
    3019              :     /// vector of timelines which have [`Timeline::get_last_record_lsn`] past
    3020              :     /// [`Tenant::get_gc_horizon`].
    3021              :     ///
    3022              :     /// This is usually executed as part of periodic gc, but can now be triggered more often.
    3023           83 :     pub async fn refresh_gc_info(
    3024           83 :         &self,
    3025           83 :         cancel: &CancellationToken,
    3026           83 :         ctx: &RequestContext,
    3027           83 :     ) -> anyhow::Result<Vec<Arc<Timeline>>> {
    3028           83 :         // since this method can now be called at different rates than the configured gc loop, it
    3029           83 :         // might be that these configuration values get applied faster than what it was previously,
    3030           83 :         // since these were only read from the gc task.
    3031           83 :         let horizon = self.get_gc_horizon();
    3032           83 :         let pitr = self.get_pitr_interval();
    3033           83 : 
    3034           83 :         // refresh all timelines
    3035           83 :         let target_timeline_id = None;
    3036           83 : 
    3037           83 :         self.refresh_gc_info_internal(target_timeline_id, horizon, pitr, cancel, ctx)
    3038           20 :             .await
    3039           83 :     }
    3040              : 
    3041          487 :     async fn refresh_gc_info_internal(
    3042          487 :         &self,
    3043          487 :         target_timeline_id: Option<TimelineId>,
    3044          487 :         horizon: u64,
    3045          487 :         pitr: Duration,
    3046          487 :         cancel: &CancellationToken,
    3047          487 :         ctx: &RequestContext,
    3048          487 :     ) -> anyhow::Result<Vec<Arc<Timeline>>> {
    3049              :         // grab mutex to prevent new timelines from being created here.
    3050          487 :         let gc_cs = self.gc_cs.lock().await;
    3051              : 
    3052              :         // Scan all timelines. For each timeline, remember the timeline ID and
    3053              :         // the branch point where it was created.
    3054          486 :         let (all_branchpoints, timeline_ids): (BTreeSet<(TimelineId, Lsn)>, _) = {
    3055          487 :             let timelines = self.timelines.lock().unwrap();
    3056          487 :             let mut all_branchpoints = BTreeSet::new();
    3057          486 :             let timeline_ids = {
    3058          487 :                 if let Some(target_timeline_id) = target_timeline_id.as_ref() {
    3059          381 :                     if timelines.get(target_timeline_id).is_none() {
    3060            1 :                         bail!("gc target timeline does not exist")
    3061          380 :                     }
    3062          106 :                 };
    3063              : 
    3064          486 :                 timelines
    3065          486 :                     .iter()
    3066         1001 :                     .map(|(timeline_id, timeline_entry)| {
    3067          520 :                         if let Some(ancestor_timeline_id) =
    3068         1001 :                             &timeline_entry.get_ancestor_timeline_id()
    3069              :                         {
    3070              :                             // If target_timeline is specified, we only need to know branchpoints of its children
    3071          520 :                             if let Some(timeline_id) = target_timeline_id {
    3072          351 :                                 if ancestor_timeline_id == &timeline_id {
    3073            9 :                                     all_branchpoints.insert((
    3074            9 :                                         *ancestor_timeline_id,
    3075            9 :                                         timeline_entry.get_ancestor_lsn(),
    3076            9 :                                     ));
    3077          342 :                                 }
    3078              :                             }
    3079              :                             // Collect branchpoints for all timelines
    3080          169 :                             else {
    3081          169 :                                 all_branchpoints.insert((
    3082          169 :                                     *ancestor_timeline_id,
    3083          169 :                                     timeline_entry.get_ancestor_lsn(),
    3084          169 :                                 ));
    3085          169 :                             }
    3086          481 :                         }
    3087              : 
    3088         1001 :                         *timeline_id
    3089         1001 :                     })
    3090          486 :                     .collect::<Vec<_>>()
    3091          486 :             };
    3092          486 :             (all_branchpoints, timeline_ids)
    3093          486 :         };
    3094          486 : 
    3095          486 :         // Ok, we now know all the branch points.
    3096          486 :         // Update the GC information for each timeline.
    3097          486 :         let mut gc_timelines = Vec::with_capacity(timeline_ids.len());
    3098         1474 :         for timeline_id in timeline_ids {
    3099              :             // Timeline is known to be local and loaded.
    3100          990 :             let timeline = self
    3101          990 :                 .get_timeline(timeline_id, false)
    3102          990 :                 .with_context(|| format!("Timeline {timeline_id} was not found"))?;
    3103              : 
    3104              :             // If target_timeline is specified, ignore all other timelines
    3105          990 :             if let Some(target_timeline_id) = target_timeline_id {
    3106          731 :                 if timeline_id != target_timeline_id {
    3107          351 :                     continue;
    3108          380 :                 }
    3109          259 :             }
    3110              : 
    3111          639 :             if let Some(cutoff) = timeline.get_last_record_lsn().checked_sub(horizon) {
    3112          567 :                 let branchpoints: Vec<Lsn> = all_branchpoints
    3113          567 :                     .range((
    3114          567 :                         Included((timeline_id, Lsn(0))),
    3115          567 :                         Included((timeline_id, Lsn(u64::MAX))),
    3116          567 :                     ))
    3117          567 :                     .map(|&x| x.1)
    3118          567 :                     .collect();
    3119          567 :                 timeline
    3120          567 :                     .update_gc_info(branchpoints, cutoff, pitr, cancel, ctx)
    3121       264919 :                     .await?;
    3122              : 
    3123          565 :                 gc_timelines.push(timeline);
    3124           72 :             }
    3125              :         }
    3126          484 :         drop(gc_cs);
    3127          484 :         Ok(gc_timelines)
    3128          485 :     }
    3129              : 
    3130              :     /// A substitute for `branch_timeline` for use in unit tests.
    3131              :     /// The returned timeline will have state value `Active` to make various `anyhow::ensure!()`
    3132              :     /// calls pass, but, we do not actually call `.activate()` under the hood. So, none of the
    3133              :     /// timeline background tasks are launched, except the flush loop.
    3134              :     #[cfg(test)]
    3135          214 :     async fn branch_timeline_test(
    3136          214 :         &self,
    3137          214 :         src_timeline: &Arc<Timeline>,
    3138          214 :         dst_id: TimelineId,
    3139          214 :         start_lsn: Option<Lsn>,
    3140          214 :         ctx: &RequestContext,
    3141          214 :     ) -> Result<Arc<Timeline>, CreateTimelineError> {
    3142          214 :         let uninit_mark = self.create_timeline_uninit_mark(dst_id).unwrap();
    3143          214 :         let tl = self
    3144          214 :             .branch_timeline_impl(src_timeline, dst_id, start_lsn, uninit_mark, ctx)
    3145          220 :             .await?;
    3146          210 :         tl.set_state(TimelineState::Active);
    3147          210 :         Ok(tl)
    3148          214 :     }
    3149              : 
    3150              :     /// Branch an existing timeline.
    3151              :     ///
    3152              :     /// The caller is responsible for activating the returned timeline.
    3153          260 :     async fn branch_timeline(
    3154          260 :         &self,
    3155          260 :         src_timeline: &Arc<Timeline>,
    3156          260 :         dst_id: TimelineId,
    3157          260 :         start_lsn: Option<Lsn>,
    3158          260 :         timeline_uninit_mark: TimelineUninitMark<'_>,
    3159          260 :         ctx: &RequestContext,
    3160          260 :     ) -> Result<Arc<Timeline>, CreateTimelineError> {
    3161          260 :         self.branch_timeline_impl(src_timeline, dst_id, start_lsn, timeline_uninit_mark, ctx)
    3162            6 :             .await
    3163          260 :     }
    3164              : 
    3165          474 :     async fn branch_timeline_impl(
    3166          474 :         &self,
    3167          474 :         src_timeline: &Arc<Timeline>,
    3168          474 :         dst_id: TimelineId,
    3169          474 :         start_lsn: Option<Lsn>,
    3170          474 :         timeline_uninit_mark: TimelineUninitMark<'_>,
    3171          474 :         _ctx: &RequestContext,
    3172          474 :     ) -> Result<Arc<Timeline>, CreateTimelineError> {
    3173          474 :         let src_id = src_timeline.timeline_id;
    3174              : 
    3175              :         // We will validate our ancestor LSN in this function.  Acquire the GC lock so that
    3176              :         // this check cannot race with GC, and the ancestor LSN is guaranteed to remain
    3177              :         // valid while we are creating the branch.
    3178          474 :         let _gc_cs = self.gc_cs.lock().await;
    3179              : 
    3180              :         // If no start LSN is specified, we branch the new timeline from the source timeline's last record LSN
    3181          474 :         let start_lsn = start_lsn.unwrap_or_else(|| {
    3182          229 :             let lsn = src_timeline.get_last_record_lsn();
    3183          229 :             info!("branching timeline {dst_id} from timeline {src_id} at last record LSN: {lsn}");
    3184          229 :             lsn
    3185          474 :         });
    3186          474 : 
    3187          474 :         // Ensure that `start_lsn` is valid, i.e. the LSN is within the PITR
    3188          474 :         // horizon on the source timeline
    3189          474 :         //
    3190          474 :         // We check it against both the planned GC cutoff stored in 'gc_info',
    3191          474 :         // and the 'latest_gc_cutoff' of the last GC that was performed.  The
    3192          474 :         // planned GC cutoff in 'gc_info' is normally larger than
    3193          474 :         // 'latest_gc_cutoff_lsn', but beware of corner cases like if you just
    3194          474 :         // changed the GC settings for the tenant to make the PITR window
    3195          474 :         // larger, but some of the data was already removed by an earlier GC
    3196          474 :         // iteration.
    3197          474 : 
    3198          474 :         // check against last actual 'latest_gc_cutoff' first
    3199          474 :         let latest_gc_cutoff_lsn = src_timeline.get_latest_gc_cutoff_lsn();
    3200          474 :         src_timeline
    3201          474 :             .check_lsn_is_in_scope(start_lsn, &latest_gc_cutoff_lsn)
    3202          474 :             .context(format!(
    3203          474 :                 "invalid branch start lsn: less than latest GC cutoff {}",
    3204          474 :                 *latest_gc_cutoff_lsn,
    3205          474 :             ))
    3206          474 :             .map_err(CreateTimelineError::AncestorLsn)?;
    3207              : 
    3208              :         // and then the planned GC cutoff
    3209              :         {
    3210          464 :             let gc_info = src_timeline.gc_info.read().unwrap();
    3211          464 :             let cutoff = min(gc_info.pitr_cutoff, gc_info.horizon_cutoff);
    3212          464 :             if start_lsn < cutoff {
    3213            0 :                 return Err(CreateTimelineError::AncestorLsn(anyhow::anyhow!(
    3214            0 :                     "invalid branch start lsn: less than planned GC cutoff {cutoff}"
    3215            0 :                 )));
    3216          464 :             }
    3217          464 :         }
    3218          464 : 
    3219          464 :         //
    3220          464 :         // The branch point is valid, and we are still holding the 'gc_cs' lock
    3221          464 :         // so that GC cannot advance the GC cutoff until we are finished.
    3222          464 :         // Proceed with the branch creation.
    3223          464 :         //
    3224          464 : 
    3225          464 :         // Determine prev-LSN for the new timeline. We can only determine it if
    3226          464 :         // the timeline was branched at the current end of the source timeline.
    3227          464 :         let RecordLsn {
    3228          464 :             last: src_last,
    3229          464 :             prev: src_prev,
    3230          464 :         } = src_timeline.get_last_record_rlsn();
    3231          464 :         let dst_prev = if src_last == start_lsn {
    3232          445 :             Some(src_prev)
    3233              :         } else {
    3234           19 :             None
    3235              :         };
    3236              : 
    3237              :         // Create the metadata file, noting the ancestor of the new timeline.
    3238              :         // There is initially no data in it, but all the read-calls know to look
    3239              :         // into the ancestor.
    3240          464 :         let metadata = TimelineMetadata::new(
    3241          464 :             start_lsn,
    3242          464 :             dst_prev,
    3243          464 :             Some(src_id),
    3244          464 :             start_lsn,
    3245          464 :             *src_timeline.latest_gc_cutoff_lsn.read(), // FIXME: should we hold onto this guard longer?
    3246          464 :             src_timeline.initdb_lsn,
    3247          464 :             src_timeline.pg_version,
    3248          464 :         );
    3249              : 
    3250          464 :         let uninitialized_timeline = self
    3251          464 :             .prepare_new_timeline(
    3252          464 :                 dst_id,
    3253          464 :                 &metadata,
    3254          464 :                 timeline_uninit_mark,
    3255          464 :                 start_lsn + 1,
    3256          464 :                 Some(Arc::clone(src_timeline)),
    3257          464 :             )
    3258          219 :             .await?;
    3259              : 
    3260          464 :         let new_timeline = uninitialized_timeline.finish_creation()?;
    3261              : 
    3262              :         // Root timeline gets its layers during creation and uploads them along with the metadata.
    3263              :         // A branch timeline though, when created, can get no writes for some time, hence won't get any layers created.
    3264              :         // We still need to upload its metadata eagerly: if other nodes `attach` the tenant and miss this timeline, their GC
    3265              :         // could get incorrect information and remove more layers, than needed.
    3266              :         // See also https://github.com/neondatabase/neon/issues/3865
    3267          464 :         if let Some(remote_client) = new_timeline.remote_client.as_ref() {
    3268          464 :             remote_client
    3269          464 :                 .schedule_index_upload_for_metadata_update(&metadata)
    3270          464 :                 .context("branch initial metadata upload")?;
    3271            0 :         }
    3272              : 
    3273          464 :         Ok(new_timeline)
    3274          474 :     }
    3275              : 
    3276              :     /// For unit tests, make this visible so that other modules can directly create timelines
    3277              :     #[cfg(test)]
    3278            4 :     #[tracing::instrument(fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug(), %timeline_id))]
    3279              :     pub(crate) async fn bootstrap_timeline_test(
    3280              :         &self,
    3281              :         timeline_id: TimelineId,
    3282              :         pg_version: u32,
    3283              :         load_existing_initdb: Option<TimelineId>,
    3284              :         ctx: &RequestContext,
    3285              :     ) -> anyhow::Result<Arc<Timeline>> {
    3286              :         let uninit_mark = self.create_timeline_uninit_mark(timeline_id).unwrap();
    3287              :         self.bootstrap_timeline(
    3288              :             timeline_id,
    3289              :             pg_version,
    3290              :             load_existing_initdb,
    3291              :             uninit_mark,
    3292              :             ctx,
    3293              :         )
    3294              :         .await
    3295              :     }
    3296              : 
    3297          567 :     async fn upload_initdb(
    3298          567 :         &self,
    3299          567 :         timelines_path: &Utf8PathBuf,
    3300          567 :         pgdata_path: &Utf8PathBuf,
    3301          567 :         timeline_id: &TimelineId,
    3302          567 :     ) -> anyhow::Result<()> {
    3303          567 :         let Some(storage) = &self.remote_storage else {
    3304              :             // No remote storage?  No upload.
    3305            0 :             return Ok(());
    3306              :         };
    3307              : 
    3308          567 :         let temp_path = timelines_path.join(format!(
    3309          567 :             "{INITDB_PATH}.upload-{timeline_id}.{TEMP_FILE_SUFFIX}"
    3310          567 :         ));
    3311              : 
    3312          566 :         scopeguard::defer! {
    3313          566 :             if let Err(e) = fs::remove_file(&temp_path) {
    3314            0 :                 error!("Failed to remove temporary initdb archive '{temp_path}': {e}");
    3315          566 :             }
    3316              :         }
    3317              : 
    3318          566 :         let (pgdata_zstd, tar_zst_size) =
    3319      4663483 :             import_datadir::create_tar_zst(pgdata_path, &temp_path).await?;
    3320              : 
    3321          566 :         pausable_failpoint!("before-initdb-upload");
    3322              : 
    3323          566 :         backoff::retry(
    3324          632 :             || async {
    3325          632 :                 self::remote_timeline_client::upload_initdb_dir(
    3326          632 :                     storage,
    3327          632 :                     &self.tenant_shard_id.tenant_id,
    3328          632 :                     timeline_id,
    3329          632 :                     pgdata_zstd.try_clone().await?,
    3330          632 :                     tar_zst_size,
    3331          632 :                     &self.cancel,
    3332              :                 )
    3333        27688 :                 .await
    3334         1264 :             },
    3335          566 :             |_| false,
    3336          566 :             3,
    3337          566 :             u32::MAX,
    3338          566 :             "persist_initdb_tar_zst",
    3339          566 :             &self.cancel,
    3340          566 :         )
    3341        28314 :         .await
    3342          566 :         .ok_or_else(|| anyhow::anyhow!("Cancelled"))
    3343          566 :         .and_then(|x| x)
    3344          566 :     }
    3345              : 
    3346              :     /// - run initdb to init temporary instance and get bootstrap data
    3347              :     /// - after initialization completes, tar up the temp dir and upload it to S3.
    3348              :     ///
    3349              :     /// The caller is responsible for activating the returned timeline.
    3350          600 :     async fn bootstrap_timeline(
    3351          600 :         &self,
    3352          600 :         timeline_id: TimelineId,
    3353          600 :         pg_version: u32,
    3354          600 :         load_existing_initdb: Option<TimelineId>,
    3355          600 :         timeline_uninit_mark: TimelineUninitMark<'_>,
    3356          600 :         ctx: &RequestContext,
    3357          600 :     ) -> anyhow::Result<Arc<Timeline>> {
    3358          600 :         // create a `tenant/{tenant_id}/timelines/basebackup-{timeline_id}.{TEMP_FILE_SUFFIX}/`
    3359          600 :         // temporary directory for basebackup files for the given timeline.
    3360          600 : 
    3361          600 :         let timelines_path = self.conf.timelines_path(&self.tenant_shard_id);
    3362          600 :         let pgdata_path = path_with_suffix_extension(
    3363          600 :             timelines_path.join(format!("basebackup-{timeline_id}")),
    3364          600 :             TEMP_FILE_SUFFIX,
    3365          600 :         );
    3366          600 : 
    3367          600 :         // an uninit mark was placed before, nothing else can access this timeline files
    3368          600 :         // current initdb was not run yet, so remove whatever was left from the previous runs
    3369          600 :         if pgdata_path.exists() {
    3370            0 :             fs::remove_dir_all(&pgdata_path).with_context(|| {
    3371            0 :                 format!("Failed to remove already existing initdb directory: {pgdata_path}")
    3372            0 :             })?;
    3373          600 :         }
    3374              :         // this new directory is very temporary, set to remove it immediately after bootstrap, we don't need it
    3375          598 :         scopeguard::defer! {
    3376          598 :             if let Err(e) = fs::remove_dir_all(&pgdata_path) {
    3377              :                 // this is unlikely, but we will remove the directory on pageserver restart or another bootstrap call
    3378            0 :                 error!("Failed to remove temporary initdb directory '{pgdata_path}': {e}");
    3379          598 :             }
    3380              :         }
    3381          600 :         if let Some(existing_initdb_timeline_id) = load_existing_initdb {
    3382            4 :             let Some(storage) = &self.remote_storage else {
    3383            0 :                 bail!("no storage configured but load_existing_initdb set to {existing_initdb_timeline_id}");
    3384              :             };
    3385            4 :             if existing_initdb_timeline_id != timeline_id {
    3386            0 :                 let source_path = &remote_initdb_archive_path(
    3387            0 :                     &self.tenant_shard_id.tenant_id,
    3388            0 :                     &existing_initdb_timeline_id,
    3389            0 :                 );
    3390            0 :                 let dest_path =
    3391            0 :                     &remote_initdb_archive_path(&self.tenant_shard_id.tenant_id, &timeline_id);
    3392            0 :                 storage
    3393            0 :                     .copy_object(source_path, dest_path)
    3394            0 :                     .await
    3395            0 :                     .context("copy initdb tar")?;
    3396            4 :             }
    3397            4 :             let (initdb_tar_zst_path, initdb_tar_zst) =
    3398            4 :                 self::remote_timeline_client::download_initdb_tar_zst(
    3399            4 :                     self.conf,
    3400            4 :                     storage,
    3401            4 :                     &self.tenant_shard_id,
    3402            4 :                     &existing_initdb_timeline_id,
    3403            4 :                     &self.cancel,
    3404            4 :                 )
    3405         1656 :                 .await
    3406            4 :                 .context("download initdb tar")?;
    3407              : 
    3408            4 :             scopeguard::defer! {
    3409            4 :                 if let Err(e) = fs::remove_file(&initdb_tar_zst_path) {
    3410            0 :                     error!("Failed to remove temporary initdb archive '{initdb_tar_zst_path}': {e}");
    3411            4 :                 }
    3412              :             }
    3413              : 
    3414            4 :             let buf_read =
    3415            4 :                 BufReader::with_capacity(remote_timeline_client::BUFFER_SIZE, initdb_tar_zst);
    3416            4 :             import_datadir::extract_tar_zst(&pgdata_path, buf_read)
    3417        20343 :                 .await
    3418            4 :                 .context("extract initdb tar")?;
    3419              :         } else {
    3420              :             // Init temporarily repo to get bootstrap data, this creates a directory in the `pgdata_path` path
    3421         1159 :             run_initdb(self.conf, &pgdata_path, pg_version, &self.cancel).await?;
    3422              : 
    3423              :             // Upload the created data dir to S3
    3424          595 :             if self.tenant_shard_id().is_zero() {
    3425          567 :                 self.upload_initdb(&timelines_path, &pgdata_path, &timeline_id)
    3426      4692354 :                     .await?;
    3427           28 :             }
    3428              :         }
    3429          597 :         let pgdata_lsn = import_datadir::get_lsn_from_controlfile(&pgdata_path)?.align();
    3430          597 : 
    3431          597 :         // Import the contents of the data directory at the initial checkpoint
    3432          597 :         // LSN, and any WAL after that.
    3433          597 :         // Initdb lsn will be equal to last_record_lsn which will be set after import.
    3434          597 :         // Because we know it upfront avoid having an option or dummy zero value by passing it to the metadata.
    3435          597 :         let new_metadata = TimelineMetadata::new(
    3436          597 :             Lsn(0),
    3437          597 :             None,
    3438          597 :             None,
    3439          597 :             Lsn(0),
    3440          597 :             pgdata_lsn,
    3441          597 :             pgdata_lsn,
    3442          597 :             pg_version,
    3443          597 :         );
    3444          597 :         let raw_timeline = self
    3445          597 :             .prepare_new_timeline(
    3446          597 :                 timeline_id,
    3447          597 :                 &new_metadata,
    3448          597 :                 timeline_uninit_mark,
    3449          597 :                 pgdata_lsn,
    3450          597 :                 None,
    3451          597 :             )
    3452            3 :             .await?;
    3453              : 
    3454          596 :         let tenant_shard_id = raw_timeline.owning_tenant.tenant_shard_id;
    3455          596 :         let unfinished_timeline = raw_timeline.raw_timeline()?;
    3456              : 
    3457          596 :         import_datadir::import_timeline_from_postgres_datadir(
    3458          596 :             unfinished_timeline,
    3459          596 :             &pgdata_path,
    3460          596 :             pgdata_lsn,
    3461          596 :             ctx,
    3462          596 :         )
    3463      2986969 :         .await
    3464          596 :         .with_context(|| {
    3465            0 :             format!("Failed to import pgdatadir for timeline {tenant_shard_id}/{timeline_id}")
    3466          596 :         })?;
    3467              : 
    3468              :         // Flush the new layer files to disk, before we make the timeline as available to
    3469              :         // the outside world.
    3470              :         //
    3471              :         // Flush loop needs to be spawned in order to be able to flush.
    3472          596 :         unfinished_timeline.maybe_spawn_flush_loop();
    3473          596 : 
    3474          596 :         fail::fail_point!("before-checkpoint-new-timeline", |_| {
    3475            2 :             anyhow::bail!("failpoint before-checkpoint-new-timeline");
    3476          596 :         });
    3477              : 
    3478          594 :         unfinished_timeline
    3479          594 :             .freeze_and_flush()
    3480          593 :             .await
    3481          593 :             .with_context(|| {
    3482            0 :                 format!(
    3483            0 :                     "Failed to flush after pgdatadir import for timeline {tenant_shard_id}/{timeline_id}"
    3484            0 :                 )
    3485          593 :             })?;
    3486              : 
    3487              :         // All done!
    3488          593 :         let timeline = raw_timeline.finish_creation()?;
    3489              : 
    3490          592 :         Ok(timeline)
    3491          598 :     }
    3492              : 
    3493              :     /// Call this before constructing a timeline, to build its required structures
    3494         1145 :     fn build_timeline_resources(&self, timeline_id: TimelineId) -> TimelineResources {
    3495         1145 :         let remote_client = if let Some(remote_storage) = self.remote_storage.as_ref() {
    3496         1145 :             let remote_client = RemoteTimelineClient::new(
    3497         1145 :                 remote_storage.clone(),
    3498         1145 :                 self.deletion_queue_client.clone(),
    3499         1145 :                 self.conf,
    3500         1145 :                 self.tenant_shard_id,
    3501         1145 :                 timeline_id,
    3502         1145 :                 self.generation,
    3503         1145 :             );
    3504         1145 :             Some(remote_client)
    3505              :         } else {
    3506            0 :             None
    3507              :         };
    3508              : 
    3509         1145 :         TimelineResources {
    3510         1145 :             remote_client,
    3511         1145 :             deletion_queue_client: self.deletion_queue_client.clone(),
    3512         1145 :         }
    3513         1145 :     }
    3514              : 
    3515              :     /// Creates intermediate timeline structure and its files.
    3516              :     ///
    3517              :     /// An empty layer map is initialized, and new data and WAL can be imported starting
    3518              :     /// at 'disk_consistent_lsn'. After any initial data has been imported, call
    3519              :     /// `finish_creation` to insert the Timeline into the timelines map and to remove the
    3520              :     /// uninit mark file.
    3521         1145 :     async fn prepare_new_timeline<'a>(
    3522         1145 :         &'a self,
    3523         1145 :         new_timeline_id: TimelineId,
    3524         1145 :         new_metadata: &TimelineMetadata,
    3525         1145 :         uninit_mark: TimelineUninitMark<'a>,
    3526         1145 :         start_lsn: Lsn,
    3527         1145 :         ancestor: Option<Arc<Timeline>>,
    3528         1145 :     ) -> anyhow::Result<UninitializedTimeline> {
    3529         1145 :         let tenant_shard_id = self.tenant_shard_id;
    3530         1145 : 
    3531         1145 :         let resources = self.build_timeline_resources(new_timeline_id);
    3532         1145 :         if let Some(remote_client) = &resources.remote_client {
    3533         1145 :             remote_client.init_upload_queue_for_empty_remote(new_metadata)?;
    3534            0 :         }
    3535              : 
    3536         1145 :         let timeline_struct = self
    3537         1145 :             .create_timeline_struct(
    3538         1145 :                 new_timeline_id,
    3539         1145 :                 new_metadata,
    3540         1145 :                 ancestor,
    3541         1145 :                 resources,
    3542         1145 :                 CreateTimelineCause::Load,
    3543         1145 :             )
    3544         1145 :             .context("Failed to create timeline data structure")?;
    3545              : 
    3546         1145 :         timeline_struct.init_empty_layer_map(start_lsn);
    3547              : 
    3548         1145 :         if let Err(e) = self
    3549         1145 :             .create_timeline_files(&uninit_mark.timeline_path, &new_timeline_id, new_metadata)
    3550          330 :             .await
    3551              :         {
    3552            1 :             error!("Failed to create initial files for timeline {tenant_shard_id}/{new_timeline_id}, cleaning up: {e:?}");
    3553            1 :             cleanup_timeline_directory(uninit_mark);
    3554            1 :             return Err(e);
    3555         1144 :         }
    3556              : 
    3557            0 :         debug!(
    3558            0 :             "Successfully created initial files for timeline {tenant_shard_id}/{new_timeline_id}"
    3559            0 :         );
    3560              : 
    3561         1144 :         Ok(UninitializedTimeline::new(
    3562         1144 :             self,
    3563         1144 :             new_timeline_id,
    3564         1144 :             Some((timeline_struct, uninit_mark)),
    3565         1144 :         ))
    3566         1145 :     }
    3567              : 
    3568         1145 :     async fn create_timeline_files(
    3569         1145 :         &self,
    3570         1145 :         timeline_path: &Utf8Path,
    3571         1145 :         new_timeline_id: &TimelineId,
    3572         1145 :         new_metadata: &TimelineMetadata,
    3573         1145 :     ) -> anyhow::Result<()> {
    3574         1145 :         crashsafe::create_dir(timeline_path).context("Failed to create timeline directory")?;
    3575              : 
    3576         1145 :         fail::fail_point!("after-timeline-uninit-mark-creation", |_| {
    3577            1 :             anyhow::bail!("failpoint after-timeline-uninit-mark-creation");
    3578         1145 :         });
    3579              : 
    3580         1144 :         save_metadata(
    3581         1144 :             self.conf,
    3582         1144 :             &self.tenant_shard_id,
    3583         1144 :             new_timeline_id,
    3584         1144 :             new_metadata,
    3585         1144 :         )
    3586          330 :         .await
    3587         1144 :         .context("Failed to create timeline metadata")?;
    3588         1144 :         Ok(())
    3589         1145 :     }
    3590              : 
    3591              :     /// Attempts to create an uninit mark file for the timeline initialization.
    3592              :     /// Bails, if the timeline is already loaded into the memory (i.e. initialized before), or the uninit mark file already exists.
    3593              :     ///
    3594              :     /// This way, we need to hold the timelines lock only for small amount of time during the mark check/creation per timeline init.
    3595         1196 :     fn create_timeline_uninit_mark(
    3596         1196 :         &self,
    3597         1196 :         timeline_id: TimelineId,
    3598         1196 :     ) -> Result<TimelineUninitMark, TimelineExclusionError> {
    3599         1196 :         let tenant_shard_id = self.tenant_shard_id;
    3600         1196 : 
    3601         1196 :         let uninit_mark_path = self
    3602         1196 :             .conf
    3603         1196 :             .timeline_uninit_mark_file_path(tenant_shard_id, timeline_id);
    3604         1196 :         let timeline_path = self.conf.timeline_path(&tenant_shard_id, &timeline_id);
    3605              : 
    3606         1196 :         let uninit_mark = TimelineUninitMark::new(
    3607         1196 :             self,
    3608         1196 :             timeline_id,
    3609         1196 :             uninit_mark_path.clone(),
    3610         1196 :             timeline_path.clone(),
    3611         1196 :         )?;
    3612              : 
    3613              :         // At this stage, we have got exclusive access to in-memory state for this timeline ID
    3614              :         // for creation.
    3615              :         // A timeline directory should never exist on disk already:
    3616              :         // - a previous failed creation would have cleaned up after itself
    3617              :         // - a pageserver restart would clean up timeline directories that don't have valid remote state
    3618              :         //
    3619              :         // Therefore it is an unexpected internal error to encounter a timeline directory already existing here,
    3620              :         // this error may indicate a bug in cleanup on failed creations.
    3621         1166 :         if timeline_path.exists() {
    3622            0 :             return Err(TimelineExclusionError::Other(anyhow::anyhow!(
    3623            0 :                 "Timeline directory already exists! This is a bug."
    3624            0 :             )));
    3625         1166 :         }
    3626         1166 : 
    3627         1166 :         // Create the on-disk uninit mark _after_ the in-memory acquisition of the tenant ID: guarantees
    3628         1166 :         // that during process runtime, colliding creations will be caught in-memory without getting
    3629         1166 :         // as far as failing to write a file.
    3630         1166 :         fs::OpenOptions::new()
    3631         1166 :             .write(true)
    3632         1166 :             .create_new(true)
    3633         1166 :             .open(&uninit_mark_path)
    3634         1166 :             .context("Failed to create uninit mark file")
    3635         1166 :             .and_then(|_| {
    3636         1166 :                 crashsafe::fsync_file_and_parent(&uninit_mark_path)
    3637         1166 :                     .context("Failed to fsync uninit mark file")
    3638         1166 :             })
    3639         1166 :             .with_context(|| {
    3640            0 :                 format!("Failed to crate uninit mark for timeline {tenant_shard_id}/{timeline_id}")
    3641         1166 :             })?;
    3642              : 
    3643         1166 :         Ok(uninit_mark)
    3644         1196 :     }
    3645              : 
    3646              :     /// Gathers inputs from all of the timelines to produce a sizing model input.
    3647              :     ///
    3648              :     /// Future is cancellation safe. Only one calculation can be running at once per tenant.
    3649          154 :     #[instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug()))]
    3650              :     pub async fn gather_size_inputs(
    3651              :         &self,
    3652              :         // `max_retention_period` overrides the cutoff that is used to calculate the size
    3653              :         // (only if it is shorter than the real cutoff).
    3654              :         max_retention_period: Option<u64>,
    3655              :         cause: LogicalSizeCalculationCause,
    3656              :         cancel: &CancellationToken,
    3657              :         ctx: &RequestContext,
    3658              :     ) -> anyhow::Result<size::ModelInputs> {
    3659              :         let logical_sizes_at_once = self
    3660              :             .conf
    3661              :             .concurrent_tenant_size_logical_size_queries
    3662              :             .inner();
    3663              : 
    3664              :         // TODO: Having a single mutex block concurrent reads is not great for performance.
    3665              :         //
    3666              :         // But the only case where we need to run multiple of these at once is when we
    3667              :         // request a size for a tenant manually via API, while another background calculation
    3668              :         // is in progress (which is not a common case).
    3669              :         //
    3670              :         // See more for on the issue #2748 condenced out of the initial PR review.
    3671              :         let mut shared_cache = self.cached_logical_sizes.lock().await;
    3672              : 
    3673              :         size::gather_inputs(
    3674              :             self,
    3675              :             logical_sizes_at_once,
    3676              :             max_retention_period,
    3677              :             &mut shared_cache,
    3678              :             cause,
    3679              :             cancel,
    3680              :             ctx,
    3681              :         )
    3682              :         .await
    3683              :     }
    3684              : 
    3685              :     /// Calculate synthetic tenant size and cache the result.
    3686              :     /// This is periodically called by background worker.
    3687              :     /// result is cached in tenant struct
    3688           44 :     #[instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug()))]
    3689              :     pub async fn calculate_synthetic_size(
    3690              :         &self,
    3691              :         cause: LogicalSizeCalculationCause,
    3692              :         cancel: &CancellationToken,
    3693              :         ctx: &RequestContext,
    3694              :     ) -> anyhow::Result<u64> {
    3695              :         let inputs = self.gather_size_inputs(None, cause, cancel, ctx).await?;
    3696              : 
    3697              :         let size = inputs.calculate()?;
    3698              : 
    3699              :         self.set_cached_synthetic_size(size);
    3700              : 
    3701              :         Ok(size)
    3702              :     }
    3703              : 
    3704              :     /// Cache given synthetic size and update the metric value
    3705           21 :     pub fn set_cached_synthetic_size(&self, size: u64) {
    3706           21 :         self.cached_synthetic_tenant_size
    3707           21 :             .store(size, Ordering::Relaxed);
    3708              : 
    3709              :         // Only shard zero should be calculating synthetic sizes
    3710           21 :         debug_assert!(self.shard_identity.is_zero());
    3711              : 
    3712           21 :         TENANT_SYNTHETIC_SIZE_METRIC
    3713           21 :             .get_metric_with_label_values(&[&self.tenant_shard_id.tenant_id.to_string()])
    3714           21 :             .unwrap()
    3715           21 :             .set(size);
    3716           21 :     }
    3717              : 
    3718           19 :     pub fn cached_synthetic_size(&self) -> u64 {
    3719           19 :         self.cached_synthetic_tenant_size.load(Ordering::Relaxed)
    3720           19 :     }
    3721              : 
    3722              :     /// Flush any in-progress layers, schedule uploads, and wait for uploads to complete.
    3723              :     ///
    3724              :     /// This function can take a long time: callers should wrap it in a timeout if calling
    3725              :     /// from an external API handler.
    3726              :     ///
    3727              :     /// Cancel-safety: cancelling this function may leave I/O running, but such I/O is
    3728              :     /// still bounded by tenant/timeline shutdown.
    3729           10 :     #[tracing::instrument(skip_all)]
    3730              :     pub(crate) async fn flush_remote(&self) -> anyhow::Result<()> {
    3731              :         let timelines = self.timelines.lock().unwrap().clone();
    3732              : 
    3733            5 :         async fn flush_timeline(_gate: GateGuard, timeline: Arc<Timeline>) -> anyhow::Result<()> {
    3734            5 :             tracing::info!(timeline_id=%timeline.timeline_id, "Flushing...");
    3735            5 :             timeline.freeze_and_flush().await?;
    3736            5 :             tracing::info!(timeline_id=%timeline.timeline_id, "Waiting for uploads...");
    3737            5 :             if let Some(client) = &timeline.remote_client {
    3738            5 :                 client.wait_completion().await?;
    3739            0 :             }
    3740              : 
    3741            5 :             Ok(())
    3742            5 :         }
    3743              : 
    3744              :         // We do not use a JoinSet for these tasks, because we don't want them to be
    3745              :         // aborted when this function's future is cancelled: they should stay alive
    3746              :         // holding their GateGuard until they complete, to ensure their I/Os complete
    3747              :         // before Timeline shutdown completes.
    3748              :         let mut results = FuturesUnordered::new();
    3749              : 
    3750              :         for (_timeline_id, timeline) in timelines {
    3751              :             // Run each timeline's flush in a task holding the timeline's gate: this
    3752              :             // means that if this function's future is cancelled, the Timeline shutdown
    3753              :             // will still wait for any I/O in here to complete.
    3754              :             let gate = match timeline.gate.enter() {
    3755              :                 Ok(g) => g,
    3756              :                 Err(_) => continue,
    3757              :             };
    3758           10 :             let jh = tokio::task::spawn(async move { flush_timeline(gate, timeline).await });
    3759              :             results.push(jh);
    3760              :         }
    3761              : 
    3762              :         while let Some(r) = results.next().await {
    3763              :             if let Err(e) = r {
    3764              :                 if !e.is_cancelled() && !e.is_panic() {
    3765            0 :                     tracing::error!("unexpected join error: {e:?}");
    3766              :                 }
    3767              :             }
    3768              :         }
    3769              : 
    3770              :         // The flushes we did above were just writes, but the Tenant might have had
    3771              :         // pending deletions as well from recent compaction/gc: we want to flush those
    3772              :         // as well.  This requires flushing the global delete queue.  This is cheap
    3773              :         // because it's typically a no-op.
    3774              :         match self.deletion_queue_client.flush_execute().await {
    3775              :             Ok(_) => {}
    3776              :             Err(DeletionQueueError::ShuttingDown) => {}
    3777              :         }
    3778              : 
    3779              :         Ok(())
    3780              :     }
    3781              : 
    3782            5 :     pub(crate) fn get_tenant_conf(&self) -> TenantConfOpt {
    3783            5 :         self.tenant_conf.read().unwrap().tenant_conf
    3784            5 :     }
    3785              : }
    3786              : 
    3787            2 : fn remove_timeline_and_uninit_mark(
    3788            2 :     timeline_dir: &Utf8Path,
    3789            2 :     uninit_mark: &Utf8Path,
    3790            2 : ) -> anyhow::Result<()> {
    3791            2 :     fs::remove_dir_all(timeline_dir)
    3792            2 :         .or_else(|e| {
    3793            0 :             if e.kind() == std::io::ErrorKind::NotFound {
    3794              :                 // we can leave the uninit mark without a timeline dir,
    3795              :                 // just remove the mark then
    3796            0 :                 Ok(())
    3797              :             } else {
    3798            0 :                 Err(e)
    3799              :             }
    3800            2 :         })
    3801            2 :         .with_context(|| {
    3802            0 :             format!("Failed to remove unit marked timeline directory {timeline_dir}")
    3803            2 :         })?;
    3804            2 :     fs::remove_file(uninit_mark)
    3805            2 :         .with_context(|| format!("Failed to remove timeline uninit mark file {uninit_mark}"))?;
    3806              : 
    3807            2 :     Ok(())
    3808            2 : }
    3809              : 
    3810              : /// Create the cluster temporarily in 'initdbpath' directory inside the repository
    3811              : /// to get bootstrap data for timeline initialization.
    3812          596 : async fn run_initdb(
    3813          596 :     conf: &'static PageServerConf,
    3814          596 :     initdb_target_dir: &Utf8Path,
    3815          596 :     pg_version: u32,
    3816          596 :     cancel: &CancellationToken,
    3817          596 : ) -> Result<(), InitdbError> {
    3818          596 :     let initdb_bin_path = conf
    3819          596 :         .pg_bin_dir(pg_version)
    3820          596 :         .map_err(InitdbError::Other)?
    3821          596 :         .join("initdb");
    3822          596 :     let initdb_lib_dir = conf.pg_lib_dir(pg_version).map_err(InitdbError::Other)?;
    3823          596 :     info!(
    3824          596 :         "running {} in {}, libdir: {}",
    3825          596 :         initdb_bin_path, initdb_target_dir, initdb_lib_dir,
    3826          596 :     );
    3827              : 
    3828          596 :     let _permit = INIT_DB_SEMAPHORE.acquire().await;
    3829              : 
    3830          596 :     let initdb_command = tokio::process::Command::new(&initdb_bin_path)
    3831          596 :         .args(["-D", initdb_target_dir.as_ref()])
    3832          596 :         .args(["-U", &conf.superuser])
    3833          596 :         .args(["-E", "utf8"])
    3834          596 :         .arg("--no-instructions")
    3835          596 :         .arg("--no-sync")
    3836          596 :         .env_clear()
    3837          596 :         .env("LD_LIBRARY_PATH", &initdb_lib_dir)
    3838          596 :         .env("DYLD_LIBRARY_PATH", &initdb_lib_dir)
    3839          596 :         .stdin(std::process::Stdio::null())
    3840          596 :         // stdout invocation produces the same output every time, we don't need it
    3841          596 :         .stdout(std::process::Stdio::null())
    3842          596 :         // we would be interested in the stderr output, if there was any
    3843          596 :         .stderr(std::process::Stdio::piped())
    3844          596 :         .spawn()?;
    3845              : 
    3846              :     // Ideally we'd select here with the cancellation token, but the problem is that
    3847              :     // we can't safely terminate initdb: it launches processes of its own, and killing
    3848              :     // initdb doesn't kill them. After we return from this function, we want the target
    3849              :     // directory to be able to be cleaned up.
    3850              :     // See https://github.com/neondatabase/neon/issues/6385
    3851         1159 :     let initdb_output = initdb_command.wait_with_output().await?;
    3852          596 :     if !initdb_output.status.success() {
    3853            0 :         return Err(InitdbError::Failed(
    3854            0 :             initdb_output.status,
    3855            0 :             initdb_output.stderr,
    3856            0 :         ));
    3857          596 :     }
    3858          596 : 
    3859          596 :     // This isn't true cancellation support, see above. Still return an error to
    3860          596 :     // excercise the cancellation code path.
    3861          596 :     if cancel.is_cancelled() {
    3862            1 :         return Err(InitdbError::Cancelled);
    3863          595 :     }
    3864          595 : 
    3865          595 :     Ok(())
    3866          596 : }
    3867              : 
    3868              : impl Drop for Tenant {
    3869          316 :     fn drop(&mut self) {
    3870          316 :         remove_tenant_metrics(&self.tenant_shard_id);
    3871          316 :     }
    3872              : }
    3873              : /// Dump contents of a layer file to stdout.
    3874            0 : pub async fn dump_layerfile_from_path(
    3875            0 :     path: &Utf8Path,
    3876            0 :     verbose: bool,
    3877            0 :     ctx: &RequestContext,
    3878            0 : ) -> anyhow::Result<()> {
    3879              :     use std::os::unix::fs::FileExt;
    3880              : 
    3881              :     // All layer files start with a two-byte "magic" value, to identify the kind of
    3882              :     // file.
    3883            0 :     let file = File::open(path)?;
    3884            0 :     let mut header_buf = [0u8; 2];
    3885            0 :     file.read_exact_at(&mut header_buf, 0)?;
    3886              : 
    3887            0 :     match u16::from_be_bytes(header_buf) {
    3888              :         crate::IMAGE_FILE_MAGIC => {
    3889            0 :             ImageLayer::new_for_path(path, file)?
    3890            0 :                 .dump(verbose, ctx)
    3891            0 :                 .await?
    3892              :         }
    3893              :         crate::DELTA_FILE_MAGIC => {
    3894            0 :             DeltaLayer::new_for_path(path, file)?
    3895            0 :                 .dump(verbose, ctx)
    3896            0 :                 .await?
    3897              :         }
    3898            0 :         magic => bail!("unrecognized magic identifier: {:?}", magic),
    3899              :     }
    3900              : 
    3901            0 :     Ok(())
    3902            0 : }
    3903              : 
    3904              : #[cfg(test)]
    3905              : pub(crate) mod harness {
    3906              :     use bytes::{Bytes, BytesMut};
    3907              :     use camino::Utf8PathBuf;
    3908              :     use once_cell::sync::OnceCell;
    3909              :     use pageserver_api::models::ShardParameters;
    3910              :     use pageserver_api::shard::ShardIndex;
    3911              :     use std::fs;
    3912              :     use std::sync::Arc;
    3913              :     use utils::logging;
    3914              :     use utils::lsn::Lsn;
    3915              : 
    3916              :     use crate::deletion_queue::mock::MockDeletionQueue;
    3917              :     use crate::{
    3918              :         config::PageServerConf, repository::Key, tenant::Tenant, walrecord::NeonWalRecord,
    3919              :     };
    3920              : 
    3921              :     use super::*;
    3922              :     use crate::tenant::config::{TenantConf, TenantConfOpt};
    3923              :     use hex_literal::hex;
    3924              :     use utils::id::{TenantId, TimelineId};
    3925              : 
    3926              :     pub const TIMELINE_ID: TimelineId =
    3927              :         TimelineId::from_array(hex!("11223344556677881122334455667788"));
    3928              :     pub const NEW_TIMELINE_ID: TimelineId =
    3929              :         TimelineId::from_array(hex!("AA223344556677881122334455667788"));
    3930              : 
    3931              :     /// Convenience function to create a page image with given string as the only content
    3932              :     #[allow(non_snake_case)]
    3933      1708217 :     pub fn TEST_IMG(s: &str) -> Bytes {
    3934      1708217 :         let mut buf = BytesMut::new();
    3935      1708217 :         buf.extend_from_slice(s.as_bytes());
    3936      1708217 :         buf.resize(64, 0);
    3937      1708217 : 
    3938      1708217 :         buf.freeze()
    3939      1708217 :     }
    3940              : 
    3941              :     impl From<TenantConf> for TenantConfOpt {
    3942           84 :         fn from(tenant_conf: TenantConf) -> Self {
    3943           84 :             Self {
    3944           84 :                 checkpoint_distance: Some(tenant_conf.checkpoint_distance),
    3945           84 :                 checkpoint_timeout: Some(tenant_conf.checkpoint_timeout),
    3946           84 :                 compaction_target_size: Some(tenant_conf.compaction_target_size),
    3947           84 :                 compaction_period: Some(tenant_conf.compaction_period),
    3948           84 :                 compaction_threshold: Some(tenant_conf.compaction_threshold),
    3949           84 :                 gc_horizon: Some(tenant_conf.gc_horizon),
    3950           84 :                 gc_period: Some(tenant_conf.gc_period),
    3951           84 :                 image_creation_threshold: Some(tenant_conf.image_creation_threshold),
    3952           84 :                 pitr_interval: Some(tenant_conf.pitr_interval),
    3953           84 :                 walreceiver_connect_timeout: Some(tenant_conf.walreceiver_connect_timeout),
    3954           84 :                 lagging_wal_timeout: Some(tenant_conf.lagging_wal_timeout),
    3955           84 :                 max_lsn_wal_lag: Some(tenant_conf.max_lsn_wal_lag),
    3956           84 :                 trace_read_requests: Some(tenant_conf.trace_read_requests),
    3957           84 :                 eviction_policy: Some(tenant_conf.eviction_policy),
    3958           84 :                 min_resident_size_override: tenant_conf.min_resident_size_override,
    3959           84 :                 evictions_low_residence_duration_metric_threshold: Some(
    3960           84 :                     tenant_conf.evictions_low_residence_duration_metric_threshold,
    3961           84 :                 ),
    3962           84 :                 gc_feedback: Some(tenant_conf.gc_feedback),
    3963           84 :                 heatmap_period: Some(tenant_conf.heatmap_period),
    3964           84 :                 lazy_slru_download: Some(tenant_conf.lazy_slru_download),
    3965           84 :             }
    3966           84 :         }
    3967              :     }
    3968              : 
    3969              :     #[cfg(test)]
    3970           84 :     #[derive(Debug)]
    3971              :     enum LoadMode {
    3972              :         Local,
    3973              :         Remote,
    3974              :     }
    3975              : 
    3976              :     pub struct TenantHarness {
    3977              :         pub conf: &'static PageServerConf,
    3978              :         pub tenant_conf: TenantConf,
    3979              :         pub tenant_shard_id: TenantShardId,
    3980              :         pub generation: Generation,
    3981              :         pub shard: ShardIndex,
    3982              :         pub remote_storage: GenericRemoteStorage,
    3983              :         pub remote_fs_dir: Utf8PathBuf,
    3984              :         pub deletion_queue: MockDeletionQueue,
    3985              :     }
    3986              : 
    3987              :     static LOG_HANDLE: OnceCell<()> = OnceCell::new();
    3988              : 
    3989           88 :     pub(crate) fn setup_logging() {
    3990           88 :         LOG_HANDLE.get_or_init(|| {
    3991           88 :             logging::init(
    3992           88 :                 logging::LogFormat::Test,
    3993           88 :                 // enable it in case the tests exercise code paths that use
    3994           88 :                 // debug_assert_current_span_has_tenant_and_timeline_id
    3995           88 :                 logging::TracingErrorLayerEnablement::EnableWithRustLogFilter,
    3996           88 :                 logging::Output::Stdout,
    3997           88 :             )
    3998           88 :             .expect("Failed to init test logging")
    3999           88 :         });
    4000           88 :     }
    4001              : 
    4002              :     impl TenantHarness {
    4003           82 :         pub fn create(test_name: &'static str) -> anyhow::Result<Self> {
    4004           82 :             setup_logging();
    4005           82 : 
    4006           82 :             let repo_dir = PageServerConf::test_repo_dir(test_name);
    4007           82 :             let _ = fs::remove_dir_all(&repo_dir);
    4008           82 :             fs::create_dir_all(&repo_dir)?;
    4009              : 
    4010           82 :             let conf = PageServerConf::dummy_conf(repo_dir);
    4011           82 :             // Make a static copy of the config. This can never be free'd, but that's
    4012           82 :             // OK in a test.
    4013           82 :             let conf: &'static PageServerConf = Box::leak(Box::new(conf));
    4014           82 : 
    4015           82 :             // Disable automatic GC and compaction to make the unit tests more deterministic.
    4016           82 :             // The tests perform them manually if needed.
    4017           82 :             let tenant_conf = TenantConf {
    4018           82 :                 gc_period: Duration::ZERO,
    4019           82 :                 compaction_period: Duration::ZERO,
    4020           82 :                 ..TenantConf::default()
    4021           82 :             };
    4022           82 : 
    4023           82 :             let tenant_id = TenantId::generate();
    4024           82 :             let tenant_shard_id = TenantShardId::unsharded(tenant_id);
    4025           82 :             fs::create_dir_all(conf.tenant_path(&tenant_shard_id))?;
    4026           82 :             fs::create_dir_all(conf.timelines_path(&tenant_shard_id))?;
    4027              : 
    4028              :             use remote_storage::{RemoteStorageConfig, RemoteStorageKind};
    4029           82 :             let remote_fs_dir = conf.workdir.join("localfs");
    4030           82 :             std::fs::create_dir_all(&remote_fs_dir).unwrap();
    4031           82 :             let config = RemoteStorageConfig {
    4032           82 :                 storage: RemoteStorageKind::LocalFs(remote_fs_dir.clone()),
    4033           82 :             };
    4034           82 :             let remote_storage = GenericRemoteStorage::from_config(&config).unwrap();
    4035           82 :             let deletion_queue = MockDeletionQueue::new(Some(remote_storage.clone()));
    4036           82 : 
    4037           82 :             Ok(Self {
    4038           82 :                 conf,
    4039           82 :                 tenant_conf,
    4040           82 :                 tenant_shard_id,
    4041           82 :                 generation: Generation::new(0xdeadbeef),
    4042           82 :                 shard: ShardIndex::unsharded(),
    4043           82 :                 remote_storage,
    4044           82 :                 remote_fs_dir,
    4045           82 :                 deletion_queue,
    4046           82 :             })
    4047           82 :         }
    4048              : 
    4049           10 :         pub fn span(&self) -> tracing::Span {
    4050           10 :             info_span!("TenantHarness", tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug())
    4051           10 :         }
    4052              : 
    4053           80 :         pub(crate) async fn load(&self) -> (Arc<Tenant>, RequestContext) {
    4054           80 :             let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error);
    4055           80 :             (
    4056           80 :                 self.try_load(&ctx)
    4057          103 :                     .await
    4058           80 :                     .expect("failed to load test tenant"),
    4059           80 :                 ctx,
    4060           80 :             )
    4061           80 :         }
    4062              : 
    4063              :         /// For tests that specifically want to exercise the local load path, which does
    4064              :         /// not use remote storage.
    4065            2 :         pub(crate) async fn try_load_local(
    4066            2 :             &self,
    4067            2 :             ctx: &RequestContext,
    4068            2 :         ) -> anyhow::Result<Arc<Tenant>> {
    4069            2 :             self.do_try_load(ctx, LoadMode::Local).await
    4070            2 :         }
    4071              : 
    4072              :         /// The 'load' in this function is either a local load or a normal attachment,
    4073           82 :         pub(crate) async fn try_load(&self, ctx: &RequestContext) -> anyhow::Result<Arc<Tenant>> {
    4074              :             // If we have nothing in remote storage, must use load_local instead of attach: attach
    4075              :             // will error out if there are no timelines.
    4076              :             //
    4077              :             // See https://github.com/neondatabase/neon/issues/5456 for how we will eliminate
    4078              :             // this weird state of a Tenant which exists but doesn't have any timelines.
    4079           82 :             let mode = match self.remote_empty() {
    4080           78 :                 true => LoadMode::Local,
    4081            4 :                 false => LoadMode::Remote,
    4082              :             };
    4083              : 
    4084          105 :             self.do_try_load(ctx, mode).await
    4085           82 :         }
    4086              : 
    4087          168 :         #[instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug(), ?mode))]
    4088              :         async fn do_try_load(
    4089              :             &self,
    4090              :             ctx: &RequestContext,
    4091              :             mode: LoadMode,
    4092              :         ) -> anyhow::Result<Arc<Tenant>> {
    4093              :             let walredo_mgr = Arc::new(WalRedoManager::from(TestRedoManager));
    4094              : 
    4095              :             let tenant = Arc::new(Tenant::new(
    4096              :                 TenantState::Loading,
    4097              :                 self.conf,
    4098              :                 AttachedTenantConf::try_from(LocationConf::attached_single(
    4099              :                     TenantConfOpt::from(self.tenant_conf),
    4100              :                     self.generation,
    4101              :                     &ShardParameters::default(),
    4102              :                 ))
    4103              :                 .unwrap(),
    4104              :                 // This is a legacy/test code path: sharding isn't supported here.
    4105              :                 ShardIdentity::unsharded(),
    4106              :                 Some(walredo_mgr),
    4107              :                 self.tenant_shard_id,
    4108              :                 Some(self.remote_storage.clone()),
    4109              :                 self.deletion_queue.new_client(),
    4110              :             ));
    4111              : 
    4112              :             match mode {
    4113              :                 LoadMode::Local => {
    4114              :                     tenant.load_local(ctx).await?;
    4115              :                 }
    4116              :                 LoadMode::Remote => {
    4117              :                     let preload = tenant
    4118              :                         .preload(&self.remote_storage, CancellationToken::new())
    4119              :                         .await?;
    4120              :                     tenant.attach(Some(preload), SpawnMode::Normal, ctx).await?;
    4121              :                 }
    4122              :             }
    4123              : 
    4124              :             tenant.state.send_replace(TenantState::Active);
    4125              :             for timeline in tenant.timelines.lock().unwrap().values() {
    4126              :                 timeline.set_state(TimelineState::Active);
    4127              :             }
    4128              :             Ok(tenant)
    4129              :         }
    4130              : 
    4131           82 :         fn remote_empty(&self) -> bool {
    4132           82 :             let tenant_path = self.conf.tenant_path(&self.tenant_shard_id);
    4133           82 :             let remote_tenant_dir = self
    4134           82 :                 .remote_fs_dir
    4135           82 :                 .join(tenant_path.strip_prefix(&self.conf.workdir).unwrap());
    4136           82 :             if std::fs::metadata(&remote_tenant_dir).is_err() {
    4137           78 :                 return true;
    4138            4 :             }
    4139            4 : 
    4140            4 :             match std::fs::read_dir(remote_tenant_dir)
    4141            4 :                 .unwrap()
    4142            4 :                 .flatten()
    4143            4 :                 .next()
    4144              :             {
    4145            4 :                 Some(entry) => {
    4146            4 :                     tracing::debug!(
    4147            0 :                         "remote_empty: not empty, found file {}",
    4148            0 :                         entry.file_name().to_string_lossy(),
    4149            0 :                     );
    4150            4 :                     false
    4151              :                 }
    4152            0 :                 None => true,
    4153              :             }
    4154           82 :         }
    4155              : 
    4156            6 :         pub fn timeline_path(&self, timeline_id: &TimelineId) -> Utf8PathBuf {
    4157            6 :             self.conf.timeline_path(&self.tenant_shard_id, timeline_id)
    4158            6 :         }
    4159              :     }
    4160              : 
    4161              :     // Mock WAL redo manager that doesn't do much
    4162              :     pub(crate) struct TestRedoManager;
    4163              : 
    4164              :     impl TestRedoManager {
    4165              :         /// # Cancel-Safety
    4166              :         ///
    4167              :         /// This method is cancellation-safe.
    4168            0 :         pub async fn request_redo(
    4169            0 :             &self,
    4170            0 :             key: Key,
    4171            0 :             lsn: Lsn,
    4172            0 :             base_img: Option<(Lsn, Bytes)>,
    4173            0 :             records: Vec<(Lsn, NeonWalRecord)>,
    4174            0 :             _pg_version: u32,
    4175            0 :         ) -> anyhow::Result<Bytes> {
    4176            0 :             let s = format!(
    4177            0 :                 "redo for {} to get to {}, with {} and {} records",
    4178            0 :                 key,
    4179            0 :                 lsn,
    4180            0 :                 if base_img.is_some() {
    4181            0 :                     "base image"
    4182              :                 } else {
    4183            0 :                     "no base image"
    4184              :                 },
    4185            0 :                 records.len()
    4186            0 :             );
    4187            0 :             println!("{s}");
    4188            0 : 
    4189            0 :             Ok(TEST_IMG(&s))
    4190            0 :         }
    4191              :     }
    4192              : }
    4193              : 
    4194              : #[cfg(test)]
    4195              : mod tests {
    4196              :     use super::*;
    4197              :     use crate::keyspace::KeySpaceAccum;
    4198              :     use crate::repository::{Key, Value};
    4199              :     use crate::tenant::harness::*;
    4200              :     use crate::DEFAULT_PG_VERSION;
    4201              :     use crate::METADATA_FILE_NAME;
    4202              :     use bytes::BytesMut;
    4203              :     use hex_literal::hex;
    4204              :     use once_cell::sync::Lazy;
    4205              :     use rand::{thread_rng, Rng};
    4206              :     use tokio_util::sync::CancellationToken;
    4207              : 
    4208              :     static TEST_KEY: Lazy<Key> =
    4209           18 :         Lazy::new(|| Key::from_slice(&hex!("010000000033333333444444445500000001")));
    4210              : 
    4211            2 :     #[tokio::test]
    4212            2 :     async fn test_basic() -> anyhow::Result<()> {
    4213            2 :         let (tenant, ctx) = TenantHarness::create("test_basic")?.load().await;
    4214            2 :         let tline = tenant
    4215            2 :             .create_test_timeline(TIMELINE_ID, Lsn(0x08), DEFAULT_PG_VERSION, &ctx)
    4216            6 :             .await?;
    4217            2 : 
    4218            2 :         let writer = tline.writer().await;
    4219            2 :         writer
    4220            2 :             .put(
    4221            2 :                 *TEST_KEY,
    4222            2 :                 Lsn(0x10),
    4223            2 :                 &Value::Image(TEST_IMG("foo at 0x10")),
    4224            2 :                 &ctx,
    4225            2 :             )
    4226            2 :             .await?;
    4227            2 :         writer.finish_write(Lsn(0x10));
    4228            2 :         drop(writer);
    4229            2 : 
    4230            2 :         let writer = tline.writer().await;
    4231            2 :         writer
    4232            2 :             .put(
    4233            2 :                 *TEST_KEY,
    4234            2 :                 Lsn(0x20),
    4235            2 :                 &Value::Image(TEST_IMG("foo at 0x20")),
    4236            2 :                 &ctx,
    4237            2 :             )
    4238            2 :             .await?;
    4239            2 :         writer.finish_write(Lsn(0x20));
    4240            2 :         drop(writer);
    4241            2 : 
    4242            2 :         assert_eq!(
    4243            2 :             tline.get(*TEST_KEY, Lsn(0x10), &ctx).await?,
    4244            2 :             TEST_IMG("foo at 0x10")
    4245            2 :         );
    4246            2 :         assert_eq!(
    4247            2 :             tline.get(*TEST_KEY, Lsn(0x1f), &ctx).await?,
    4248            2 :             TEST_IMG("foo at 0x10")
    4249            2 :         );
    4250            2 :         assert_eq!(
    4251            2 :             tline.get(*TEST_KEY, Lsn(0x20), &ctx).await?,
    4252            2 :             TEST_IMG("foo at 0x20")
    4253            2 :         );
    4254            2 : 
    4255            2 :         Ok(())
    4256            2 :     }
    4257              : 
    4258            2 :     #[tokio::test]
    4259            2 :     async fn no_duplicate_timelines() -> anyhow::Result<()> {
    4260            2 :         let (tenant, ctx) = TenantHarness::create("no_duplicate_timelines")?
    4261            2 :             .load()
    4262            2 :             .await;
    4263            2 :         let _ = tenant
    4264            2 :             .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
    4265            7 :             .await?;
    4266            2 : 
    4267            2 :         match tenant
    4268            2 :             .create_empty_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
    4269            2 :             .await
    4270            2 :         {
    4271            2 :             Ok(_) => panic!("duplicate timeline creation should fail"),
    4272            2 :             Err(e) => assert_eq!(e.to_string(), "Already exists".to_string()),
    4273            2 :         }
    4274            2 : 
    4275            2 :         Ok(())
    4276            2 :     }
    4277              : 
    4278              :     /// Convenience function to create a page image with given string as the only content
    4279           10 :     pub fn test_value(s: &str) -> Value {
    4280           10 :         let mut buf = BytesMut::new();
    4281           10 :         buf.extend_from_slice(s.as_bytes());
    4282           10 :         Value::Image(buf.freeze())
    4283           10 :     }
    4284              : 
    4285              :     ///
    4286              :     /// Test branch creation
    4287              :     ///
    4288            2 :     #[tokio::test]
    4289            2 :     async fn test_branch() -> anyhow::Result<()> {
    4290            2 :         use std::str::from_utf8;
    4291            2 : 
    4292            2 :         let (tenant, ctx) = TenantHarness::create("test_branch")?.load().await;
    4293            2 :         let tline = tenant
    4294            2 :             .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
    4295            7 :             .await?;
    4296            2 :         let writer = tline.writer().await;
    4297            2 : 
    4298            2 :         #[allow(non_snake_case)]
    4299            2 :         let TEST_KEY_A: Key = Key::from_hex("110000000033333333444444445500000001").unwrap();
    4300            2 :         #[allow(non_snake_case)]
    4301            2 :         let TEST_KEY_B: Key = Key::from_hex("110000000033333333444444445500000002").unwrap();
    4302            2 : 
    4303            2 :         // Insert a value on the timeline
    4304            2 :         writer
    4305            2 :             .put(TEST_KEY_A, Lsn(0x20), &test_value("foo at 0x20"), &ctx)
    4306            2 :             .await?;
    4307            2 :         writer
    4308            2 :             .put(TEST_KEY_B, Lsn(0x20), &test_value("foobar at 0x20"), &ctx)
    4309            2 :             .await?;
    4310            2 :         writer.finish_write(Lsn(0x20));
    4311            2 : 
    4312            2 :         writer
    4313            2 :             .put(TEST_KEY_A, Lsn(0x30), &test_value("foo at 0x30"), &ctx)
    4314            2 :             .await?;
    4315            2 :         writer.finish_write(Lsn(0x30));
    4316            2 :         writer
    4317            2 :             .put(TEST_KEY_A, Lsn(0x40), &test_value("foo at 0x40"), &ctx)
    4318            2 :             .await?;
    4319            2 :         writer.finish_write(Lsn(0x40));
    4320            2 : 
    4321            2 :         //assert_current_logical_size(&tline, Lsn(0x40));
    4322            2 : 
    4323            2 :         // Branch the history, modify relation differently on the new timeline
    4324            2 :         tenant
    4325            2 :             .branch_timeline_test(&tline, NEW_TIMELINE_ID, Some(Lsn(0x30)), &ctx)
    4326            2 :             .await?;
    4327            2 :         let newtline = tenant
    4328            2 :             .get_timeline(NEW_TIMELINE_ID, true)
    4329            2 :             .expect("Should have a local timeline");
    4330            2 :         let new_writer = newtline.writer().await;
    4331            2 :         new_writer
    4332            2 :             .put(TEST_KEY_A, Lsn(0x40), &test_value("bar at 0x40"), &ctx)
    4333            2 :             .await?;
    4334            2 :         new_writer.finish_write(Lsn(0x40));
    4335            2 : 
    4336            2 :         // Check page contents on both branches
    4337            2 :         assert_eq!(
    4338            2 :             from_utf8(&tline.get(TEST_KEY_A, Lsn(0x40), &ctx).await?)?,
    4339            2 :             "foo at 0x40"
    4340            2 :         );
    4341            2 :         assert_eq!(
    4342            2 :             from_utf8(&newtline.get(TEST_KEY_A, Lsn(0x40), &ctx).await?)?,
    4343            2 :             "bar at 0x40"
    4344            2 :         );
    4345            2 :         assert_eq!(
    4346            2 :             from_utf8(&newtline.get(TEST_KEY_B, Lsn(0x40), &ctx).await?)?,
    4347            2 :             "foobar at 0x20"
    4348            2 :         );
    4349            2 : 
    4350            2 :         //assert_current_logical_size(&tline, Lsn(0x40));
    4351            2 : 
    4352            2 :         Ok(())
    4353            2 :     }
    4354              : 
    4355           20 :     async fn make_some_layers(
    4356           20 :         tline: &Timeline,
    4357           20 :         start_lsn: Lsn,
    4358           20 :         ctx: &RequestContext,
    4359           20 :     ) -> anyhow::Result<()> {
    4360           20 :         let mut lsn = start_lsn;
    4361              :         #[allow(non_snake_case)]
    4362              :         {
    4363           20 :             let writer = tline.writer().await;
    4364              :             // Create a relation on the timeline
    4365           20 :             writer
    4366           20 :                 .put(
    4367           20 :                     *TEST_KEY,
    4368           20 :                     lsn,
    4369           20 :                     &Value::Image(TEST_IMG(&format!("foo at {}", lsn))),
    4370           20 :                     ctx,
    4371           20 :                 )
    4372           10 :                 .await?;
    4373           20 :             writer.finish_write(lsn);
    4374           20 :             lsn += 0x10;
    4375           20 :             writer
    4376           20 :                 .put(
    4377           20 :                     *TEST_KEY,
    4378           20 :                     lsn,
    4379           20 :                     &Value::Image(TEST_IMG(&format!("foo at {}", lsn))),
    4380           20 :                     ctx,
    4381           20 :                 )
    4382            0 :                 .await?;
    4383           20 :             writer.finish_write(lsn);
    4384           20 :             lsn += 0x10;
    4385           20 :         }
    4386           20 :         tline.freeze_and_flush().await?;
    4387              :         {
    4388           20 :             let writer = tline.writer().await;
    4389           20 :             writer
    4390           20 :                 .put(
    4391           20 :                     *TEST_KEY,
    4392           20 :                     lsn,
    4393           20 :                     &Value::Image(TEST_IMG(&format!("foo at {}", lsn))),
    4394           20 :                     ctx,
    4395           20 :                 )
    4396           10 :                 .await?;
    4397           20 :             writer.finish_write(lsn);
    4398           20 :             lsn += 0x10;
    4399           20 :             writer
    4400           20 :                 .put(
    4401           20 :                     *TEST_KEY,
    4402           20 :                     lsn,
    4403           20 :                     &Value::Image(TEST_IMG(&format!("foo at {}", lsn))),
    4404           20 :                     ctx,
    4405           20 :                 )
    4406            0 :                 .await?;
    4407           20 :             writer.finish_write(lsn);
    4408           20 :         }
    4409           20 :         tline.freeze_and_flush().await
    4410           20 :     }
    4411              : 
    4412            2 :     #[tokio::test]
    4413            2 :     async fn test_prohibit_branch_creation_on_garbage_collected_data() -> anyhow::Result<()> {
    4414            2 :         let (tenant, ctx) =
    4415            2 :             TenantHarness::create("test_prohibit_branch_creation_on_garbage_collected_data")?
    4416            2 :                 .load()
    4417            2 :                 .await;
    4418            2 :         let tline = tenant
    4419            2 :             .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
    4420            8 :             .await?;
    4421            6 :         make_some_layers(tline.as_ref(), Lsn(0x20), &ctx).await?;
    4422            2 : 
    4423            2 :         // this removes layers before lsn 40 (50 minus 10), so there are two remaining layers, image and delta for 31-50
    4424            2 :         // FIXME: this doesn't actually remove any layer currently, given how the flushing
    4425            2 :         // and compaction works. But it does set the 'cutoff' point so that the cross check
    4426            2 :         // below should fail.
    4427            2 :         tenant
    4428            2 :             .gc_iteration(
    4429            2 :                 Some(TIMELINE_ID),
    4430            2 :                 0x10,
    4431            2 :                 Duration::ZERO,
    4432            2 :                 &CancellationToken::new(),
    4433            2 :                 &ctx,
    4434            2 :             )
    4435            2 :             .await?;
    4436            2 : 
    4437            2 :         // try to branch at lsn 25, should fail because we already garbage collected the data
    4438            2 :         match tenant
    4439            2 :             .branch_timeline_test(&tline, NEW_TIMELINE_ID, Some(Lsn(0x25)), &ctx)
    4440            2 :             .await
    4441            2 :         {
    4442            2 :             Ok(_) => panic!("branching should have failed"),
    4443            2 :             Err(err) => {
    4444            2 :                 let CreateTimelineError::AncestorLsn(err) = err else {
    4445            2 :                     panic!("wrong error type")
    4446            2 :                 };
    4447            2 :                 assert!(err.to_string().contains("invalid branch start lsn"));
    4448            2 :                 assert!(err
    4449            2 :                     .source()
    4450            2 :                     .unwrap()
    4451            2 :                     .to_string()
    4452            2 :                     .contains("we might've already garbage collected needed data"))
    4453            2 :             }
    4454            2 :         }
    4455            2 : 
    4456            2 :         Ok(())
    4457            2 :     }
    4458              : 
    4459            2 :     #[tokio::test]
    4460            2 :     async fn test_prohibit_branch_creation_on_pre_initdb_lsn() -> anyhow::Result<()> {
    4461            2 :         let (tenant, ctx) =
    4462            2 :             TenantHarness::create("test_prohibit_branch_creation_on_pre_initdb_lsn")?
    4463            2 :                 .load()
    4464            2 :                 .await;
    4465            2 : 
    4466            2 :         let tline = tenant
    4467            2 :             .create_test_timeline(TIMELINE_ID, Lsn(0x50), DEFAULT_PG_VERSION, &ctx)
    4468            8 :             .await?;
    4469            2 :         // try to branch at lsn 0x25, should fail because initdb lsn is 0x50
    4470            2 :         match tenant
    4471            2 :             .branch_timeline_test(&tline, NEW_TIMELINE_ID, Some(Lsn(0x25)), &ctx)
    4472            2 :             .await
    4473            2 :         {
    4474            2 :             Ok(_) => panic!("branching should have failed"),
    4475            2 :             Err(err) => {
    4476            2 :                 let CreateTimelineError::AncestorLsn(err) = err else {
    4477            2 :                     panic!("wrong error type");
    4478            2 :                 };
    4479            2 :                 assert!(&err.to_string().contains("invalid branch start lsn"));
    4480            2 :                 assert!(&err
    4481            2 :                     .source()
    4482            2 :                     .unwrap()
    4483            2 :                     .to_string()
    4484            2 :                     .contains("is earlier than latest GC horizon"));
    4485            2 :             }
    4486            2 :         }
    4487            2 : 
    4488            2 :         Ok(())
    4489            2 :     }
    4490              : 
    4491              :     /*
    4492              :     // FIXME: This currently fails to error out. Calling GC doesn't currently
    4493              :     // remove the old value, we'd need to work a little harder
    4494              :     #[tokio::test]
    4495              :     async fn test_prohibit_get_for_garbage_collected_data() -> anyhow::Result<()> {
    4496              :         let repo =
    4497              :             RepoHarness::create("test_prohibit_get_for_garbage_collected_data")?
    4498              :             .load();
    4499              : 
    4500              :         let tline = repo.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION)?;
    4501              :         make_some_layers(tline.as_ref(), Lsn(0x20), &ctx).await?;
    4502              : 
    4503              :         repo.gc_iteration(Some(TIMELINE_ID), 0x10, Duration::ZERO)?;
    4504              :         let latest_gc_cutoff_lsn = tline.get_latest_gc_cutoff_lsn();
    4505              :         assert!(*latest_gc_cutoff_lsn > Lsn(0x25));
    4506              :         match tline.get(*TEST_KEY, Lsn(0x25)) {
    4507              :             Ok(_) => panic!("request for page should have failed"),
    4508              :             Err(err) => assert!(err.to_string().contains("not found at")),
    4509              :         }
    4510              :         Ok(())
    4511              :     }
    4512              :      */
    4513              : 
    4514            2 :     #[tokio::test]
    4515            2 :     async fn test_get_branchpoints_from_an_inactive_timeline() -> anyhow::Result<()> {
    4516            2 :         let (tenant, ctx) =
    4517            2 :             TenantHarness::create("test_get_branchpoints_from_an_inactive_timeline")?
    4518            2 :                 .load()
    4519            2 :                 .await;
    4520            2 :         let tline = tenant
    4521            2 :             .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
    4522            7 :             .await?;
    4523            6 :         make_some_layers(tline.as_ref(), Lsn(0x20), &ctx).await?;
    4524            2 : 
    4525            2 :         tenant
    4526            2 :             .branch_timeline_test(&tline, NEW_TIMELINE_ID, Some(Lsn(0x40)), &ctx)
    4527            2 :             .await?;
    4528            2 :         let newtline = tenant
    4529            2 :             .get_timeline(NEW_TIMELINE_ID, true)
    4530            2 :             .expect("Should have a local timeline");
    4531            2 : 
    4532            6 :         make_some_layers(newtline.as_ref(), Lsn(0x60), &ctx).await?;
    4533            2 : 
    4534            2 :         tline.set_broken("test".to_owned());
    4535            2 : 
    4536            2 :         tenant
    4537            2 :             .gc_iteration(
    4538            2 :                 Some(TIMELINE_ID),
    4539            2 :                 0x10,
    4540            2 :                 Duration::ZERO,
    4541            2 :                 &CancellationToken::new(),
    4542            2 :                 &ctx,
    4543            2 :             )
    4544            2 :             .await?;
    4545            2 : 
    4546            2 :         // The branchpoints should contain all timelines, even ones marked
    4547            2 :         // as Broken.
    4548            2 :         {
    4549            2 :             let branchpoints = &tline.gc_info.read().unwrap().retain_lsns;
    4550            2 :             assert_eq!(branchpoints.len(), 1);
    4551            2 :             assert_eq!(branchpoints[0], Lsn(0x40));
    4552            2 :         }
    4553            2 : 
    4554            2 :         // You can read the key from the child branch even though the parent is
    4555            2 :         // Broken, as long as you don't need to access data from the parent.
    4556            2 :         assert_eq!(
    4557            4 :             newtline.get(*TEST_KEY, Lsn(0x70), &ctx).await?,
    4558            2 :             TEST_IMG(&format!("foo at {}", Lsn(0x70)))
    4559            2 :         );
    4560            2 : 
    4561            2 :         // This needs to traverse to the parent, and fails.
    4562            2 :         let err = newtline.get(*TEST_KEY, Lsn(0x50), &ctx).await.unwrap_err();
    4563            2 :         assert!(err
    4564            2 :             .to_string()
    4565            2 :             .contains("will not become active. Current state: Broken"));
    4566            2 : 
    4567            2 :         Ok(())
    4568            2 :     }
    4569              : 
    4570            2 :     #[tokio::test]
    4571            2 :     async fn test_retain_data_in_parent_which_is_needed_for_child() -> anyhow::Result<()> {
    4572            2 :         let (tenant, ctx) =
    4573            2 :             TenantHarness::create("test_retain_data_in_parent_which_is_needed_for_child")?
    4574            2 :                 .load()
    4575            2 :                 .await;
    4576            2 :         let tline = tenant
    4577            2 :             .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
    4578            7 :             .await?;
    4579            6 :         make_some_layers(tline.as_ref(), Lsn(0x20), &ctx).await?;
    4580            2 : 
    4581            2 :         tenant
    4582            2 :             .branch_timeline_test(&tline, NEW_TIMELINE_ID, Some(Lsn(0x40)), &ctx)
    4583            2 :             .await?;
    4584            2 :         let newtline = tenant
    4585            2 :             .get_timeline(NEW_TIMELINE_ID, true)
    4586            2 :             .expect("Should have a local timeline");
    4587            2 :         // this removes layers before lsn 40 (50 minus 10), so there are two remaining layers, image and delta for 31-50
    4588            2 :         tenant
    4589            2 :             .gc_iteration(
    4590            2 :                 Some(TIMELINE_ID),
    4591            2 :                 0x10,
    4592            2 :                 Duration::ZERO,
    4593            2 :                 &CancellationToken::new(),
    4594            2 :                 &ctx,
    4595            2 :             )
    4596            2 :             .await?;
    4597            4 :         assert!(newtline.get(*TEST_KEY, Lsn(0x25), &ctx).await.is_ok());
    4598            2 : 
    4599            2 :         Ok(())
    4600            2 :     }
    4601            2 :     #[tokio::test]
    4602            2 :     async fn test_parent_keeps_data_forever_after_branching() -> anyhow::Result<()> {
    4603            2 :         let (tenant, ctx) =
    4604            2 :             TenantHarness::create("test_parent_keeps_data_forever_after_branching")?
    4605            2 :                 .load()
    4606            2 :                 .await;
    4607            2 :         let tline = tenant
    4608            2 :             .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
    4609            8 :             .await?;
    4610            6 :         make_some_layers(tline.as_ref(), Lsn(0x20), &ctx).await?;
    4611            2 : 
    4612            2 :         tenant
    4613            2 :             .branch_timeline_test(&tline, NEW_TIMELINE_ID, Some(Lsn(0x40)), &ctx)
    4614            2 :             .await?;
    4615            2 :         let newtline = tenant
    4616            2 :             .get_timeline(NEW_TIMELINE_ID, true)
    4617            2 :             .expect("Should have a local timeline");
    4618            2 : 
    4619            6 :         make_some_layers(newtline.as_ref(), Lsn(0x60), &ctx).await?;
    4620            2 : 
    4621            2 :         // run gc on parent
    4622            2 :         tenant
    4623            2 :             .gc_iteration(
    4624            2 :                 Some(TIMELINE_ID),
    4625            2 :                 0x10,
    4626            2 :                 Duration::ZERO,
    4627            2 :                 &CancellationToken::new(),
    4628            2 :                 &ctx,
    4629            2 :             )
    4630            2 :             .await?;
    4631            2 : 
    4632            2 :         // Check that the data is still accessible on the branch.
    4633            2 :         assert_eq!(
    4634            7 :             newtline.get(*TEST_KEY, Lsn(0x50), &ctx).await?,
    4635            2 :             TEST_IMG(&format!("foo at {}", Lsn(0x40)))
    4636            2 :         );
    4637            2 : 
    4638            2 :         Ok(())
    4639            2 :     }
    4640              : 
    4641            2 :     #[tokio::test]
    4642            2 :     async fn timeline_load() -> anyhow::Result<()> {
    4643            2 :         const TEST_NAME: &str = "timeline_load";
    4644            2 :         let harness = TenantHarness::create(TEST_NAME)?;
    4645            2 :         {
    4646            2 :             let (tenant, ctx) = harness.load().await;
    4647            2 :             let tline = tenant
    4648            2 :                 .create_test_timeline(TIMELINE_ID, Lsn(0x7000), DEFAULT_PG_VERSION, &ctx)
    4649            8 :                 .await?;
    4650            6 :             make_some_layers(tline.as_ref(), Lsn(0x8000), &ctx).await?;
    4651            2 :             // so that all uploads finish & we can call harness.load() below again
    4652            2 :             tenant
    4653            2 :                 .shutdown(Default::default(), true)
    4654            2 :                 .instrument(harness.span())
    4655            2 :                 .await
    4656            2 :                 .ok()
    4657            2 :                 .unwrap();
    4658            2 :         }
    4659            2 : 
    4660           11 :         let (tenant, _ctx) = harness.load().await;
    4661            2 :         tenant
    4662            2 :             .get_timeline(TIMELINE_ID, true)
    4663            2 :             .expect("cannot load timeline");
    4664            2 : 
    4665            2 :         Ok(())
    4666            2 :     }
    4667              : 
    4668            2 :     #[tokio::test]
    4669            2 :     async fn timeline_load_with_ancestor() -> anyhow::Result<()> {
    4670            2 :         const TEST_NAME: &str = "timeline_load_with_ancestor";
    4671            2 :         let harness = TenantHarness::create(TEST_NAME)?;
    4672            2 :         // create two timelines
    4673            2 :         {
    4674            2 :             let (tenant, ctx) = harness.load().await;
    4675            2 :             let tline = tenant
    4676            2 :                 .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
    4677            8 :                 .await?;
    4678            2 : 
    4679            6 :             make_some_layers(tline.as_ref(), Lsn(0x20), &ctx).await?;
    4680            2 : 
    4681            2 :             let child_tline = tenant
    4682            2 :                 .branch_timeline_test(&tline, NEW_TIMELINE_ID, Some(Lsn(0x40)), &ctx)
    4683            2 :                 .await?;
    4684            2 :             child_tline.set_state(TimelineState::Active);
    4685            2 : 
    4686            2 :             let newtline = tenant
    4687            2 :                 .get_timeline(NEW_TIMELINE_ID, true)
    4688            2 :                 .expect("Should have a local timeline");
    4689            2 : 
    4690            6 :             make_some_layers(newtline.as_ref(), Lsn(0x60), &ctx).await?;
    4691            2 : 
    4692            2 :             // so that all uploads finish & we can call harness.load() below again
    4693            2 :             tenant
    4694            2 :                 .shutdown(Default::default(), true)
    4695            2 :                 .instrument(harness.span())
    4696            4 :                 .await
    4697            2 :                 .ok()
    4698            2 :                 .unwrap();
    4699            2 :         }
    4700            2 : 
    4701            2 :         // check that both of them are initially unloaded
    4702           17 :         let (tenant, _ctx) = harness.load().await;
    4703            2 : 
    4704            2 :         // check that both, child and ancestor are loaded
    4705            2 :         let _child_tline = tenant
    4706            2 :             .get_timeline(NEW_TIMELINE_ID, true)
    4707            2 :             .expect("cannot get child timeline loaded");
    4708            2 : 
    4709            2 :         let _ancestor_tline = tenant
    4710            2 :             .get_timeline(TIMELINE_ID, true)
    4711            2 :             .expect("cannot get ancestor timeline loaded");
    4712            2 : 
    4713            2 :         Ok(())
    4714            2 :     }
    4715              : 
    4716            2 :     #[tokio::test]
    4717            2 :     async fn delta_layer_dumping() -> anyhow::Result<()> {
    4718            2 :         use storage_layer::AsLayerDesc;
    4719            2 :         let (tenant, ctx) = TenantHarness::create("test_layer_dumping")?.load().await;
    4720            2 :         let tline = tenant
    4721            2 :             .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
    4722            6 :             .await?;
    4723            6 :         make_some_layers(tline.as_ref(), Lsn(0x20), &ctx).await?;
    4724            2 : 
    4725            2 :         let layer_map = tline.layers.read().await;
    4726            2 :         let level0_deltas = layer_map
    4727            2 :             .layer_map()
    4728            2 :             .get_level0_deltas()?
    4729            2 :             .into_iter()
    4730            4 :             .map(|desc| layer_map.get_from_desc(&desc))
    4731            2 :             .collect::<Vec<_>>();
    4732            2 : 
    4733            2 :         assert!(!level0_deltas.is_empty());
    4734            2 : 
    4735            6 :         for delta in level0_deltas {
    4736            2 :             // Ensure we are dumping a delta layer here
    4737            4 :             assert!(delta.layer_desc().is_delta);
    4738            8 :             delta.dump(true, &ctx).await.unwrap();
    4739            2 :         }
    4740            2 : 
    4741            2 :         Ok(())
    4742            2 :     }
    4743              : 
    4744            2 :     #[tokio::test]
    4745            2 :     async fn corrupt_local_metadata() -> anyhow::Result<()> {
    4746            2 :         const TEST_NAME: &str = "corrupt_metadata";
    4747            2 :         let harness = TenantHarness::create(TEST_NAME)?;
    4748            2 :         let (tenant, ctx) = harness.load().await;
    4749            2 : 
    4750            2 :         let tline = tenant
    4751            2 :             .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
    4752            7 :             .await?;
    4753            2 :         drop(tline);
    4754            2 :         // so that all uploads finish & we can call harness.try_load() below again
    4755            2 :         tenant
    4756            2 :             .shutdown(Default::default(), true)
    4757            2 :             .instrument(harness.span())
    4758            2 :             .await
    4759            2 :             .ok()
    4760            2 :             .unwrap();
    4761            2 :         drop(tenant);
    4762            2 : 
    4763            2 :         // Corrupt local metadata
    4764            2 :         let metadata_path = harness.timeline_path(&TIMELINE_ID).join(METADATA_FILE_NAME);
    4765            2 :         assert!(metadata_path.is_file());
    4766            2 :         let mut metadata_bytes = std::fs::read(&metadata_path)?;
    4767            2 :         assert_eq!(metadata_bytes.len(), 512);
    4768            2 :         metadata_bytes[8] ^= 1;
    4769            2 :         std::fs::write(metadata_path, metadata_bytes)?;
    4770            2 : 
    4771            2 :         let err = harness.try_load_local(&ctx).await.expect_err("should fail");
    4772            2 :         // get all the stack with all .context, not only the last one
    4773            2 :         let message = format!("{err:#}");
    4774            2 :         let expected = "failed to load metadata";
    4775            2 :         assert!(
    4776            2 :             message.contains(expected),
    4777            2 :             "message '{message}' expected to contain {expected}"
    4778            2 :         );
    4779            2 : 
    4780            2 :         let mut found_error_message = false;
    4781            2 :         let mut err_source = err.source();
    4782            2 :         while let Some(source) = err_source {
    4783            2 :             if source.to_string().contains("metadata checksum mismatch") {
    4784            2 :                 found_error_message = true;
    4785            2 :                 break;
    4786            2 :             }
    4787            0 :             err_source = source.source();
    4788            2 :         }
    4789            2 :         assert!(
    4790            2 :             found_error_message,
    4791            2 :             "didn't find the corrupted metadata error in {}",
    4792            2 :             message
    4793            2 :         );
    4794            2 : 
    4795            2 :         Ok(())
    4796            2 :     }
    4797              : 
    4798            2 :     #[tokio::test]
    4799            2 :     async fn test_images() -> anyhow::Result<()> {
    4800            2 :         let (tenant, ctx) = TenantHarness::create("test_images")?.load().await;
    4801            2 :         let tline = tenant
    4802            2 :             .create_test_timeline(TIMELINE_ID, Lsn(0x08), DEFAULT_PG_VERSION, &ctx)
    4803            7 :             .await?;
    4804            2 : 
    4805            2 :         let writer = tline.writer().await;
    4806            2 :         writer
    4807            2 :             .put(
    4808            2 :                 *TEST_KEY,
    4809            2 :                 Lsn(0x10),
    4810            2 :                 &Value::Image(TEST_IMG("foo at 0x10")),
    4811            2 :                 &ctx,
    4812            2 :             )
    4813            2 :             .await?;
    4814            2 :         writer.finish_write(Lsn(0x10));
    4815            2 :         drop(writer);
    4816            2 : 
    4817            2 :         tline.freeze_and_flush().await?;
    4818            2 :         tline
    4819            2 :             .compact(&CancellationToken::new(), EnumSet::empty(), &ctx)
    4820            2 :             .await?;
    4821            2 : 
    4822            2 :         let writer = tline.writer().await;
    4823            2 :         writer
    4824            2 :             .put(
    4825            2 :                 *TEST_KEY,
    4826            2 :                 Lsn(0x20),
    4827            2 :                 &Value::Image(TEST_IMG("foo at 0x20")),
    4828            2 :                 &ctx,
    4829            2 :             )
    4830            2 :             .await?;
    4831            2 :         writer.finish_write(Lsn(0x20));
    4832            2 :         drop(writer);
    4833            2 : 
    4834            2 :         tline.freeze_and_flush().await?;
    4835            2 :         tline
    4836            2 :             .compact(&CancellationToken::new(), EnumSet::empty(), &ctx)
    4837            2 :             .await?;
    4838            2 : 
    4839            2 :         let writer = tline.writer().await;
    4840            2 :         writer
    4841            2 :             .put(
    4842            2 :                 *TEST_KEY,
    4843            2 :                 Lsn(0x30),
    4844            2 :                 &Value::Image(TEST_IMG("foo at 0x30")),
    4845            2 :                 &ctx,
    4846            2 :             )
    4847            2 :             .await?;
    4848            2 :         writer.finish_write(Lsn(0x30));
    4849            2 :         drop(writer);
    4850            2 : 
    4851            2 :         tline.freeze_and_flush().await?;
    4852            2 :         tline
    4853            2 :             .compact(&CancellationToken::new(), EnumSet::empty(), &ctx)
    4854            2 :             .await?;
    4855            2 : 
    4856            2 :         let writer = tline.writer().await;
    4857            2 :         writer
    4858            2 :             .put(
    4859            2 :                 *TEST_KEY,
    4860            2 :                 Lsn(0x40),
    4861            2 :                 &Value::Image(TEST_IMG("foo at 0x40")),
    4862            2 :                 &ctx,
    4863            2 :             )
    4864            2 :             .await?;
    4865            2 :         writer.finish_write(Lsn(0x40));
    4866            2 :         drop(writer);
    4867            2 : 
    4868            2 :         tline.freeze_and_flush().await?;
    4869            2 :         tline
    4870            2 :             .compact(&CancellationToken::new(), EnumSet::empty(), &ctx)
    4871            2 :             .await?;
    4872            2 : 
    4873            2 :         assert_eq!(
    4874            4 :             tline.get(*TEST_KEY, Lsn(0x10), &ctx).await?,
    4875            2 :             TEST_IMG("foo at 0x10")
    4876            2 :         );
    4877            2 :         assert_eq!(
    4878            3 :             tline.get(*TEST_KEY, Lsn(0x1f), &ctx).await?,
    4879            2 :             TEST_IMG("foo at 0x10")
    4880            2 :         );
    4881            2 :         assert_eq!(
    4882            2 :             tline.get(*TEST_KEY, Lsn(0x20), &ctx).await?,
    4883            2 :             TEST_IMG("foo at 0x20")
    4884            2 :         );
    4885            2 :         assert_eq!(
    4886            4 :             tline.get(*TEST_KEY, Lsn(0x30), &ctx).await?,
    4887            2 :             TEST_IMG("foo at 0x30")
    4888            2 :         );
    4889            2 :         assert_eq!(
    4890            4 :             tline.get(*TEST_KEY, Lsn(0x40), &ctx).await?,
    4891            2 :             TEST_IMG("foo at 0x40")
    4892            2 :         );
    4893            2 : 
    4894            2 :         Ok(())
    4895            2 :     }
    4896              : 
    4897              :     //
    4898              :     // Insert 1000 key-value pairs with increasing keys, flush, compact, GC.
    4899              :     // Repeat 50 times.
    4900              :     //
    4901            2 :     #[tokio::test]
    4902            2 :     async fn test_bulk_insert() -> anyhow::Result<()> {
    4903            2 :         let harness = TenantHarness::create("test_bulk_insert")?;
    4904            2 :         let (tenant, ctx) = harness.load().await;
    4905            2 :         let tline = tenant
    4906            2 :             .create_test_timeline(TIMELINE_ID, Lsn(0x08), DEFAULT_PG_VERSION, &ctx)
    4907            7 :             .await?;
    4908            2 : 
    4909            2 :         let mut lsn = Lsn(0x10);
    4910            2 : 
    4911            2 :         let mut keyspace = KeySpaceAccum::new();
    4912            2 : 
    4913            2 :         let mut test_key = Key::from_hex("010000000033333333444444445500000000").unwrap();
    4914            2 :         let mut blknum = 0;
    4915          102 :         for _ in 0..50 {
    4916      1000100 :             for _ in 0..10000 {
    4917      1000000 :                 test_key.field6 = blknum;
    4918      1000000 :                 let writer = tline.writer().await;
    4919      1000000 :                 writer
    4920      1000000 :                     .put(
    4921      1000000 :                         test_key,
    4922      1000000 :                         lsn,
    4923      1000000 :                         &Value::Image(TEST_IMG(&format!("{} at {}", blknum, lsn))),
    4924      1000000 :                         &ctx,
    4925      1000000 :                     )
    4926        15854 :                     .await?;
    4927      1000000 :                 writer.finish_write(lsn);
    4928      1000000 :                 drop(writer);
    4929      1000000 : 
    4930      1000000 :                 keyspace.add_key(test_key);
    4931      1000000 : 
    4932      1000000 :                 lsn = Lsn(lsn.0 + 0x10);
    4933      1000000 :                 blknum += 1;
    4934            2 :             }
    4935            2 : 
    4936          100 :             let cutoff = tline.get_last_record_lsn();
    4937          100 : 
    4938          100 :             tline
    4939          100 :                 .update_gc_info(
    4940          100 :                     Vec::new(),
    4941          100 :                     cutoff,
    4942          100 :                     Duration::ZERO,
    4943          100 :                     &CancellationToken::new(),
    4944          100 :                     &ctx,
    4945          100 :                 )
    4946            2 :                 .await?;
    4947          100 :             tline.freeze_and_flush().await?;
    4948          100 :             tline
    4949          100 :                 .compact(&CancellationToken::new(), EnumSet::empty(), &ctx)
    4950        18640 :                 .await?;
    4951          100 :             tline.gc().await?;
    4952            2 :         }
    4953            2 : 
    4954            2 :         Ok(())
    4955            2 :     }
    4956              : 
    4957            2 :     #[tokio::test]
    4958            2 :     async fn test_random_updates() -> anyhow::Result<()> {
    4959            2 :         let harness = TenantHarness::create("test_random_updates")?;
    4960            2 :         let (tenant, ctx) = harness.load().await;
    4961            2 :         let tline = tenant
    4962            2 :             .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
    4963            6 :             .await?;
    4964            2 : 
    4965            2 :         const NUM_KEYS: usize = 1000;
    4966            2 : 
    4967            2 :         let mut test_key = Key::from_hex("010000000033333333444444445500000000").unwrap();
    4968            2 : 
    4969            2 :         let mut keyspace = KeySpaceAccum::new();
    4970            2 : 
    4971            2 :         // Track when each page was last modified. Used to assert that
    4972            2 :         // a read sees the latest page version.
    4973            2 :         let mut updated = [Lsn(0); NUM_KEYS];
    4974            2 : 
    4975            2 :         let mut lsn = Lsn(0x10);
    4976            2 :         #[allow(clippy::needless_range_loop)]
    4977         2002 :         for blknum in 0..NUM_KEYS {
    4978         2000 :             lsn = Lsn(lsn.0 + 0x10);
    4979         2000 :             test_key.field6 = blknum as u32;
    4980         2000 :             let writer = tline.writer().await;
    4981         2000 :             writer
    4982         2000 :                 .put(
    4983         2000 :                     test_key,
    4984         2000 :                     lsn,
    4985         2000 :                     &Value::Image(TEST_IMG(&format!("{} at {}", blknum, lsn))),
    4986         2000 :                     &ctx,
    4987         2000 :                 )
    4988           33 :                 .await?;
    4989         2000 :             writer.finish_write(lsn);
    4990         2000 :             updated[blknum] = lsn;
    4991         2000 :             drop(writer);
    4992         2000 : 
    4993         2000 :             keyspace.add_key(test_key);
    4994            2 :         }
    4995            2 : 
    4996          102 :         for _ in 0..50 {
    4997       100100 :             for _ in 0..NUM_KEYS {
    4998       100000 :                 lsn = Lsn(lsn.0 + 0x10);
    4999       100000 :                 let blknum = thread_rng().gen_range(0..NUM_KEYS);
    5000       100000 :                 test_key.field6 = blknum as u32;
    5001       100000 :                 let writer = tline.writer().await;
    5002       100000 :                 writer
    5003       100000 :                     .put(
    5004       100000 :                         test_key,
    5005       100000 :                         lsn,
    5006       100000 :                         &Value::Image(TEST_IMG(&format!("{} at {}", blknum, lsn))),
    5007       100000 :                         &ctx,
    5008       100000 :                     )
    5009         1604 :                     .await?;
    5010       100000 :                 writer.finish_write(lsn);
    5011       100000 :                 drop(writer);
    5012       100000 :                 updated[blknum] = lsn;
    5013            2 :             }
    5014            2 : 
    5015            2 :             // Read all the blocks
    5016       100000 :             for (blknum, last_lsn) in updated.iter().enumerate() {
    5017       100000 :                 test_key.field6 = blknum as u32;
    5018       100000 :                 assert_eq!(
    5019       100000 :                     tline.get(test_key, lsn, &ctx).await?,
    5020       100000 :                     TEST_IMG(&format!("{} at {}", blknum, last_lsn))
    5021            2 :                 );
    5022            2 :             }
    5023            2 : 
    5024            2 :             // Perform a cycle of flush, compact, and GC
    5025          100 :             let cutoff = tline.get_last_record_lsn();
    5026          100 :             tline
    5027          100 :                 .update_gc_info(
    5028          100 :                     Vec::new(),
    5029          100 :                     cutoff,
    5030          100 :                     Duration::ZERO,
    5031          100 :                     &CancellationToken::new(),
    5032          100 :                     &ctx,
    5033          100 :                 )
    5034            2 :                 .await?;
    5035          102 :             tline.freeze_and_flush().await?;
    5036          100 :             tline
    5037          100 :                 .compact(&CancellationToken::new(), EnumSet::empty(), &ctx)
    5038         2276 :                 .await?;
    5039          100 :             tline.gc().await?;
    5040            2 :         }
    5041            2 : 
    5042            2 :         Ok(())
    5043            2 :     }
    5044              : 
    5045            2 :     #[tokio::test]
    5046            2 :     async fn test_traverse_branches() -> anyhow::Result<()> {
    5047            2 :         let (tenant, ctx) = TenantHarness::create("test_traverse_branches")?
    5048            2 :             .load()
    5049            2 :             .await;
    5050            2 :         let mut tline = tenant
    5051            2 :             .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
    5052            8 :             .await?;
    5053            2 : 
    5054            2 :         const NUM_KEYS: usize = 1000;
    5055            2 : 
    5056            2 :         let mut test_key = Key::from_hex("010000000033333333444444445500000000").unwrap();
    5057            2 : 
    5058            2 :         let mut keyspace = KeySpaceAccum::new();
    5059            2 : 
    5060            2 :         // Track when each page was last modified. Used to assert that
    5061            2 :         // a read sees the latest page version.
    5062            2 :         let mut updated = [Lsn(0); NUM_KEYS];
    5063            2 : 
    5064            2 :         let mut lsn = Lsn(0x10);
    5065            2 :         #[allow(clippy::needless_range_loop)]
    5066         2002 :         for blknum in 0..NUM_KEYS {
    5067         2000 :             lsn = Lsn(lsn.0 + 0x10);
    5068         2000 :             test_key.field6 = blknum as u32;
    5069         2000 :             let writer = tline.writer().await;
    5070         2000 :             writer
    5071         2000 :                 .put(
    5072         2000 :                     test_key,
    5073         2000 :                     lsn,
    5074         2000 :                     &Value::Image(TEST_IMG(&format!("{} at {}", blknum, lsn))),
    5075         2000 :                     &ctx,
    5076         2000 :                 )
    5077           33 :                 .await?;
    5078         2000 :             writer.finish_write(lsn);
    5079         2000 :             updated[blknum] = lsn;
    5080         2000 :             drop(writer);
    5081         2000 : 
    5082         2000 :             keyspace.add_key(test_key);
    5083            2 :         }
    5084            2 : 
    5085          102 :         for _ in 0..50 {
    5086          100 :             let new_tline_id = TimelineId::generate();
    5087          100 :             tenant
    5088          100 :                 .branch_timeline_test(&tline, new_tline_id, Some(lsn), &ctx)
    5089          104 :                 .await?;
    5090          100 :             tline = tenant
    5091          100 :                 .get_timeline(new_tline_id, true)
    5092          100 :                 .expect("Should have the branched timeline");
    5093            2 : 
    5094       100100 :             for _ in 0..NUM_KEYS {
    5095       100000 :                 lsn = Lsn(lsn.0 + 0x10);
    5096       100000 :                 let blknum = thread_rng().gen_range(0..NUM_KEYS);
    5097       100000 :                 test_key.field6 = blknum as u32;
    5098       100000 :                 let writer = tline.writer().await;
    5099       100000 :                 writer
    5100       100000 :                     .put(
    5101       100000 :                         test_key,
    5102       100000 :                         lsn,
    5103       100000 :                         &Value::Image(TEST_IMG(&format!("{} at {}", blknum, lsn))),
    5104       100000 :                         &ctx,
    5105       100000 :                     )
    5106         1641 :                     .await?;
    5107       100000 :                 println!("updating {} at {}", blknum, lsn);
    5108       100000 :                 writer.finish_write(lsn);
    5109       100000 :                 drop(writer);
    5110       100000 :                 updated[blknum] = lsn;
    5111            2 :             }
    5112            2 : 
    5113            2 :             // Read all the blocks
    5114       100000 :             for (blknum, last_lsn) in updated.iter().enumerate() {
    5115       100000 :                 test_key.field6 = blknum as u32;
    5116       100000 :                 assert_eq!(
    5117       100000 :                     tline.get(test_key, lsn, &ctx).await?,
    5118       100000 :                     TEST_IMG(&format!("{} at {}", blknum, last_lsn))
    5119            2 :                 );
    5120            2 :             }
    5121            2 : 
    5122            2 :             // Perform a cycle of flush, compact, and GC
    5123          100 :             let cutoff = tline.get_last_record_lsn();
    5124          100 :             tline
    5125          100 :                 .update_gc_info(
    5126          100 :                     Vec::new(),
    5127          100 :                     cutoff,
    5128          100 :                     Duration::ZERO,
    5129          100 :                     &CancellationToken::new(),
    5130          100 :                     &ctx,
    5131          100 :                 )
    5132            2 :                 .await?;
    5133          103 :             tline.freeze_and_flush().await?;
    5134          100 :             tline
    5135          100 :                 .compact(&CancellationToken::new(), EnumSet::empty(), &ctx)
    5136        13249 :                 .await?;
    5137          100 :             tline.gc().await?;
    5138            2 :         }
    5139            2 : 
    5140            2 :         Ok(())
    5141            2 :     }
    5142              : 
    5143            2 :     #[tokio::test]
    5144            2 :     async fn test_traverse_ancestors() -> anyhow::Result<()> {
    5145            2 :         let (tenant, ctx) = TenantHarness::create("test_traverse_ancestors")?
    5146            2 :             .load()
    5147            2 :             .await;
    5148            2 :         let mut tline = tenant
    5149            2 :             .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
    5150            7 :             .await?;
    5151            2 : 
    5152            2 :         const NUM_KEYS: usize = 100;
    5153            2 :         const NUM_TLINES: usize = 50;
    5154            2 : 
    5155            2 :         let mut test_key = Key::from_hex("010000000033333333444444445500000000").unwrap();
    5156            2 :         // Track page mutation lsns across different timelines.
    5157            2 :         let mut updated = [[Lsn(0); NUM_KEYS]; NUM_TLINES];
    5158            2 : 
    5159            2 :         let mut lsn = Lsn(0x10);
    5160            2 : 
    5161            2 :         #[allow(clippy::needless_range_loop)]
    5162          102 :         for idx in 0..NUM_TLINES {
    5163          100 :             let new_tline_id = TimelineId::generate();
    5164          100 :             tenant
    5165          100 :                 .branch_timeline_test(&tline, new_tline_id, Some(lsn), &ctx)
    5166          106 :                 .await?;
    5167          100 :             tline = tenant
    5168          100 :                 .get_timeline(new_tline_id, true)
    5169          100 :                 .expect("Should have the branched timeline");
    5170            2 : 
    5171        10100 :             for _ in 0..NUM_KEYS {
    5172        10000 :                 lsn = Lsn(lsn.0 + 0x10);
    5173        10000 :                 let blknum = thread_rng().gen_range(0..NUM_KEYS);
    5174        10000 :                 test_key.field6 = blknum as u32;
    5175        10000 :                 let writer = tline.writer().await;
    5176        10000 :                 writer
    5177        10000 :                     .put(
    5178        10000 :                         test_key,
    5179        10000 :                         lsn,
    5180        10000 :                         &Value::Image(TEST_IMG(&format!("{} {} at {}", idx, blknum, lsn))),
    5181        10000 :                         &ctx,
    5182        10000 :                     )
    5183          174 :                     .await?;
    5184        10000 :                 println!("updating [{}][{}] at {}", idx, blknum, lsn);
    5185        10000 :                 writer.finish_write(lsn);
    5186        10000 :                 drop(writer);
    5187        10000 :                 updated[idx][blknum] = lsn;
    5188            2 :             }
    5189            2 :         }
    5190            2 : 
    5191            2 :         // Read pages from leaf timeline across all ancestors.
    5192          100 :         for (idx, lsns) in updated.iter().enumerate() {
    5193        10000 :             for (blknum, lsn) in lsns.iter().enumerate() {
    5194            2 :                 // Skip empty mutations.
    5195        10000 :                 if lsn.0 == 0 {
    5196         3683 :                     continue;
    5197         6317 :                 }
    5198         6317 :                 println!("checking [{idx}][{blknum}] at {lsn}");
    5199         6317 :                 test_key.field6 = blknum as u32;
    5200         6317 :                 assert_eq!(
    5201         6317 :                     tline.get(test_key, *lsn, &ctx).await?,
    5202         6317 :                     TEST_IMG(&format!("{idx} {blknum} at {lsn}"))
    5203            2 :                 );
    5204            2 :             }
    5205            2 :         }
    5206            2 :         Ok(())
    5207            2 :     }
    5208              : 
    5209            2 :     #[tokio::test]
    5210            2 :     async fn test_write_at_initdb_lsn_takes_optimization_code_path() -> anyhow::Result<()> {
    5211            2 :         let (tenant, ctx) = TenantHarness::create("test_empty_test_timeline_is_usable")?
    5212            2 :             .load()
    5213            2 :             .await;
    5214            2 : 
    5215            2 :         let initdb_lsn = Lsn(0x20);
    5216            2 :         let utline = tenant
    5217            2 :             .create_empty_timeline(TIMELINE_ID, initdb_lsn, DEFAULT_PG_VERSION, &ctx)
    5218            3 :             .await?;
    5219            2 :         let tline = utline.raw_timeline().unwrap();
    5220            2 : 
    5221            2 :         // Spawn flush loop now so that we can set the `expect_initdb_optimization`
    5222            2 :         tline.maybe_spawn_flush_loop();
    5223            2 : 
    5224            2 :         // Make sure the timeline has the minimum set of required keys for operation.
    5225            2 :         // The only operation you can always do on an empty timeline is to `put` new data.
    5226            2 :         // Except if you `put` at `initdb_lsn`.
    5227            2 :         // In that case, there's an optimization to directly create image layers instead of delta layers.
    5228            2 :         // It uses `repartition()`, which assumes some keys to be present.
    5229            2 :         // Let's make sure the test timeline can handle that case.
    5230            2 :         {
    5231            2 :             let mut state = tline.flush_loop_state.lock().unwrap();
    5232            2 :             assert_eq!(
    5233            2 :                 timeline::FlushLoopState::Running {
    5234            2 :                     expect_initdb_optimization: false,
    5235            2 :                     initdb_optimization_count: 0,
    5236            2 :                 },
    5237            2 :                 *state
    5238            2 :             );
    5239            2 :             *state = timeline::FlushLoopState::Running {
    5240            2 :                 expect_initdb_optimization: true,
    5241            2 :                 initdb_optimization_count: 0,
    5242            2 :             };
    5243            2 :         }
    5244            2 : 
    5245            2 :         // Make writes at the initdb_lsn. When we flush it below, it should be handled by the optimization.
    5246            2 :         // As explained above, the optimization requires some keys to be present.
    5247            2 :         // As per `create_empty_timeline` documentation, use init_empty to set them.
    5248            2 :         // This is what `create_test_timeline` does, by the way.
    5249            2 :         let mut modification = tline.begin_modification(initdb_lsn);
    5250            2 :         modification
    5251            2 :             .init_empty_test_timeline()
    5252            2 :             .context("init_empty_test_timeline")?;
    5253            2 :         modification
    5254            2 :             .commit(&ctx)
    5255            2 :             .await
    5256            2 :             .context("commit init_empty_test_timeline modification")?;
    5257            2 : 
    5258            2 :         // Do the flush. The flush code will check the expectations that we set above.
    5259            2 :         tline.freeze_and_flush().await?;
    5260            2 : 
    5261            2 :         // assert freeze_and_flush exercised the initdb optimization
    5262            2 :         {
    5263            2 :             let state = tline.flush_loop_state.lock().unwrap();
    5264            2 :             let timeline::FlushLoopState::Running {
    5265            2 :                 expect_initdb_optimization,
    5266            2 :                 initdb_optimization_count,
    5267            2 :             } = *state
    5268            2 :             else {
    5269            2 :                 panic!("unexpected state: {:?}", *state);
    5270            2 :             };
    5271            2 :             assert!(expect_initdb_optimization);
    5272            2 :             assert!(initdb_optimization_count > 0);
    5273            2 :         }
    5274            2 :         Ok(())
    5275            2 :     }
    5276              : 
    5277            2 :     #[tokio::test]
    5278            2 :     async fn test_uninit_mark_crash() -> anyhow::Result<()> {
    5279            2 :         let name = "test_uninit_mark_crash";
    5280            2 :         let harness = TenantHarness::create(name)?;
    5281            2 :         {
    5282            2 :             let (tenant, ctx) = harness.load().await;
    5283            2 :             let tline = tenant
    5284            2 :                 .create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)
    5285            3 :                 .await?;
    5286            2 :             // Keeps uninit mark in place
    5287            2 :             let raw_tline = tline.raw_timeline().unwrap();
    5288            2 :             raw_tline
    5289            2 :                 .shutdown()
    5290            2 :                 .instrument(info_span!("test_shutdown", tenant_id=%raw_tline.tenant_shard_id, shard_id=%raw_tline.tenant_shard_id.shard_slug(), timeline_id=%TIMELINE_ID))
    5291            2 :                 .await;
    5292            2 :             std::mem::forget(tline);
    5293            2 :         }
    5294            2 : 
    5295            2 :         let (tenant, _) = harness.load().await;
    5296            2 :         match tenant.get_timeline(TIMELINE_ID, false) {
    5297            2 :             Ok(_) => panic!("timeline should've been removed during load"),
    5298            2 :             Err(e) => {
    5299            2 :                 assert_eq!(
    5300            2 :                     e,
    5301            2 :                     GetTimelineError::NotFound {
    5302            2 :                         tenant_id: tenant.tenant_shard_id,
    5303            2 :                         timeline_id: TIMELINE_ID,
    5304            2 :                     }
    5305            2 :                 )
    5306            2 :             }
    5307            2 :         }
    5308            2 : 
    5309            2 :         assert!(!harness
    5310            2 :             .conf
    5311            2 :             .timeline_path(&tenant.tenant_shard_id, &TIMELINE_ID)
    5312            2 :             .exists());
    5313            2 : 
    5314            2 :         assert!(!harness
    5315            2 :             .conf
    5316            2 :             .timeline_uninit_mark_file_path(tenant.tenant_shard_id, TIMELINE_ID)
    5317            2 :             .exists());
    5318            2 : 
    5319            2 :         Ok(())
    5320            2 :     }
    5321              : }
        

Generated by: LCOV version 2.1-beta