LCOV - code coverage report
Current view: top level - pageserver/src - tenant.rs (source / functions) Coverage Total Hit
Test: 190869232aac3a234374e5bb62582e91cf5f5818.info Lines: 64.0 % 3315 2121
Test Date: 2024-02-23 13:21:27 Functions: 38.7 % 357 138

            Line data    Source code
       1              : //!
       2              : //! Timeline repository implementation that keeps old data in files on disk, and
       3              : //! the recent changes in memory. See tenant/*_layer.rs files.
       4              : //! The functions here are responsible for locating the correct layer for the
       5              : //! get/put call, walking back the timeline branching history as needed.
       6              : //!
       7              : //! The files are stored in the .neon/tenants/<tenant_id>/timelines/<timeline_id>
       8              : //! directory. See docs/pageserver-storage.md for how the files are managed.
       9              : //! In addition to the layer files, there is a metadata file in the same
      10              : //! directory that contains information about the timeline, in particular its
      11              : //! parent timeline, and the last LSN that has been written to disk.
      12              : //!
      13              : 
      14              : use anyhow::{bail, Context};
      15              : use camino::Utf8Path;
      16              : use camino::Utf8PathBuf;
      17              : use enumset::EnumSet;
      18              : use futures::stream::FuturesUnordered;
      19              : use futures::FutureExt;
      20              : use futures::StreamExt;
      21              : use pageserver_api::models;
      22              : use pageserver_api::models::TimelineState;
      23              : use pageserver_api::models::WalRedoManagerStatus;
      24              : use pageserver_api::shard::ShardIdentity;
      25              : use pageserver_api::shard::TenantShardId;
      26              : use remote_storage::DownloadError;
      27              : use remote_storage::GenericRemoteStorage;
      28              : use remote_storage::TimeoutOrCancel;
      29              : use std::fmt;
      30              : use storage_broker::BrokerClientChannel;
      31              : use tokio::io::BufReader;
      32              : use tokio::runtime::Handle;
      33              : use tokio::sync::watch;
      34              : use tokio::task::JoinSet;
      35              : use tokio_util::sync::CancellationToken;
      36              : use tracing::*;
      37              : use utils::backoff;
      38              : use utils::completion;
      39              : use utils::crashsafe::path_with_suffix_extension;
      40              : use utils::failpoint_support;
      41              : use utils::fs_ext;
      42              : use utils::sync::gate::Gate;
      43              : use utils::sync::gate::GateGuard;
      44              : use utils::timeout::timeout_cancellable;
      45              : use utils::timeout::TimeoutCancellableError;
      46              : 
      47              : use self::config::AttachedLocationConfig;
      48              : use self::config::AttachmentMode;
      49              : use self::config::LocationConf;
      50              : use self::config::TenantConf;
      51              : use self::delete::DeleteTenantFlow;
      52              : use self::metadata::TimelineMetadata;
      53              : use self::mgr::GetActiveTenantError;
      54              : use self::mgr::GetTenantError;
      55              : use self::mgr::TenantsMap;
      56              : use self::remote_timeline_client::upload::upload_index_part;
      57              : use self::remote_timeline_client::RemoteTimelineClient;
      58              : use self::timeline::uninit::TimelineExclusionError;
      59              : use self::timeline::uninit::TimelineUninitMark;
      60              : use self::timeline::uninit::UninitializedTimeline;
      61              : use self::timeline::EvictionTaskTenantState;
      62              : use self::timeline::TimelineResources;
      63              : use self::timeline::WaitLsnError;
      64              : use crate::config::PageServerConf;
      65              : use crate::context::{DownloadBehavior, RequestContext};
      66              : use crate::deletion_queue::DeletionQueueClient;
      67              : use crate::deletion_queue::DeletionQueueError;
      68              : use crate::import_datadir;
      69              : use crate::is_uninit_mark;
      70              : use crate::metrics::TENANT;
      71              : use crate::metrics::{
      72              :     remove_tenant_metrics, BROKEN_TENANTS_SET, TENANT_STATE_METRIC, TENANT_SYNTHETIC_SIZE_METRIC,
      73              : };
      74              : use crate::repository::GcResult;
      75              : use crate::task_mgr;
      76              : use crate::task_mgr::TaskKind;
      77              : use crate::tenant::config::LocationMode;
      78              : use crate::tenant::config::TenantConfOpt;
      79              : pub use crate::tenant::remote_timeline_client::index::IndexPart;
      80              : use crate::tenant::remote_timeline_client::remote_initdb_archive_path;
      81              : use crate::tenant::remote_timeline_client::MaybeDeletedIndexPart;
      82              : use crate::tenant::remote_timeline_client::INITDB_PATH;
      83              : use crate::tenant::storage_layer::DeltaLayer;
      84              : use crate::tenant::storage_layer::ImageLayer;
      85              : use crate::InitializationOrder;
      86              : use std::cmp::min;
      87              : use std::collections::hash_map::Entry;
      88              : use std::collections::BTreeSet;
      89              : use std::collections::HashMap;
      90              : use std::collections::HashSet;
      91              : use std::fmt::Debug;
      92              : use std::fmt::Display;
      93              : use std::fs;
      94              : use std::fs::File;
      95              : use std::ops::Bound::Included;
      96              : use std::sync::atomic::AtomicU64;
      97              : use std::sync::atomic::Ordering;
      98              : use std::sync::Arc;
      99              : use std::sync::{Mutex, RwLock};
     100              : use std::time::{Duration, Instant};
     101              : 
     102              : use crate::span;
     103              : use crate::tenant::timeline::delete::DeleteTimelineFlow;
     104              : use crate::tenant::timeline::uninit::cleanup_timeline_directory;
     105              : use crate::virtual_file::VirtualFile;
     106              : use crate::walredo::PostgresRedoManager;
     107              : use crate::TEMP_FILE_SUFFIX;
     108              : use once_cell::sync::Lazy;
     109              : pub use pageserver_api::models::TenantState;
     110              : use tokio::sync::Semaphore;
     111              : 
     112            0 : static INIT_DB_SEMAPHORE: Lazy<Semaphore> = Lazy::new(|| Semaphore::new(8));
     113              : use toml_edit;
     114              : use utils::{
     115              :     crashsafe,
     116              :     generation::Generation,
     117              :     id::TimelineId,
     118              :     lsn::{Lsn, RecordLsn},
     119              : };
     120              : 
     121              : /// Declare a failpoint that can use the `pause` failpoint action.
     122              : /// We don't want to block the executor thread, hence, spawn_blocking + await.
     123              : macro_rules! pausable_failpoint {
     124              :     ($name:literal) => {
     125              :         if cfg!(feature = "testing") {
     126              :             tokio::task::spawn_blocking({
     127              :                 let current = tracing::Span::current();
     128              :                 move || {
     129              :                     let _entered = current.entered();
     130              :                     tracing::info!("at failpoint {}", $name);
     131              :                     fail::fail_point!($name);
     132              :                 }
     133              :             })
     134              :             .await
     135              :             .expect("spawn_blocking");
     136              :         }
     137              :     };
     138              :     ($name:literal, $cond:expr) => {
     139              :         if cfg!(feature = "testing") {
     140              :             if $cond {
     141              :                 pausable_failpoint!($name)
     142              :             }
     143              :         }
     144              :     };
     145              : }
     146              : 
     147              : pub mod blob_io;
     148              : pub mod block_io;
     149              : 
     150              : pub mod disk_btree;
     151              : pub(crate) mod ephemeral_file;
     152              : pub mod layer_map;
     153              : 
     154              : pub mod metadata;
     155              : mod par_fsync;
     156              : pub mod remote_timeline_client;
     157              : pub mod storage_layer;
     158              : 
     159              : pub mod config;
     160              : pub mod delete;
     161              : pub mod mgr;
     162              : pub mod secondary;
     163              : pub mod tasks;
     164              : pub mod upload_queue;
     165              : 
     166              : pub(crate) mod timeline;
     167              : 
     168              : pub mod size;
     169              : 
     170              : pub(crate) mod throttle;
     171              : 
     172              : pub(crate) use crate::span::debug_assert_current_span_has_tenant_and_timeline_id;
     173              : pub(crate) use timeline::{LogicalSizeCalculationCause, PageReconstructError, Timeline};
     174              : 
     175              : // re-export for use in remote_timeline_client.rs
     176              : pub use crate::tenant::metadata::save_metadata;
     177              : 
     178              : // re-export for use in walreceiver
     179              : pub use crate::tenant::timeline::WalReceiverInfo;
     180              : 
     181              : /// The "tenants" part of `tenants/<tenant>/timelines...`
     182              : pub const TENANTS_SEGMENT_NAME: &str = "tenants";
     183              : 
     184              : /// Parts of the `.neon/tenants/<tenant_id>/timelines/<timeline_id>` directory prefix.
     185              : pub const TIMELINES_SEGMENT_NAME: &str = "timelines";
     186              : 
     187              : pub const TENANT_DELETED_MARKER_FILE_NAME: &str = "deleted";
     188              : 
     189              : /// References to shared objects that are passed into each tenant, such
     190              : /// as the shared remote storage client and process initialization state.
     191            0 : #[derive(Clone)]
     192              : pub struct TenantSharedResources {
     193              :     pub broker_client: storage_broker::BrokerClientChannel,
     194              :     pub remote_storage: Option<GenericRemoteStorage>,
     195              :     pub deletion_queue_client: DeletionQueueClient,
     196              : }
     197              : 
     198              : /// A [`Tenant`] is really an _attached_ tenant.  The configuration
     199              : /// for an attached tenant is a subset of the [`LocationConf`], represented
     200              : /// in this struct.
     201              : pub(super) struct AttachedTenantConf {
     202              :     tenant_conf: TenantConfOpt,
     203              :     location: AttachedLocationConfig,
     204              : }
     205              : 
     206              : impl AttachedTenantConf {
     207           84 :     fn try_from(location_conf: LocationConf) -> anyhow::Result<Self> {
     208           84 :         match &location_conf.mode {
     209           84 :             LocationMode::Attached(attach_conf) => Ok(Self {
     210           84 :                 tenant_conf: location_conf.tenant_conf,
     211           84 :                 location: *attach_conf,
     212           84 :             }),
     213              :             LocationMode::Secondary(_) => {
     214            0 :                 anyhow::bail!("Attempted to construct AttachedTenantConf from a LocationConf in secondary mode")
     215              :             }
     216              :         }
     217           84 :     }
     218              : }
     219              : struct TimelinePreload {
     220              :     timeline_id: TimelineId,
     221              :     client: RemoteTimelineClient,
     222              :     index_part: Result<MaybeDeletedIndexPart, DownloadError>,
     223              : }
     224              : 
     225              : pub(crate) struct TenantPreload {
     226              :     deleting: bool,
     227              :     timelines: HashMap<TimelineId, TimelinePreload>,
     228              : }
     229              : 
     230              : /// When we spawn a tenant, there is a special mode for tenant creation that
     231              : /// avoids trying to read anything from remote storage.
     232              : pub(crate) enum SpawnMode {
     233              :     Normal,
     234              :     Create,
     235              : }
     236              : 
     237              : ///
     238              : /// Tenant consists of multiple timelines. Keep them in a hash table.
     239              : ///
     240              : pub struct Tenant {
     241              :     // Global pageserver config parameters
     242              :     pub conf: &'static PageServerConf,
     243              : 
     244              :     /// The value creation timestamp, used to measure activation delay, see:
     245              :     /// <https://github.com/neondatabase/neon/issues/4025>
     246              :     constructed_at: Instant,
     247              : 
     248              :     state: watch::Sender<TenantState>,
     249              : 
     250              :     // Overridden tenant-specific config parameters.
     251              :     // We keep TenantConfOpt sturct here to preserve the information
     252              :     // about parameters that are not set.
     253              :     // This is necessary to allow global config updates.
     254              :     tenant_conf: Arc<RwLock<AttachedTenantConf>>,
     255              : 
     256              :     tenant_shard_id: TenantShardId,
     257              : 
     258              :     // The detailed sharding information, beyond the number/count in tenant_shard_id
     259              :     shard_identity: ShardIdentity,
     260              : 
     261              :     /// The remote storage generation, used to protect S3 objects from split-brain.
     262              :     /// Does not change over the lifetime of the [`Tenant`] object.
     263              :     ///
     264              :     /// This duplicates the generation stored in LocationConf, but that structure is mutable:
     265              :     /// this copy enforces the invariant that generatio doesn't change during a Tenant's lifetime.
     266              :     generation: Generation,
     267              : 
     268              :     timelines: Mutex<HashMap<TimelineId, Arc<Timeline>>>,
     269              : 
     270              :     /// During timeline creation, we first insert the TimelineId to the
     271              :     /// creating map, then `timelines`, then remove it from the creating map.
     272              :     /// **Lock order**: if acquring both, acquire`timelines` before `timelines_creating`
     273              :     timelines_creating: std::sync::Mutex<HashSet<TimelineId>>,
     274              : 
     275              :     // This mutex prevents creation of new timelines during GC.
     276              :     // Adding yet another mutex (in addition to `timelines`) is needed because holding
     277              :     // `timelines` mutex during all GC iteration
     278              :     // may block for a long time `get_timeline`, `get_timelines_state`,... and other operations
     279              :     // with timelines, which in turn may cause dropping replication connection, expiration of wait_for_lsn
     280              :     // timeout...
     281              :     gc_cs: tokio::sync::Mutex<()>,
     282              :     walredo_mgr: Option<Arc<WalRedoManager>>,
     283              : 
     284              :     // provides access to timeline data sitting in the remote storage
     285              :     pub(crate) remote_storage: Option<GenericRemoteStorage>,
     286              : 
     287              :     // Access to global deletion queue for when this tenant wants to schedule a deletion
     288              :     deletion_queue_client: DeletionQueueClient,
     289              : 
     290              :     /// Cached logical sizes updated updated on each [`Tenant::gather_size_inputs`].
     291              :     cached_logical_sizes: tokio::sync::Mutex<HashMap<(TimelineId, Lsn), u64>>,
     292              :     cached_synthetic_tenant_size: Arc<AtomicU64>,
     293              : 
     294              :     eviction_task_tenant_state: tokio::sync::Mutex<EvictionTaskTenantState>,
     295              : 
     296              :     /// If the tenant is in Activating state, notify this to encourage it
     297              :     /// to proceed to Active as soon as possible, rather than waiting for lazy
     298              :     /// background warmup.
     299              :     pub(crate) activate_now_sem: tokio::sync::Semaphore,
     300              : 
     301              :     pub(crate) delete_progress: Arc<tokio::sync::Mutex<DeleteTenantFlow>>,
     302              : 
     303              :     // Cancellation token fires when we have entered shutdown().  This is a parent of
     304              :     // Timelines' cancellation token.
     305              :     pub(crate) cancel: CancellationToken,
     306              : 
     307              :     // Users of the Tenant such as the page service must take this Gate to avoid
     308              :     // trying to use a Tenant which is shutting down.
     309              :     pub(crate) gate: Gate,
     310              : 
     311              :     /// Throttle applied at the top of [`Timeline::get`].
     312              :     /// All [`Tenant::timelines`] of a given [`Tenant`] instance share the same [`throttle::Throttle`] instance.
     313              :     pub(crate) timeline_get_throttle:
     314              :         Arc<throttle::Throttle<&'static crate::metrics::tenant_throttling::TimelineGet>>,
     315              : }
     316              : 
     317              : impl std::fmt::Debug for Tenant {
     318            0 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
     319            0 :         write!(f, "{} ({})", self.tenant_shard_id, self.current_state())
     320            0 :     }
     321              : }
     322              : 
     323              : pub(crate) enum WalRedoManager {
     324              :     Prod(PostgresRedoManager),
     325              :     #[cfg(test)]
     326              :     Test(harness::TestRedoManager),
     327              : }
     328              : 
     329              : impl From<PostgresRedoManager> for WalRedoManager {
     330            0 :     fn from(mgr: PostgresRedoManager) -> Self {
     331            0 :         Self::Prod(mgr)
     332            0 :     }
     333              : }
     334              : 
     335              : #[cfg(test)]
     336              : impl From<harness::TestRedoManager> for WalRedoManager {
     337           84 :     fn from(mgr: harness::TestRedoManager) -> Self {
     338           84 :         Self::Test(mgr)
     339           84 :     }
     340              : }
     341              : 
     342              : impl WalRedoManager {
     343            0 :     pub(crate) fn maybe_quiesce(&self, idle_timeout: Duration) {
     344            0 :         match self {
     345            0 :             Self::Prod(mgr) => mgr.maybe_quiesce(idle_timeout),
     346            0 :             #[cfg(test)]
     347            0 :             Self::Test(_) => {
     348            0 :                 // Not applicable to test redo manager
     349            0 :             }
     350            0 :         }
     351            0 :     }
     352              : 
     353              :     /// # Cancel-Safety
     354              :     ///
     355              :     /// This method is cancellation-safe.
     356            8 :     pub async fn request_redo(
     357            8 :         &self,
     358            8 :         key: crate::repository::Key,
     359            8 :         lsn: Lsn,
     360            8 :         base_img: Option<(Lsn, bytes::Bytes)>,
     361            8 :         records: Vec<(Lsn, crate::walrecord::NeonWalRecord)>,
     362            8 :         pg_version: u32,
     363            8 :     ) -> anyhow::Result<bytes::Bytes> {
     364            8 :         match self {
     365            0 :             Self::Prod(mgr) => {
     366            0 :                 mgr.request_redo(key, lsn, base_img, records, pg_version)
     367            0 :                     .await
     368              :             }
     369              :             #[cfg(test)]
     370            8 :             Self::Test(mgr) => {
     371            8 :                 mgr.request_redo(key, lsn, base_img, records, pg_version)
     372            0 :                     .await
     373              :             }
     374              :         }
     375            8 :     }
     376              : 
     377            0 :     pub(crate) fn status(&self) -> Option<WalRedoManagerStatus> {
     378            0 :         match self {
     379            0 :             WalRedoManager::Prod(m) => m.status(),
     380            0 :             #[cfg(test)]
     381            0 :             WalRedoManager::Test(_) => None,
     382            0 :         }
     383            0 :     }
     384              : }
     385              : 
     386            2 : #[derive(Debug, thiserror::Error, PartialEq, Eq)]
     387              : pub enum GetTimelineError {
     388              :     #[error("Timeline {tenant_id}/{timeline_id} is not active, state: {state:?}")]
     389              :     NotActive {
     390              :         tenant_id: TenantShardId,
     391              :         timeline_id: TimelineId,
     392              :         state: TimelineState,
     393              :     },
     394              :     #[error("Timeline {tenant_id}/{timeline_id} was not found")]
     395              :     NotFound {
     396              :         tenant_id: TenantShardId,
     397              :         timeline_id: TimelineId,
     398              :     },
     399              : }
     400              : 
     401            0 : #[derive(Debug, thiserror::Error)]
     402              : pub enum LoadLocalTimelineError {
     403              :     #[error("FailedToLoad")]
     404              :     Load(#[source] anyhow::Error),
     405              :     #[error("FailedToResumeDeletion")]
     406              :     ResumeDeletion(#[source] anyhow::Error),
     407              : }
     408              : 
     409            0 : #[derive(thiserror::Error)]
     410              : pub enum DeleteTimelineError {
     411              :     #[error("NotFound")]
     412              :     NotFound,
     413              : 
     414              :     #[error("HasChildren")]
     415              :     HasChildren(Vec<TimelineId>),
     416              : 
     417              :     #[error("Timeline deletion is already in progress")]
     418              :     AlreadyInProgress(Arc<tokio::sync::Mutex<DeleteTimelineFlow>>),
     419              : 
     420              :     #[error(transparent)]
     421              :     Other(#[from] anyhow::Error),
     422              : }
     423              : 
     424              : impl Debug for DeleteTimelineError {
     425            0 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
     426            0 :         match self {
     427            0 :             Self::NotFound => write!(f, "NotFound"),
     428            0 :             Self::HasChildren(c) => f.debug_tuple("HasChildren").field(c).finish(),
     429            0 :             Self::AlreadyInProgress(_) => f.debug_tuple("AlreadyInProgress").finish(),
     430            0 :             Self::Other(e) => f.debug_tuple("Other").field(e).finish(),
     431              :         }
     432            0 :     }
     433              : }
     434              : 
     435              : pub enum SetStoppingError {
     436              :     AlreadyStopping(completion::Barrier),
     437              :     Broken,
     438              : }
     439              : 
     440              : impl Debug for SetStoppingError {
     441            0 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
     442            0 :         match self {
     443            0 :             Self::AlreadyStopping(_) => f.debug_tuple("AlreadyStopping").finish(),
     444            0 :             Self::Broken => write!(f, "Broken"),
     445              :         }
     446            0 :     }
     447              : }
     448              : 
     449            0 : #[derive(thiserror::Error, Debug)]
     450              : pub enum CreateTimelineError {
     451              :     #[error("creation of timeline with the given ID is in progress")]
     452              :     AlreadyCreating,
     453              :     #[error("timeline already exists with different parameters")]
     454              :     Conflict,
     455              :     #[error(transparent)]
     456              :     AncestorLsn(anyhow::Error),
     457              :     #[error("ancestor timeline is not active")]
     458              :     AncestorNotActive,
     459              :     #[error("tenant shutting down")]
     460              :     ShuttingDown,
     461              :     #[error(transparent)]
     462              :     Other(#[from] anyhow::Error),
     463              : }
     464              : 
     465            0 : #[derive(thiserror::Error, Debug)]
     466              : enum InitdbError {
     467              :     Other(anyhow::Error),
     468              :     Cancelled,
     469              :     Spawn(std::io::Result<()>),
     470              :     Failed(std::process::ExitStatus, Vec<u8>),
     471              : }
     472              : 
     473              : impl fmt::Display for InitdbError {
     474            0 :     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
     475            0 :         match self {
     476            0 :             InitdbError::Cancelled => write!(f, "Operation was cancelled"),
     477            0 :             InitdbError::Spawn(e) => write!(f, "Spawn error: {:?}", e),
     478            0 :             InitdbError::Failed(status, stderr) => write!(
     479            0 :                 f,
     480            0 :                 "Command failed with status {:?}: {}",
     481            0 :                 status,
     482            0 :                 String::from_utf8_lossy(stderr)
     483            0 :             ),
     484            0 :             InitdbError::Other(e) => write!(f, "Error: {:?}", e),
     485              :         }
     486            0 :     }
     487              : }
     488              : 
     489              : impl From<std::io::Error> for InitdbError {
     490            0 :     fn from(error: std::io::Error) -> Self {
     491            0 :         InitdbError::Spawn(Err(error))
     492            0 :     }
     493              : }
     494              : 
     495              : enum CreateTimelineCause {
     496              :     Load,
     497              :     Delete,
     498              : }
     499              : 
     500              : impl Tenant {
     501              :     /// Yet another helper for timeline initialization.
     502              :     ///
     503              :     /// - Initializes the Timeline struct and inserts it into the tenant's hash map
     504              :     /// - Scans the local timeline directory for layer files and builds the layer map
     505              :     /// - Downloads remote index file and adds remote files to the layer map
     506              :     /// - Schedules remote upload tasks for any files that are present locally but missing from remote storage.
     507              :     ///
     508              :     /// If the operation fails, the timeline is left in the tenant's hash map in Broken state. On success,
     509              :     /// it is marked as Active.
     510              :     #[allow(clippy::too_many_arguments)]
     511            6 :     async fn timeline_init_and_sync(
     512            6 :         &self,
     513            6 :         timeline_id: TimelineId,
     514            6 :         resources: TimelineResources,
     515            6 :         index_part: Option<IndexPart>,
     516            6 :         metadata: TimelineMetadata,
     517            6 :         ancestor: Option<Arc<Timeline>>,
     518            6 :         _ctx: &RequestContext,
     519            6 :     ) -> anyhow::Result<()> {
     520            6 :         let tenant_id = self.tenant_shard_id;
     521              : 
     522            6 :         let timeline = self.create_timeline_struct(
     523            6 :             timeline_id,
     524            6 :             &metadata,
     525            6 :             ancestor.clone(),
     526            6 :             resources,
     527            6 :             CreateTimelineCause::Load,
     528            6 :         )?;
     529            6 :         let disk_consistent_lsn = timeline.get_disk_consistent_lsn();
     530            6 :         anyhow::ensure!(
     531            6 :             disk_consistent_lsn.is_valid(),
     532            0 :             "Timeline {tenant_id}/{timeline_id} has invalid disk_consistent_lsn"
     533              :         );
     534            6 :         assert_eq!(
     535            6 :             disk_consistent_lsn,
     536            6 :             metadata.disk_consistent_lsn(),
     537            0 :             "these are used interchangeably"
     538              :         );
     539              : 
     540            6 :         if let Some(index_part) = index_part.as_ref() {
     541            6 :             timeline
     542            6 :                 .remote_client
     543            6 :                 .as_ref()
     544            6 :                 .unwrap()
     545            6 :                 .init_upload_queue(index_part)?;
     546            0 :         } else if self.remote_storage.is_some() {
     547              :             // No data on the remote storage, but we have local metadata file. We can end up
     548              :             // here with timeline_create being interrupted before finishing index part upload.
     549              :             // By doing what we do here, the index part upload is retried.
     550              :             // If control plane retries timeline creation in the meantime, the mgmt API handler
     551              :             // for timeline creation will coalesce on the upload we queue here.
     552            0 :             let rtc = timeline.remote_client.as_ref().unwrap();
     553            0 :             rtc.init_upload_queue_for_empty_remote(&metadata)?;
     554            0 :             rtc.schedule_index_upload_for_metadata_update(&metadata)?;
     555            0 :         }
     556              : 
     557            6 :         timeline
     558            6 :             .load_layer_map(disk_consistent_lsn, index_part)
     559            6 :             .await
     560            6 :             .with_context(|| {
     561            0 :                 format!("Failed to load layermap for timeline {tenant_id}/{timeline_id}")
     562            6 :             })?;
     563              : 
     564              :         {
     565              :             // avoiding holding it across awaits
     566            6 :             let mut timelines_accessor = self.timelines.lock().unwrap();
     567            6 :             match timelines_accessor.entry(timeline_id) {
     568              :                 Entry::Occupied(_) => {
     569              :                     // The uninit mark file acts as a lock that prevents another task from
     570              :                     // initializing the timeline at the same time.
     571            0 :                     unreachable!(
     572            0 :                         "Timeline {tenant_id}/{timeline_id} already exists in the tenant map"
     573            0 :                     );
     574              :                 }
     575            6 :                 Entry::Vacant(v) => {
     576            6 :                     v.insert(Arc::clone(&timeline));
     577            6 :                     timeline.maybe_spawn_flush_loop();
     578            6 :                 }
     579            6 :             }
     580            6 :         };
     581            6 : 
     582            6 :         // Sanity check: a timeline should have some content.
     583            6 :         anyhow::ensure!(
     584            6 :             ancestor.is_some()
     585            4 :                 || timeline
     586            4 :                     .layers
     587            4 :                     .read()
     588            0 :                     .await
     589            4 :                     .layer_map()
     590            4 :                     .iter_historic_layers()
     591            4 :                     .next()
     592            4 :                     .is_some(),
     593            0 :             "Timeline has no ancestor and no layer files"
     594              :         );
     595              : 
     596            6 :         Ok(())
     597            6 :     }
     598              : 
     599              :     /// Attach a tenant that's available in cloud storage.
     600              :     ///
     601              :     /// This returns quickly, after just creating the in-memory object
     602              :     /// Tenant struct and launching a background task to download
     603              :     /// the remote index files.  On return, the tenant is most likely still in
     604              :     /// Attaching state, and it will become Active once the background task
     605              :     /// finishes. You can use wait_until_active() to wait for the task to
     606              :     /// complete.
     607              :     ///
     608              :     #[allow(clippy::too_many_arguments)]
     609            0 :     pub(crate) fn spawn(
     610            0 :         conf: &'static PageServerConf,
     611            0 :         tenant_shard_id: TenantShardId,
     612            0 :         resources: TenantSharedResources,
     613            0 :         attached_conf: AttachedTenantConf,
     614            0 :         shard_identity: ShardIdentity,
     615            0 :         init_order: Option<InitializationOrder>,
     616            0 :         tenants: &'static std::sync::RwLock<TenantsMap>,
     617            0 :         mode: SpawnMode,
     618            0 :         ctx: &RequestContext,
     619            0 :     ) -> anyhow::Result<Arc<Tenant>> {
     620            0 :         let wal_redo_manager = Arc::new(WalRedoManager::from(PostgresRedoManager::new(
     621            0 :             conf,
     622            0 :             tenant_shard_id,
     623            0 :         )));
     624            0 : 
     625            0 :         let TenantSharedResources {
     626            0 :             broker_client,
     627            0 :             remote_storage,
     628            0 :             deletion_queue_client,
     629            0 :         } = resources;
     630            0 : 
     631            0 :         let attach_mode = attached_conf.location.attach_mode;
     632            0 :         let generation = attached_conf.location.generation;
     633            0 : 
     634            0 :         let tenant = Arc::new(Tenant::new(
     635            0 :             TenantState::Attaching,
     636            0 :             conf,
     637            0 :             attached_conf,
     638            0 :             shard_identity,
     639            0 :             Some(wal_redo_manager),
     640            0 :             tenant_shard_id,
     641            0 :             remote_storage.clone(),
     642            0 :             deletion_queue_client,
     643            0 :         ));
     644            0 : 
     645            0 :         // The attach task will carry a GateGuard, so that shutdown() reliably waits for it to drop out if
     646            0 :         // we shut down while attaching.
     647            0 :         let attach_gate_guard = tenant
     648            0 :             .gate
     649            0 :             .enter()
     650            0 :             .expect("We just created the Tenant: nothing else can have shut it down yet");
     651            0 : 
     652            0 :         // Do all the hard work in the background
     653            0 :         let tenant_clone = Arc::clone(&tenant);
     654            0 :         let ctx = ctx.detached_child(TaskKind::Attach, DownloadBehavior::Warn);
     655            0 :         task_mgr::spawn(
     656            0 :             &tokio::runtime::Handle::current(),
     657            0 :             TaskKind::Attach,
     658            0 :             Some(tenant_shard_id),
     659            0 :             None,
     660            0 :             "attach tenant",
     661              :             false,
     662            0 :             async move {
     663            0 : 
     664            0 :                 info!(
     665            0 :                     ?attach_mode,
     666            0 :                     "Attaching tenant"
     667            0 :                 );
     668              : 
     669            0 :                 let _gate_guard = attach_gate_guard;
     670            0 : 
     671            0 :                 // Is this tenant being spawned as part of process startup?
     672            0 :                 let starting_up = init_order.is_some();
     673            0 :                 scopeguard::defer! {
     674            0 :                     if starting_up {
     675            0 :                         TENANT.startup_complete.inc();
     676            0 :                     }
     677              :                 }
     678              : 
     679              :                 // Ideally we should use Tenant::set_broken_no_wait, but it is not supposed to be used when tenant is in loading state.
     680            0 :                 let make_broken =
     681            0 :                     |t: &Tenant, err: anyhow::Error| {
     682            0 :                         error!("attach failed, setting tenant state to Broken: {err:?}");
     683            0 :                         t.state.send_modify(|state| {
     684            0 :                             // The Stopping case is for when we have passed control on to DeleteTenantFlow:
     685            0 :                             // if it errors, we will call make_broken when tenant is already in Stopping.
     686            0 :                             assert!(
     687            0 :                             matches!(*state, TenantState::Attaching | TenantState::Stopping { .. }),
     688            0 :                             "the attach task owns the tenant state until activation is complete"
     689              :                         );
     690              : 
     691            0 :                             *state = TenantState::broken_from_reason(err.to_string());
     692            0 :                         });
     693            0 :                     };
     694              : 
     695            0 :                 let mut init_order = init_order;
     696            0 :                 // take the completion because initial tenant loading will complete when all of
     697            0 :                 // these tasks complete.
     698            0 :                 let _completion = init_order
     699            0 :                     .as_mut()
     700            0 :                     .and_then(|x| x.initial_tenant_load.take());
     701            0 :                 let remote_load_completion = init_order
     702            0 :                     .as_mut()
     703            0 :                     .and_then(|x| x.initial_tenant_load_remote.take());
     704              : 
     705              :                 enum AttachType<'a> {
     706              :                     // During pageserver startup, we are attaching this tenant lazily in the background
     707              :                     Warmup(tokio::sync::SemaphorePermit<'a>),
     708              :                     // During pageserver startup, we are attaching this tenant as soon as we can,
     709              :                     // because a client tried to access it.
     710              :                     OnDemand,
     711              :                     // During normal operations after startup, we are attaching a tenant.
     712              :                     Normal,
     713              :                 }
     714              : 
     715              :                 // Before doing any I/O, wait for either or:
     716              :                 // - A client to attempt to access to this tenant (on-demand loading)
     717              :                 // - A permit to become available in the warmup semaphore (background warmup)
     718              :                 //
     719              :                 // Some-ness of init_order is how we know if we're attaching during startup or later
     720              :                 // in process lifetime.
     721            0 :                 let attach_type = if init_order.is_some() {
     722            0 :                     tokio::select!(
     723              :                         _ = tenant_clone.activate_now_sem.acquire() => {
     724            0 :                             tracing::info!("Activating tenant (on-demand)");
     725              :                             AttachType::OnDemand
     726              :                         },
     727            0 :                         permit_result = conf.concurrent_tenant_warmup.inner().acquire() => {
     728              :                             match permit_result {
     729              :                                 Ok(p) => {
     730            0 :                                     tracing::info!("Activating tenant (warmup)");
     731              :                                     AttachType::Warmup(p)
     732              :                                 }
     733              :                                 Err(_) => {
     734              :                                     // This is unexpected: the warmup semaphore should stay alive
     735              :                                     // for the lifetime of init_order.  Log a warning and proceed.
     736            0 :                                     tracing::warn!("warmup_limit semaphore unexpectedly closed");
     737              :                                     AttachType::Normal
     738              :                                 }
     739              :                             }
     740              : 
     741              :                         }
     742              :                         _ = tenant_clone.cancel.cancelled() => {
     743              :                             // This is safe, but should be pretty rare: it is interesting if a tenant
     744              :                             // stayed in Activating for such a long time that shutdown found it in
     745              :                             // that state.
     746            0 :                             tracing::info!(state=%tenant_clone.current_state(), "Tenant shut down before activation");
     747              :                             // Make the tenant broken so that set_stopping will not hang waiting for it to leave
     748              :                             // the Attaching state.  This is an over-reaction (nothing really broke, the tenant is
     749              :                             // just shutting down), but ensures progress.
     750              :                             make_broken(&tenant_clone, anyhow::anyhow!("Shut down while Attaching"));
     751              :                             return Ok(());
     752              :                         },
     753              :                     )
     754              :                 } else {
     755            0 :                     AttachType::Normal
     756              :                 };
     757              : 
     758            0 :                 let preload = match (&mode, &remote_storage) {
     759              :                     (SpawnMode::Create, _) => {
     760            0 :                         None
     761              :                     },
     762            0 :                     (SpawnMode::Normal, Some(remote_storage)) => {
     763            0 :                         let _preload_timer = TENANT.preload.start_timer();
     764            0 :                         let res = tenant_clone
     765            0 :                             .preload(remote_storage, task_mgr::shutdown_token())
     766            0 :                             .await;
     767            0 :                         match res {
     768            0 :                             Ok(p) => Some(p),
     769            0 :                             Err(e) => {
     770            0 :                                 make_broken(&tenant_clone, anyhow::anyhow!(e));
     771            0 :                                 return Ok(());
     772              :                             }
     773              :                         }
     774              :                     }
     775              :                     (SpawnMode::Normal, None) => {
     776            0 :                         let _preload_timer = TENANT.preload.start_timer();
     777            0 :                         None
     778              :                     }
     779              :                 };
     780              : 
     781              :                 // Remote preload is complete.
     782            0 :                 drop(remote_load_completion);
     783              : 
     784            0 :                 let pending_deletion = {
     785            0 :                     match DeleteTenantFlow::should_resume_deletion(
     786            0 :                         conf,
     787            0 :                         preload.as_ref().map(|p| p.deleting).unwrap_or(false),
     788            0 :                         &tenant_clone,
     789            0 :                     )
     790            0 :                     .await
     791              :                     {
     792            0 :                         Ok(should_resume_deletion) => should_resume_deletion,
     793            0 :                         Err(err) => {
     794            0 :                             make_broken(&tenant_clone, anyhow::anyhow!(err));
     795            0 :                             return Ok(());
     796              :                         }
     797              :                     }
     798              :                 };
     799              : 
     800            0 :                 info!("pending_deletion {}", pending_deletion.is_some());
     801              : 
     802            0 :                 if let Some(deletion) = pending_deletion {
     803              :                     // as we are no longer loading, signal completion by dropping
     804              :                     // the completion while we resume deletion
     805            0 :                     drop(_completion);
     806            0 :                     let background_jobs_can_start =
     807            0 :                         init_order.as_ref().map(|x| &x.background_jobs_can_start);
     808            0 :                     if let Some(background) = background_jobs_can_start {
     809            0 :                         info!("waiting for backgound jobs barrier");
     810            0 :                         background.clone().wait().await;
     811            0 :                         info!("ready for backgound jobs barrier");
     812            0 :                     }
     813              : 
     814            0 :                     let deleted = DeleteTenantFlow::resume_from_attach(
     815            0 :                         deletion,
     816            0 :                         &tenant_clone,
     817            0 :                         preload,
     818            0 :                         tenants,
     819            0 :                         &ctx,
     820            0 :                     )
     821            0 :                     .await;
     822              : 
     823            0 :                     if let Err(e) = deleted {
     824            0 :                         make_broken(&tenant_clone, anyhow::anyhow!(e));
     825            0 :                     }
     826              : 
     827            0 :                     return Ok(());
     828            0 :                 }
     829              : 
     830              :                 // We will time the duration of the attach phase unless this is a creation (attach will do no work)
     831            0 :                 let attached = {
     832            0 :                     let _attach_timer = match mode {
     833            0 :                         SpawnMode::Create => None,
     834            0 :                         SpawnMode::Normal => {Some(TENANT.attach.start_timer())}
     835              :                     };
     836            0 :                     tenant_clone.attach(preload, mode, &ctx).await
     837              :                 };
     838              : 
     839            0 :                 match attached {
     840              :                     Ok(()) => {
     841            0 :                         info!("attach finished, activating");
     842            0 :                         tenant_clone.activate(broker_client, None, &ctx);
     843              :                     }
     844            0 :                     Err(e) => {
     845            0 :                         make_broken(&tenant_clone, anyhow::anyhow!(e));
     846            0 :                     }
     847              :                 }
     848              : 
     849              :                 // If we are doing an opportunistic warmup attachment at startup, initialize
     850              :                 // logical size at the same time.  This is better than starting a bunch of idle tenants
     851              :                 // with cold caches and then coming back later to initialize their logical sizes.
     852              :                 //
     853              :                 // It also prevents the warmup proccess competing with the concurrency limit on
     854              :                 // logical size calculations: if logical size calculation semaphore is saturated,
     855              :                 // then warmup will wait for that before proceeding to the next tenant.
     856            0 :                 if let AttachType::Warmup(_permit) = attach_type {
     857            0 :                     let mut futs: FuturesUnordered<_> = tenant_clone.timelines.lock().unwrap().values().cloned().map(|t| t.await_initial_logical_size()).collect();
     858            0 :                     tracing::info!("Waiting for initial logical sizes while warming up...");
     859            0 :                     while futs.next().await.is_some() {}
     860            0 :                     tracing::info!("Warm-up complete");
     861            0 :                 }
     862              : 
     863            0 :                 Ok(())
     864            0 :             }
     865            0 :             .instrument(tracing::info_span!(parent: None, "attach", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), gen=?generation)),
     866              :         );
     867            0 :         Ok(tenant)
     868            0 :     }
     869              : 
     870          168 :     #[instrument(skip_all)]
     871              :     pub(crate) async fn preload(
     872              :         self: &Arc<Tenant>,
     873              :         remote_storage: &GenericRemoteStorage,
     874              :         cancel: CancellationToken,
     875              :     ) -> anyhow::Result<TenantPreload> {
     876              :         span::debug_assert_current_span_has_tenant_id();
     877              :         // Get list of remote timelines
     878              :         // download index files for every tenant timeline
     879           84 :         info!("listing remote timelines");
     880              :         let (remote_timeline_ids, other_keys) = remote_timeline_client::list_remote_timelines(
     881              :             remote_storage,
     882              :             self.tenant_shard_id,
     883              :             cancel.clone(),
     884              :         )
     885              :         .await?;
     886              : 
     887              :         let deleting = other_keys.contains(TENANT_DELETED_MARKER_FILE_NAME);
     888           84 :         info!(
     889           84 :             "found {} timelines, deleting={}",
     890           84 :             remote_timeline_ids.len(),
     891           84 :             deleting
     892           84 :         );
     893              : 
     894              :         for k in other_keys {
     895              :             if k != TENANT_DELETED_MARKER_FILE_NAME {
     896            0 :                 warn!("Unexpected non timeline key {k}");
     897              :             }
     898              :         }
     899              : 
     900              :         Ok(TenantPreload {
     901              :             deleting,
     902              :             timelines: self
     903              :                 .load_timeline_metadata(remote_timeline_ids, remote_storage, cancel)
     904              :                 .await?,
     905              :         })
     906              :     }
     907              : 
     908              :     ///
     909              :     /// Background task that downloads all data for a tenant and brings it to Active state.
     910              :     ///
     911              :     /// No background tasks are started as part of this routine.
     912              :     ///
     913           84 :     async fn attach(
     914           84 :         self: &Arc<Tenant>,
     915           84 :         preload: Option<TenantPreload>,
     916           84 :         mode: SpawnMode,
     917           84 :         ctx: &RequestContext,
     918           84 :     ) -> anyhow::Result<()> {
     919           84 :         span::debug_assert_current_span_has_tenant_id();
     920              : 
     921            0 :         failpoint_support::sleep_millis_async!("before-attaching-tenant");
     922              : 
     923           84 :         let preload = match (preload, mode) {
     924           84 :             (Some(p), _) => p,
     925            0 :             (None, SpawnMode::Create) => TenantPreload {
     926            0 :                 deleting: false,
     927            0 :                 timelines: HashMap::new(),
     928            0 :             },
     929              :             (None, SpawnMode::Normal) => {
     930            0 :                 anyhow::bail!("local-only deployment is no longer supported, https://github.com/neondatabase/neon/issues/5624");
     931              :             }
     932              :         };
     933              : 
     934           84 :         let mut timelines_to_resume_deletions = vec![];
     935           84 : 
     936           84 :         let mut remote_index_and_client = HashMap::new();
     937           84 :         let mut timeline_ancestors = HashMap::new();
     938           84 :         let mut existent_timelines = HashSet::new();
     939           90 :         for (timeline_id, preload) in preload.timelines {
     940            6 :             let index_part = match preload.index_part {
     941            6 :                 Ok(i) => {
     942            0 :                     debug!("remote index part exists for timeline {timeline_id}");
     943              :                     // We found index_part on the remote, this is the standard case.
     944            6 :                     existent_timelines.insert(timeline_id);
     945            6 :                     i
     946              :                 }
     947              :                 Err(DownloadError::NotFound) => {
     948              :                     // There is no index_part on the remote. We only get here
     949              :                     // if there is some prefix for the timeline in the remote storage.
     950              :                     // This can e.g. be the initdb.tar.zst archive, maybe a
     951              :                     // remnant from a prior incomplete creation or deletion attempt.
     952              :                     // Delete the local directory as the deciding criterion for a
     953              :                     // timeline's existence is presence of index_part.
     954            0 :                     info!(%timeline_id, "index_part not found on remote");
     955            0 :                     continue;
     956              :                 }
     957            0 :                 Err(e) => {
     958              :                     // Some (possibly ephemeral) error happened during index_part download.
     959              :                     // Pretend the timeline exists to not delete the timeline directory,
     960              :                     // as it might be a temporary issue and we don't want to re-download
     961              :                     // everything after it resolves.
     962            0 :                     warn!(%timeline_id, "Failed to load index_part from remote storage, failed creation? ({e})");
     963              : 
     964            0 :                     existent_timelines.insert(timeline_id);
     965            0 :                     continue;
     966              :                 }
     967              :             };
     968            6 :             match index_part {
     969            6 :                 MaybeDeletedIndexPart::IndexPart(index_part) => {
     970            6 :                     timeline_ancestors.insert(timeline_id, index_part.metadata.clone());
     971            6 :                     remote_index_and_client.insert(timeline_id, (index_part, preload.client));
     972            6 :                 }
     973            0 :                 MaybeDeletedIndexPart::Deleted(index_part) => {
     974            0 :                     info!(
     975            0 :                         "timeline {} is deleted, picking to resume deletion",
     976            0 :                         timeline_id
     977            0 :                     );
     978            0 :                     timelines_to_resume_deletions.push((timeline_id, index_part, preload.client));
     979              :                 }
     980              :             }
     981              :         }
     982              : 
     983              :         // For every timeline, download the metadata file, scan the local directory,
     984              :         // and build a layer map that contains an entry for each remote and local
     985              :         // layer file.
     986           84 :         let sorted_timelines = tree_sort_timelines(timeline_ancestors, |m| m.ancestor_timeline())?;
     987           90 :         for (timeline_id, remote_metadata) in sorted_timelines {
     988            6 :             let (index_part, remote_client) = remote_index_and_client
     989            6 :                 .remove(&timeline_id)
     990            6 :                 .expect("just put it in above");
     991            6 : 
     992            6 :             // TODO again handle early failure
     993            6 :             self.load_remote_timeline(
     994            6 :                 timeline_id,
     995            6 :                 index_part,
     996            6 :                 remote_metadata,
     997            6 :                 TimelineResources {
     998            6 :                     remote_client: Some(remote_client),
     999            6 :                     deletion_queue_client: self.deletion_queue_client.clone(),
    1000            6 :                     timeline_get_throttle: self.timeline_get_throttle.clone(),
    1001            6 :                 },
    1002            6 :                 ctx,
    1003            6 :             )
    1004           17 :             .await
    1005            6 :             .with_context(|| {
    1006            0 :                 format!(
    1007            0 :                     "failed to load remote timeline {} for tenant {}",
    1008            0 :                     timeline_id, self.tenant_shard_id
    1009            0 :                 )
    1010            6 :             })?;
    1011              :         }
    1012              : 
    1013              :         // Walk through deleted timelines, resume deletion
    1014           84 :         for (timeline_id, index_part, remote_timeline_client) in timelines_to_resume_deletions {
    1015            0 :             remote_timeline_client
    1016            0 :                 .init_upload_queue_stopped_to_continue_deletion(&index_part)
    1017            0 :                 .context("init queue stopped")
    1018            0 :                 .map_err(LoadLocalTimelineError::ResumeDeletion)?;
    1019              : 
    1020            0 :             DeleteTimelineFlow::resume_deletion(
    1021            0 :                 Arc::clone(self),
    1022            0 :                 timeline_id,
    1023            0 :                 &index_part.metadata,
    1024            0 :                 Some(remote_timeline_client),
    1025            0 :                 self.deletion_queue_client.clone(),
    1026            0 :             )
    1027            0 :             .instrument(tracing::info_span!("timeline_delete", %timeline_id))
    1028            0 :             .await
    1029            0 :             .context("resume_deletion")
    1030            0 :             .map_err(LoadLocalTimelineError::ResumeDeletion)?;
    1031              :         }
    1032              : 
    1033              :         // The local filesystem contents are a cache of what's in the remote IndexPart;
    1034              :         // IndexPart is the source of truth.
    1035           84 :         self.clean_up_timelines(&existent_timelines)?;
    1036              : 
    1037           84 :         fail::fail_point!("attach-before-activate", |_| {
    1038            0 :             anyhow::bail!("attach-before-activate");
    1039           84 :         });
    1040            0 :         failpoint_support::sleep_millis_async!("attach-before-activate-sleep", &self.cancel);
    1041              : 
    1042           84 :         info!("Done");
    1043              : 
    1044           84 :         Ok(())
    1045           84 :     }
    1046              : 
    1047              :     /// Check for any local timeline directories that are temporary, or do not correspond to a
    1048              :     /// timeline that still exists: this can happen if we crashed during a deletion/creation, or
    1049              :     /// if a timeline was deleted while the tenant was attached to a different pageserver.
    1050           84 :     fn clean_up_timelines(&self, existent_timelines: &HashSet<TimelineId>) -> anyhow::Result<()> {
    1051           84 :         let timelines_dir = self.conf.timelines_path(&self.tenant_shard_id);
    1052              : 
    1053           84 :         let entries = match timelines_dir.read_dir_utf8() {
    1054           84 :             Ok(d) => d,
    1055            0 :             Err(e) => {
    1056            0 :                 if e.kind() == std::io::ErrorKind::NotFound {
    1057            0 :                     return Ok(());
    1058              :                 } else {
    1059            0 :                     return Err(e).context("list timelines directory for tenant");
    1060              :                 }
    1061              :             }
    1062              :         };
    1063              : 
    1064           94 :         for entry in entries {
    1065           10 :             let entry = entry.context("read timeline dir entry")?;
    1066           10 :             let entry_path = entry.path();
    1067              : 
    1068           10 :             let purge = if crate::is_temporary(entry_path)
    1069              :                 // TODO: uninit_mark isn't needed any more, since uninitialized timelines are already
    1070              :                 // covered by the check that the timeline must exist in remote storage.
    1071           10 :                 || is_uninit_mark(entry_path)
    1072            8 :                 || crate::is_delete_mark(entry_path)
    1073              :             {
    1074            2 :                 true
    1075              :             } else {
    1076            8 :                 match TimelineId::try_from(entry_path.file_name()) {
    1077            8 :                     Ok(i) => {
    1078            8 :                         // Purge if the timeline ID does not exist in remote storage: remote storage is the authority.
    1079            8 :                         !existent_timelines.contains(&i)
    1080              :                     }
    1081            0 :                     Err(e) => {
    1082            0 :                         tracing::warn!(
    1083            0 :                             "Unparseable directory in timelines directory: {entry_path}, ignoring ({e})"
    1084            0 :                         );
    1085              :                         // Do not purge junk: if we don't recognize it, be cautious and leave it for a human.
    1086            0 :                         false
    1087              :                     }
    1088              :                 }
    1089              :             };
    1090              : 
    1091           10 :             if purge {
    1092            4 :                 tracing::info!("Purging stale timeline dentry {entry_path}");
    1093            4 :                 if let Err(e) = match entry.file_type() {
    1094            4 :                     Ok(t) => if t.is_dir() {
    1095            2 :                         std::fs::remove_dir_all(entry_path)
    1096              :                     } else {
    1097            2 :                         std::fs::remove_file(entry_path)
    1098              :                     }
    1099            4 :                     .or_else(fs_ext::ignore_not_found),
    1100            0 :                     Err(e) => Err(e),
    1101              :                 } {
    1102            0 :                     tracing::warn!("Failed to purge stale timeline dentry {entry_path}: {e}");
    1103            4 :                 }
    1104            6 :             }
    1105              :         }
    1106              : 
    1107           84 :         Ok(())
    1108           84 :     }
    1109              : 
    1110              :     /// Get sum of all remote timelines sizes
    1111              :     ///
    1112              :     /// This function relies on the index_part instead of listing the remote storage
    1113            0 :     pub fn remote_size(&self) -> u64 {
    1114            0 :         let mut size = 0;
    1115              : 
    1116            0 :         for timeline in self.list_timelines() {
    1117            0 :             if let Some(remote_client) = &timeline.remote_client {
    1118            0 :                 size += remote_client.get_remote_physical_size();
    1119            0 :             }
    1120              :         }
    1121              : 
    1122            0 :         size
    1123            0 :     }
    1124              : 
    1125           12 :     #[instrument(skip_all, fields(timeline_id=%timeline_id))]
    1126              :     async fn load_remote_timeline(
    1127              :         &self,
    1128              :         timeline_id: TimelineId,
    1129              :         index_part: IndexPart,
    1130              :         remote_metadata: TimelineMetadata,
    1131              :         resources: TimelineResources,
    1132              :         ctx: &RequestContext,
    1133              :     ) -> anyhow::Result<()> {
    1134              :         span::debug_assert_current_span_has_tenant_id();
    1135              : 
    1136            6 :         info!("downloading index file for timeline {}", timeline_id);
    1137              :         tokio::fs::create_dir_all(self.conf.timeline_path(&self.tenant_shard_id, &timeline_id))
    1138              :             .await
    1139              :             .context("Failed to create new timeline directory")?;
    1140              : 
    1141              :         let ancestor = if let Some(ancestor_id) = remote_metadata.ancestor_timeline() {
    1142              :             let timelines = self.timelines.lock().unwrap();
    1143              :             Some(Arc::clone(timelines.get(&ancestor_id).ok_or_else(
    1144            0 :                 || {
    1145            0 :                     anyhow::anyhow!(
    1146            0 :                         "cannot find ancestor timeline {ancestor_id} for timeline {timeline_id}"
    1147            0 :                     )
    1148            0 :                 },
    1149              :             )?))
    1150              :         } else {
    1151              :             None
    1152              :         };
    1153              : 
    1154              :         // timeline loading after attach expects to find metadata file for each metadata
    1155              :         save_metadata(
    1156              :             self.conf,
    1157              :             &self.tenant_shard_id,
    1158              :             &timeline_id,
    1159              :             &remote_metadata,
    1160              :         )
    1161              :         .await
    1162              :         .context("save_metadata")
    1163              :         .map_err(LoadLocalTimelineError::Load)?;
    1164              : 
    1165              :         self.timeline_init_and_sync(
    1166              :             timeline_id,
    1167              :             resources,
    1168              :             Some(index_part),
    1169              :             remote_metadata,
    1170              :             ancestor,
    1171              :             ctx,
    1172              :         )
    1173              :         .await
    1174              :     }
    1175              : 
    1176              :     /// Create a placeholder Tenant object for a broken tenant
    1177            0 :     pub fn create_broken_tenant(
    1178            0 :         conf: &'static PageServerConf,
    1179            0 :         tenant_shard_id: TenantShardId,
    1180            0 :         reason: String,
    1181            0 :     ) -> Arc<Tenant> {
    1182            0 :         Arc::new(Tenant::new(
    1183            0 :             TenantState::Broken {
    1184            0 :                 reason,
    1185            0 :                 backtrace: String::new(),
    1186            0 :             },
    1187            0 :             conf,
    1188            0 :             AttachedTenantConf::try_from(LocationConf::default()).unwrap(),
    1189            0 :             // Shard identity isn't meaningful for a broken tenant: it's just a placeholder
    1190            0 :             // to occupy the slot for this TenantShardId.
    1191            0 :             ShardIdentity::broken(tenant_shard_id.shard_number, tenant_shard_id.shard_count),
    1192            0 :             None,
    1193            0 :             tenant_shard_id,
    1194            0 :             None,
    1195            0 :             DeletionQueueClient::broken(),
    1196            0 :         ))
    1197            0 :     }
    1198              : 
    1199           84 :     async fn load_timeline_metadata(
    1200           84 :         self: &Arc<Tenant>,
    1201           84 :         timeline_ids: HashSet<TimelineId>,
    1202           84 :         remote_storage: &GenericRemoteStorage,
    1203           84 :         cancel: CancellationToken,
    1204           84 :     ) -> anyhow::Result<HashMap<TimelineId, TimelinePreload>> {
    1205           84 :         let mut part_downloads = JoinSet::new();
    1206           90 :         for timeline_id in timeline_ids {
    1207            6 :             let client = RemoteTimelineClient::new(
    1208            6 :                 remote_storage.clone(),
    1209            6 :                 self.deletion_queue_client.clone(),
    1210            6 :                 self.conf,
    1211            6 :                 self.tenant_shard_id,
    1212            6 :                 timeline_id,
    1213            6 :                 self.generation,
    1214            6 :             );
    1215            6 :             let cancel_clone = cancel.clone();
    1216            6 :             part_downloads.spawn(
    1217            6 :                 async move {
    1218            6 :                     debug!("starting index part download");
    1219              : 
    1220           12 :                     let index_part = client.download_index_file(&cancel_clone).await;
    1221              : 
    1222            6 :                     debug!("finished index part download");
    1223              : 
    1224            6 :                     Result::<_, anyhow::Error>::Ok(TimelinePreload {
    1225            6 :                         client,
    1226            6 :                         timeline_id,
    1227            6 :                         index_part,
    1228            6 :                     })
    1229            6 :                 }
    1230            6 :                 .map(move |res| {
    1231            6 :                     res.with_context(|| format!("download index part for timeline {timeline_id}"))
    1232            6 :                 })
    1233            6 :                 .instrument(info_span!("download_index_part", %timeline_id)),
    1234              :             );
    1235              :         }
    1236              : 
    1237           84 :         let mut timeline_preloads: HashMap<TimelineId, TimelinePreload> = HashMap::new();
    1238              : 
    1239           90 :         loop {
    1240           94 :             tokio::select!(
    1241           90 :                 next = part_downloads.join_next() => {
    1242              :                     match next {
    1243              :                         Some(result) => {
    1244              :                             let preload_result = result.context("join preload task")?;
    1245              :                             let preload = preload_result?;
    1246              :                             timeline_preloads.insert(preload.timeline_id, preload);
    1247              :                         },
    1248              :                         None => {
    1249              :                             break;
    1250              :                         }
    1251              :                     }
    1252              :                 },
    1253              :                 _ = cancel.cancelled() => {
    1254              :                     anyhow::bail!("Cancelled while waiting for remote index download")
    1255              :                 }
    1256           90 :             )
    1257           90 :         }
    1258              : 
    1259           84 :         Ok(timeline_preloads)
    1260           84 :     }
    1261              : 
    1262            4 :     pub(crate) fn tenant_shard_id(&self) -> TenantShardId {
    1263            4 :         self.tenant_shard_id
    1264            4 :     }
    1265              : 
    1266              :     /// Get Timeline handle for given Neon timeline ID.
    1267              :     /// This function is idempotent. It doesn't change internal state in any way.
    1268          232 :     pub fn get_timeline(
    1269          232 :         &self,
    1270          232 :         timeline_id: TimelineId,
    1271          232 :         active_only: bool,
    1272          232 :     ) -> Result<Arc<Timeline>, GetTimelineError> {
    1273          232 :         let timelines_accessor = self.timelines.lock().unwrap();
    1274          232 :         let timeline = timelines_accessor
    1275          232 :             .get(&timeline_id)
    1276          232 :             .ok_or(GetTimelineError::NotFound {
    1277          232 :                 tenant_id: self.tenant_shard_id,
    1278          232 :                 timeline_id,
    1279          232 :             })?;
    1280              : 
    1281          230 :         if active_only && !timeline.is_active() {
    1282            0 :             Err(GetTimelineError::NotActive {
    1283            0 :                 tenant_id: self.tenant_shard_id,
    1284            0 :                 timeline_id,
    1285            0 :                 state: timeline.current_state(),
    1286            0 :             })
    1287              :         } else {
    1288          230 :             Ok(Arc::clone(timeline))
    1289              :         }
    1290          232 :     }
    1291              : 
    1292              :     /// Lists timelines the tenant contains.
    1293              :     /// Up to tenant's implementation to omit certain timelines that ar not considered ready for use.
    1294            0 :     pub fn list_timelines(&self) -> Vec<Arc<Timeline>> {
    1295            0 :         self.timelines
    1296            0 :             .lock()
    1297            0 :             .unwrap()
    1298            0 :             .values()
    1299            0 :             .map(Arc::clone)
    1300            0 :             .collect()
    1301            0 :     }
    1302              : 
    1303            0 :     pub fn list_timeline_ids(&self) -> Vec<TimelineId> {
    1304            0 :         self.timelines.lock().unwrap().keys().cloned().collect()
    1305            0 :     }
    1306              : 
    1307              :     /// This is used to create the initial 'main' timeline during bootstrapping,
    1308              :     /// or when importing a new base backup. The caller is expected to load an
    1309              :     /// initial image of the datadir to the new timeline after this.
    1310              :     ///
    1311              :     /// Until that happens, the on-disk state is invalid (disk_consistent_lsn=Lsn(0))
    1312              :     /// and the timeline will fail to load at a restart.
    1313              :     ///
    1314              :     /// That's why we add an uninit mark file, and wrap it together witht the Timeline
    1315              :     /// in-memory object into UninitializedTimeline.
    1316              :     /// Once the caller is done setting up the timeline, they should call
    1317              :     /// `UninitializedTimeline::initialize_with_lock` to remove the uninit mark.
    1318              :     ///
    1319              :     /// For tests, use `DatadirModification::init_empty_test_timeline` + `commit` to setup the
    1320              :     /// minimum amount of keys required to get a writable timeline.
    1321              :     /// (Without it, `put` might fail due to `repartition` failing.)
    1322           76 :     pub(crate) async fn create_empty_timeline(
    1323           76 :         &self,
    1324           76 :         new_timeline_id: TimelineId,
    1325           76 :         initdb_lsn: Lsn,
    1326           76 :         pg_version: u32,
    1327           76 :         _ctx: &RequestContext,
    1328           76 :     ) -> anyhow::Result<UninitializedTimeline> {
    1329           76 :         anyhow::ensure!(
    1330           76 :             self.is_active(),
    1331            0 :             "Cannot create empty timelines on inactive tenant"
    1332              :         );
    1333              : 
    1334           76 :         let timeline_uninit_mark = self.create_timeline_uninit_mark(new_timeline_id)?;
    1335           74 :         let new_metadata = TimelineMetadata::new(
    1336           74 :             // Initialize disk_consistent LSN to 0, The caller must import some data to
    1337           74 :             // make it valid, before calling finish_creation()
    1338           74 :             Lsn(0),
    1339           74 :             None,
    1340           74 :             None,
    1341           74 :             Lsn(0),
    1342           74 :             initdb_lsn,
    1343           74 :             initdb_lsn,
    1344           74 :             pg_version,
    1345           74 :         );
    1346           74 :         self.prepare_new_timeline(
    1347           74 :             new_timeline_id,
    1348           74 :             &new_metadata,
    1349           74 :             timeline_uninit_mark,
    1350           74 :             initdb_lsn,
    1351           74 :             None,
    1352           74 :         )
    1353          111 :         .await
    1354           76 :     }
    1355              : 
    1356              :     /// Helper for unit tests to create an empty timeline.
    1357              :     ///
    1358              :     /// The timeline is has state value `Active` but its background loops are not running.
    1359              :     // This makes the various functions which anyhow::ensure! for Active state work in tests.
    1360              :     // Our current tests don't need the background loops.
    1361              :     #[cfg(test)]
    1362           68 :     pub async fn create_test_timeline(
    1363           68 :         &self,
    1364           68 :         new_timeline_id: TimelineId,
    1365           68 :         initdb_lsn: Lsn,
    1366           68 :         pg_version: u32,
    1367           68 :         ctx: &RequestContext,
    1368           68 :     ) -> anyhow::Result<Arc<Timeline>> {
    1369           68 :         let uninit_tl = self
    1370           68 :             .create_empty_timeline(new_timeline_id, initdb_lsn, pg_version, ctx)
    1371          102 :             .await?;
    1372           68 :         let tline = uninit_tl.raw_timeline().expect("we just created it");
    1373           68 :         assert_eq!(tline.get_last_record_lsn(), Lsn(0));
    1374              : 
    1375              :         // Setup minimum keys required for the timeline to be usable.
    1376           68 :         let mut modification = tline.begin_modification(initdb_lsn);
    1377           68 :         modification
    1378           68 :             .init_empty_test_timeline()
    1379           68 :             .context("init_empty_test_timeline")?;
    1380           68 :         modification
    1381           68 :             .commit(ctx)
    1382           34 :             .await
    1383           68 :             .context("commit init_empty_test_timeline modification")?;
    1384              : 
    1385              :         // Flush to disk so that uninit_tl's check for valid disk_consistent_lsn passes.
    1386           68 :         tline.maybe_spawn_flush_loop();
    1387           68 :         tline.freeze_and_flush().await.context("freeze_and_flush")?;
    1388              : 
    1389              :         // Make sure the freeze_and_flush reaches remote storage.
    1390           68 :         tline
    1391           68 :             .remote_client
    1392           68 :             .as_ref()
    1393           68 :             .unwrap()
    1394           68 :             .wait_completion()
    1395           23 :             .await
    1396           68 :             .unwrap();
    1397              : 
    1398           68 :         let tl = uninit_tl.finish_creation()?;
    1399              :         // The non-test code would call tl.activate() here.
    1400           68 :         tl.set_state(TimelineState::Active);
    1401           68 :         Ok(tl)
    1402           68 :     }
    1403              : 
    1404              :     /// Create a new timeline.
    1405              :     ///
    1406              :     /// Returns the new timeline ID and reference to its Timeline object.
    1407              :     ///
    1408              :     /// If the caller specified the timeline ID to use (`new_timeline_id`), and timeline with
    1409              :     /// the same timeline ID already exists, returns CreateTimelineError::AlreadyExists.
    1410              :     #[allow(clippy::too_many_arguments)]
    1411            0 :     pub(crate) async fn create_timeline(
    1412            0 :         &self,
    1413            0 :         new_timeline_id: TimelineId,
    1414            0 :         ancestor_timeline_id: Option<TimelineId>,
    1415            0 :         mut ancestor_start_lsn: Option<Lsn>,
    1416            0 :         pg_version: u32,
    1417            0 :         load_existing_initdb: Option<TimelineId>,
    1418            0 :         broker_client: storage_broker::BrokerClientChannel,
    1419            0 :         ctx: &RequestContext,
    1420            0 :     ) -> Result<Arc<Timeline>, CreateTimelineError> {
    1421            0 :         if !self.is_active() {
    1422            0 :             if matches!(self.current_state(), TenantState::Stopping { .. }) {
    1423            0 :                 return Err(CreateTimelineError::ShuttingDown);
    1424              :             } else {
    1425            0 :                 return Err(CreateTimelineError::Other(anyhow::anyhow!(
    1426            0 :                     "Cannot create timelines on inactive tenant"
    1427            0 :                 )));
    1428              :             }
    1429            0 :         }
    1430              : 
    1431            0 :         let _gate = self
    1432            0 :             .gate
    1433            0 :             .enter()
    1434            0 :             .map_err(|_| CreateTimelineError::ShuttingDown)?;
    1435              : 
    1436              :         // Get exclusive access to the timeline ID: this ensures that it does not already exist,
    1437              :         // and that no other creation attempts will be allowed in while we are working.  The
    1438              :         // uninit_mark is a guard.
    1439            0 :         let uninit_mark = match self.create_timeline_uninit_mark(new_timeline_id) {
    1440            0 :             Ok(m) => m,
    1441              :             Err(TimelineExclusionError::AlreadyCreating) => {
    1442              :                 // Creation is in progress, we cannot create it again, and we cannot
    1443              :                 // check if this request matches the existing one, so caller must try
    1444              :                 // again later.
    1445            0 :                 return Err(CreateTimelineError::AlreadyCreating);
    1446              :             }
    1447            0 :             Err(TimelineExclusionError::Other(e)) => {
    1448            0 :                 return Err(CreateTimelineError::Other(e));
    1449              :             }
    1450            0 :             Err(TimelineExclusionError::AlreadyExists(existing)) => {
    1451            0 :                 debug!("timeline {new_timeline_id} already exists");
    1452              : 
    1453              :                 // Idempotency: creating the same timeline twice is not an error, unless
    1454              :                 // the second creation has different parameters.
    1455            0 :                 if existing.get_ancestor_timeline_id() != ancestor_timeline_id
    1456            0 :                     || existing.pg_version != pg_version
    1457            0 :                     || (ancestor_start_lsn.is_some()
    1458            0 :                         && ancestor_start_lsn != Some(existing.get_ancestor_lsn()))
    1459              :                 {
    1460            0 :                     return Err(CreateTimelineError::Conflict);
    1461            0 :                 }
    1462              : 
    1463            0 :                 if let Some(remote_client) = existing.remote_client.as_ref() {
    1464              :                     // Wait for uploads to complete, so that when we return Ok, the timeline
    1465              :                     // is known to be durable on remote storage. Just like we do at the end of
    1466              :                     // this function, after we have created the timeline ourselves.
    1467              :                     //
    1468              :                     // We only really care that the initial version of `index_part.json` has
    1469              :                     // been uploaded. That's enough to remember that the timeline
    1470              :                     // exists. However, there is no function to wait specifically for that so
    1471              :                     // we just wait for all in-progress uploads to finish.
    1472            0 :                     remote_client
    1473            0 :                         .wait_completion()
    1474            0 :                         .await
    1475            0 :                         .context("wait for timeline uploads to complete")?;
    1476            0 :                 }
    1477              : 
    1478            0 :                 return Ok(existing);
    1479              :             }
    1480              :         };
    1481              : 
    1482            0 :         let loaded_timeline = match ancestor_timeline_id {
    1483            0 :             Some(ancestor_timeline_id) => {
    1484            0 :                 let ancestor_timeline = self
    1485            0 :                     .get_timeline(ancestor_timeline_id, false)
    1486            0 :                     .context("Cannot branch off the timeline that's not present in pageserver")?;
    1487              : 
    1488              :                 // instead of waiting around, just deny the request because ancestor is not yet
    1489              :                 // ready for other purposes either.
    1490            0 :                 if !ancestor_timeline.is_active() {
    1491            0 :                     return Err(CreateTimelineError::AncestorNotActive);
    1492            0 :                 }
    1493              : 
    1494            0 :                 if let Some(lsn) = ancestor_start_lsn.as_mut() {
    1495            0 :                     *lsn = lsn.align();
    1496            0 : 
    1497            0 :                     let ancestor_ancestor_lsn = ancestor_timeline.get_ancestor_lsn();
    1498            0 :                     if ancestor_ancestor_lsn > *lsn {
    1499              :                         // can we safely just branch from the ancestor instead?
    1500            0 :                         return Err(CreateTimelineError::AncestorLsn(anyhow::anyhow!(
    1501            0 :                             "invalid start lsn {} for ancestor timeline {}: less than timeline ancestor lsn {}",
    1502            0 :                             lsn,
    1503            0 :                             ancestor_timeline_id,
    1504            0 :                             ancestor_ancestor_lsn,
    1505            0 :                         )));
    1506            0 :                     }
    1507            0 : 
    1508            0 :                     // Wait for the WAL to arrive and be processed on the parent branch up
    1509            0 :                     // to the requested branch point. The repository code itself doesn't
    1510            0 :                     // require it, but if we start to receive WAL on the new timeline,
    1511            0 :                     // decoding the new WAL might need to look up previous pages, relation
    1512            0 :                     // sizes etc. and that would get confused if the previous page versions
    1513            0 :                     // are not in the repository yet.
    1514            0 :                     ancestor_timeline
    1515            0 :                         .wait_lsn(*lsn, ctx)
    1516            0 :                         .await
    1517            0 :                         .map_err(|e| match e {
    1518            0 :                             e @ (WaitLsnError::Timeout(_) | WaitLsnError::BadState) => {
    1519            0 :                                 CreateTimelineError::AncestorLsn(anyhow::anyhow!(e))
    1520              :                             }
    1521            0 :                             WaitLsnError::Shutdown => CreateTimelineError::ShuttingDown,
    1522            0 :                         })?;
    1523            0 :                 }
    1524              : 
    1525            0 :                 self.branch_timeline(
    1526            0 :                     &ancestor_timeline,
    1527            0 :                     new_timeline_id,
    1528            0 :                     ancestor_start_lsn,
    1529            0 :                     uninit_mark,
    1530            0 :                     ctx,
    1531            0 :                 )
    1532            0 :                 .await?
    1533              :             }
    1534              :             None => {
    1535            0 :                 self.bootstrap_timeline(
    1536            0 :                     new_timeline_id,
    1537            0 :                     pg_version,
    1538            0 :                     load_existing_initdb,
    1539            0 :                     uninit_mark,
    1540            0 :                     ctx,
    1541            0 :                 )
    1542            0 :                 .await?
    1543              :             }
    1544              :         };
    1545              : 
    1546              :         // At this point we have dropped our guard on [`Self::timelines_creating`], and
    1547              :         // the timeline is visible in [`Self::timelines`], but it is _not_ durable yet.  We must
    1548              :         // not send a success to the caller until it is.  The same applies to handling retries,
    1549              :         // see the handling of [`TimelineExclusionError::AlreadyExists`] above.
    1550            0 :         if let Some(remote_client) = loaded_timeline.remote_client.as_ref() {
    1551            0 :             let kind = ancestor_timeline_id
    1552            0 :                 .map(|_| "branched")
    1553            0 :                 .unwrap_or("bootstrapped");
    1554            0 :             remote_client.wait_completion().await.with_context(|| {
    1555            0 :                 format!("wait for {} timeline initial uploads to complete", kind)
    1556            0 :             })?;
    1557            0 :         }
    1558              : 
    1559            0 :         loaded_timeline.activate(broker_client, None, ctx);
    1560            0 : 
    1561            0 :         Ok(loaded_timeline)
    1562            0 :     }
    1563              : 
    1564            0 :     pub(crate) async fn delete_timeline(
    1565            0 :         self: Arc<Self>,
    1566            0 :         timeline_id: TimelineId,
    1567            0 :     ) -> Result<(), DeleteTimelineError> {
    1568            0 :         DeleteTimelineFlow::run(&self, timeline_id, false).await?;
    1569              : 
    1570            0 :         Ok(())
    1571            0 :     }
    1572              : 
    1573              :     /// perform one garbage collection iteration, removing old data files from disk.
    1574              :     /// this function is periodically called by gc task.
    1575              :     /// also it can be explicitly requested through page server api 'do_gc' command.
    1576              :     ///
    1577              :     /// `target_timeline_id` specifies the timeline to GC, or None for all.
    1578              :     ///
    1579              :     /// The `horizon` an `pitr` parameters determine how much WAL history needs to be retained.
    1580              :     /// Also known as the retention period, or the GC cutoff point. `horizon` specifies
    1581              :     /// the amount of history, as LSN difference from current latest LSN on each timeline.
    1582              :     /// `pitr` specifies the same as a time difference from the current time. The effective
    1583              :     /// GC cutoff point is determined conservatively by either `horizon` and `pitr`, whichever
    1584              :     /// requires more history to be retained.
    1585              :     //
    1586            8 :     pub async fn gc_iteration(
    1587            8 :         &self,
    1588            8 :         target_timeline_id: Option<TimelineId>,
    1589            8 :         horizon: u64,
    1590            8 :         pitr: Duration,
    1591            8 :         cancel: &CancellationToken,
    1592            8 :         ctx: &RequestContext,
    1593            8 :     ) -> anyhow::Result<GcResult> {
    1594            8 :         // Don't start doing work during shutdown
    1595            8 :         if let TenantState::Stopping { .. } = self.current_state() {
    1596            0 :             return Ok(GcResult::default());
    1597            8 :         }
    1598            8 : 
    1599            8 :         // there is a global allowed_error for this
    1600            8 :         anyhow::ensure!(
    1601            8 :             self.is_active(),
    1602            0 :             "Cannot run GC iteration on inactive tenant"
    1603              :         );
    1604              : 
    1605              :         {
    1606            8 :             let conf = self.tenant_conf.read().unwrap();
    1607            8 : 
    1608            8 :             if !conf.location.may_delete_layers_hint() {
    1609            0 :                 info!("Skipping GC in location state {:?}", conf.location);
    1610            0 :                 return Ok(GcResult::default());
    1611            8 :             }
    1612            8 :         }
    1613            8 : 
    1614            8 :         self.gc_iteration_internal(target_timeline_id, horizon, pitr, cancel, ctx)
    1615            0 :             .await
    1616            8 :     }
    1617              : 
    1618              :     /// Perform one compaction iteration.
    1619              :     /// This function is periodically called by compactor task.
    1620              :     /// Also it can be explicitly requested per timeline through page server
    1621              :     /// api's 'compact' command.
    1622            0 :     async fn compaction_iteration(
    1623            0 :         &self,
    1624            0 :         cancel: &CancellationToken,
    1625            0 :         ctx: &RequestContext,
    1626            0 :     ) -> anyhow::Result<(), timeline::CompactionError> {
    1627            0 :         // Don't start doing work during shutdown, or when broken, we do not need those in the logs
    1628            0 :         if !self.is_active() {
    1629            0 :             return Ok(());
    1630            0 :         }
    1631            0 : 
    1632            0 :         {
    1633            0 :             let conf = self.tenant_conf.read().unwrap();
    1634            0 :             if !conf.location.may_delete_layers_hint() || !conf.location.may_upload_layers_hint() {
    1635            0 :                 info!("Skipping compaction in location state {:?}", conf.location);
    1636            0 :                 return Ok(());
    1637            0 :             }
    1638            0 :         }
    1639            0 : 
    1640            0 :         // Scan through the hashmap and collect a list of all the timelines,
    1641            0 :         // while holding the lock. Then drop the lock and actually perform the
    1642            0 :         // compactions.  We don't want to block everything else while the
    1643            0 :         // compaction runs.
    1644            0 :         let timelines_to_compact = {
    1645            0 :             let timelines = self.timelines.lock().unwrap();
    1646            0 :             let timelines_to_compact = timelines
    1647            0 :                 .iter()
    1648            0 :                 .filter_map(|(timeline_id, timeline)| {
    1649            0 :                     if timeline.is_active() {
    1650            0 :                         Some((*timeline_id, timeline.clone()))
    1651              :                     } else {
    1652            0 :                         None
    1653              :                     }
    1654            0 :                 })
    1655            0 :                 .collect::<Vec<_>>();
    1656            0 :             drop(timelines);
    1657            0 :             timelines_to_compact
    1658              :         };
    1659              : 
    1660            0 :         for (timeline_id, timeline) in &timelines_to_compact {
    1661            0 :             timeline
    1662            0 :                 .compact(cancel, EnumSet::empty(), ctx)
    1663            0 :                 .instrument(info_span!("compact_timeline", %timeline_id))
    1664            0 :                 .await?;
    1665              :         }
    1666              : 
    1667            0 :         Ok(())
    1668            0 :     }
    1669              : 
    1670           98 :     pub fn current_state(&self) -> TenantState {
    1671           98 :         self.state.borrow().clone()
    1672           98 :     }
    1673              : 
    1674           84 :     pub fn is_active(&self) -> bool {
    1675           84 :         self.current_state() == TenantState::Active
    1676           84 :     }
    1677              : 
    1678            0 :     pub fn generation(&self) -> Generation {
    1679            0 :         self.generation
    1680            0 :     }
    1681              : 
    1682            0 :     pub(crate) fn wal_redo_manager_status(&self) -> Option<WalRedoManagerStatus> {
    1683            0 :         self.walredo_mgr.as_ref().and_then(|mgr| mgr.status())
    1684            0 :     }
    1685              : 
    1686              :     /// Changes tenant status to active, unless shutdown was already requested.
    1687              :     ///
    1688              :     /// `background_jobs_can_start` is an optional barrier set to a value during pageserver startup
    1689              :     /// to delay background jobs. Background jobs can be started right away when None is given.
    1690            0 :     fn activate(
    1691            0 :         self: &Arc<Self>,
    1692            0 :         broker_client: BrokerClientChannel,
    1693            0 :         background_jobs_can_start: Option<&completion::Barrier>,
    1694            0 :         ctx: &RequestContext,
    1695            0 :     ) {
    1696            0 :         span::debug_assert_current_span_has_tenant_id();
    1697            0 : 
    1698            0 :         let mut activating = false;
    1699            0 :         self.state.send_modify(|current_state| {
    1700            0 :             use pageserver_api::models::ActivatingFrom;
    1701            0 :             match &*current_state {
    1702              :                 TenantState::Activating(_) | TenantState::Active | TenantState::Broken { .. } | TenantState::Stopping { .. } => {
    1703            0 :                     panic!("caller is responsible for calling activate() only on Loading / Attaching tenants, got {state:?}", state = current_state);
    1704              :                 }
    1705            0 :                 TenantState::Loading => {
    1706            0 :                     *current_state = TenantState::Activating(ActivatingFrom::Loading);
    1707            0 :                 }
    1708            0 :                 TenantState::Attaching => {
    1709            0 :                     *current_state = TenantState::Activating(ActivatingFrom::Attaching);
    1710            0 :                 }
    1711              :             }
    1712            0 :             debug!(tenant_id = %self.tenant_shard_id.tenant_id, shard_id = %self.tenant_shard_id.shard_slug(), "Activating tenant");
    1713            0 :             activating = true;
    1714            0 :             // Continue outside the closure. We need to grab timelines.lock()
    1715            0 :             // and we plan to turn it into a tokio::sync::Mutex in a future patch.
    1716            0 :         });
    1717            0 : 
    1718            0 :         if activating {
    1719            0 :             let timelines_accessor = self.timelines.lock().unwrap();
    1720            0 :             let timelines_to_activate = timelines_accessor
    1721            0 :                 .values()
    1722            0 :                 .filter(|timeline| !(timeline.is_broken() || timeline.is_stopping()));
    1723            0 : 
    1724            0 :             // Spawn gc and compaction loops. The loops will shut themselves
    1725            0 :             // down when they notice that the tenant is inactive.
    1726            0 :             tasks::start_background_loops(self, background_jobs_can_start);
    1727            0 : 
    1728            0 :             let mut activated_timelines = 0;
    1729              : 
    1730            0 :             for timeline in timelines_to_activate {
    1731            0 :                 timeline.activate(broker_client.clone(), background_jobs_can_start, ctx);
    1732            0 :                 activated_timelines += 1;
    1733            0 :             }
    1734              : 
    1735            0 :             self.state.send_modify(move |current_state| {
    1736            0 :                 assert!(
    1737            0 :                     matches!(current_state, TenantState::Activating(_)),
    1738            0 :                     "set_stopping and set_broken wait for us to leave Activating state",
    1739              :                 );
    1740            0 :                 *current_state = TenantState::Active;
    1741            0 : 
    1742            0 :                 let elapsed = self.constructed_at.elapsed();
    1743            0 :                 let total_timelines = timelines_accessor.len();
    1744            0 : 
    1745            0 :                 // log a lot of stuff, because some tenants sometimes suffer from user-visible
    1746            0 :                 // times to activate. see https://github.com/neondatabase/neon/issues/4025
    1747            0 :                 info!(
    1748            0 :                     since_creation_millis = elapsed.as_millis(),
    1749            0 :                     tenant_id = %self.tenant_shard_id.tenant_id,
    1750            0 :                     shard_id = %self.tenant_shard_id.shard_slug(),
    1751            0 :                     activated_timelines,
    1752            0 :                     total_timelines,
    1753            0 :                     post_state = <&'static str>::from(&*current_state),
    1754            0 :                     "activation attempt finished"
    1755            0 :                 );
    1756              : 
    1757            0 :                 TENANT.activation.observe(elapsed.as_secs_f64());
    1758            0 :             });
    1759            0 :         }
    1760            0 :     }
    1761              : 
    1762              :     /// Shutdown the tenant and join all of the spawned tasks.
    1763              :     ///
    1764              :     /// The method caters for all use-cases:
    1765              :     /// - pageserver shutdown (freeze_and_flush == true)
    1766              :     /// - detach + ignore (freeze_and_flush == false)
    1767              :     ///
    1768              :     /// This will attempt to shutdown even if tenant is broken.
    1769              :     ///
    1770              :     /// `shutdown_progress` is a [`completion::Barrier`] for the shutdown initiated by this call.
    1771              :     /// If the tenant is already shutting down, we return a clone of the first shutdown call's
    1772              :     /// `Barrier` as an `Err`. This not-first caller can use the returned barrier to join with
    1773              :     /// the ongoing shutdown.
    1774            6 :     async fn shutdown(
    1775            6 :         &self,
    1776            6 :         shutdown_progress: completion::Barrier,
    1777            6 :         freeze_and_flush: bool,
    1778            6 :     ) -> Result<(), completion::Barrier> {
    1779            6 :         span::debug_assert_current_span_has_tenant_id();
    1780            6 : 
    1781            6 :         // Set tenant (and its timlines) to Stoppping state.
    1782            6 :         //
    1783            6 :         // Since we can only transition into Stopping state after activation is complete,
    1784            6 :         // run it in a JoinSet so all tenants have a chance to stop before we get SIGKILLed.
    1785            6 :         //
    1786            6 :         // Transitioning tenants to Stopping state has a couple of non-obvious side effects:
    1787            6 :         // 1. Lock out any new requests to the tenants.
    1788            6 :         // 2. Signal cancellation to WAL receivers (we wait on it below).
    1789            6 :         // 3. Signal cancellation for other tenant background loops.
    1790            6 :         // 4. ???
    1791            6 :         //
    1792            6 :         // The waiting for the cancellation is not done uniformly.
    1793            6 :         // We certainly wait for WAL receivers to shut down.
    1794            6 :         // That is necessary so that no new data comes in before the freeze_and_flush.
    1795            6 :         // But the tenant background loops are joined-on in our caller.
    1796            6 :         // It's mesed up.
    1797            6 :         // we just ignore the failure to stop
    1798            6 : 
    1799            6 :         // If we're still attaching, fire the cancellation token early to drop out: this
    1800            6 :         // will prevent us flushing, but ensures timely shutdown if some I/O during attach
    1801            6 :         // is very slow.
    1802            6 :         if matches!(self.current_state(), TenantState::Attaching) {
    1803            0 :             self.cancel.cancel();
    1804            6 :         }
    1805              : 
    1806            6 :         match self.set_stopping(shutdown_progress, false, false).await {
    1807            6 :             Ok(()) => {}
    1808            0 :             Err(SetStoppingError::Broken) => {
    1809            0 :                 // assume that this is acceptable
    1810            0 :             }
    1811            0 :             Err(SetStoppingError::AlreadyStopping(other)) => {
    1812              :                 // give caller the option to wait for this this shutdown
    1813            0 :                 info!("Tenant::shutdown: AlreadyStopping");
    1814            0 :                 return Err(other);
    1815              :             }
    1816              :         };
    1817              : 
    1818            6 :         let mut js = tokio::task::JoinSet::new();
    1819            6 :         {
    1820            6 :             let timelines = self.timelines.lock().unwrap();
    1821            6 :             timelines.values().for_each(|timeline| {
    1822            6 :                 let timeline = Arc::clone(timeline);
    1823            6 :                 let timeline_id = timeline.timeline_id;
    1824              : 
    1825            6 :                 let span =
    1826            6 :                     tracing::info_span!("timeline_shutdown", %timeline_id, ?freeze_and_flush);
    1827            6 :                 js.spawn(async move {
    1828            6 :                     if freeze_and_flush {
    1829           13 :                         timeline.flush_and_shutdown().instrument(span).await
    1830              :                     } else {
    1831            0 :                         timeline.shutdown().instrument(span).await
    1832              :                     }
    1833            6 :                 });
    1834            6 :             })
    1835              :         };
    1836              :         // test_long_timeline_create_then_tenant_delete is leaning on this message
    1837            6 :         tracing::info!("Waiting for timelines...");
    1838           12 :         while let Some(res) = js.join_next().await {
    1839            0 :             match res {
    1840            6 :                 Ok(()) => {}
    1841            0 :                 Err(je) if je.is_cancelled() => unreachable!("no cancelling used"),
    1842            0 :                 Err(je) if je.is_panic() => { /* logged already */ }
    1843            0 :                 Err(je) => warn!("unexpected JoinError: {je:?}"),
    1844              :             }
    1845              :         }
    1846              : 
    1847              :         // We cancel the Tenant's cancellation token _after_ the timelines have all shut down.  This permits
    1848              :         // them to continue to do work during their shutdown methods, e.g. flushing data.
    1849            0 :         tracing::debug!("Cancelling CancellationToken");
    1850            6 :         self.cancel.cancel();
    1851              : 
    1852              :         // shutdown all tenant and timeline tasks: gc, compaction, page service
    1853              :         // No new tasks will be started for this tenant because it's in `Stopping` state.
    1854              :         //
    1855              :         // this will additionally shutdown and await all timeline tasks.
    1856            0 :         tracing::debug!("Waiting for tasks...");
    1857            6 :         task_mgr::shutdown_tasks(None, Some(self.tenant_shard_id), None).await;
    1858              : 
    1859              :         // Wait for any in-flight operations to complete
    1860            6 :         self.gate.close().await;
    1861              : 
    1862            6 :         Ok(())
    1863            6 :     }
    1864              : 
    1865              :     /// Change tenant status to Stopping, to mark that it is being shut down.
    1866              :     ///
    1867              :     /// This function waits for the tenant to become active if it isn't already, before transitioning it into Stopping state.
    1868              :     ///
    1869              :     /// This function is not cancel-safe!
    1870              :     ///
    1871              :     /// `allow_transition_from_loading` is needed for the special case of loading task deleting the tenant.
    1872              :     /// `allow_transition_from_attaching` is needed for the special case of attaching deleted tenant.
    1873            6 :     async fn set_stopping(
    1874            6 :         &self,
    1875            6 :         progress: completion::Barrier,
    1876            6 :         allow_transition_from_loading: bool,
    1877            6 :         allow_transition_from_attaching: bool,
    1878            6 :     ) -> Result<(), SetStoppingError> {
    1879            6 :         let mut rx = self.state.subscribe();
    1880            6 : 
    1881            6 :         // cannot stop before we're done activating, so wait out until we're done activating
    1882            6 :         rx.wait_for(|state| match state {
    1883            0 :             TenantState::Attaching if allow_transition_from_attaching => true,
    1884              :             TenantState::Activating(_) | TenantState::Attaching => {
    1885            0 :                 info!(
    1886            0 :                     "waiting for {} to turn Active|Broken|Stopping",
    1887            0 :                     <&'static str>::from(state)
    1888            0 :                 );
    1889            0 :                 false
    1890              :             }
    1891            0 :             TenantState::Loading => allow_transition_from_loading,
    1892            6 :             TenantState::Active | TenantState::Broken { .. } | TenantState::Stopping { .. } => true,
    1893            6 :         })
    1894            0 :         .await
    1895            6 :         .expect("cannot drop self.state while on a &self method");
    1896            6 : 
    1897            6 :         // we now know we're done activating, let's see whether this task is the winner to transition into Stopping
    1898            6 :         let mut err = None;
    1899            6 :         let stopping = self.state.send_if_modified(|current_state| match current_state {
    1900              :             TenantState::Activating(_) => {
    1901            0 :                 unreachable!("1we ensured above that we're done with activation, and, there is no re-activation")
    1902              :             }
    1903              :             TenantState::Attaching => {
    1904            0 :                 if !allow_transition_from_attaching {
    1905            0 :                     unreachable!("2we ensured above that we're done with activation, and, there is no re-activation")
    1906            0 :                 };
    1907            0 :                 *current_state = TenantState::Stopping { progress };
    1908            0 :                 true
    1909              :             }
    1910              :             TenantState::Loading => {
    1911            0 :                 if !allow_transition_from_loading {
    1912            0 :                     unreachable!("3we ensured above that we're done with activation, and, there is no re-activation")
    1913            0 :                 };
    1914            0 :                 *current_state = TenantState::Stopping { progress };
    1915            0 :                 true
    1916              :             }
    1917              :             TenantState::Active => {
    1918              :                 // FIXME: due to time-of-check vs time-of-use issues, it can happen that new timelines
    1919              :                 // are created after the transition to Stopping. That's harmless, as the Timelines
    1920              :                 // won't be accessible to anyone afterwards, because the Tenant is in Stopping state.
    1921            6 :                 *current_state = TenantState::Stopping { progress };
    1922            6 :                 // Continue stopping outside the closure. We need to grab timelines.lock()
    1923            6 :                 // and we plan to turn it into a tokio::sync::Mutex in a future patch.
    1924            6 :                 true
    1925              :             }
    1926            0 :             TenantState::Broken { reason, .. } => {
    1927            0 :                 info!(
    1928            0 :                     "Cannot set tenant to Stopping state, it is in Broken state due to: {reason}"
    1929            0 :                 );
    1930            0 :                 err = Some(SetStoppingError::Broken);
    1931            0 :                 false
    1932              :             }
    1933            0 :             TenantState::Stopping { progress } => {
    1934            0 :                 info!("Tenant is already in Stopping state");
    1935            0 :                 err = Some(SetStoppingError::AlreadyStopping(progress.clone()));
    1936            0 :                 false
    1937              :             }
    1938            6 :         });
    1939            6 :         match (stopping, err) {
    1940            6 :             (true, None) => {} // continue
    1941            0 :             (false, Some(err)) => return Err(err),
    1942            0 :             (true, Some(_)) => unreachable!(
    1943            0 :                 "send_if_modified closure must error out if not transitioning to Stopping"
    1944            0 :             ),
    1945            0 :             (false, None) => unreachable!(
    1946            0 :                 "send_if_modified closure must return true if transitioning to Stopping"
    1947            0 :             ),
    1948              :         }
    1949              : 
    1950            6 :         let timelines_accessor = self.timelines.lock().unwrap();
    1951            6 :         let not_broken_timelines = timelines_accessor
    1952            6 :             .values()
    1953            6 :             .filter(|timeline| !timeline.is_broken());
    1954           12 :         for timeline in not_broken_timelines {
    1955            6 :             timeline.set_state(TimelineState::Stopping);
    1956            6 :         }
    1957            6 :         Ok(())
    1958            6 :     }
    1959              : 
    1960              :     /// Method for tenant::mgr to transition us into Broken state in case of a late failure in
    1961              :     /// `remove_tenant_from_memory`
    1962              :     ///
    1963              :     /// This function waits for the tenant to become active if it isn't already, before transitioning it into Stopping state.
    1964              :     ///
    1965              :     /// In tests, we also use this to set tenants to Broken state on purpose.
    1966            0 :     pub(crate) async fn set_broken(&self, reason: String) {
    1967            0 :         let mut rx = self.state.subscribe();
    1968            0 : 
    1969            0 :         // The load & attach routines own the tenant state until it has reached `Active`.
    1970            0 :         // So, wait until it's done.
    1971            0 :         rx.wait_for(|state| match state {
    1972              :             TenantState::Activating(_) | TenantState::Loading | TenantState::Attaching => {
    1973            0 :                 info!(
    1974            0 :                     "waiting for {} to turn Active|Broken|Stopping",
    1975            0 :                     <&'static str>::from(state)
    1976            0 :                 );
    1977            0 :                 false
    1978              :             }
    1979            0 :             TenantState::Active | TenantState::Broken { .. } | TenantState::Stopping { .. } => true,
    1980            0 :         })
    1981            0 :         .await
    1982            0 :         .expect("cannot drop self.state while on a &self method");
    1983            0 : 
    1984            0 :         // we now know we're done activating, let's see whether this task is the winner to transition into Broken
    1985            0 :         self.set_broken_no_wait(reason)
    1986            0 :     }
    1987              : 
    1988            0 :     pub(crate) fn set_broken_no_wait(&self, reason: impl Display) {
    1989            0 :         let reason = reason.to_string();
    1990            0 :         self.state.send_modify(|current_state| {
    1991            0 :             match *current_state {
    1992              :                 TenantState::Activating(_) | TenantState::Loading | TenantState::Attaching => {
    1993            0 :                     unreachable!("we ensured above that we're done with activation, and, there is no re-activation")
    1994              :                 }
    1995              :                 TenantState::Active => {
    1996            0 :                     if cfg!(feature = "testing") {
    1997            0 :                         warn!("Changing Active tenant to Broken state, reason: {}", reason);
    1998            0 :                         *current_state = TenantState::broken_from_reason(reason);
    1999              :                     } else {
    2000            0 :                         unreachable!("not allowed to call set_broken on Active tenants in non-testing builds")
    2001              :                     }
    2002              :                 }
    2003              :                 TenantState::Broken { .. } => {
    2004            0 :                     warn!("Tenant is already in Broken state");
    2005              :                 }
    2006              :                 // This is the only "expected" path, any other path is a bug.
    2007              :                 TenantState::Stopping { .. } => {
    2008            0 :                     warn!(
    2009            0 :                         "Marking Stopping tenant as Broken state, reason: {}",
    2010            0 :                         reason
    2011            0 :                     );
    2012            0 :                     *current_state = TenantState::broken_from_reason(reason);
    2013              :                 }
    2014              :            }
    2015            0 :         });
    2016            0 :     }
    2017              : 
    2018            0 :     pub fn subscribe_for_state_updates(&self) -> watch::Receiver<TenantState> {
    2019            0 :         self.state.subscribe()
    2020            0 :     }
    2021              : 
    2022              :     /// The activate_now semaphore is initialized with zero units.  As soon as
    2023              :     /// we add a unit, waiters will be able to acquire a unit and proceed.
    2024            0 :     pub(crate) fn activate_now(&self) {
    2025            0 :         self.activate_now_sem.add_permits(1);
    2026            0 :     }
    2027              : 
    2028            0 :     pub(crate) async fn wait_to_become_active(
    2029            0 :         &self,
    2030            0 :         timeout: Duration,
    2031            0 :     ) -> Result<(), GetActiveTenantError> {
    2032            0 :         let mut receiver = self.state.subscribe();
    2033            0 :         loop {
    2034            0 :             let current_state = receiver.borrow_and_update().clone();
    2035            0 :             match current_state {
    2036              :                 TenantState::Loading | TenantState::Attaching | TenantState::Activating(_) => {
    2037              :                     // in these states, there's a chance that we can reach ::Active
    2038            0 :                     self.activate_now();
    2039            0 :                     match timeout_cancellable(timeout, &self.cancel, receiver.changed()).await {
    2040            0 :                         Ok(r) => {
    2041            0 :                             r.map_err(
    2042            0 :                             |_e: tokio::sync::watch::error::RecvError|
    2043              :                                 // Tenant existed but was dropped: report it as non-existent
    2044            0 :                                 GetActiveTenantError::NotFound(GetTenantError::NotFound(self.tenant_shard_id.tenant_id))
    2045            0 :                         )?
    2046              :                         }
    2047              :                         Err(TimeoutCancellableError::Cancelled) => {
    2048            0 :                             return Err(GetActiveTenantError::Cancelled);
    2049              :                         }
    2050              :                         Err(TimeoutCancellableError::Timeout) => {
    2051            0 :                             return Err(GetActiveTenantError::WaitForActiveTimeout {
    2052            0 :                                 latest_state: Some(self.current_state()),
    2053            0 :                                 wait_time: timeout,
    2054            0 :                             });
    2055              :                         }
    2056              :                     }
    2057              :                 }
    2058              :                 TenantState::Active { .. } => {
    2059            0 :                     return Ok(());
    2060              :                 }
    2061              :                 TenantState::Broken { .. } | TenantState::Stopping { .. } => {
    2062              :                     // There's no chance the tenant can transition back into ::Active
    2063            0 :                     return Err(GetActiveTenantError::WillNotBecomeActive(current_state));
    2064              :                 }
    2065              :             }
    2066              :         }
    2067            0 :     }
    2068              : 
    2069            0 :     pub(crate) fn get_attach_mode(&self) -> AttachmentMode {
    2070            0 :         self.tenant_conf.read().unwrap().location.attach_mode
    2071            0 :     }
    2072              : 
    2073              :     /// For API access: generate a LocationConfig equivalent to the one that would be used to
    2074              :     /// create a Tenant in the same state.  Do not use this in hot paths: it's for relatively
    2075              :     /// rare external API calls, like a reconciliation at startup.
    2076            0 :     pub(crate) fn get_location_conf(&self) -> models::LocationConfig {
    2077            0 :         let conf = self.tenant_conf.read().unwrap();
    2078              : 
    2079            0 :         let location_config_mode = match conf.location.attach_mode {
    2080            0 :             AttachmentMode::Single => models::LocationConfigMode::AttachedSingle,
    2081            0 :             AttachmentMode::Multi => models::LocationConfigMode::AttachedMulti,
    2082            0 :             AttachmentMode::Stale => models::LocationConfigMode::AttachedStale,
    2083              :         };
    2084              : 
    2085              :         // We have a pageserver TenantConf, we need the API-facing TenantConfig.
    2086            0 :         let tenant_config: models::TenantConfig = conf.tenant_conf.clone().into();
    2087            0 : 
    2088            0 :         models::LocationConfig {
    2089            0 :             mode: location_config_mode,
    2090            0 :             generation: self.generation.into(),
    2091            0 :             secondary_conf: None,
    2092            0 :             shard_number: self.shard_identity.number.0,
    2093            0 :             shard_count: self.shard_identity.count.literal(),
    2094            0 :             shard_stripe_size: self.shard_identity.stripe_size.0,
    2095            0 :             tenant_conf: tenant_config,
    2096            0 :         }
    2097            0 :     }
    2098              : 
    2099            0 :     pub(crate) fn get_tenant_shard_id(&self) -> &TenantShardId {
    2100            0 :         &self.tenant_shard_id
    2101            0 :     }
    2102              : 
    2103            0 :     pub(crate) fn get_generation(&self) -> Generation {
    2104            0 :         self.generation
    2105            0 :     }
    2106              : 
    2107              :     /// This function partially shuts down the tenant (it shuts down the Timelines) and is fallible,
    2108              :     /// and can leave the tenant in a bad state if it fails.  The caller is responsible for
    2109              :     /// resetting this tenant to a valid state if we fail.
    2110            0 :     pub(crate) async fn split_prepare(
    2111            0 :         &self,
    2112            0 :         child_shards: &Vec<TenantShardId>,
    2113            0 :     ) -> anyhow::Result<()> {
    2114            0 :         let timelines = self.timelines.lock().unwrap().clone();
    2115            0 :         for timeline in timelines.values() {
    2116            0 :             let Some(tl_client) = &timeline.remote_client else {
    2117            0 :                 anyhow::bail!("Remote storage is mandatory");
    2118              :             };
    2119              : 
    2120            0 :             let Some(remote_storage) = &self.remote_storage else {
    2121            0 :                 anyhow::bail!("Remote storage is mandatory");
    2122              :             };
    2123              : 
    2124              :             // We do not block timeline creation/deletion during splits inside the pageserver: it is up to higher levels
    2125              :             // to ensure that they do not start a split if currently in the process of doing these.
    2126              : 
    2127              :             // Upload an index from the parent: this is partly to provide freshness for the
    2128              :             // child tenants that will copy it, and partly for general ease-of-debugging: there will
    2129              :             // always be a parent shard index in the same generation as we wrote the child shard index.
    2130            0 :             tl_client.schedule_index_upload_for_file_changes()?;
    2131            0 :             tl_client.wait_completion().await?;
    2132              : 
    2133              :             // Shut down the timeline's remote client: this means that the indices we write
    2134              :             // for child shards will not be invalidated by the parent shard deleting layers.
    2135            0 :             tl_client.shutdown().await?;
    2136              : 
    2137              :             // Download methods can still be used after shutdown, as they don't flow through the remote client's
    2138              :             // queue.  In principal the RemoteTimelineClient could provide this without downloading it, but this
    2139              :             // operation is rare, so it's simpler to just download it (and robustly guarantees that the index
    2140              :             // we use here really is the remotely persistent one).
    2141            0 :             let result = tl_client
    2142            0 :                 .download_index_file(&self.cancel)
    2143            0 :                 .instrument(info_span!("download_index_file", tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug(), timeline_id=%timeline.timeline_id))
    2144            0 :                 .await?;
    2145            0 :             let index_part = match result {
    2146              :                 MaybeDeletedIndexPart::Deleted(_) => {
    2147            0 :                     anyhow::bail!("Timeline deletion happened concurrently with split")
    2148              :                 }
    2149            0 :                 MaybeDeletedIndexPart::IndexPart(p) => p,
    2150              :             };
    2151              : 
    2152            0 :             for child_shard in child_shards {
    2153            0 :                 upload_index_part(
    2154            0 :                     remote_storage,
    2155            0 :                     child_shard,
    2156            0 :                     &timeline.timeline_id,
    2157            0 :                     self.generation,
    2158            0 :                     &index_part,
    2159            0 :                     &self.cancel,
    2160            0 :                 )
    2161            0 :                 .await?;
    2162              :             }
    2163              :         }
    2164              : 
    2165            0 :         Ok(())
    2166            0 :     }
    2167              : }
    2168              : 
    2169              : /// Given a Vec of timelines and their ancestors (timeline_id, ancestor_id),
    2170              : /// perform a topological sort, so that the parent of each timeline comes
    2171              : /// before the children.
    2172              : /// E extracts the ancestor from T
    2173              : /// This allows for T to be different. It can be TimelineMetadata, can be Timeline itself, etc.
    2174           84 : fn tree_sort_timelines<T, E>(
    2175           84 :     timelines: HashMap<TimelineId, T>,
    2176           84 :     extractor: E,
    2177           84 : ) -> anyhow::Result<Vec<(TimelineId, T)>>
    2178           84 : where
    2179           84 :     E: Fn(&T) -> Option<TimelineId>,
    2180           84 : {
    2181           84 :     let mut result = Vec::with_capacity(timelines.len());
    2182           84 : 
    2183           84 :     let mut now = Vec::with_capacity(timelines.len());
    2184           84 :     // (ancestor, children)
    2185           84 :     let mut later: HashMap<TimelineId, Vec<(TimelineId, T)>> =
    2186           84 :         HashMap::with_capacity(timelines.len());
    2187              : 
    2188           90 :     for (timeline_id, value) in timelines {
    2189            6 :         if let Some(ancestor_id) = extractor(&value) {
    2190            2 :             let children = later.entry(ancestor_id).or_default();
    2191            2 :             children.push((timeline_id, value));
    2192            4 :         } else {
    2193            4 :             now.push((timeline_id, value));
    2194            4 :         }
    2195              :     }
    2196              : 
    2197           90 :     while let Some((timeline_id, metadata)) = now.pop() {
    2198            6 :         result.push((timeline_id, metadata));
    2199              :         // All children of this can be loaded now
    2200            6 :         if let Some(mut children) = later.remove(&timeline_id) {
    2201            2 :             now.append(&mut children);
    2202            4 :         }
    2203              :     }
    2204              : 
    2205              :     // All timelines should be visited now. Unless there were timelines with missing ancestors.
    2206           84 :     if !later.is_empty() {
    2207            0 :         for (missing_id, orphan_ids) in later {
    2208            0 :             for (orphan_id, _) in orphan_ids {
    2209            0 :                 error!("could not load timeline {orphan_id} because its ancestor timeline {missing_id} could not be loaded");
    2210              :             }
    2211              :         }
    2212            0 :         bail!("could not load tenant because some timelines are missing ancestors");
    2213           84 :     }
    2214           84 : 
    2215           84 :     Ok(result)
    2216           84 : }
    2217              : 
    2218              : impl Tenant {
    2219            0 :     pub fn tenant_specific_overrides(&self) -> TenantConfOpt {
    2220            0 :         self.tenant_conf.read().unwrap().tenant_conf.clone()
    2221            0 :     }
    2222              : 
    2223            0 :     pub fn effective_config(&self) -> TenantConf {
    2224            0 :         self.tenant_specific_overrides()
    2225            0 :             .merge(self.conf.default_tenant_conf.clone())
    2226            0 :     }
    2227              : 
    2228            0 :     pub fn get_checkpoint_distance(&self) -> u64 {
    2229            0 :         let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf.clone();
    2230            0 :         tenant_conf
    2231            0 :             .checkpoint_distance
    2232            0 :             .unwrap_or(self.conf.default_tenant_conf.checkpoint_distance)
    2233            0 :     }
    2234              : 
    2235            0 :     pub fn get_checkpoint_timeout(&self) -> Duration {
    2236            0 :         let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf.clone();
    2237            0 :         tenant_conf
    2238            0 :             .checkpoint_timeout
    2239            0 :             .unwrap_or(self.conf.default_tenant_conf.checkpoint_timeout)
    2240            0 :     }
    2241              : 
    2242            0 :     pub fn get_compaction_target_size(&self) -> u64 {
    2243            0 :         let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf.clone();
    2244            0 :         tenant_conf
    2245            0 :             .compaction_target_size
    2246            0 :             .unwrap_or(self.conf.default_tenant_conf.compaction_target_size)
    2247            0 :     }
    2248              : 
    2249            0 :     pub fn get_compaction_period(&self) -> Duration {
    2250            0 :         let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf.clone();
    2251            0 :         tenant_conf
    2252            0 :             .compaction_period
    2253            0 :             .unwrap_or(self.conf.default_tenant_conf.compaction_period)
    2254            0 :     }
    2255              : 
    2256            0 :     pub fn get_compaction_threshold(&self) -> usize {
    2257            0 :         let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf.clone();
    2258            0 :         tenant_conf
    2259            0 :             .compaction_threshold
    2260            0 :             .unwrap_or(self.conf.default_tenant_conf.compaction_threshold)
    2261            0 :     }
    2262              : 
    2263            0 :     pub fn get_gc_horizon(&self) -> u64 {
    2264            0 :         let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf.clone();
    2265            0 :         tenant_conf
    2266            0 :             .gc_horizon
    2267            0 :             .unwrap_or(self.conf.default_tenant_conf.gc_horizon)
    2268            0 :     }
    2269              : 
    2270            0 :     pub fn get_gc_period(&self) -> Duration {
    2271            0 :         let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf.clone();
    2272            0 :         tenant_conf
    2273            0 :             .gc_period
    2274            0 :             .unwrap_or(self.conf.default_tenant_conf.gc_period)
    2275            0 :     }
    2276              : 
    2277            0 :     pub fn get_image_creation_threshold(&self) -> usize {
    2278            0 :         let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf.clone();
    2279            0 :         tenant_conf
    2280            0 :             .image_creation_threshold
    2281            0 :             .unwrap_or(self.conf.default_tenant_conf.image_creation_threshold)
    2282            0 :     }
    2283              : 
    2284            0 :     pub fn get_pitr_interval(&self) -> Duration {
    2285            0 :         let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf.clone();
    2286            0 :         tenant_conf
    2287            0 :             .pitr_interval
    2288            0 :             .unwrap_or(self.conf.default_tenant_conf.pitr_interval)
    2289            0 :     }
    2290              : 
    2291            0 :     pub fn get_trace_read_requests(&self) -> bool {
    2292            0 :         let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf.clone();
    2293            0 :         tenant_conf
    2294            0 :             .trace_read_requests
    2295            0 :             .unwrap_or(self.conf.default_tenant_conf.trace_read_requests)
    2296            0 :     }
    2297              : 
    2298            0 :     pub fn get_min_resident_size_override(&self) -> Option<u64> {
    2299            0 :         let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf.clone();
    2300            0 :         tenant_conf
    2301            0 :             .min_resident_size_override
    2302            0 :             .or(self.conf.default_tenant_conf.min_resident_size_override)
    2303            0 :     }
    2304              : 
    2305            0 :     pub fn get_heatmap_period(&self) -> Option<Duration> {
    2306            0 :         let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf.clone();
    2307            0 :         let heatmap_period = tenant_conf
    2308            0 :             .heatmap_period
    2309            0 :             .unwrap_or(self.conf.default_tenant_conf.heatmap_period);
    2310            0 :         if heatmap_period.is_zero() {
    2311            0 :             None
    2312              :         } else {
    2313            0 :             Some(heatmap_period)
    2314              :         }
    2315            0 :     }
    2316              : 
    2317            0 :     pub fn set_new_tenant_config(&self, new_tenant_conf: TenantConfOpt) {
    2318            0 :         self.tenant_conf.write().unwrap().tenant_conf = new_tenant_conf;
    2319            0 :         self.tenant_conf_updated();
    2320            0 :         // Don't hold self.timelines.lock() during the notifies.
    2321            0 :         // There's no risk of deadlock right now, but there could be if we consolidate
    2322            0 :         // mutexes in struct Timeline in the future.
    2323            0 :         let timelines = self.list_timelines();
    2324            0 :         for timeline in timelines {
    2325            0 :             timeline.tenant_conf_updated();
    2326            0 :         }
    2327            0 :     }
    2328              : 
    2329            0 :     pub(crate) fn set_new_location_config(&self, new_conf: AttachedTenantConf) {
    2330            0 :         *self.tenant_conf.write().unwrap() = new_conf;
    2331            0 :         self.tenant_conf_updated();
    2332            0 :         // Don't hold self.timelines.lock() during the notifies.
    2333            0 :         // There's no risk of deadlock right now, but there could be if we consolidate
    2334            0 :         // mutexes in struct Timeline in the future.
    2335            0 :         let timelines = self.list_timelines();
    2336            0 :         for timeline in timelines {
    2337            0 :             timeline.tenant_conf_updated();
    2338            0 :         }
    2339            0 :     }
    2340              : 
    2341           84 :     fn get_timeline_get_throttle_config(
    2342           84 :         psconf: &'static PageServerConf,
    2343           84 :         overrides: &TenantConfOpt,
    2344           84 :     ) -> throttle::Config {
    2345           84 :         overrides
    2346           84 :             .timeline_get_throttle
    2347           84 :             .clone()
    2348           84 :             .unwrap_or(psconf.default_tenant_conf.timeline_get_throttle.clone())
    2349           84 :     }
    2350              : 
    2351            0 :     pub(crate) fn tenant_conf_updated(&self) {
    2352            0 :         let conf = {
    2353            0 :             let guard = self.tenant_conf.read().unwrap();
    2354            0 :             Self::get_timeline_get_throttle_config(self.conf, &guard.tenant_conf)
    2355            0 :         };
    2356            0 :         self.timeline_get_throttle.reconfigure(conf)
    2357            0 :     }
    2358              : 
    2359              :     /// Helper function to create a new Timeline struct.
    2360              :     ///
    2361              :     /// The returned Timeline is in Loading state. The caller is responsible for
    2362              :     /// initializing any on-disk state, and for inserting the Timeline to the 'timelines'
    2363              :     /// map.
    2364              :     ///
    2365              :     /// `validate_ancestor == false` is used when a timeline is created for deletion
    2366              :     /// and we might not have the ancestor present anymore which is fine for to be
    2367              :     /// deleted timelines.
    2368          292 :     fn create_timeline_struct(
    2369          292 :         &self,
    2370          292 :         new_timeline_id: TimelineId,
    2371          292 :         new_metadata: &TimelineMetadata,
    2372          292 :         ancestor: Option<Arc<Timeline>>,
    2373          292 :         resources: TimelineResources,
    2374          292 :         cause: CreateTimelineCause,
    2375          292 :     ) -> anyhow::Result<Arc<Timeline>> {
    2376          292 :         let state = match cause {
    2377              :             CreateTimelineCause::Load => {
    2378          292 :                 let ancestor_id = new_metadata.ancestor_timeline();
    2379          292 :                 anyhow::ensure!(
    2380          292 :                     ancestor_id == ancestor.as_ref().map(|t| t.timeline_id),
    2381            0 :                     "Timeline's {new_timeline_id} ancestor {ancestor_id:?} was not found"
    2382              :                 );
    2383          292 :                 TimelineState::Loading
    2384              :             }
    2385            0 :             CreateTimelineCause::Delete => TimelineState::Stopping,
    2386              :         };
    2387              : 
    2388          292 :         let pg_version = new_metadata.pg_version();
    2389          292 : 
    2390          292 :         let timeline = Timeline::new(
    2391          292 :             self.conf,
    2392          292 :             Arc::clone(&self.tenant_conf),
    2393          292 :             new_metadata,
    2394          292 :             ancestor,
    2395          292 :             new_timeline_id,
    2396          292 :             self.tenant_shard_id,
    2397          292 :             self.generation,
    2398          292 :             self.shard_identity,
    2399          292 :             self.walredo_mgr.as_ref().map(Arc::clone),
    2400          292 :             resources,
    2401          292 :             pg_version,
    2402          292 :             state,
    2403          292 :             self.cancel.child_token(),
    2404          292 :         );
    2405          292 : 
    2406          292 :         Ok(timeline)
    2407          292 :     }
    2408              : 
    2409              :     // Allow too_many_arguments because a constructor's argument list naturally grows with the
    2410              :     // number of attributes in the struct: breaking these out into a builder wouldn't be helpful.
    2411              :     #[allow(clippy::too_many_arguments)]
    2412           84 :     fn new(
    2413           84 :         state: TenantState,
    2414           84 :         conf: &'static PageServerConf,
    2415           84 :         attached_conf: AttachedTenantConf,
    2416           84 :         shard_identity: ShardIdentity,
    2417           84 :         walredo_mgr: Option<Arc<WalRedoManager>>,
    2418           84 :         tenant_shard_id: TenantShardId,
    2419           84 :         remote_storage: Option<GenericRemoteStorage>,
    2420           84 :         deletion_queue_client: DeletionQueueClient,
    2421           84 :     ) -> Tenant {
    2422           84 :         let (state, mut rx) = watch::channel(state);
    2423           84 : 
    2424           84 :         tokio::spawn(async move {
    2425           80 :             // reflect tenant state in metrics:
    2426           80 :             // - global per tenant state: TENANT_STATE_METRIC
    2427           80 :             // - "set" of broken tenants: BROKEN_TENANTS_SET
    2428           80 :             //
    2429           80 :             // set of broken tenants should not have zero counts so that it remains accessible for
    2430           80 :             // alerting.
    2431           80 : 
    2432           80 :             let tid = tenant_shard_id.to_string();
    2433           80 :             let shard_id = tenant_shard_id.shard_slug().to_string();
    2434           80 :             let set_key = &[tid.as_str(), shard_id.as_str()][..];
    2435           80 : 
    2436           86 :             fn inspect_state(state: &TenantState) -> ([&'static str; 1], bool) {
    2437           86 :                 ([state.into()], matches!(state, TenantState::Broken { .. }))
    2438           86 :             }
    2439           80 : 
    2440           80 :             let mut tuple = inspect_state(&rx.borrow_and_update());
    2441           80 : 
    2442           80 :             let is_broken = tuple.1;
    2443           80 :             let mut counted_broken = if is_broken {
    2444              :                 // add the id to the set right away, there should not be any updates on the channel
    2445              :                 // after before tenant is removed, if ever
    2446            0 :                 BROKEN_TENANTS_SET.with_label_values(set_key).set(1);
    2447            0 :                 true
    2448              :             } else {
    2449           80 :                 false
    2450              :             };
    2451              : 
    2452           86 :             loop {
    2453           86 :                 let labels = &tuple.0;
    2454           86 :                 let current = TENANT_STATE_METRIC.with_label_values(labels);
    2455           86 :                 current.inc();
    2456           86 : 
    2457           86 :                 if rx.changed().await.is_err() {
    2458              :                     // tenant has been dropped
    2459            4 :                     current.dec();
    2460            4 :                     drop(BROKEN_TENANTS_SET.remove_label_values(set_key));
    2461            4 :                     break;
    2462            6 :                 }
    2463            6 : 
    2464            6 :                 current.dec();
    2465            6 :                 tuple = inspect_state(&rx.borrow_and_update());
    2466            6 : 
    2467            6 :                 let is_broken = tuple.1;
    2468            6 :                 if is_broken && !counted_broken {
    2469            0 :                     counted_broken = true;
    2470            0 :                     // insert the tenant_id (back) into the set while avoiding needless counter
    2471            0 :                     // access
    2472            0 :                     BROKEN_TENANTS_SET.with_label_values(set_key).set(1);
    2473            6 :                 }
    2474              :             }
    2475           84 :         });
    2476           84 : 
    2477           84 :         Tenant {
    2478           84 :             tenant_shard_id,
    2479           84 :             shard_identity,
    2480           84 :             generation: attached_conf.location.generation,
    2481           84 :             conf,
    2482           84 :             // using now here is good enough approximation to catch tenants with really long
    2483           84 :             // activation times.
    2484           84 :             constructed_at: Instant::now(),
    2485           84 :             timelines: Mutex::new(HashMap::new()),
    2486           84 :             timelines_creating: Mutex::new(HashSet::new()),
    2487           84 :             gc_cs: tokio::sync::Mutex::new(()),
    2488           84 :             walredo_mgr,
    2489           84 :             remote_storage,
    2490           84 :             deletion_queue_client,
    2491           84 :             state,
    2492           84 :             cached_logical_sizes: tokio::sync::Mutex::new(HashMap::new()),
    2493           84 :             cached_synthetic_tenant_size: Arc::new(AtomicU64::new(0)),
    2494           84 :             eviction_task_tenant_state: tokio::sync::Mutex::new(EvictionTaskTenantState::default()),
    2495           84 :             activate_now_sem: tokio::sync::Semaphore::new(0),
    2496           84 :             delete_progress: Arc::new(tokio::sync::Mutex::new(DeleteTenantFlow::default())),
    2497           84 :             cancel: CancellationToken::default(),
    2498           84 :             gate: Gate::default(),
    2499           84 :             timeline_get_throttle: Arc::new(throttle::Throttle::new(
    2500           84 :                 Tenant::get_timeline_get_throttle_config(conf, &attached_conf.tenant_conf),
    2501           84 :                 &crate::metrics::tenant_throttling::TIMELINE_GET,
    2502           84 :             )),
    2503           84 :             tenant_conf: Arc::new(RwLock::new(attached_conf)),
    2504           84 :         }
    2505           84 :     }
    2506              : 
    2507              :     /// Locate and load config
    2508            0 :     pub(super) fn load_tenant_config(
    2509            0 :         conf: &'static PageServerConf,
    2510            0 :         tenant_shard_id: &TenantShardId,
    2511            0 :     ) -> anyhow::Result<LocationConf> {
    2512            0 :         let legacy_config_path = conf.tenant_config_path(tenant_shard_id);
    2513            0 :         let config_path = conf.tenant_location_config_path(tenant_shard_id);
    2514            0 : 
    2515            0 :         if config_path.exists() {
    2516              :             // New-style config takes precedence
    2517            0 :             let deserialized = Self::read_config(&config_path)?;
    2518            0 :             Ok(toml_edit::de::from_document::<LocationConf>(deserialized)?)
    2519            0 :         } else if legacy_config_path.exists() {
    2520              :             // Upgrade path: found an old-style configuration only
    2521            0 :             let deserialized = Self::read_config(&legacy_config_path)?;
    2522              : 
    2523            0 :             let mut tenant_conf = TenantConfOpt::default();
    2524            0 :             for (key, item) in deserialized.iter() {
    2525            0 :                 match key {
    2526            0 :                     "tenant_config" => {
    2527            0 :                         tenant_conf = TenantConfOpt::try_from(item.to_owned()).context(format!("Failed to parse config from file '{legacy_config_path}' as pageserver config"))?;
    2528              :                     }
    2529            0 :                     _ => bail!(
    2530            0 :                         "config file {legacy_config_path} has unrecognized pageserver option '{key}'"
    2531            0 :                     ),
    2532              :                 }
    2533              :             }
    2534              : 
    2535              :             // Legacy configs are implicitly in attached state, and do not support sharding
    2536            0 :             Ok(LocationConf::attached_single(
    2537            0 :                 tenant_conf,
    2538            0 :                 Generation::none(),
    2539            0 :                 &models::ShardParameters::default(),
    2540            0 :             ))
    2541              :         } else {
    2542              :             // FIXME If the config file is not found, assume that we're attaching
    2543              :             // a detached tenant and config is passed via attach command.
    2544              :             // https://github.com/neondatabase/neon/issues/1555
    2545              :             // OR: we're loading after incomplete deletion that managed to remove config.
    2546            0 :             info!(
    2547            0 :                 "tenant config not found in {} or {}",
    2548            0 :                 config_path, legacy_config_path
    2549            0 :             );
    2550            0 :             Ok(LocationConf::default())
    2551              :         }
    2552            0 :     }
    2553              : 
    2554            0 :     fn read_config(path: &Utf8Path) -> anyhow::Result<toml_edit::Document> {
    2555            0 :         info!("loading tenant configuration from {path}");
    2556              : 
    2557              :         // load and parse file
    2558            0 :         let config = fs::read_to_string(path)
    2559            0 :             .with_context(|| format!("Failed to load config from path '{path}'"))?;
    2560              : 
    2561            0 :         config
    2562            0 :             .parse::<toml_edit::Document>()
    2563            0 :             .with_context(|| format!("Failed to parse config from file '{path}' as toml file"))
    2564            0 :     }
    2565              : 
    2566            0 :     #[tracing::instrument(skip_all, fields(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug()))]
    2567              :     pub(super) async fn persist_tenant_config(
    2568              :         conf: &'static PageServerConf,
    2569              :         tenant_shard_id: &TenantShardId,
    2570              :         location_conf: &LocationConf,
    2571              :     ) -> anyhow::Result<()> {
    2572              :         let legacy_config_path = conf.tenant_config_path(tenant_shard_id);
    2573              :         let config_path = conf.tenant_location_config_path(tenant_shard_id);
    2574              : 
    2575              :         Self::persist_tenant_config_at(
    2576              :             tenant_shard_id,
    2577              :             &config_path,
    2578              :             &legacy_config_path,
    2579              :             location_conf,
    2580              :         )
    2581              :         .await
    2582              :     }
    2583              : 
    2584            0 :     #[tracing::instrument(skip_all, fields(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug()))]
    2585              :     pub(super) async fn persist_tenant_config_at(
    2586              :         tenant_shard_id: &TenantShardId,
    2587              :         config_path: &Utf8Path,
    2588              :         legacy_config_path: &Utf8Path,
    2589              :         location_conf: &LocationConf,
    2590              :     ) -> anyhow::Result<()> {
    2591              :         // Forward compat: write out an old-style configuration that old versions can read, in case we roll back
    2592              :         Self::persist_tenant_config_legacy(
    2593              :             tenant_shard_id,
    2594              :             legacy_config_path,
    2595              :             &location_conf.tenant_conf,
    2596              :         )
    2597              :         .await?;
    2598              : 
    2599              :         if let LocationMode::Attached(attach_conf) = &location_conf.mode {
    2600              :             // Once we use LocationMode, generations are mandatory.  If we aren't using generations,
    2601              :             // then drop out after writing legacy-style config.
    2602              :             if attach_conf.generation.is_none() {
    2603            0 :                 tracing::debug!("Running without generations, not writing new-style LocationConf");
    2604              :                 return Ok(());
    2605              :             }
    2606              :         }
    2607              : 
    2608            0 :         debug!("persisting tenantconf to {config_path}");
    2609              : 
    2610              :         let mut conf_content = r#"# This file contains a specific per-tenant's config.
    2611              : #  It is read in case of pageserver restart.
    2612              : "#
    2613              :         .to_string();
    2614              : 
    2615            0 :         fail::fail_point!("tenant-config-before-write", |_| {
    2616            0 :             anyhow::bail!("tenant-config-before-write");
    2617            0 :         });
    2618              : 
    2619              :         // Convert the config to a toml file.
    2620              :         conf_content += &toml_edit::ser::to_string_pretty(&location_conf)?;
    2621              : 
    2622              :         let temp_path = path_with_suffix_extension(config_path, TEMP_FILE_SUFFIX);
    2623              : 
    2624              :         let tenant_shard_id = *tenant_shard_id;
    2625              :         let config_path = config_path.to_owned();
    2626            0 :         tokio::task::spawn_blocking(move || {
    2627            0 :             Handle::current().block_on(async move {
    2628            0 :                 let conf_content = conf_content.into_bytes();
    2629            0 :                 VirtualFile::crashsafe_overwrite(&config_path, &temp_path, conf_content)
    2630            0 :                     .await
    2631            0 :                     .with_context(|| {
    2632            0 :                         format!("write tenant {tenant_shard_id} config to {config_path}")
    2633            0 :                     })
    2634            0 :             })
    2635            0 :         })
    2636              :         .await??;
    2637              : 
    2638              :         Ok(())
    2639              :     }
    2640              : 
    2641            0 :     #[tracing::instrument(skip_all, fields(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug()))]
    2642              :     async fn persist_tenant_config_legacy(
    2643              :         tenant_shard_id: &TenantShardId,
    2644              :         target_config_path: &Utf8Path,
    2645              :         tenant_conf: &TenantConfOpt,
    2646              :     ) -> anyhow::Result<()> {
    2647            0 :         debug!("persisting tenantconf to {target_config_path}");
    2648              : 
    2649              :         let mut conf_content = r#"# This file contains a specific per-tenant's config.
    2650              : #  It is read in case of pageserver restart.
    2651              : 
    2652              : [tenant_config]
    2653              : "#
    2654              :         .to_string();
    2655              : 
    2656              :         // Convert the config to a toml file.
    2657              :         conf_content += &toml_edit::ser::to_string(&tenant_conf)?;
    2658              : 
    2659              :         let temp_path = path_with_suffix_extension(target_config_path, TEMP_FILE_SUFFIX);
    2660              : 
    2661              :         let tenant_shard_id = *tenant_shard_id;
    2662              :         let target_config_path = target_config_path.to_owned();
    2663            0 :         tokio::task::spawn_blocking(move || {
    2664            0 :             Handle::current().block_on(async move {
    2665            0 :                 let conf_content = conf_content.into_bytes();
    2666            0 :                 VirtualFile::crashsafe_overwrite(&target_config_path, &temp_path, conf_content)
    2667            0 :                     .await
    2668            0 :                     .with_context(|| {
    2669            0 :                         format!("write tenant {tenant_shard_id} config to {target_config_path}")
    2670            0 :                     })
    2671            0 :             })
    2672            0 :         })
    2673              :         .await??;
    2674              :         Ok(())
    2675              :     }
    2676              : 
    2677              :     //
    2678              :     // How garbage collection works:
    2679              :     //
    2680              :     //                    +--bar------------->
    2681              :     //                   /
    2682              :     //             +----+-----foo---------------->
    2683              :     //            /
    2684              :     // ----main--+-------------------------->
    2685              :     //                \
    2686              :     //                 +-----baz-------->
    2687              :     //
    2688              :     //
    2689              :     // 1. Grab 'gc_cs' mutex to prevent new timelines from being created while Timeline's
    2690              :     //    `gc_infos` are being refreshed
    2691              :     // 2. Scan collected timelines, and on each timeline, make note of the
    2692              :     //    all the points where other timelines have been branched off.
    2693              :     //    We will refrain from removing page versions at those LSNs.
    2694              :     // 3. For each timeline, scan all layer files on the timeline.
    2695              :     //    Remove all files for which a newer file exists and which
    2696              :     //    don't cover any branch point LSNs.
    2697              :     //
    2698              :     // TODO:
    2699              :     // - if a relation has a non-incremental persistent layer on a child branch, then we
    2700              :     //   don't need to keep that in the parent anymore. But currently
    2701              :     //   we do.
    2702            8 :     async fn gc_iteration_internal(
    2703            8 :         &self,
    2704            8 :         target_timeline_id: Option<TimelineId>,
    2705            8 :         horizon: u64,
    2706            8 :         pitr: Duration,
    2707            8 :         cancel: &CancellationToken,
    2708            8 :         ctx: &RequestContext,
    2709            8 :     ) -> anyhow::Result<GcResult> {
    2710            8 :         let mut totals: GcResult = Default::default();
    2711            8 :         let now = Instant::now();
    2712              : 
    2713            8 :         let gc_timelines = match self
    2714            8 :             .refresh_gc_info_internal(target_timeline_id, horizon, pitr, cancel, ctx)
    2715            0 :             .await
    2716              :         {
    2717            8 :             Ok(result) => result,
    2718            0 :             Err(e) => {
    2719            0 :                 if let Some(PageReconstructError::Cancelled) =
    2720            0 :                     e.downcast_ref::<PageReconstructError>()
    2721              :                 {
    2722              :                     // Handle cancellation
    2723            0 :                     totals.elapsed = now.elapsed();
    2724            0 :                     return Ok(totals);
    2725              :                 } else {
    2726              :                     // Propagate other errors
    2727            0 :                     return Err(e);
    2728              :                 }
    2729              :             }
    2730              :         };
    2731              : 
    2732            0 :         failpoint_support::sleep_millis_async!("gc_iteration_internal_after_getting_gc_timelines");
    2733              : 
    2734              :         // If there is nothing to GC, we don't want any messages in the INFO log.
    2735            8 :         if !gc_timelines.is_empty() {
    2736            8 :             info!("{} timelines need GC", gc_timelines.len());
    2737              :         } else {
    2738            0 :             debug!("{} timelines need GC", gc_timelines.len());
    2739              :         }
    2740              : 
    2741              :         // Perform GC for each timeline.
    2742              :         //
    2743              :         // Note that we don't hold the `Tenant::gc_cs` lock here because we don't want to delay the
    2744              :         // branch creation task, which requires the GC lock. A GC iteration can run concurrently
    2745              :         // with branch creation.
    2746              :         //
    2747              :         // See comments in [`Tenant::branch_timeline`] for more information about why branch
    2748              :         // creation task can run concurrently with timeline's GC iteration.
    2749           16 :         for timeline in gc_timelines {
    2750            8 :             if task_mgr::is_shutdown_requested() || cancel.is_cancelled() {
    2751              :                 // We were requested to shut down. Stop and return with the progress we
    2752              :                 // made.
    2753            0 :                 break;
    2754            8 :             }
    2755            8 :             let result = timeline.gc().await?;
    2756            8 :             totals += result;
    2757              :         }
    2758              : 
    2759            8 :         totals.elapsed = now.elapsed();
    2760            8 :         Ok(totals)
    2761            8 :     }
    2762              : 
    2763              :     /// Refreshes the Timeline::gc_info for all timelines, returning the
    2764              :     /// vector of timelines which have [`Timeline::get_last_record_lsn`] past
    2765              :     /// [`Tenant::get_gc_horizon`].
    2766              :     ///
    2767              :     /// This is usually executed as part of periodic gc, but can now be triggered more often.
    2768            0 :     pub async fn refresh_gc_info(
    2769            0 :         &self,
    2770            0 :         cancel: &CancellationToken,
    2771            0 :         ctx: &RequestContext,
    2772            0 :     ) -> anyhow::Result<Vec<Arc<Timeline>>> {
    2773            0 :         // since this method can now be called at different rates than the configured gc loop, it
    2774            0 :         // might be that these configuration values get applied faster than what it was previously,
    2775            0 :         // since these were only read from the gc task.
    2776            0 :         let horizon = self.get_gc_horizon();
    2777            0 :         let pitr = self.get_pitr_interval();
    2778            0 : 
    2779            0 :         // refresh all timelines
    2780            0 :         let target_timeline_id = None;
    2781            0 : 
    2782            0 :         self.refresh_gc_info_internal(target_timeline_id, horizon, pitr, cancel, ctx)
    2783            0 :             .await
    2784            0 :     }
    2785              : 
    2786            8 :     async fn refresh_gc_info_internal(
    2787            8 :         &self,
    2788            8 :         target_timeline_id: Option<TimelineId>,
    2789            8 :         horizon: u64,
    2790            8 :         pitr: Duration,
    2791            8 :         cancel: &CancellationToken,
    2792            8 :         ctx: &RequestContext,
    2793            8 :     ) -> anyhow::Result<Vec<Arc<Timeline>>> {
    2794              :         // grab mutex to prevent new timelines from being created here.
    2795            8 :         let gc_cs = self.gc_cs.lock().await;
    2796              : 
    2797              :         // Scan all timelines. For each timeline, remember the timeline ID and
    2798              :         // the branch point where it was created.
    2799            8 :         let (all_branchpoints, timeline_ids): (BTreeSet<(TimelineId, Lsn)>, _) = {
    2800            8 :             let timelines = self.timelines.lock().unwrap();
    2801            8 :             let mut all_branchpoints = BTreeSet::new();
    2802            8 :             let timeline_ids = {
    2803            8 :                 if let Some(target_timeline_id) = target_timeline_id.as_ref() {
    2804            8 :                     if timelines.get(target_timeline_id).is_none() {
    2805            0 :                         bail!("gc target timeline does not exist")
    2806            8 :                     }
    2807            0 :                 };
    2808              : 
    2809            8 :                 timelines
    2810            8 :                     .iter()
    2811           14 :                     .map(|(timeline_id, timeline_entry)| {
    2812            6 :                         if let Some(ancestor_timeline_id) =
    2813           14 :                             &timeline_entry.get_ancestor_timeline_id()
    2814              :                         {
    2815              :                             // If target_timeline is specified, we only need to know branchpoints of its children
    2816            6 :                             if let Some(timeline_id) = target_timeline_id {
    2817            6 :                                 if ancestor_timeline_id == &timeline_id {
    2818            6 :                                     all_branchpoints.insert((
    2819            6 :                                         *ancestor_timeline_id,
    2820            6 :                                         timeline_entry.get_ancestor_lsn(),
    2821            6 :                                     ));
    2822            6 :                                 }
    2823              :                             }
    2824              :                             // Collect branchpoints for all timelines
    2825            0 :                             else {
    2826            0 :                                 all_branchpoints.insert((
    2827            0 :                                     *ancestor_timeline_id,
    2828            0 :                                     timeline_entry.get_ancestor_lsn(),
    2829            0 :                                 ));
    2830            0 :                             }
    2831            8 :                         }
    2832              : 
    2833           14 :                         *timeline_id
    2834           14 :                     })
    2835            8 :                     .collect::<Vec<_>>()
    2836            8 :             };
    2837            8 :             (all_branchpoints, timeline_ids)
    2838            8 :         };
    2839            8 : 
    2840            8 :         // Ok, we now know all the branch points.
    2841            8 :         // Update the GC information for each timeline.
    2842            8 :         let mut gc_timelines = Vec::with_capacity(timeline_ids.len());
    2843           22 :         for timeline_id in timeline_ids {
    2844              :             // Timeline is known to be local and loaded.
    2845           14 :             let timeline = self
    2846           14 :                 .get_timeline(timeline_id, false)
    2847           14 :                 .with_context(|| format!("Timeline {timeline_id} was not found"))?;
    2848              : 
    2849              :             // If target_timeline is specified, ignore all other timelines
    2850           14 :             if let Some(target_timeline_id) = target_timeline_id {
    2851           14 :                 if timeline_id != target_timeline_id {
    2852            6 :                     continue;
    2853            8 :                 }
    2854            0 :             }
    2855              : 
    2856            8 :             if let Some(cutoff) = timeline.get_last_record_lsn().checked_sub(horizon) {
    2857            8 :                 let branchpoints: Vec<Lsn> = all_branchpoints
    2858            8 :                     .range((
    2859            8 :                         Included((timeline_id, Lsn(0))),
    2860            8 :                         Included((timeline_id, Lsn(u64::MAX))),
    2861            8 :                     ))
    2862            8 :                     .map(|&x| x.1)
    2863            8 :                     .collect();
    2864            8 :                 timeline
    2865            8 :                     .update_gc_info(branchpoints, cutoff, pitr, cancel, ctx)
    2866            0 :                     .await?;
    2867              : 
    2868            8 :                 gc_timelines.push(timeline);
    2869            0 :             }
    2870              :         }
    2871            8 :         drop(gc_cs);
    2872            8 :         Ok(gc_timelines)
    2873            8 :     }
    2874              : 
    2875              :     /// A substitute for `branch_timeline` for use in unit tests.
    2876              :     /// The returned timeline will have state value `Active` to make various `anyhow::ensure!()`
    2877              :     /// calls pass, but, we do not actually call `.activate()` under the hood. So, none of the
    2878              :     /// timeline background tasks are launched, except the flush loop.
    2879              :     #[cfg(test)]
    2880          214 :     async fn branch_timeline_test(
    2881          214 :         &self,
    2882          214 :         src_timeline: &Arc<Timeline>,
    2883          214 :         dst_id: TimelineId,
    2884          214 :         start_lsn: Option<Lsn>,
    2885          214 :         ctx: &RequestContext,
    2886          214 :     ) -> Result<Arc<Timeline>, CreateTimelineError> {
    2887          214 :         let uninit_mark = self.create_timeline_uninit_mark(dst_id).unwrap();
    2888          214 :         let tl = self
    2889          214 :             .branch_timeline_impl(src_timeline, dst_id, start_lsn, uninit_mark, ctx)
    2890          215 :             .await?;
    2891          210 :         tl.set_state(TimelineState::Active);
    2892          210 :         Ok(tl)
    2893          214 :     }
    2894              : 
    2895              :     /// Branch an existing timeline.
    2896              :     ///
    2897              :     /// The caller is responsible for activating the returned timeline.
    2898            0 :     async fn branch_timeline(
    2899            0 :         &self,
    2900            0 :         src_timeline: &Arc<Timeline>,
    2901            0 :         dst_id: TimelineId,
    2902            0 :         start_lsn: Option<Lsn>,
    2903            0 :         timeline_uninit_mark: TimelineUninitMark<'_>,
    2904            0 :         ctx: &RequestContext,
    2905            0 :     ) -> Result<Arc<Timeline>, CreateTimelineError> {
    2906            0 :         self.branch_timeline_impl(src_timeline, dst_id, start_lsn, timeline_uninit_mark, ctx)
    2907            0 :             .await
    2908            0 :     }
    2909              : 
    2910          214 :     async fn branch_timeline_impl(
    2911          214 :         &self,
    2912          214 :         src_timeline: &Arc<Timeline>,
    2913          214 :         dst_id: TimelineId,
    2914          214 :         start_lsn: Option<Lsn>,
    2915          214 :         timeline_uninit_mark: TimelineUninitMark<'_>,
    2916          214 :         _ctx: &RequestContext,
    2917          214 :     ) -> Result<Arc<Timeline>, CreateTimelineError> {
    2918          214 :         let src_id = src_timeline.timeline_id;
    2919              : 
    2920              :         // We will validate our ancestor LSN in this function.  Acquire the GC lock so that
    2921              :         // this check cannot race with GC, and the ancestor LSN is guaranteed to remain
    2922              :         // valid while we are creating the branch.
    2923          214 :         let _gc_cs = self.gc_cs.lock().await;
    2924              : 
    2925              :         // If no start LSN is specified, we branch the new timeline from the source timeline's last record LSN
    2926          214 :         let start_lsn = start_lsn.unwrap_or_else(|| {
    2927            0 :             let lsn = src_timeline.get_last_record_lsn();
    2928            0 :             info!("branching timeline {dst_id} from timeline {src_id} at last record LSN: {lsn}");
    2929            0 :             lsn
    2930          214 :         });
    2931          214 : 
    2932          214 :         // Ensure that `start_lsn` is valid, i.e. the LSN is within the PITR
    2933          214 :         // horizon on the source timeline
    2934          214 :         //
    2935          214 :         // We check it against both the planned GC cutoff stored in 'gc_info',
    2936          214 :         // and the 'latest_gc_cutoff' of the last GC that was performed.  The
    2937          214 :         // planned GC cutoff in 'gc_info' is normally larger than
    2938          214 :         // 'latest_gc_cutoff_lsn', but beware of corner cases like if you just
    2939          214 :         // changed the GC settings for the tenant to make the PITR window
    2940          214 :         // larger, but some of the data was already removed by an earlier GC
    2941          214 :         // iteration.
    2942          214 : 
    2943          214 :         // check against last actual 'latest_gc_cutoff' first
    2944          214 :         let latest_gc_cutoff_lsn = src_timeline.get_latest_gc_cutoff_lsn();
    2945          214 :         src_timeline
    2946          214 :             .check_lsn_is_in_scope(start_lsn, &latest_gc_cutoff_lsn)
    2947          214 :             .context(format!(
    2948          214 :                 "invalid branch start lsn: less than latest GC cutoff {}",
    2949          214 :                 *latest_gc_cutoff_lsn,
    2950          214 :             ))
    2951          214 :             .map_err(CreateTimelineError::AncestorLsn)?;
    2952              : 
    2953              :         // and then the planned GC cutoff
    2954              :         {
    2955          210 :             let gc_info = src_timeline.gc_info.read().unwrap();
    2956          210 :             let cutoff = min(gc_info.pitr_cutoff, gc_info.horizon_cutoff);
    2957          210 :             if start_lsn < cutoff {
    2958            0 :                 return Err(CreateTimelineError::AncestorLsn(anyhow::anyhow!(
    2959            0 :                     "invalid branch start lsn: less than planned GC cutoff {cutoff}"
    2960            0 :                 )));
    2961          210 :             }
    2962          210 :         }
    2963          210 : 
    2964          210 :         //
    2965          210 :         // The branch point is valid, and we are still holding the 'gc_cs' lock
    2966          210 :         // so that GC cannot advance the GC cutoff until we are finished.
    2967          210 :         // Proceed with the branch creation.
    2968          210 :         //
    2969          210 : 
    2970          210 :         // Determine prev-LSN for the new timeline. We can only determine it if
    2971          210 :         // the timeline was branched at the current end of the source timeline.
    2972          210 :         let RecordLsn {
    2973          210 :             last: src_last,
    2974          210 :             prev: src_prev,
    2975          210 :         } = src_timeline.get_last_record_rlsn();
    2976          210 :         let dst_prev = if src_last == start_lsn {
    2977          200 :             Some(src_prev)
    2978              :         } else {
    2979           10 :             None
    2980              :         };
    2981              : 
    2982              :         // Create the metadata file, noting the ancestor of the new timeline.
    2983              :         // There is initially no data in it, but all the read-calls know to look
    2984              :         // into the ancestor.
    2985          210 :         let metadata = TimelineMetadata::new(
    2986          210 :             start_lsn,
    2987          210 :             dst_prev,
    2988          210 :             Some(src_id),
    2989          210 :             start_lsn,
    2990          210 :             *src_timeline.latest_gc_cutoff_lsn.read(), // FIXME: should we hold onto this guard longer?
    2991          210 :             src_timeline.initdb_lsn,
    2992          210 :             src_timeline.pg_version,
    2993          210 :         );
    2994              : 
    2995          210 :         let uninitialized_timeline = self
    2996          210 :             .prepare_new_timeline(
    2997          210 :                 dst_id,
    2998          210 :                 &metadata,
    2999          210 :                 timeline_uninit_mark,
    3000          210 :                 start_lsn + 1,
    3001          210 :                 Some(Arc::clone(src_timeline)),
    3002          210 :             )
    3003          215 :             .await?;
    3004              : 
    3005          210 :         let new_timeline = uninitialized_timeline.finish_creation()?;
    3006              : 
    3007              :         // Root timeline gets its layers during creation and uploads them along with the metadata.
    3008              :         // A branch timeline though, when created, can get no writes for some time, hence won't get any layers created.
    3009              :         // We still need to upload its metadata eagerly: if other nodes `attach` the tenant and miss this timeline, their GC
    3010              :         // could get incorrect information and remove more layers, than needed.
    3011              :         // See also https://github.com/neondatabase/neon/issues/3865
    3012          210 :         if let Some(remote_client) = new_timeline.remote_client.as_ref() {
    3013          210 :             remote_client
    3014          210 :                 .schedule_index_upload_for_metadata_update(&metadata)
    3015          210 :                 .context("branch initial metadata upload")?;
    3016            0 :         }
    3017              : 
    3018          210 :         Ok(new_timeline)
    3019          214 :     }
    3020              : 
    3021              :     /// For unit tests, make this visible so that other modules can directly create timelines
    3022              :     #[cfg(test)]
    3023            4 :     #[tracing::instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug(), %timeline_id))]
    3024              :     pub(crate) async fn bootstrap_timeline_test(
    3025              :         &self,
    3026              :         timeline_id: TimelineId,
    3027              :         pg_version: u32,
    3028              :         load_existing_initdb: Option<TimelineId>,
    3029              :         ctx: &RequestContext,
    3030              :     ) -> anyhow::Result<Arc<Timeline>> {
    3031              :         let uninit_mark = self.create_timeline_uninit_mark(timeline_id).unwrap();
    3032              :         self.bootstrap_timeline(
    3033              :             timeline_id,
    3034              :             pg_version,
    3035              :             load_existing_initdb,
    3036              :             uninit_mark,
    3037              :             ctx,
    3038              :         )
    3039              :         .await
    3040              :     }
    3041              : 
    3042            0 :     async fn upload_initdb(
    3043            0 :         &self,
    3044            0 :         timelines_path: &Utf8PathBuf,
    3045            0 :         pgdata_path: &Utf8PathBuf,
    3046            0 :         timeline_id: &TimelineId,
    3047            0 :     ) -> anyhow::Result<()> {
    3048            0 :         let Some(storage) = &self.remote_storage else {
    3049              :             // No remote storage?  No upload.
    3050            0 :             return Ok(());
    3051              :         };
    3052              : 
    3053            0 :         let temp_path = timelines_path.join(format!(
    3054            0 :             "{INITDB_PATH}.upload-{timeline_id}.{TEMP_FILE_SUFFIX}"
    3055            0 :         ));
    3056              : 
    3057            0 :         scopeguard::defer! {
    3058            0 :             if let Err(e) = fs::remove_file(&temp_path) {
    3059            0 :                 error!("Failed to remove temporary initdb archive '{temp_path}': {e}");
    3060            0 :             }
    3061              :         }
    3062              : 
    3063            0 :         let (pgdata_zstd, tar_zst_size) =
    3064            0 :             import_datadir::create_tar_zst(pgdata_path, &temp_path).await?;
    3065              : 
    3066            0 :         pausable_failpoint!("before-initdb-upload");
    3067              : 
    3068            0 :         backoff::retry(
    3069            0 :             || async {
    3070            0 :                 self::remote_timeline_client::upload_initdb_dir(
    3071            0 :                     storage,
    3072            0 :                     &self.tenant_shard_id.tenant_id,
    3073            0 :                     timeline_id,
    3074            0 :                     pgdata_zstd.try_clone().await?,
    3075            0 :                     tar_zst_size,
    3076            0 :                     &self.cancel,
    3077              :                 )
    3078            0 :                 .await
    3079            0 :             },
    3080            0 :             |_| false,
    3081            0 :             3,
    3082            0 :             u32::MAX,
    3083            0 :             "persist_initdb_tar_zst",
    3084            0 :             &self.cancel,
    3085            0 :         )
    3086            0 :         .await
    3087            0 :         .ok_or_else(|| anyhow::Error::new(TimeoutOrCancel::Cancel))
    3088            0 :         .and_then(|x| x)
    3089            0 :     }
    3090              : 
    3091              :     /// - run initdb to init temporary instance and get bootstrap data
    3092              :     /// - after initialization completes, tar up the temp dir and upload it to S3.
    3093              :     ///
    3094              :     /// The caller is responsible for activating the returned timeline.
    3095            2 :     async fn bootstrap_timeline(
    3096            2 :         &self,
    3097            2 :         timeline_id: TimelineId,
    3098            2 :         pg_version: u32,
    3099            2 :         load_existing_initdb: Option<TimelineId>,
    3100            2 :         timeline_uninit_mark: TimelineUninitMark<'_>,
    3101            2 :         ctx: &RequestContext,
    3102            2 :     ) -> anyhow::Result<Arc<Timeline>> {
    3103            2 :         // create a `tenant/{tenant_id}/timelines/basebackup-{timeline_id}.{TEMP_FILE_SUFFIX}/`
    3104            2 :         // temporary directory for basebackup files for the given timeline.
    3105            2 : 
    3106            2 :         let timelines_path = self.conf.timelines_path(&self.tenant_shard_id);
    3107            2 :         let pgdata_path = path_with_suffix_extension(
    3108            2 :             timelines_path.join(format!("basebackup-{timeline_id}")),
    3109            2 :             TEMP_FILE_SUFFIX,
    3110            2 :         );
    3111            2 : 
    3112            2 :         // an uninit mark was placed before, nothing else can access this timeline files
    3113            2 :         // current initdb was not run yet, so remove whatever was left from the previous runs
    3114            2 :         if pgdata_path.exists() {
    3115            0 :             fs::remove_dir_all(&pgdata_path).with_context(|| {
    3116            0 :                 format!("Failed to remove already existing initdb directory: {pgdata_path}")
    3117            0 :             })?;
    3118            2 :         }
    3119              :         // this new directory is very temporary, set to remove it immediately after bootstrap, we don't need it
    3120            2 :         scopeguard::defer! {
    3121            2 :             if let Err(e) = fs::remove_dir_all(&pgdata_path) {
    3122              :                 // this is unlikely, but we will remove the directory on pageserver restart or another bootstrap call
    3123            0 :                 error!("Failed to remove temporary initdb directory '{pgdata_path}': {e}");
    3124            2 :             }
    3125              :         }
    3126            2 :         if let Some(existing_initdb_timeline_id) = load_existing_initdb {
    3127            2 :             let Some(storage) = &self.remote_storage else {
    3128            0 :                 bail!("no storage configured but load_existing_initdb set to {existing_initdb_timeline_id}");
    3129              :             };
    3130            2 :             if existing_initdb_timeline_id != timeline_id {
    3131            0 :                 let source_path = &remote_initdb_archive_path(
    3132            0 :                     &self.tenant_shard_id.tenant_id,
    3133            0 :                     &existing_initdb_timeline_id,
    3134            0 :                 );
    3135            0 :                 let dest_path =
    3136            0 :                     &remote_initdb_archive_path(&self.tenant_shard_id.tenant_id, &timeline_id);
    3137            0 : 
    3138            0 :                 // if this fails, it will get retried by retried control plane requests
    3139            0 :                 storage
    3140            0 :                     .copy_object(source_path, dest_path, &self.cancel)
    3141            0 :                     .await
    3142            0 :                     .context("copy initdb tar")?;
    3143            2 :             }
    3144            2 :             let (initdb_tar_zst_path, initdb_tar_zst) =
    3145            2 :                 self::remote_timeline_client::download_initdb_tar_zst(
    3146            2 :                     self.conf,
    3147            2 :                     storage,
    3148            2 :                     &self.tenant_shard_id,
    3149            2 :                     &existing_initdb_timeline_id,
    3150            2 :                     &self.cancel,
    3151            2 :                 )
    3152          710 :                 .await
    3153            2 :                 .context("download initdb tar")?;
    3154              : 
    3155            2 :             scopeguard::defer! {
    3156            2 :                 if let Err(e) = fs::remove_file(&initdb_tar_zst_path) {
    3157            0 :                     error!("Failed to remove temporary initdb archive '{initdb_tar_zst_path}': {e}");
    3158            2 :                 }
    3159              :             }
    3160              : 
    3161            2 :             let buf_read =
    3162            2 :                 BufReader::with_capacity(remote_timeline_client::BUFFER_SIZE, initdb_tar_zst);
    3163            2 :             import_datadir::extract_tar_zst(&pgdata_path, buf_read)
    3164        10962 :                 .await
    3165            2 :                 .context("extract initdb tar")?;
    3166              :         } else {
    3167              :             // Init temporarily repo to get bootstrap data, this creates a directory in the `pgdata_path` path
    3168            0 :             run_initdb(self.conf, &pgdata_path, pg_version, &self.cancel).await?;
    3169              : 
    3170              :             // Upload the created data dir to S3
    3171            0 :             if self.tenant_shard_id().is_zero() {
    3172            0 :                 self.upload_initdb(&timelines_path, &pgdata_path, &timeline_id)
    3173            0 :                     .await?;
    3174            0 :             }
    3175              :         }
    3176            2 :         let pgdata_lsn = import_datadir::get_lsn_from_controlfile(&pgdata_path)?.align();
    3177            2 : 
    3178            2 :         // Import the contents of the data directory at the initial checkpoint
    3179            2 :         // LSN, and any WAL after that.
    3180            2 :         // Initdb lsn will be equal to last_record_lsn which will be set after import.
    3181            2 :         // Because we know it upfront avoid having an option or dummy zero value by passing it to the metadata.
    3182            2 :         let new_metadata = TimelineMetadata::new(
    3183            2 :             Lsn(0),
    3184            2 :             None,
    3185            2 :             None,
    3186            2 :             Lsn(0),
    3187            2 :             pgdata_lsn,
    3188            2 :             pgdata_lsn,
    3189            2 :             pg_version,
    3190            2 :         );
    3191            2 :         let raw_timeline = self
    3192            2 :             .prepare_new_timeline(
    3193            2 :                 timeline_id,
    3194            2 :                 &new_metadata,
    3195            2 :                 timeline_uninit_mark,
    3196            2 :                 pgdata_lsn,
    3197            2 :                 None,
    3198            2 :             )
    3199            3 :             .await?;
    3200              : 
    3201            2 :         let tenant_shard_id = raw_timeline.owning_tenant.tenant_shard_id;
    3202            2 :         let unfinished_timeline = raw_timeline.raw_timeline()?;
    3203              : 
    3204            2 :         import_datadir::import_timeline_from_postgres_datadir(
    3205            2 :             unfinished_timeline,
    3206            2 :             &pgdata_path,
    3207            2 :             pgdata_lsn,
    3208            2 :             ctx,
    3209            2 :         )
    3210         9422 :         .await
    3211            2 :         .with_context(|| {
    3212            0 :             format!("Failed to import pgdatadir for timeline {tenant_shard_id}/{timeline_id}")
    3213            2 :         })?;
    3214              : 
    3215              :         // Flush the new layer files to disk, before we make the timeline as available to
    3216              :         // the outside world.
    3217              :         //
    3218              :         // Flush loop needs to be spawned in order to be able to flush.
    3219            2 :         unfinished_timeline.maybe_spawn_flush_loop();
    3220            2 : 
    3221            2 :         fail::fail_point!("before-checkpoint-new-timeline", |_| {
    3222            0 :             anyhow::bail!("failpoint before-checkpoint-new-timeline");
    3223            2 :         });
    3224              : 
    3225            2 :         unfinished_timeline
    3226            2 :             .freeze_and_flush()
    3227            2 :             .await
    3228            2 :             .with_context(|| {
    3229            0 :                 format!(
    3230            0 :                     "Failed to flush after pgdatadir import for timeline {tenant_shard_id}/{timeline_id}"
    3231            0 :                 )
    3232            2 :             })?;
    3233              : 
    3234              :         // All done!
    3235            2 :         let timeline = raw_timeline.finish_creation()?;
    3236              : 
    3237            2 :         Ok(timeline)
    3238            2 :     }
    3239              : 
    3240              :     /// Call this before constructing a timeline, to build its required structures
    3241          286 :     fn build_timeline_resources(&self, timeline_id: TimelineId) -> TimelineResources {
    3242          286 :         let remote_client = if let Some(remote_storage) = self.remote_storage.as_ref() {
    3243          286 :             let remote_client = RemoteTimelineClient::new(
    3244          286 :                 remote_storage.clone(),
    3245          286 :                 self.deletion_queue_client.clone(),
    3246          286 :                 self.conf,
    3247          286 :                 self.tenant_shard_id,
    3248          286 :                 timeline_id,
    3249          286 :                 self.generation,
    3250          286 :             );
    3251          286 :             Some(remote_client)
    3252              :         } else {
    3253            0 :             None
    3254              :         };
    3255              : 
    3256          286 :         TimelineResources {
    3257          286 :             remote_client,
    3258          286 :             deletion_queue_client: self.deletion_queue_client.clone(),
    3259          286 :             timeline_get_throttle: self.timeline_get_throttle.clone(),
    3260          286 :         }
    3261          286 :     }
    3262              : 
    3263              :     /// Creates intermediate timeline structure and its files.
    3264              :     ///
    3265              :     /// An empty layer map is initialized, and new data and WAL can be imported starting
    3266              :     /// at 'disk_consistent_lsn'. After any initial data has been imported, call
    3267              :     /// `finish_creation` to insert the Timeline into the timelines map and to remove the
    3268              :     /// uninit mark file.
    3269          286 :     async fn prepare_new_timeline<'a>(
    3270          286 :         &'a self,
    3271          286 :         new_timeline_id: TimelineId,
    3272          286 :         new_metadata: &TimelineMetadata,
    3273          286 :         uninit_mark: TimelineUninitMark<'a>,
    3274          286 :         start_lsn: Lsn,
    3275          286 :         ancestor: Option<Arc<Timeline>>,
    3276          286 :     ) -> anyhow::Result<UninitializedTimeline> {
    3277          286 :         let tenant_shard_id = self.tenant_shard_id;
    3278          286 : 
    3279          286 :         let resources = self.build_timeline_resources(new_timeline_id);
    3280          286 :         if let Some(remote_client) = &resources.remote_client {
    3281          286 :             remote_client.init_upload_queue_for_empty_remote(new_metadata)?;
    3282            0 :         }
    3283              : 
    3284          286 :         let timeline_struct = self
    3285          286 :             .create_timeline_struct(
    3286          286 :                 new_timeline_id,
    3287          286 :                 new_metadata,
    3288          286 :                 ancestor,
    3289          286 :                 resources,
    3290          286 :                 CreateTimelineCause::Load,
    3291          286 :             )
    3292          286 :             .context("Failed to create timeline data structure")?;
    3293              : 
    3294          286 :         timeline_struct.init_empty_layer_map(start_lsn);
    3295              : 
    3296          286 :         if let Err(e) = self
    3297          286 :             .create_timeline_files(&uninit_mark.timeline_path, &new_timeline_id, new_metadata)
    3298          329 :             .await
    3299              :         {
    3300            0 :             error!("Failed to create initial files for timeline {tenant_shard_id}/{new_timeline_id}, cleaning up: {e:?}");
    3301            0 :             cleanup_timeline_directory(uninit_mark);
    3302            0 :             return Err(e);
    3303          286 :         }
    3304              : 
    3305            0 :         debug!(
    3306            0 :             "Successfully created initial files for timeline {tenant_shard_id}/{new_timeline_id}"
    3307            0 :         );
    3308              : 
    3309          286 :         Ok(UninitializedTimeline::new(
    3310          286 :             self,
    3311          286 :             new_timeline_id,
    3312          286 :             Some((timeline_struct, uninit_mark)),
    3313          286 :         ))
    3314          286 :     }
    3315              : 
    3316          286 :     async fn create_timeline_files(
    3317          286 :         &self,
    3318          286 :         timeline_path: &Utf8Path,
    3319          286 :         new_timeline_id: &TimelineId,
    3320          286 :         new_metadata: &TimelineMetadata,
    3321          286 :     ) -> anyhow::Result<()> {
    3322          286 :         crashsafe::create_dir(timeline_path).context("Failed to create timeline directory")?;
    3323              : 
    3324          286 :         fail::fail_point!("after-timeline-uninit-mark-creation", |_| {
    3325            0 :             anyhow::bail!("failpoint after-timeline-uninit-mark-creation");
    3326          286 :         });
    3327              : 
    3328          286 :         save_metadata(
    3329          286 :             self.conf,
    3330          286 :             &self.tenant_shard_id,
    3331          286 :             new_timeline_id,
    3332          286 :             new_metadata,
    3333          286 :         )
    3334          329 :         .await
    3335          286 :         .context("Failed to create timeline metadata")?;
    3336          286 :         Ok(())
    3337          286 :     }
    3338              : 
    3339              :     /// Attempts to create an uninit mark file for the timeline initialization.
    3340              :     /// Bails, if the timeline is already loaded into the memory (i.e. initialized before), or the uninit mark file already exists.
    3341              :     ///
    3342              :     /// This way, we need to hold the timelines lock only for small amount of time during the mark check/creation per timeline init.
    3343          292 :     fn create_timeline_uninit_mark(
    3344          292 :         &self,
    3345          292 :         timeline_id: TimelineId,
    3346          292 :     ) -> Result<TimelineUninitMark, TimelineExclusionError> {
    3347          292 :         let tenant_shard_id = self.tenant_shard_id;
    3348          292 : 
    3349          292 :         let uninit_mark_path = self
    3350          292 :             .conf
    3351          292 :             .timeline_uninit_mark_file_path(tenant_shard_id, timeline_id);
    3352          292 :         let timeline_path = self.conf.timeline_path(&tenant_shard_id, &timeline_id);
    3353              : 
    3354          292 :         let uninit_mark = TimelineUninitMark::new(
    3355          292 :             self,
    3356          292 :             timeline_id,
    3357          292 :             uninit_mark_path.clone(),
    3358          292 :             timeline_path.clone(),
    3359          292 :         )?;
    3360              : 
    3361              :         // At this stage, we have got exclusive access to in-memory state for this timeline ID
    3362              :         // for creation.
    3363              :         // A timeline directory should never exist on disk already:
    3364              :         // - a previous failed creation would have cleaned up after itself
    3365              :         // - a pageserver restart would clean up timeline directories that don't have valid remote state
    3366              :         //
    3367              :         // Therefore it is an unexpected internal error to encounter a timeline directory already existing here,
    3368              :         // this error may indicate a bug in cleanup on failed creations.
    3369          290 :         if timeline_path.exists() {
    3370            0 :             return Err(TimelineExclusionError::Other(anyhow::anyhow!(
    3371            0 :                 "Timeline directory already exists! This is a bug."
    3372            0 :             )));
    3373          290 :         }
    3374          290 : 
    3375          290 :         // Create the on-disk uninit mark _after_ the in-memory acquisition of the tenant ID: guarantees
    3376          290 :         // that during process runtime, colliding creations will be caught in-memory without getting
    3377          290 :         // as far as failing to write a file.
    3378          290 :         fs::OpenOptions::new()
    3379          290 :             .write(true)
    3380          290 :             .create_new(true)
    3381          290 :             .open(&uninit_mark_path)
    3382          290 :             .context("Failed to create uninit mark file")
    3383          290 :             .and_then(|_| {
    3384          290 :                 crashsafe::fsync_file_and_parent(&uninit_mark_path)
    3385          290 :                     .context("Failed to fsync uninit mark file")
    3386          290 :             })
    3387          290 :             .with_context(|| {
    3388            0 :                 format!("Failed to crate uninit mark for timeline {tenant_shard_id}/{timeline_id}")
    3389          290 :             })?;
    3390              : 
    3391          290 :         Ok(uninit_mark)
    3392          292 :     }
    3393              : 
    3394              :     /// Gathers inputs from all of the timelines to produce a sizing model input.
    3395              :     ///
    3396              :     /// Future is cancellation safe. Only one calculation can be running at once per tenant.
    3397            0 :     #[instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug()))]
    3398              :     pub async fn gather_size_inputs(
    3399              :         &self,
    3400              :         // `max_retention_period` overrides the cutoff that is used to calculate the size
    3401              :         // (only if it is shorter than the real cutoff).
    3402              :         max_retention_period: Option<u64>,
    3403              :         cause: LogicalSizeCalculationCause,
    3404              :         cancel: &CancellationToken,
    3405              :         ctx: &RequestContext,
    3406              :     ) -> anyhow::Result<size::ModelInputs> {
    3407              :         let logical_sizes_at_once = self
    3408              :             .conf
    3409              :             .concurrent_tenant_size_logical_size_queries
    3410              :             .inner();
    3411              : 
    3412              :         // TODO: Having a single mutex block concurrent reads is not great for performance.
    3413              :         //
    3414              :         // But the only case where we need to run multiple of these at once is when we
    3415              :         // request a size for a tenant manually via API, while another background calculation
    3416              :         // is in progress (which is not a common case).
    3417              :         //
    3418              :         // See more for on the issue #2748 condenced out of the initial PR review.
    3419              :         let mut shared_cache = self.cached_logical_sizes.lock().await;
    3420              : 
    3421              :         size::gather_inputs(
    3422              :             self,
    3423              :             logical_sizes_at_once,
    3424              :             max_retention_period,
    3425              :             &mut shared_cache,
    3426              :             cause,
    3427              :             cancel,
    3428              :             ctx,
    3429              :         )
    3430              :         .await
    3431              :     }
    3432              : 
    3433              :     /// Calculate synthetic tenant size and cache the result.
    3434              :     /// This is periodically called by background worker.
    3435              :     /// result is cached in tenant struct
    3436            0 :     #[instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug()))]
    3437              :     pub async fn calculate_synthetic_size(
    3438              :         &self,
    3439              :         cause: LogicalSizeCalculationCause,
    3440              :         cancel: &CancellationToken,
    3441              :         ctx: &RequestContext,
    3442              :     ) -> anyhow::Result<u64> {
    3443              :         let inputs = self.gather_size_inputs(None, cause, cancel, ctx).await?;
    3444              : 
    3445              :         let size = inputs.calculate()?;
    3446              : 
    3447              :         self.set_cached_synthetic_size(size);
    3448              : 
    3449              :         Ok(size)
    3450              :     }
    3451              : 
    3452              :     /// Cache given synthetic size and update the metric value
    3453            0 :     pub fn set_cached_synthetic_size(&self, size: u64) {
    3454            0 :         self.cached_synthetic_tenant_size
    3455            0 :             .store(size, Ordering::Relaxed);
    3456              : 
    3457              :         // Only shard zero should be calculating synthetic sizes
    3458            0 :         debug_assert!(self.shard_identity.is_zero());
    3459              : 
    3460            0 :         TENANT_SYNTHETIC_SIZE_METRIC
    3461            0 :             .get_metric_with_label_values(&[&self.tenant_shard_id.tenant_id.to_string()])
    3462            0 :             .unwrap()
    3463            0 :             .set(size);
    3464            0 :     }
    3465              : 
    3466            0 :     pub fn cached_synthetic_size(&self) -> u64 {
    3467            0 :         self.cached_synthetic_tenant_size.load(Ordering::Relaxed)
    3468            0 :     }
    3469              : 
    3470              :     /// Flush any in-progress layers, schedule uploads, and wait for uploads to complete.
    3471              :     ///
    3472              :     /// This function can take a long time: callers should wrap it in a timeout if calling
    3473              :     /// from an external API handler.
    3474              :     ///
    3475              :     /// Cancel-safety: cancelling this function may leave I/O running, but such I/O is
    3476              :     /// still bounded by tenant/timeline shutdown.
    3477            0 :     #[tracing::instrument(skip_all)]
    3478              :     pub(crate) async fn flush_remote(&self) -> anyhow::Result<()> {
    3479              :         let timelines = self.timelines.lock().unwrap().clone();
    3480              : 
    3481            0 :         async fn flush_timeline(_gate: GateGuard, timeline: Arc<Timeline>) -> anyhow::Result<()> {
    3482            0 :             tracing::info!(timeline_id=%timeline.timeline_id, "Flushing...");
    3483            0 :             timeline.freeze_and_flush().await?;
    3484            0 :             tracing::info!(timeline_id=%timeline.timeline_id, "Waiting for uploads...");
    3485            0 :             if let Some(client) = &timeline.remote_client {
    3486            0 :                 client.wait_completion().await?;
    3487            0 :             }
    3488              : 
    3489            0 :             Ok(())
    3490            0 :         }
    3491              : 
    3492              :         // We do not use a JoinSet for these tasks, because we don't want them to be
    3493              :         // aborted when this function's future is cancelled: they should stay alive
    3494              :         // holding their GateGuard until they complete, to ensure their I/Os complete
    3495              :         // before Timeline shutdown completes.
    3496              :         let mut results = FuturesUnordered::new();
    3497              : 
    3498              :         for (_timeline_id, timeline) in timelines {
    3499              :             // Run each timeline's flush in a task holding the timeline's gate: this
    3500              :             // means that if this function's future is cancelled, the Timeline shutdown
    3501              :             // will still wait for any I/O in here to complete.
    3502              :             let gate = match timeline.gate.enter() {
    3503              :                 Ok(g) => g,
    3504              :                 Err(_) => continue,
    3505              :             };
    3506            0 :             let jh = tokio::task::spawn(async move { flush_timeline(gate, timeline).await });
    3507              :             results.push(jh);
    3508              :         }
    3509              : 
    3510              :         while let Some(r) = results.next().await {
    3511              :             if let Err(e) = r {
    3512              :                 if !e.is_cancelled() && !e.is_panic() {
    3513            0 :                     tracing::error!("unexpected join error: {e:?}");
    3514              :                 }
    3515              :             }
    3516              :         }
    3517              : 
    3518              :         // The flushes we did above were just writes, but the Tenant might have had
    3519              :         // pending deletions as well from recent compaction/gc: we want to flush those
    3520              :         // as well.  This requires flushing the global delete queue.  This is cheap
    3521              :         // because it's typically a no-op.
    3522              :         match self.deletion_queue_client.flush_execute().await {
    3523              :             Ok(_) => {}
    3524              :             Err(DeletionQueueError::ShuttingDown) => {}
    3525              :         }
    3526              : 
    3527              :         Ok(())
    3528              :     }
    3529              : 
    3530            0 :     pub(crate) fn get_tenant_conf(&self) -> TenantConfOpt {
    3531            0 :         self.tenant_conf.read().unwrap().tenant_conf.clone()
    3532            0 :     }
    3533              : }
    3534              : 
    3535              : /// Create the cluster temporarily in 'initdbpath' directory inside the repository
    3536              : /// to get bootstrap data for timeline initialization.
    3537            0 : async fn run_initdb(
    3538            0 :     conf: &'static PageServerConf,
    3539            0 :     initdb_target_dir: &Utf8Path,
    3540            0 :     pg_version: u32,
    3541            0 :     cancel: &CancellationToken,
    3542            0 : ) -> Result<(), InitdbError> {
    3543            0 :     let initdb_bin_path = conf
    3544            0 :         .pg_bin_dir(pg_version)
    3545            0 :         .map_err(InitdbError::Other)?
    3546            0 :         .join("initdb");
    3547            0 :     let initdb_lib_dir = conf.pg_lib_dir(pg_version).map_err(InitdbError::Other)?;
    3548            0 :     info!(
    3549            0 :         "running {} in {}, libdir: {}",
    3550            0 :         initdb_bin_path, initdb_target_dir, initdb_lib_dir,
    3551            0 :     );
    3552              : 
    3553            0 :     let _permit = INIT_DB_SEMAPHORE.acquire().await;
    3554              : 
    3555            0 :     let initdb_command = tokio::process::Command::new(&initdb_bin_path)
    3556            0 :         .args(["-D", initdb_target_dir.as_ref()])
    3557            0 :         .args(["-U", &conf.superuser])
    3558            0 :         .args(["-E", "utf8"])
    3559            0 :         .arg("--no-instructions")
    3560            0 :         .arg("--no-sync")
    3561            0 :         .env_clear()
    3562            0 :         .env("LD_LIBRARY_PATH", &initdb_lib_dir)
    3563            0 :         .env("DYLD_LIBRARY_PATH", &initdb_lib_dir)
    3564            0 :         .stdin(std::process::Stdio::null())
    3565            0 :         // stdout invocation produces the same output every time, we don't need it
    3566            0 :         .stdout(std::process::Stdio::null())
    3567            0 :         // we would be interested in the stderr output, if there was any
    3568            0 :         .stderr(std::process::Stdio::piped())
    3569            0 :         .spawn()?;
    3570              : 
    3571              :     // Ideally we'd select here with the cancellation token, but the problem is that
    3572              :     // we can't safely terminate initdb: it launches processes of its own, and killing
    3573              :     // initdb doesn't kill them. After we return from this function, we want the target
    3574              :     // directory to be able to be cleaned up.
    3575              :     // See https://github.com/neondatabase/neon/issues/6385
    3576            0 :     let initdb_output = initdb_command.wait_with_output().await?;
    3577            0 :     if !initdb_output.status.success() {
    3578            0 :         return Err(InitdbError::Failed(
    3579            0 :             initdb_output.status,
    3580            0 :             initdb_output.stderr,
    3581            0 :         ));
    3582            0 :     }
    3583            0 : 
    3584            0 :     // This isn't true cancellation support, see above. Still return an error to
    3585            0 :     // excercise the cancellation code path.
    3586            0 :     if cancel.is_cancelled() {
    3587            0 :         return Err(InitdbError::Cancelled);
    3588            0 :     }
    3589            0 : 
    3590            0 :     Ok(())
    3591            0 : }
    3592              : 
    3593              : impl Drop for Tenant {
    3594           84 :     fn drop(&mut self) {
    3595           84 :         remove_tenant_metrics(&self.tenant_shard_id);
    3596           84 :     }
    3597              : }
    3598              : /// Dump contents of a layer file to stdout.
    3599            0 : pub async fn dump_layerfile_from_path(
    3600            0 :     path: &Utf8Path,
    3601            0 :     verbose: bool,
    3602            0 :     ctx: &RequestContext,
    3603            0 : ) -> anyhow::Result<()> {
    3604              :     use std::os::unix::fs::FileExt;
    3605              : 
    3606              :     // All layer files start with a two-byte "magic" value, to identify the kind of
    3607              :     // file.
    3608            0 :     let file = File::open(path)?;
    3609            0 :     let mut header_buf = [0u8; 2];
    3610            0 :     file.read_exact_at(&mut header_buf, 0)?;
    3611              : 
    3612            0 :     match u16::from_be_bytes(header_buf) {
    3613              :         crate::IMAGE_FILE_MAGIC => {
    3614            0 :             ImageLayer::new_for_path(path, file)?
    3615            0 :                 .dump(verbose, ctx)
    3616            0 :                 .await?
    3617              :         }
    3618              :         crate::DELTA_FILE_MAGIC => {
    3619            0 :             DeltaLayer::new_for_path(path, file)?
    3620            0 :                 .dump(verbose, ctx)
    3621            0 :                 .await?
    3622              :         }
    3623            0 :         magic => bail!("unrecognized magic identifier: {:?}", magic),
    3624              :     }
    3625              : 
    3626            0 :     Ok(())
    3627            0 : }
    3628              : 
    3629              : #[cfg(test)]
    3630              : pub(crate) mod harness {
    3631              :     use bytes::{Bytes, BytesMut};
    3632              :     use camino::Utf8PathBuf;
    3633              :     use once_cell::sync::OnceCell;
    3634              :     use pageserver_api::models::ShardParameters;
    3635              :     use pageserver_api::shard::ShardIndex;
    3636              :     use std::fs;
    3637              :     use std::sync::Arc;
    3638              :     use utils::logging;
    3639              :     use utils::lsn::Lsn;
    3640              : 
    3641              :     use crate::deletion_queue::mock::MockDeletionQueue;
    3642              :     use crate::walredo::apply_neon;
    3643              :     use crate::{
    3644              :         config::PageServerConf, repository::Key, tenant::Tenant, walrecord::NeonWalRecord,
    3645              :     };
    3646              : 
    3647              :     use super::*;
    3648              :     use crate::tenant::config::{TenantConf, TenantConfOpt};
    3649              :     use hex_literal::hex;
    3650              :     use utils::id::{TenantId, TimelineId};
    3651              : 
    3652              :     pub const TIMELINE_ID: TimelineId =
    3653              :         TimelineId::from_array(hex!("11223344556677881122334455667788"));
    3654              :     pub const NEW_TIMELINE_ID: TimelineId =
    3655              :         TimelineId::from_array(hex!("AA223344556677881122334455667788"));
    3656              : 
    3657              :     /// Convenience function to create a page image with given string as the only content
    3658      2708270 :     pub fn test_img(s: &str) -> Bytes {
    3659      2708270 :         let mut buf = BytesMut::new();
    3660      2708270 :         buf.extend_from_slice(s.as_bytes());
    3661      2708270 :         buf.resize(64, 0);
    3662      2708270 : 
    3663      2708270 :         buf.freeze()
    3664      2708270 :     }
    3665              : 
    3666              :     impl From<TenantConf> for TenantConfOpt {
    3667           84 :         fn from(tenant_conf: TenantConf) -> Self {
    3668           84 :             Self {
    3669           84 :                 checkpoint_distance: Some(tenant_conf.checkpoint_distance),
    3670           84 :                 checkpoint_timeout: Some(tenant_conf.checkpoint_timeout),
    3671           84 :                 compaction_target_size: Some(tenant_conf.compaction_target_size),
    3672           84 :                 compaction_period: Some(tenant_conf.compaction_period),
    3673           84 :                 compaction_threshold: Some(tenant_conf.compaction_threshold),
    3674           84 :                 gc_horizon: Some(tenant_conf.gc_horizon),
    3675           84 :                 gc_period: Some(tenant_conf.gc_period),
    3676           84 :                 image_creation_threshold: Some(tenant_conf.image_creation_threshold),
    3677           84 :                 pitr_interval: Some(tenant_conf.pitr_interval),
    3678           84 :                 walreceiver_connect_timeout: Some(tenant_conf.walreceiver_connect_timeout),
    3679           84 :                 lagging_wal_timeout: Some(tenant_conf.lagging_wal_timeout),
    3680           84 :                 max_lsn_wal_lag: Some(tenant_conf.max_lsn_wal_lag),
    3681           84 :                 trace_read_requests: Some(tenant_conf.trace_read_requests),
    3682           84 :                 eviction_policy: Some(tenant_conf.eviction_policy),
    3683           84 :                 min_resident_size_override: tenant_conf.min_resident_size_override,
    3684           84 :                 evictions_low_residence_duration_metric_threshold: Some(
    3685           84 :                     tenant_conf.evictions_low_residence_duration_metric_threshold,
    3686           84 :                 ),
    3687           84 :                 gc_feedback: Some(tenant_conf.gc_feedback),
    3688           84 :                 heatmap_period: Some(tenant_conf.heatmap_period),
    3689           84 :                 lazy_slru_download: Some(tenant_conf.lazy_slru_download),
    3690           84 :                 timeline_get_throttle: Some(tenant_conf.timeline_get_throttle),
    3691           84 :             }
    3692           84 :         }
    3693              :     }
    3694              : 
    3695              :     pub struct TenantHarness {
    3696              :         pub conf: &'static PageServerConf,
    3697              :         pub tenant_conf: TenantConf,
    3698              :         pub tenant_shard_id: TenantShardId,
    3699              :         pub generation: Generation,
    3700              :         pub shard: ShardIndex,
    3701              :         pub remote_storage: GenericRemoteStorage,
    3702              :         pub remote_fs_dir: Utf8PathBuf,
    3703              :         pub deletion_queue: MockDeletionQueue,
    3704              :     }
    3705              : 
    3706              :     static LOG_HANDLE: OnceCell<()> = OnceCell::new();
    3707              : 
    3708           90 :     pub(crate) fn setup_logging() {
    3709           90 :         LOG_HANDLE.get_or_init(|| {
    3710           90 :             logging::init(
    3711           90 :                 logging::LogFormat::Test,
    3712           90 :                 // enable it in case the tests exercise code paths that use
    3713           90 :                 // debug_assert_current_span_has_tenant_and_timeline_id
    3714           90 :                 logging::TracingErrorLayerEnablement::EnableWithRustLogFilter,
    3715           90 :                 logging::Output::Stdout,
    3716           90 :             )
    3717           90 :             .expect("Failed to init test logging")
    3718           90 :         });
    3719           90 :     }
    3720              : 
    3721              :     impl TenantHarness {
    3722           84 :         pub fn create(test_name: &'static str) -> anyhow::Result<Self> {
    3723           84 :             setup_logging();
    3724           84 : 
    3725           84 :             let repo_dir = PageServerConf::test_repo_dir(test_name);
    3726           84 :             let _ = fs::remove_dir_all(&repo_dir);
    3727           84 :             fs::create_dir_all(&repo_dir)?;
    3728              : 
    3729           84 :             let conf = PageServerConf::dummy_conf(repo_dir);
    3730           84 :             // Make a static copy of the config. This can never be free'd, but that's
    3731           84 :             // OK in a test.
    3732           84 :             let conf: &'static PageServerConf = Box::leak(Box::new(conf));
    3733           84 : 
    3734           84 :             // Disable automatic GC and compaction to make the unit tests more deterministic.
    3735           84 :             // The tests perform them manually if needed.
    3736           84 :             let tenant_conf = TenantConf {
    3737           84 :                 gc_period: Duration::ZERO,
    3738           84 :                 compaction_period: Duration::ZERO,
    3739           84 :                 ..TenantConf::default()
    3740           84 :             };
    3741           84 : 
    3742           84 :             let tenant_id = TenantId::generate();
    3743           84 :             let tenant_shard_id = TenantShardId::unsharded(tenant_id);
    3744           84 :             fs::create_dir_all(conf.tenant_path(&tenant_shard_id))?;
    3745           84 :             fs::create_dir_all(conf.timelines_path(&tenant_shard_id))?;
    3746              : 
    3747              :             use remote_storage::{RemoteStorageConfig, RemoteStorageKind};
    3748           84 :             let remote_fs_dir = conf.workdir.join("localfs");
    3749           84 :             std::fs::create_dir_all(&remote_fs_dir).unwrap();
    3750           84 :             let config = RemoteStorageConfig {
    3751           84 :                 storage: RemoteStorageKind::LocalFs(remote_fs_dir.clone()),
    3752           84 :                 timeout: RemoteStorageConfig::DEFAULT_TIMEOUT,
    3753           84 :             };
    3754           84 :             let remote_storage = GenericRemoteStorage::from_config(&config).unwrap();
    3755           84 :             let deletion_queue = MockDeletionQueue::new(Some(remote_storage.clone()));
    3756           84 : 
    3757           84 :             Ok(Self {
    3758           84 :                 conf,
    3759           84 :                 tenant_conf,
    3760           84 :                 tenant_shard_id,
    3761           84 :                 generation: Generation::new(0xdeadbeef),
    3762           84 :                 shard: ShardIndex::unsharded(),
    3763           84 :                 remote_storage,
    3764           84 :                 remote_fs_dir,
    3765           84 :                 deletion_queue,
    3766           84 :             })
    3767           84 :         }
    3768              : 
    3769            8 :         pub fn span(&self) -> tracing::Span {
    3770            8 :             info_span!("TenantHarness", tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug())
    3771            8 :         }
    3772              : 
    3773           82 :         pub(crate) async fn load(&self) -> (Arc<Tenant>, RequestContext) {
    3774           82 :             let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error);
    3775           82 :             (
    3776           82 :                 self.do_try_load(&ctx)
    3777           28 :                     .await
    3778           82 :                     .expect("failed to load test tenant"),
    3779           82 :                 ctx,
    3780           82 :             )
    3781           82 :         }
    3782              : 
    3783          168 :         #[instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug()))]
    3784              :         pub(crate) async fn do_try_load(
    3785              :             &self,
    3786              :             ctx: &RequestContext,
    3787              :         ) -> anyhow::Result<Arc<Tenant>> {
    3788              :             let walredo_mgr = Arc::new(WalRedoManager::from(TestRedoManager));
    3789              : 
    3790              :             let tenant = Arc::new(Tenant::new(
    3791              :                 TenantState::Loading,
    3792              :                 self.conf,
    3793              :                 AttachedTenantConf::try_from(LocationConf::attached_single(
    3794              :                     TenantConfOpt::from(self.tenant_conf.clone()),
    3795              :                     self.generation,
    3796              :                     &ShardParameters::default(),
    3797              :                 ))
    3798              :                 .unwrap(),
    3799              :                 // This is a legacy/test code path: sharding isn't supported here.
    3800              :                 ShardIdentity::unsharded(),
    3801              :                 Some(walredo_mgr),
    3802              :                 self.tenant_shard_id,
    3803              :                 Some(self.remote_storage.clone()),
    3804              :                 self.deletion_queue.new_client(),
    3805              :             ));
    3806              : 
    3807              :             let preload = tenant
    3808              :                 .preload(&self.remote_storage, CancellationToken::new())
    3809              :                 .await?;
    3810              :             tenant.attach(Some(preload), SpawnMode::Normal, ctx).await?;
    3811              : 
    3812              :             tenant.state.send_replace(TenantState::Active);
    3813              :             for timeline in tenant.timelines.lock().unwrap().values() {
    3814              :                 timeline.set_state(TimelineState::Active);
    3815              :             }
    3816              :             Ok(tenant)
    3817              :         }
    3818              : 
    3819            4 :         pub fn timeline_path(&self, timeline_id: &TimelineId) -> Utf8PathBuf {
    3820            4 :             self.conf.timeline_path(&self.tenant_shard_id, timeline_id)
    3821            4 :         }
    3822              :     }
    3823              : 
    3824              :     // Mock WAL redo manager that doesn't do much
    3825              :     pub(crate) struct TestRedoManager;
    3826              : 
    3827              :     impl TestRedoManager {
    3828              :         /// # Cancel-Safety
    3829              :         ///
    3830              :         /// This method is cancellation-safe.
    3831            8 :         pub async fn request_redo(
    3832            8 :             &self,
    3833            8 :             key: Key,
    3834            8 :             lsn: Lsn,
    3835            8 :             base_img: Option<(Lsn, Bytes)>,
    3836            8 :             records: Vec<(Lsn, NeonWalRecord)>,
    3837            8 :             _pg_version: u32,
    3838            8 :         ) -> anyhow::Result<Bytes> {
    3839           12 :             let records_neon = records.iter().all(|r| apply_neon::can_apply_in_neon(&r.1));
    3840            8 :             if records_neon {
    3841              :                 // For Neon wal records, we can decode without spawning postgres, so do so.
    3842            8 :                 let base_img = base_img.expect("Neon WAL redo requires base image").1;
    3843            8 :                 let mut page = BytesMut::new();
    3844            8 :                 page.extend_from_slice(&base_img);
    3845           20 :                 for (_record_lsn, record) in records {
    3846           12 :                     apply_neon::apply_in_neon(&record, key, &mut page)?;
    3847              :                 }
    3848            8 :                 Ok(page.freeze())
    3849              :             } else {
    3850              :                 // We never spawn a postgres walredo process in unit tests: just log what we might have done.
    3851            0 :                 let s = format!(
    3852            0 :                     "redo for {} to get to {}, with {} and {} records",
    3853            0 :                     key,
    3854            0 :                     lsn,
    3855            0 :                     if base_img.is_some() {
    3856            0 :                         "base image"
    3857              :                     } else {
    3858            0 :                         "no base image"
    3859              :                     },
    3860            0 :                     records.len()
    3861            0 :                 );
    3862            0 :                 println!("{s}");
    3863            0 : 
    3864            0 :                 Ok(test_img(&s))
    3865              :             }
    3866            8 :         }
    3867              :     }
    3868              : }
    3869              : 
    3870              : #[cfg(test)]
    3871              : mod tests {
    3872              :     use super::*;
    3873              :     use crate::keyspace::KeySpaceAccum;
    3874              :     use crate::repository::{Key, Value};
    3875              :     use crate::tenant::harness::*;
    3876              :     use crate::DEFAULT_PG_VERSION;
    3877              :     use bytes::BytesMut;
    3878              :     use hex_literal::hex;
    3879              :     use once_cell::sync::Lazy;
    3880              :     use pageserver_api::keyspace::KeySpace;
    3881              :     use rand::{thread_rng, Rng};
    3882              :     use tokio_util::sync::CancellationToken;
    3883              : 
    3884              :     static TEST_KEY: Lazy<Key> =
    3885           18 :         Lazy::new(|| Key::from_slice(&hex!("010000000033333333444444445500000001")));
    3886              : 
    3887            2 :     #[tokio::test]
    3888            2 :     async fn test_basic() -> anyhow::Result<()> {
    3889            2 :         let (tenant, ctx) = TenantHarness::create("test_basic")?.load().await;
    3890            2 :         let tline = tenant
    3891            2 :             .create_test_timeline(TIMELINE_ID, Lsn(0x08), DEFAULT_PG_VERSION, &ctx)
    3892            7 :             .await?;
    3893            2 : 
    3894            2 :         let mut writer = tline.writer().await;
    3895            2 :         writer
    3896            2 :             .put(
    3897            2 :                 *TEST_KEY,
    3898            2 :                 Lsn(0x10),
    3899            2 :                 &Value::Image(test_img("foo at 0x10")),
    3900            2 :                 &ctx,
    3901            2 :             )
    3902            2 :             .await?;
    3903            2 :         writer.finish_write(Lsn(0x10));
    3904            2 :         drop(writer);
    3905            2 : 
    3906            2 :         let mut writer = tline.writer().await;
    3907            2 :         writer
    3908            2 :             .put(
    3909            2 :                 *TEST_KEY,
    3910            2 :                 Lsn(0x20),
    3911            2 :                 &Value::Image(test_img("foo at 0x20")),
    3912            2 :                 &ctx,
    3913            2 :             )
    3914            2 :             .await?;
    3915            2 :         writer.finish_write(Lsn(0x20));
    3916            2 :         drop(writer);
    3917            2 : 
    3918            2 :         assert_eq!(
    3919            2 :             tline.get(*TEST_KEY, Lsn(0x10), &ctx).await?,
    3920            2 :             test_img("foo at 0x10")
    3921            2 :         );
    3922            2 :         assert_eq!(
    3923            2 :             tline.get(*TEST_KEY, Lsn(0x1f), &ctx).await?,
    3924            2 :             test_img("foo at 0x10")
    3925            2 :         );
    3926            2 :         assert_eq!(
    3927            2 :             tline.get(*TEST_KEY, Lsn(0x20), &ctx).await?,
    3928            2 :             test_img("foo at 0x20")
    3929            2 :         );
    3930            2 : 
    3931            2 :         Ok(())
    3932            2 :     }
    3933              : 
    3934            2 :     #[tokio::test]
    3935            2 :     async fn no_duplicate_timelines() -> anyhow::Result<()> {
    3936            2 :         let (tenant, ctx) = TenantHarness::create("no_duplicate_timelines")?
    3937            2 :             .load()
    3938            2 :             .await;
    3939            2 :         let _ = tenant
    3940            2 :             .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
    3941            6 :             .await?;
    3942            2 : 
    3943            2 :         match tenant
    3944            2 :             .create_empty_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
    3945            2 :             .await
    3946            2 :         {
    3947            2 :             Ok(_) => panic!("duplicate timeline creation should fail"),
    3948            2 :             Err(e) => assert_eq!(e.to_string(), "Already exists".to_string()),
    3949            2 :         }
    3950            2 : 
    3951            2 :         Ok(())
    3952            2 :     }
    3953              : 
    3954              :     /// Convenience function to create a page image with given string as the only content
    3955           10 :     pub fn test_value(s: &str) -> Value {
    3956           10 :         let mut buf = BytesMut::new();
    3957           10 :         buf.extend_from_slice(s.as_bytes());
    3958           10 :         Value::Image(buf.freeze())
    3959           10 :     }
    3960              : 
    3961              :     ///
    3962              :     /// Test branch creation
    3963              :     ///
    3964            2 :     #[tokio::test]
    3965            2 :     async fn test_branch() -> anyhow::Result<()> {
    3966            2 :         use std::str::from_utf8;
    3967            2 : 
    3968            2 :         let (tenant, ctx) = TenantHarness::create("test_branch")?.load().await;
    3969            2 :         let tline = tenant
    3970            2 :             .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
    3971            8 :             .await?;
    3972            2 :         let mut writer = tline.writer().await;
    3973            2 : 
    3974            2 :         #[allow(non_snake_case)]
    3975            2 :         let TEST_KEY_A: Key = Key::from_hex("110000000033333333444444445500000001").unwrap();
    3976            2 :         #[allow(non_snake_case)]
    3977            2 :         let TEST_KEY_B: Key = Key::from_hex("110000000033333333444444445500000002").unwrap();
    3978            2 : 
    3979            2 :         // Insert a value on the timeline
    3980            2 :         writer
    3981            2 :             .put(TEST_KEY_A, Lsn(0x20), &test_value("foo at 0x20"), &ctx)
    3982            2 :             .await?;
    3983            2 :         writer
    3984            2 :             .put(TEST_KEY_B, Lsn(0x20), &test_value("foobar at 0x20"), &ctx)
    3985            2 :             .await?;
    3986            2 :         writer.finish_write(Lsn(0x20));
    3987            2 : 
    3988            2 :         writer
    3989            2 :             .put(TEST_KEY_A, Lsn(0x30), &test_value("foo at 0x30"), &ctx)
    3990            2 :             .await?;
    3991            2 :         writer.finish_write(Lsn(0x30));
    3992            2 :         writer
    3993            2 :             .put(TEST_KEY_A, Lsn(0x40), &test_value("foo at 0x40"), &ctx)
    3994            2 :             .await?;
    3995            2 :         writer.finish_write(Lsn(0x40));
    3996            2 : 
    3997            2 :         //assert_current_logical_size(&tline, Lsn(0x40));
    3998            2 : 
    3999            2 :         // Branch the history, modify relation differently on the new timeline
    4000            2 :         tenant
    4001            2 :             .branch_timeline_test(&tline, NEW_TIMELINE_ID, Some(Lsn(0x30)), &ctx)
    4002            2 :             .await?;
    4003            2 :         let newtline = tenant
    4004            2 :             .get_timeline(NEW_TIMELINE_ID, true)
    4005            2 :             .expect("Should have a local timeline");
    4006            2 :         let mut new_writer = newtline.writer().await;
    4007            2 :         new_writer
    4008            2 :             .put(TEST_KEY_A, Lsn(0x40), &test_value("bar at 0x40"), &ctx)
    4009            2 :             .await?;
    4010            2 :         new_writer.finish_write(Lsn(0x40));
    4011            2 : 
    4012            2 :         // Check page contents on both branches
    4013            2 :         assert_eq!(
    4014            2 :             from_utf8(&tline.get(TEST_KEY_A, Lsn(0x40), &ctx).await?)?,
    4015            2 :             "foo at 0x40"
    4016            2 :         );
    4017            2 :         assert_eq!(
    4018            2 :             from_utf8(&newtline.get(TEST_KEY_A, Lsn(0x40), &ctx).await?)?,
    4019            2 :             "bar at 0x40"
    4020            2 :         );
    4021            2 :         assert_eq!(
    4022            2 :             from_utf8(&newtline.get(TEST_KEY_B, Lsn(0x40), &ctx).await?)?,
    4023            2 :             "foobar at 0x20"
    4024            2 :         );
    4025            2 : 
    4026            2 :         //assert_current_logical_size(&tline, Lsn(0x40));
    4027            2 : 
    4028            2 :         Ok(())
    4029            2 :     }
    4030              : 
    4031           20 :     async fn make_some_layers(
    4032           20 :         tline: &Timeline,
    4033           20 :         start_lsn: Lsn,
    4034           20 :         ctx: &RequestContext,
    4035           20 :     ) -> anyhow::Result<()> {
    4036           20 :         let mut lsn = start_lsn;
    4037              :         {
    4038           20 :             let mut writer = tline.writer().await;
    4039              :             // Create a relation on the timeline
    4040           20 :             writer
    4041           20 :                 .put(
    4042           20 :                     *TEST_KEY,
    4043           20 :                     lsn,
    4044           20 :                     &Value::Image(test_img(&format!("foo at {}", lsn))),
    4045           20 :                     ctx,
    4046           20 :                 )
    4047           10 :                 .await?;
    4048           20 :             writer.finish_write(lsn);
    4049           20 :             lsn += 0x10;
    4050           20 :             writer
    4051           20 :                 .put(
    4052           20 :                     *TEST_KEY,
    4053           20 :                     lsn,
    4054           20 :                     &Value::Image(test_img(&format!("foo at {}", lsn))),
    4055           20 :                     ctx,
    4056           20 :                 )
    4057            0 :                 .await?;
    4058           20 :             writer.finish_write(lsn);
    4059           20 :             lsn += 0x10;
    4060           20 :         }
    4061           20 :         tline.freeze_and_flush().await?;
    4062              :         {
    4063           20 :             let mut writer = tline.writer().await;
    4064           20 :             writer
    4065           20 :                 .put(
    4066           20 :                     *TEST_KEY,
    4067           20 :                     lsn,
    4068           20 :                     &Value::Image(test_img(&format!("foo at {}", lsn))),
    4069           20 :                     ctx,
    4070           20 :                 )
    4071           10 :                 .await?;
    4072           20 :             writer.finish_write(lsn);
    4073           20 :             lsn += 0x10;
    4074           20 :             writer
    4075           20 :                 .put(
    4076           20 :                     *TEST_KEY,
    4077           20 :                     lsn,
    4078           20 :                     &Value::Image(test_img(&format!("foo at {}", lsn))),
    4079           20 :                     ctx,
    4080           20 :                 )
    4081            0 :                 .await?;
    4082           20 :             writer.finish_write(lsn);
    4083           20 :         }
    4084           20 :         tline.freeze_and_flush().await
    4085           20 :     }
    4086              : 
    4087            2 :     #[tokio::test]
    4088            2 :     async fn test_prohibit_branch_creation_on_garbage_collected_data() -> anyhow::Result<()> {
    4089            2 :         let (tenant, ctx) =
    4090            2 :             TenantHarness::create("test_prohibit_branch_creation_on_garbage_collected_data")?
    4091            2 :                 .load()
    4092            2 :                 .await;
    4093            2 :         let tline = tenant
    4094            2 :             .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
    4095            7 :             .await?;
    4096            6 :         make_some_layers(tline.as_ref(), Lsn(0x20), &ctx).await?;
    4097            2 : 
    4098            2 :         // this removes layers before lsn 40 (50 minus 10), so there are two remaining layers, image and delta for 31-50
    4099            2 :         // FIXME: this doesn't actually remove any layer currently, given how the flushing
    4100            2 :         // and compaction works. But it does set the 'cutoff' point so that the cross check
    4101            2 :         // below should fail.
    4102            2 :         tenant
    4103            2 :             .gc_iteration(
    4104            2 :                 Some(TIMELINE_ID),
    4105            2 :                 0x10,
    4106            2 :                 Duration::ZERO,
    4107            2 :                 &CancellationToken::new(),
    4108            2 :                 &ctx,
    4109            2 :             )
    4110            2 :             .await?;
    4111            2 : 
    4112            2 :         // try to branch at lsn 25, should fail because we already garbage collected the data
    4113            2 :         match tenant
    4114            2 :             .branch_timeline_test(&tline, NEW_TIMELINE_ID, Some(Lsn(0x25)), &ctx)
    4115            2 :             .await
    4116            2 :         {
    4117            2 :             Ok(_) => panic!("branching should have failed"),
    4118            2 :             Err(err) => {
    4119            2 :                 let CreateTimelineError::AncestorLsn(err) = err else {
    4120            2 :                     panic!("wrong error type")
    4121            2 :                 };
    4122            2 :                 assert!(err.to_string().contains("invalid branch start lsn"));
    4123            2 :                 assert!(err
    4124            2 :                     .source()
    4125            2 :                     .unwrap()
    4126            2 :                     .to_string()
    4127            2 :                     .contains("we might've already garbage collected needed data"))
    4128            2 :             }
    4129            2 :         }
    4130            2 : 
    4131            2 :         Ok(())
    4132            2 :     }
    4133              : 
    4134            2 :     #[tokio::test]
    4135            2 :     async fn test_prohibit_branch_creation_on_pre_initdb_lsn() -> anyhow::Result<()> {
    4136            2 :         let (tenant, ctx) =
    4137            2 :             TenantHarness::create("test_prohibit_branch_creation_on_pre_initdb_lsn")?
    4138            2 :                 .load()
    4139            2 :                 .await;
    4140            2 : 
    4141            2 :         let tline = tenant
    4142            2 :             .create_test_timeline(TIMELINE_ID, Lsn(0x50), DEFAULT_PG_VERSION, &ctx)
    4143            7 :             .await?;
    4144            2 :         // try to branch at lsn 0x25, should fail because initdb lsn is 0x50
    4145            2 :         match tenant
    4146            2 :             .branch_timeline_test(&tline, NEW_TIMELINE_ID, Some(Lsn(0x25)), &ctx)
    4147            2 :             .await
    4148            2 :         {
    4149            2 :             Ok(_) => panic!("branching should have failed"),
    4150            2 :             Err(err) => {
    4151            2 :                 let CreateTimelineError::AncestorLsn(err) = err else {
    4152            2 :                     panic!("wrong error type");
    4153            2 :                 };
    4154            2 :                 assert!(&err.to_string().contains("invalid branch start lsn"));
    4155            2 :                 assert!(&err
    4156            2 :                     .source()
    4157            2 :                     .unwrap()
    4158            2 :                     .to_string()
    4159            2 :                     .contains("is earlier than latest GC horizon"));
    4160            2 :             }
    4161            2 :         }
    4162            2 : 
    4163            2 :         Ok(())
    4164            2 :     }
    4165              : 
    4166              :     /*
    4167              :     // FIXME: This currently fails to error out. Calling GC doesn't currently
    4168              :     // remove the old value, we'd need to work a little harder
    4169              :     #[tokio::test]
    4170              :     async fn test_prohibit_get_for_garbage_collected_data() -> anyhow::Result<()> {
    4171              :         let repo =
    4172              :             RepoHarness::create("test_prohibit_get_for_garbage_collected_data")?
    4173              :             .load();
    4174              : 
    4175              :         let tline = repo.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION)?;
    4176              :         make_some_layers(tline.as_ref(), Lsn(0x20), &ctx).await?;
    4177              : 
    4178              :         repo.gc_iteration(Some(TIMELINE_ID), 0x10, Duration::ZERO)?;
    4179              :         let latest_gc_cutoff_lsn = tline.get_latest_gc_cutoff_lsn();
    4180              :         assert!(*latest_gc_cutoff_lsn > Lsn(0x25));
    4181              :         match tline.get(*TEST_KEY, Lsn(0x25)) {
    4182              :             Ok(_) => panic!("request for page should have failed"),
    4183              :             Err(err) => assert!(err.to_string().contains("not found at")),
    4184              :         }
    4185              :         Ok(())
    4186              :     }
    4187              :      */
    4188              : 
    4189            2 :     #[tokio::test]
    4190            2 :     async fn test_get_branchpoints_from_an_inactive_timeline() -> anyhow::Result<()> {
    4191            2 :         let (tenant, ctx) =
    4192            2 :             TenantHarness::create("test_get_branchpoints_from_an_inactive_timeline")?
    4193            2 :                 .load()
    4194            2 :                 .await;
    4195            2 :         let tline = tenant
    4196            2 :             .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
    4197            7 :             .await?;
    4198            6 :         make_some_layers(tline.as_ref(), Lsn(0x20), &ctx).await?;
    4199            2 : 
    4200            2 :         tenant
    4201            2 :             .branch_timeline_test(&tline, NEW_TIMELINE_ID, Some(Lsn(0x40)), &ctx)
    4202            2 :             .await?;
    4203            2 :         let newtline = tenant
    4204            2 :             .get_timeline(NEW_TIMELINE_ID, true)
    4205            2 :             .expect("Should have a local timeline");
    4206            2 : 
    4207            6 :         make_some_layers(newtline.as_ref(), Lsn(0x60), &ctx).await?;
    4208            2 : 
    4209            2 :         tline.set_broken("test".to_owned());
    4210            2 : 
    4211            2 :         tenant
    4212            2 :             .gc_iteration(
    4213            2 :                 Some(TIMELINE_ID),
    4214            2 :                 0x10,
    4215            2 :                 Duration::ZERO,
    4216            2 :                 &CancellationToken::new(),
    4217            2 :                 &ctx,
    4218            2 :             )
    4219            2 :             .await?;
    4220            2 : 
    4221            2 :         // The branchpoints should contain all timelines, even ones marked
    4222            2 :         // as Broken.
    4223            2 :         {
    4224            2 :             let branchpoints = &tline.gc_info.read().unwrap().retain_lsns;
    4225            2 :             assert_eq!(branchpoints.len(), 1);
    4226            2 :             assert_eq!(branchpoints[0], Lsn(0x40));
    4227            2 :         }
    4228            2 : 
    4229            2 :         // You can read the key from the child branch even though the parent is
    4230            2 :         // Broken, as long as you don't need to access data from the parent.
    4231            2 :         assert_eq!(
    4232            4 :             newtline.get(*TEST_KEY, Lsn(0x70), &ctx).await?,
    4233            2 :             test_img(&format!("foo at {}", Lsn(0x70)))
    4234            2 :         );
    4235            2 : 
    4236            2 :         // This needs to traverse to the parent, and fails.
    4237            2 :         let err = newtline.get(*TEST_KEY, Lsn(0x50), &ctx).await.unwrap_err();
    4238            2 :         assert!(err
    4239            2 :             .to_string()
    4240            2 :             .contains("will not become active. Current state: Broken"));
    4241            2 : 
    4242            2 :         Ok(())
    4243            2 :     }
    4244              : 
    4245            2 :     #[tokio::test]
    4246            2 :     async fn test_retain_data_in_parent_which_is_needed_for_child() -> anyhow::Result<()> {
    4247            2 :         let (tenant, ctx) =
    4248            2 :             TenantHarness::create("test_retain_data_in_parent_which_is_needed_for_child")?
    4249            2 :                 .load()
    4250            2 :                 .await;
    4251            2 :         let tline = tenant
    4252            2 :             .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
    4253            6 :             .await?;
    4254            6 :         make_some_layers(tline.as_ref(), Lsn(0x20), &ctx).await?;
    4255            2 : 
    4256            2 :         tenant
    4257            2 :             .branch_timeline_test(&tline, NEW_TIMELINE_ID, Some(Lsn(0x40)), &ctx)
    4258            2 :             .await?;
    4259            2 :         let newtline = tenant
    4260            2 :             .get_timeline(NEW_TIMELINE_ID, true)
    4261            2 :             .expect("Should have a local timeline");
    4262            2 :         // this removes layers before lsn 40 (50 minus 10), so there are two remaining layers, image and delta for 31-50
    4263            2 :         tenant
    4264            2 :             .gc_iteration(
    4265            2 :                 Some(TIMELINE_ID),
    4266            2 :                 0x10,
    4267            2 :                 Duration::ZERO,
    4268            2 :                 &CancellationToken::new(),
    4269            2 :                 &ctx,
    4270            2 :             )
    4271            2 :             .await?;
    4272            4 :         assert!(newtline.get(*TEST_KEY, Lsn(0x25), &ctx).await.is_ok());
    4273            2 : 
    4274            2 :         Ok(())
    4275            2 :     }
    4276            2 :     #[tokio::test]
    4277            2 :     async fn test_parent_keeps_data_forever_after_branching() -> anyhow::Result<()> {
    4278            2 :         let (tenant, ctx) =
    4279            2 :             TenantHarness::create("test_parent_keeps_data_forever_after_branching")?
    4280            2 :                 .load()
    4281            2 :                 .await;
    4282            2 :         let tline = tenant
    4283            2 :             .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
    4284            7 :             .await?;
    4285            6 :         make_some_layers(tline.as_ref(), Lsn(0x20), &ctx).await?;
    4286            2 : 
    4287            2 :         tenant
    4288            2 :             .branch_timeline_test(&tline, NEW_TIMELINE_ID, Some(Lsn(0x40)), &ctx)
    4289            2 :             .await?;
    4290            2 :         let newtline = tenant
    4291            2 :             .get_timeline(NEW_TIMELINE_ID, true)
    4292            2 :             .expect("Should have a local timeline");
    4293            2 : 
    4294            6 :         make_some_layers(newtline.as_ref(), Lsn(0x60), &ctx).await?;
    4295            2 : 
    4296            2 :         // run gc on parent
    4297            2 :         tenant
    4298            2 :             .gc_iteration(
    4299            2 :                 Some(TIMELINE_ID),
    4300            2 :                 0x10,
    4301            2 :                 Duration::ZERO,
    4302            2 :                 &CancellationToken::new(),
    4303            2 :                 &ctx,
    4304            2 :             )
    4305            2 :             .await?;
    4306            2 : 
    4307            2 :         // Check that the data is still accessible on the branch.
    4308            2 :         assert_eq!(
    4309            7 :             newtline.get(*TEST_KEY, Lsn(0x50), &ctx).await?,
    4310            2 :             test_img(&format!("foo at {}", Lsn(0x40)))
    4311            2 :         );
    4312            2 : 
    4313            2 :         Ok(())
    4314            2 :     }
    4315              : 
    4316            2 :     #[tokio::test]
    4317            2 :     async fn timeline_load() -> anyhow::Result<()> {
    4318            2 :         const TEST_NAME: &str = "timeline_load";
    4319            2 :         let harness = TenantHarness::create(TEST_NAME)?;
    4320            2 :         {
    4321            2 :             let (tenant, ctx) = harness.load().await;
    4322            2 :             let tline = tenant
    4323            2 :                 .create_test_timeline(TIMELINE_ID, Lsn(0x7000), DEFAULT_PG_VERSION, &ctx)
    4324            7 :                 .await?;
    4325            6 :             make_some_layers(tline.as_ref(), Lsn(0x8000), &ctx).await?;
    4326            2 :             // so that all uploads finish & we can call harness.load() below again
    4327            2 :             tenant
    4328            2 :                 .shutdown(Default::default(), true)
    4329            2 :                 .instrument(harness.span())
    4330            2 :                 .await
    4331            2 :                 .ok()
    4332            2 :                 .unwrap();
    4333            2 :         }
    4334            2 : 
    4335           12 :         let (tenant, _ctx) = harness.load().await;
    4336            2 :         tenant
    4337            2 :             .get_timeline(TIMELINE_ID, true)
    4338            2 :             .expect("cannot load timeline");
    4339            2 : 
    4340            2 :         Ok(())
    4341            2 :     }
    4342              : 
    4343            2 :     #[tokio::test]
    4344            2 :     async fn timeline_load_with_ancestor() -> anyhow::Result<()> {
    4345            2 :         const TEST_NAME: &str = "timeline_load_with_ancestor";
    4346            2 :         let harness = TenantHarness::create(TEST_NAME)?;
    4347            2 :         // create two timelines
    4348            2 :         {
    4349            2 :             let (tenant, ctx) = harness.load().await;
    4350            2 :             let tline = tenant
    4351            2 :                 .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
    4352            6 :                 .await?;
    4353            2 : 
    4354            6 :             make_some_layers(tline.as_ref(), Lsn(0x20), &ctx).await?;
    4355            2 : 
    4356            2 :             let child_tline = tenant
    4357            2 :                 .branch_timeline_test(&tline, NEW_TIMELINE_ID, Some(Lsn(0x40)), &ctx)
    4358            2 :                 .await?;
    4359            2 :             child_tline.set_state(TimelineState::Active);
    4360            2 : 
    4361            2 :             let newtline = tenant
    4362            2 :                 .get_timeline(NEW_TIMELINE_ID, true)
    4363            2 :                 .expect("Should have a local timeline");
    4364            2 : 
    4365            6 :             make_some_layers(newtline.as_ref(), Lsn(0x60), &ctx).await?;
    4366            2 : 
    4367            2 :             // so that all uploads finish & we can call harness.load() below again
    4368            2 :             tenant
    4369            2 :                 .shutdown(Default::default(), true)
    4370            2 :                 .instrument(harness.span())
    4371            3 :                 .await
    4372            2 :                 .ok()
    4373            2 :                 .unwrap();
    4374            2 :         }
    4375            2 : 
    4376            2 :         // check that both of them are initially unloaded
    4377           16 :         let (tenant, _ctx) = harness.load().await;
    4378            2 : 
    4379            2 :         // check that both, child and ancestor are loaded
    4380            2 :         let _child_tline = tenant
    4381            2 :             .get_timeline(NEW_TIMELINE_ID, true)
    4382            2 :             .expect("cannot get child timeline loaded");
    4383            2 : 
    4384            2 :         let _ancestor_tline = tenant
    4385            2 :             .get_timeline(TIMELINE_ID, true)
    4386            2 :             .expect("cannot get ancestor timeline loaded");
    4387            2 : 
    4388            2 :         Ok(())
    4389            2 :     }
    4390              : 
    4391            2 :     #[tokio::test]
    4392            2 :     async fn delta_layer_dumping() -> anyhow::Result<()> {
    4393            2 :         use storage_layer::AsLayerDesc;
    4394            2 :         let (tenant, ctx) = TenantHarness::create("test_layer_dumping")?.load().await;
    4395            2 :         let tline = tenant
    4396            2 :             .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
    4397            8 :             .await?;
    4398            6 :         make_some_layers(tline.as_ref(), Lsn(0x20), &ctx).await?;
    4399            2 : 
    4400            2 :         let layer_map = tline.layers.read().await;
    4401            2 :         let level0_deltas = layer_map
    4402            2 :             .layer_map()
    4403            2 :             .get_level0_deltas()?
    4404            2 :             .into_iter()
    4405            4 :             .map(|desc| layer_map.get_from_desc(&desc))
    4406            2 :             .collect::<Vec<_>>();
    4407            2 : 
    4408            2 :         assert!(!level0_deltas.is_empty());
    4409            2 : 
    4410            6 :         for delta in level0_deltas {
    4411            2 :             // Ensure we are dumping a delta layer here
    4412            4 :             assert!(delta.layer_desc().is_delta);
    4413            8 :             delta.dump(true, &ctx).await.unwrap();
    4414            2 :         }
    4415            2 : 
    4416            2 :         Ok(())
    4417            2 :     }
    4418              : 
    4419            2 :     #[tokio::test]
    4420            2 :     async fn test_images() -> anyhow::Result<()> {
    4421            2 :         let (tenant, ctx) = TenantHarness::create("test_images")?.load().await;
    4422            2 :         let tline = tenant
    4423            2 :             .create_test_timeline(TIMELINE_ID, Lsn(0x08), DEFAULT_PG_VERSION, &ctx)
    4424            8 :             .await?;
    4425            2 : 
    4426            2 :         let mut writer = tline.writer().await;
    4427            2 :         writer
    4428            2 :             .put(
    4429            2 :                 *TEST_KEY,
    4430            2 :                 Lsn(0x10),
    4431            2 :                 &Value::Image(test_img("foo at 0x10")),
    4432            2 :                 &ctx,
    4433            2 :             )
    4434            2 :             .await?;
    4435            2 :         writer.finish_write(Lsn(0x10));
    4436            2 :         drop(writer);
    4437            2 : 
    4438            2 :         tline.freeze_and_flush().await?;
    4439            2 :         tline
    4440            2 :             .compact(&CancellationToken::new(), EnumSet::empty(), &ctx)
    4441            2 :             .await?;
    4442            2 : 
    4443            2 :         let mut writer = tline.writer().await;
    4444            2 :         writer
    4445            2 :             .put(
    4446            2 :                 *TEST_KEY,
    4447            2 :                 Lsn(0x20),
    4448            2 :                 &Value::Image(test_img("foo at 0x20")),
    4449            2 :                 &ctx,
    4450            2 :             )
    4451            2 :             .await?;
    4452            2 :         writer.finish_write(Lsn(0x20));
    4453            2 :         drop(writer);
    4454            2 : 
    4455            2 :         tline.freeze_and_flush().await?;
    4456            2 :         tline
    4457            2 :             .compact(&CancellationToken::new(), EnumSet::empty(), &ctx)
    4458            2 :             .await?;
    4459            2 : 
    4460            2 :         let mut writer = tline.writer().await;
    4461            2 :         writer
    4462            2 :             .put(
    4463            2 :                 *TEST_KEY,
    4464            2 :                 Lsn(0x30),
    4465            2 :                 &Value::Image(test_img("foo at 0x30")),
    4466            2 :                 &ctx,
    4467            2 :             )
    4468            2 :             .await?;
    4469            2 :         writer.finish_write(Lsn(0x30));
    4470            2 :         drop(writer);
    4471            2 : 
    4472            2 :         tline.freeze_and_flush().await?;
    4473            2 :         tline
    4474            2 :             .compact(&CancellationToken::new(), EnumSet::empty(), &ctx)
    4475            2 :             .await?;
    4476            2 : 
    4477            2 :         let mut writer = tline.writer().await;
    4478            2 :         writer
    4479            2 :             .put(
    4480            2 :                 *TEST_KEY,
    4481            2 :                 Lsn(0x40),
    4482            2 :                 &Value::Image(test_img("foo at 0x40")),
    4483            2 :                 &ctx,
    4484            2 :             )
    4485            2 :             .await?;
    4486            2 :         writer.finish_write(Lsn(0x40));
    4487            2 :         drop(writer);
    4488            2 : 
    4489            2 :         tline.freeze_and_flush().await?;
    4490            2 :         tline
    4491            2 :             .compact(&CancellationToken::new(), EnumSet::empty(), &ctx)
    4492            2 :             .await?;
    4493            2 : 
    4494            2 :         assert_eq!(
    4495            4 :             tline.get(*TEST_KEY, Lsn(0x10), &ctx).await?,
    4496            2 :             test_img("foo at 0x10")
    4497            2 :         );
    4498            2 :         assert_eq!(
    4499            3 :             tline.get(*TEST_KEY, Lsn(0x1f), &ctx).await?,
    4500            2 :             test_img("foo at 0x10")
    4501            2 :         );
    4502            2 :         assert_eq!(
    4503            2 :             tline.get(*TEST_KEY, Lsn(0x20), &ctx).await?,
    4504            2 :             test_img("foo at 0x20")
    4505            2 :         );
    4506            2 :         assert_eq!(
    4507            4 :             tline.get(*TEST_KEY, Lsn(0x30), &ctx).await?,
    4508            2 :             test_img("foo at 0x30")
    4509            2 :         );
    4510            2 :         assert_eq!(
    4511            4 :             tline.get(*TEST_KEY, Lsn(0x40), &ctx).await?,
    4512            2 :             test_img("foo at 0x40")
    4513            2 :         );
    4514            2 : 
    4515            2 :         Ok(())
    4516            2 :     }
    4517              : 
    4518            4 :     async fn bulk_insert_compact_gc(
    4519            4 :         timeline: Arc<Timeline>,
    4520            4 :         ctx: &RequestContext,
    4521            4 :         mut lsn: Lsn,
    4522            4 :         repeat: usize,
    4523            4 :         key_count: usize,
    4524            4 :     ) -> anyhow::Result<()> {
    4525            4 :         let mut test_key = Key::from_hex("010000000033333333444444445500000000").unwrap();
    4526            4 :         let mut blknum = 0;
    4527            4 : 
    4528            4 :         // Enforce that key range is monotonously increasing
    4529            4 :         let mut keyspace = KeySpaceAccum::new();
    4530            4 : 
    4531            4 :         for _ in 0..repeat {
    4532          200 :             for _ in 0..key_count {
    4533      2000000 :                 test_key.field6 = blknum;
    4534      2000000 :                 let mut writer = timeline.writer().await;
    4535      2000000 :                 writer
    4536      2000000 :                     .put(
    4537      2000000 :                         test_key,
    4538      2000000 :                         lsn,
    4539      2000000 :                         &Value::Image(test_img(&format!("{} at {}", blknum, lsn))),
    4540      2000000 :                         ctx,
    4541      2000000 :                     )
    4542        47594 :                     .await?;
    4543      2000000 :                 writer.finish_write(lsn);
    4544      2000000 :                 drop(writer);
    4545      2000000 : 
    4546      2000000 :                 keyspace.add_key(test_key);
    4547      2000000 : 
    4548      2000000 :                 lsn = Lsn(lsn.0 + 0x10);
    4549      2000000 :                 blknum += 1;
    4550              :             }
    4551              : 
    4552          200 :             let cutoff = timeline.get_last_record_lsn();
    4553          200 : 
    4554          200 :             timeline
    4555          200 :                 .update_gc_info(
    4556          200 :                     Vec::new(),
    4557          200 :                     cutoff,
    4558          200 :                     Duration::ZERO,
    4559          200 :                     &CancellationToken::new(),
    4560          200 :                     ctx,
    4561          200 :                 )
    4562            0 :                 .await?;
    4563          200 :             timeline.freeze_and_flush().await?;
    4564          200 :             timeline
    4565          200 :                 .compact(&CancellationToken::new(), EnumSet::empty(), ctx)
    4566        37280 :                 .await?;
    4567          200 :             timeline.gc().await?;
    4568              :         }
    4569              : 
    4570            4 :         Ok(())
    4571            4 :     }
    4572              : 
    4573              :     //
    4574              :     // Insert 1000 key-value pairs with increasing keys, flush, compact, GC.
    4575              :     // Repeat 50 times.
    4576              :     //
    4577            2 :     #[tokio::test]
    4578            2 :     async fn test_bulk_insert() -> anyhow::Result<()> {
    4579            2 :         let harness = TenantHarness::create("test_bulk_insert")?;
    4580            2 :         let (tenant, ctx) = harness.load().await;
    4581            2 :         let tline = tenant
    4582            2 :             .create_test_timeline(TIMELINE_ID, Lsn(0x08), DEFAULT_PG_VERSION, &ctx)
    4583            7 :             .await?;
    4584            2 : 
    4585            2 :         let lsn = Lsn(0x10);
    4586        50190 :         bulk_insert_compact_gc(tline.clone(), &ctx, lsn, 50, 10000).await?;
    4587            2 : 
    4588            2 :         Ok(())
    4589            2 :     }
    4590              : 
    4591              :     // Test the vectored get real implementation against a simple sequential implementation.
    4592              :     //
    4593              :     // The test generates a keyspace by repeatedly flushing the in-memory layer and compacting.
    4594              :     // Projected to 2D the key space looks like below. Lsn grows upwards on the Y axis and keys
    4595              :     // grow to the right on the X axis.
    4596              :     //                       [Delta]
    4597              :     //                 [Delta]
    4598              :     //           [Delta]
    4599              :     //    [Delta]
    4600              :     // ------------ Image ---------------
    4601              :     //
    4602              :     // After layer generation we pick the ranges to query as follows:
    4603              :     // 1. The beginning of each delta layer
    4604              :     // 2. At the seam between two adjacent delta layers
    4605              :     //
    4606              :     // There's one major downside to this test: delta layers only contains images,
    4607              :     // so the search can stop at the first delta layer and doesn't traverse any deeper.
    4608            2 :     #[tokio::test]
    4609            2 :     async fn test_get_vectored() -> anyhow::Result<()> {
    4610            2 :         let harness = TenantHarness::create("test_get_vectored")?;
    4611            2 :         let (tenant, ctx) = harness.load().await;
    4612            2 :         let tline = tenant
    4613            2 :             .create_test_timeline(TIMELINE_ID, Lsn(0x08), DEFAULT_PG_VERSION, &ctx)
    4614            7 :             .await?;
    4615            2 : 
    4616            2 :         let lsn = Lsn(0x10);
    4617        50190 :         bulk_insert_compact_gc(tline.clone(), &ctx, lsn, 50, 10000).await?;
    4618            2 : 
    4619            2 :         let guard = tline.layers.read().await;
    4620            2 :         guard.layer_map().dump(true, &ctx).await?;
    4621            2 : 
    4622            2 :         let mut reads = Vec::new();
    4623            2 :         let mut prev = None;
    4624           12 :         guard.layer_map().iter_historic_layers().for_each(|desc| {
    4625           12 :             if !desc.is_delta() {
    4626            2 :                 prev = Some(desc.clone());
    4627            2 :                 return;
    4628           10 :             }
    4629           10 : 
    4630           10 :             let start = desc.key_range.start;
    4631           10 :             let end = desc
    4632           10 :                 .key_range
    4633           10 :                 .start
    4634           10 :                 .add(Timeline::MAX_GET_VECTORED_KEYS.try_into().unwrap());
    4635           10 :             reads.push(KeySpace {
    4636           10 :                 ranges: vec![start..end],
    4637           10 :             });
    4638            2 : 
    4639           10 :             if let Some(prev) = &prev {
    4640           10 :                 if !prev.is_delta() {
    4641           10 :                     return;
    4642            2 :                 }
    4643            0 : 
    4644            0 :                 let first_range = Key {
    4645            0 :                     field6: prev.key_range.end.field6 - 4,
    4646            0 :                     ..prev.key_range.end
    4647            0 :                 }..prev.key_range.end;
    4648            0 : 
    4649            0 :                 let second_range = desc.key_range.start..Key {
    4650            0 :                     field6: desc.key_range.start.field6 + 4,
    4651            0 :                     ..desc.key_range.start
    4652            0 :                 };
    4653            0 : 
    4654            0 :                 reads.push(KeySpace {
    4655            0 :                     ranges: vec![first_range, second_range],
    4656            0 :                 });
    4657            2 :             };
    4658            2 : 
    4659            2 :             prev = Some(desc.clone());
    4660           12 :         });
    4661            2 : 
    4662            2 :         drop(guard);
    4663            2 : 
    4664            2 :         // Pick a big LSN such that we query over all the changes.
    4665            2 :         // Technically, u64::MAX - 1 is the largest LSN supported by the read path,
    4666            2 :         // but there seems to be a bug on the non-vectored search path which surfaces
    4667            2 :         // in that case.
    4668            2 :         let reads_lsn = Lsn(u64::MAX - 1000);
    4669            2 : 
    4670           12 :         for read in reads {
    4671           10 :             info!("Doing vectored read on {:?}", read);
    4672            2 : 
    4673           28 :             let vectored_res = tline.get_vectored_impl(read.clone(), reads_lsn, &ctx).await;
    4674           10 :             tline
    4675           10 :                 .validate_get_vectored_impl(&vectored_res, read, reads_lsn, &ctx)
    4676           20 :                 .await;
    4677            2 :         }
    4678            2 : 
    4679            2 :         Ok(())
    4680            2 :     }
    4681              : 
    4682            2 :     #[tokio::test]
    4683            2 :     async fn test_random_updates() -> anyhow::Result<()> {
    4684            2 :         let harness = TenantHarness::create("test_random_updates")?;
    4685            2 :         let (tenant, ctx) = harness.load().await;
    4686            2 :         let tline = tenant
    4687            2 :             .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
    4688            7 :             .await?;
    4689            2 : 
    4690            2 :         const NUM_KEYS: usize = 1000;
    4691            2 : 
    4692            2 :         let mut test_key = Key::from_hex("010000000033333333444444445500000000").unwrap();
    4693            2 : 
    4694            2 :         let mut keyspace = KeySpaceAccum::new();
    4695            2 : 
    4696            2 :         // Track when each page was last modified. Used to assert that
    4697            2 :         // a read sees the latest page version.
    4698            2 :         let mut updated = [Lsn(0); NUM_KEYS];
    4699            2 : 
    4700            2 :         let mut lsn = Lsn(0x10);
    4701            2 :         #[allow(clippy::needless_range_loop)]
    4702         2002 :         for blknum in 0..NUM_KEYS {
    4703         2000 :             lsn = Lsn(lsn.0 + 0x10);
    4704         2000 :             test_key.field6 = blknum as u32;
    4705         2000 :             let mut writer = tline.writer().await;
    4706         2000 :             writer
    4707         2000 :                 .put(
    4708         2000 :                     test_key,
    4709         2000 :                     lsn,
    4710         2000 :                     &Value::Image(test_img(&format!("{} at {}", blknum, lsn))),
    4711         2000 :                     &ctx,
    4712         2000 :                 )
    4713           46 :                 .await?;
    4714         2000 :             writer.finish_write(lsn);
    4715         2000 :             updated[blknum] = lsn;
    4716         2000 :             drop(writer);
    4717         2000 : 
    4718         2000 :             keyspace.add_key(test_key);
    4719            2 :         }
    4720            2 : 
    4721          102 :         for _ in 0..50 {
    4722       100100 :             for _ in 0..NUM_KEYS {
    4723       100000 :                 lsn = Lsn(lsn.0 + 0x10);
    4724       100000 :                 let blknum = thread_rng().gen_range(0..NUM_KEYS);
    4725       100000 :                 test_key.field6 = blknum as u32;
    4726       100000 :                 let mut writer = tline.writer().await;
    4727       100000 :                 writer
    4728       100000 :                     .put(
    4729       100000 :                         test_key,
    4730       100000 :                         lsn,
    4731       100000 :                         &Value::Image(test_img(&format!("{} at {}", blknum, lsn))),
    4732       100000 :                         &ctx,
    4733       100000 :                     )
    4734         2493 :                     .await?;
    4735       100000 :                 writer.finish_write(lsn);
    4736       100000 :                 drop(writer);
    4737       100000 :                 updated[blknum] = lsn;
    4738            2 :             }
    4739            2 : 
    4740            2 :             // Read all the blocks
    4741       100000 :             for (blknum, last_lsn) in updated.iter().enumerate() {
    4742       100000 :                 test_key.field6 = blknum as u32;
    4743       100000 :                 assert_eq!(
    4744       100000 :                     tline.get(test_key, lsn, &ctx).await?,
    4745       100000 :                     test_img(&format!("{} at {}", blknum, last_lsn))
    4746            2 :                 );
    4747            2 :             }
    4748            2 : 
    4749            2 :             // Perform a cycle of flush, compact, and GC
    4750          100 :             let cutoff = tline.get_last_record_lsn();
    4751          100 :             tline
    4752          100 :                 .update_gc_info(
    4753          100 :                     Vec::new(),
    4754          100 :                     cutoff,
    4755          100 :                     Duration::ZERO,
    4756          100 :                     &CancellationToken::new(),
    4757          100 :                     &ctx,
    4758          100 :                 )
    4759            2 :                 .await?;
    4760          101 :             tline.freeze_and_flush().await?;
    4761          100 :             tline
    4762          100 :                 .compact(&CancellationToken::new(), EnumSet::empty(), &ctx)
    4763         2271 :                 .await?;
    4764          100 :             tline.gc().await?;
    4765            2 :         }
    4766            2 : 
    4767            2 :         Ok(())
    4768            2 :     }
    4769              : 
    4770            2 :     #[tokio::test]
    4771            2 :     async fn test_traverse_branches() -> anyhow::Result<()> {
    4772            2 :         let (tenant, ctx) = TenantHarness::create("test_traverse_branches")?
    4773            2 :             .load()
    4774            2 :             .await;
    4775            2 :         let mut tline = tenant
    4776            2 :             .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
    4777            6 :             .await?;
    4778            2 : 
    4779            2 :         const NUM_KEYS: usize = 1000;
    4780            2 : 
    4781            2 :         let mut test_key = Key::from_hex("010000000033333333444444445500000000").unwrap();
    4782            2 : 
    4783            2 :         let mut keyspace = KeySpaceAccum::new();
    4784            2 : 
    4785            2 :         // Track when each page was last modified. Used to assert that
    4786            2 :         // a read sees the latest page version.
    4787            2 :         let mut updated = [Lsn(0); NUM_KEYS];
    4788            2 : 
    4789            2 :         let mut lsn = Lsn(0x10);
    4790            2 :         #[allow(clippy::needless_range_loop)]
    4791         2002 :         for blknum in 0..NUM_KEYS {
    4792         2000 :             lsn = Lsn(lsn.0 + 0x10);
    4793         2000 :             test_key.field6 = blknum as u32;
    4794         2000 :             let mut writer = tline.writer().await;
    4795         2000 :             writer
    4796         2000 :                 .put(
    4797         2000 :                     test_key,
    4798         2000 :                     lsn,
    4799         2000 :                     &Value::Image(test_img(&format!("{} at {}", blknum, lsn))),
    4800         2000 :                     &ctx,
    4801         2000 :                 )
    4802           46 :                 .await?;
    4803         2000 :             writer.finish_write(lsn);
    4804         2000 :             updated[blknum] = lsn;
    4805         2000 :             drop(writer);
    4806         2000 : 
    4807         2000 :             keyspace.add_key(test_key);
    4808            2 :         }
    4809            2 : 
    4810          102 :         for _ in 0..50 {
    4811          100 :             let new_tline_id = TimelineId::generate();
    4812          100 :             tenant
    4813          100 :                 .branch_timeline_test(&tline, new_tline_id, Some(lsn), &ctx)
    4814          103 :                 .await?;
    4815          100 :             tline = tenant
    4816          100 :                 .get_timeline(new_tline_id, true)
    4817          100 :                 .expect("Should have the branched timeline");
    4818            2 : 
    4819       100100 :             for _ in 0..NUM_KEYS {
    4820       100000 :                 lsn = Lsn(lsn.0 + 0x10);
    4821       100000 :                 let blknum = thread_rng().gen_range(0..NUM_KEYS);
    4822       100000 :                 test_key.field6 = blknum as u32;
    4823       100000 :                 let mut writer = tline.writer().await;
    4824       100000 :                 writer
    4825       100000 :                     .put(
    4826       100000 :                         test_key,
    4827       100000 :                         lsn,
    4828       100000 :                         &Value::Image(test_img(&format!("{} at {}", blknum, lsn))),
    4829       100000 :                         &ctx,
    4830       100000 :                     )
    4831         2433 :                     .await?;
    4832       100000 :                 println!("updating {} at {}", blknum, lsn);
    4833       100000 :                 writer.finish_write(lsn);
    4834       100000 :                 drop(writer);
    4835       100000 :                 updated[blknum] = lsn;
    4836            2 :             }
    4837            2 : 
    4838            2 :             // Read all the blocks
    4839       100000 :             for (blknum, last_lsn) in updated.iter().enumerate() {
    4840       100000 :                 test_key.field6 = blknum as u32;
    4841       100000 :                 assert_eq!(
    4842       100000 :                     tline.get(test_key, lsn, &ctx).await?,
    4843       100000 :                     test_img(&format!("{} at {}", blknum, last_lsn))
    4844            2 :                 );
    4845            2 :             }
    4846            2 : 
    4847            2 :             // Perform a cycle of flush, compact, and GC
    4848          100 :             let cutoff = tline.get_last_record_lsn();
    4849          100 :             tline
    4850          100 :                 .update_gc_info(
    4851          100 :                     Vec::new(),
    4852          100 :                     cutoff,
    4853          100 :                     Duration::ZERO,
    4854          100 :                     &CancellationToken::new(),
    4855          100 :                     &ctx,
    4856          100 :                 )
    4857            2 :                 .await?;
    4858          101 :             tline.freeze_and_flush().await?;
    4859          100 :             tline
    4860          100 :                 .compact(&CancellationToken::new(), EnumSet::empty(), &ctx)
    4861        13252 :                 .await?;
    4862          100 :             tline.gc().await?;
    4863            2 :         }
    4864            2 : 
    4865            2 :         Ok(())
    4866            2 :     }
    4867              : 
    4868            2 :     #[tokio::test]
    4869            2 :     async fn test_traverse_ancestors() -> anyhow::Result<()> {
    4870            2 :         let (tenant, ctx) = TenantHarness::create("test_traverse_ancestors")?
    4871            2 :             .load()
    4872            2 :             .await;
    4873            2 :         let mut tline = tenant
    4874            2 :             .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
    4875            7 :             .await?;
    4876            2 : 
    4877            2 :         const NUM_KEYS: usize = 100;
    4878            2 :         const NUM_TLINES: usize = 50;
    4879            2 : 
    4880            2 :         let mut test_key = Key::from_hex("010000000033333333444444445500000000").unwrap();
    4881            2 :         // Track page mutation lsns across different timelines.
    4882            2 :         let mut updated = [[Lsn(0); NUM_KEYS]; NUM_TLINES];
    4883            2 : 
    4884            2 :         let mut lsn = Lsn(0x10);
    4885            2 : 
    4886            2 :         #[allow(clippy::needless_range_loop)]
    4887          102 :         for idx in 0..NUM_TLINES {
    4888          100 :             let new_tline_id = TimelineId::generate();
    4889          100 :             tenant
    4890          100 :                 .branch_timeline_test(&tline, new_tline_id, Some(lsn), &ctx)
    4891          102 :                 .await?;
    4892          100 :             tline = tenant
    4893          100 :                 .get_timeline(new_tline_id, true)
    4894          100 :                 .expect("Should have the branched timeline");
    4895            2 : 
    4896        10100 :             for _ in 0..NUM_KEYS {
    4897        10000 :                 lsn = Lsn(lsn.0 + 0x10);
    4898        10000 :                 let blknum = thread_rng().gen_range(0..NUM_KEYS);
    4899        10000 :                 test_key.field6 = blknum as u32;
    4900        10000 :                 let mut writer = tline.writer().await;
    4901        10000 :                 writer
    4902        10000 :                     .put(
    4903        10000 :                         test_key,
    4904        10000 :                         lsn,
    4905        10000 :                         &Value::Image(test_img(&format!("{} {} at {}", idx, blknum, lsn))),
    4906        10000 :                         &ctx,
    4907        10000 :                     )
    4908          319 :                     .await?;
    4909        10000 :                 println!("updating [{}][{}] at {}", idx, blknum, lsn);
    4910        10000 :                 writer.finish_write(lsn);
    4911        10000 :                 drop(writer);
    4912        10000 :                 updated[idx][blknum] = lsn;
    4913            2 :             }
    4914            2 :         }
    4915            2 : 
    4916            2 :         // Read pages from leaf timeline across all ancestors.
    4917          100 :         for (idx, lsns) in updated.iter().enumerate() {
    4918        10000 :             for (blknum, lsn) in lsns.iter().enumerate() {
    4919            2 :                 // Skip empty mutations.
    4920        10000 :                 if lsn.0 == 0 {
    4921         3630 :                     continue;
    4922         6370 :                 }
    4923         6370 :                 println!("checking [{idx}][{blknum}] at {lsn}");
    4924         6370 :                 test_key.field6 = blknum as u32;
    4925         6370 :                 assert_eq!(
    4926         6370 :                     tline.get(test_key, *lsn, &ctx).await?,
    4927         6370 :                     test_img(&format!("{idx} {blknum} at {lsn}"))
    4928            2 :                 );
    4929            2 :             }
    4930            2 :         }
    4931            2 :         Ok(())
    4932            2 :     }
    4933              : 
    4934            2 :     #[tokio::test]
    4935            2 :     async fn test_write_at_initdb_lsn_takes_optimization_code_path() -> anyhow::Result<()> {
    4936            2 :         let (tenant, ctx) = TenantHarness::create("test_empty_test_timeline_is_usable")?
    4937            2 :             .load()
    4938            2 :             .await;
    4939            2 : 
    4940            2 :         let initdb_lsn = Lsn(0x20);
    4941            2 :         let utline = tenant
    4942            2 :             .create_empty_timeline(TIMELINE_ID, initdb_lsn, DEFAULT_PG_VERSION, &ctx)
    4943            3 :             .await?;
    4944            2 :         let tline = utline.raw_timeline().unwrap();
    4945            2 : 
    4946            2 :         // Spawn flush loop now so that we can set the `expect_initdb_optimization`
    4947            2 :         tline.maybe_spawn_flush_loop();
    4948            2 : 
    4949            2 :         // Make sure the timeline has the minimum set of required keys for operation.
    4950            2 :         // The only operation you can always do on an empty timeline is to `put` new data.
    4951            2 :         // Except if you `put` at `initdb_lsn`.
    4952            2 :         // In that case, there's an optimization to directly create image layers instead of delta layers.
    4953            2 :         // It uses `repartition()`, which assumes some keys to be present.
    4954            2 :         // Let's make sure the test timeline can handle that case.
    4955            2 :         {
    4956            2 :             let mut state = tline.flush_loop_state.lock().unwrap();
    4957            2 :             assert_eq!(
    4958            2 :                 timeline::FlushLoopState::Running {
    4959            2 :                     expect_initdb_optimization: false,
    4960            2 :                     initdb_optimization_count: 0,
    4961            2 :                 },
    4962            2 :                 *state
    4963            2 :             );
    4964            2 :             *state = timeline::FlushLoopState::Running {
    4965            2 :                 expect_initdb_optimization: true,
    4966            2 :                 initdb_optimization_count: 0,
    4967            2 :             };
    4968            2 :         }
    4969            2 : 
    4970            2 :         // Make writes at the initdb_lsn. When we flush it below, it should be handled by the optimization.
    4971            2 :         // As explained above, the optimization requires some keys to be present.
    4972            2 :         // As per `create_empty_timeline` documentation, use init_empty to set them.
    4973            2 :         // This is what `create_test_timeline` does, by the way.
    4974            2 :         let mut modification = tline.begin_modification(initdb_lsn);
    4975            2 :         modification
    4976            2 :             .init_empty_test_timeline()
    4977            2 :             .context("init_empty_test_timeline")?;
    4978            2 :         modification
    4979            2 :             .commit(&ctx)
    4980            2 :             .await
    4981            2 :             .context("commit init_empty_test_timeline modification")?;
    4982            2 : 
    4983            2 :         // Do the flush. The flush code will check the expectations that we set above.
    4984            2 :         tline.freeze_and_flush().await?;
    4985            2 : 
    4986            2 :         // assert freeze_and_flush exercised the initdb optimization
    4987            2 :         {
    4988            2 :             let state = tline.flush_loop_state.lock().unwrap();
    4989            2 :             let timeline::FlushLoopState::Running {
    4990            2 :                 expect_initdb_optimization,
    4991            2 :                 initdb_optimization_count,
    4992            2 :             } = *state
    4993            2 :             else {
    4994            2 :                 panic!("unexpected state: {:?}", *state);
    4995            2 :             };
    4996            2 :             assert!(expect_initdb_optimization);
    4997            2 :             assert!(initdb_optimization_count > 0);
    4998            2 :         }
    4999            2 :         Ok(())
    5000            2 :     }
    5001              : 
    5002            2 :     #[tokio::test]
    5003            2 :     async fn test_uninit_mark_crash() -> anyhow::Result<()> {
    5004            2 :         let name = "test_uninit_mark_crash";
    5005            2 :         let harness = TenantHarness::create(name)?;
    5006            2 :         {
    5007            2 :             let (tenant, ctx) = harness.load().await;
    5008            2 :             let tline = tenant
    5009            2 :                 .create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)
    5010            3 :                 .await?;
    5011            2 :             // Keeps uninit mark in place
    5012            2 :             let raw_tline = tline.raw_timeline().unwrap();
    5013            2 :             raw_tline
    5014            2 :                 .shutdown()
    5015            2 :                 .instrument(info_span!("test_shutdown", tenant_id=%raw_tline.tenant_shard_id, shard_id=%raw_tline.tenant_shard_id.shard_slug(), timeline_id=%TIMELINE_ID))
    5016            2 :                 .await;
    5017            2 :             std::mem::forget(tline);
    5018            2 :         }
    5019            2 : 
    5020            2 :         let (tenant, _) = harness.load().await;
    5021            2 :         match tenant.get_timeline(TIMELINE_ID, false) {
    5022            2 :             Ok(_) => panic!("timeline should've been removed during load"),
    5023            2 :             Err(e) => {
    5024            2 :                 assert_eq!(
    5025            2 :                     e,
    5026            2 :                     GetTimelineError::NotFound {
    5027            2 :                         tenant_id: tenant.tenant_shard_id,
    5028            2 :                         timeline_id: TIMELINE_ID,
    5029            2 :                     }
    5030            2 :                 )
    5031            2 :             }
    5032            2 :         }
    5033            2 : 
    5034            2 :         assert!(!harness
    5035            2 :             .conf
    5036            2 :             .timeline_path(&tenant.tenant_shard_id, &TIMELINE_ID)
    5037            2 :             .exists());
    5038            2 : 
    5039            2 :         assert!(!harness
    5040            2 :             .conf
    5041            2 :             .timeline_uninit_mark_file_path(tenant.tenant_shard_id, TIMELINE_ID)
    5042            2 :             .exists());
    5043            2 : 
    5044            2 :         Ok(())
    5045            2 :     }
    5046              : }
        

Generated by: LCOV version 2.1-beta