LCOV - differential code coverage report
Current view: top level - pageserver/src - tenant.rs (source / functions) Coverage Total Hit LBC UBC GBC CBC
Current: cd44433dd675caa99df17a61b18949c8387e2242.info Lines: 85.7 % 3333 2857 16 460 11 2846
Current Date: 2024-01-09 02:06:09 Functions: 69.8 % 387 270 3 114 3 267
Baseline: 66c52a629a0f4a503e193045e0df4c77139e344b.info
Baseline Date: 2024-01-08 15:34:46

           TLA  Line data    Source code
       1                 : //!
       2                 : //! Timeline repository implementation that keeps old data in files on disk, and
       3                 : //! the recent changes in memory. See tenant/*_layer.rs files.
       4                 : //! The functions here are responsible for locating the correct layer for the
       5                 : //! get/put call, walking back the timeline branching history as needed.
       6                 : //!
       7                 : //! The files are stored in the .neon/tenants/<tenant_id>/timelines/<timeline_id>
       8                 : //! directory. See docs/pageserver-storage.md for how the files are managed.
       9                 : //! In addition to the layer files, there is a metadata file in the same
      10                 : //! directory that contains information about the timeline, in particular its
      11                 : //! parent timeline, and the last LSN that has been written to disk.
      12                 : //!
      13                 : 
      14                 : use anyhow::{bail, Context};
      15                 : use camino::{Utf8Path, Utf8PathBuf};
      16                 : use enumset::EnumSet;
      17                 : use futures::stream::FuturesUnordered;
      18                 : use futures::FutureExt;
      19                 : use futures::StreamExt;
      20                 : use pageserver_api::models::TimelineState;
      21                 : use pageserver_api::shard::ShardIdentity;
      22                 : use pageserver_api::shard::TenantShardId;
      23                 : use remote_storage::DownloadError;
      24                 : use remote_storage::GenericRemoteStorage;
      25                 : use std::fmt;
      26                 : use storage_broker::BrokerClientChannel;
      27                 : use tokio::io::BufReader;
      28                 : use tokio::runtime::Handle;
      29                 : use tokio::sync::watch;
      30                 : use tokio::task::JoinSet;
      31                 : use tokio_util::sync::CancellationToken;
      32                 : use tracing::*;
      33                 : use utils::backoff;
      34                 : use utils::completion;
      35                 : use utils::crashsafe::path_with_suffix_extension;
      36                 : use utils::failpoint_support;
      37                 : use utils::fs_ext;
      38                 : use utils::sync::gate::Gate;
      39                 : use utils::sync::gate::GateGuard;
      40                 : use utils::timeout::timeout_cancellable;
      41                 : use utils::timeout::TimeoutCancellableError;
      42                 : 
      43                 : use self::config::AttachedLocationConfig;
      44                 : use self::config::AttachmentMode;
      45                 : use self::config::LocationConf;
      46                 : use self::config::TenantConf;
      47                 : use self::delete::DeleteTenantFlow;
      48                 : use self::metadata::LoadMetadataError;
      49                 : use self::metadata::TimelineMetadata;
      50                 : use self::mgr::GetActiveTenantError;
      51                 : use self::mgr::GetTenantError;
      52                 : use self::mgr::TenantsMap;
      53                 : use self::remote_timeline_client::RemoteTimelineClient;
      54                 : use self::timeline::uninit::TimelineExclusionError;
      55                 : use self::timeline::uninit::TimelineUninitMark;
      56                 : use self::timeline::uninit::UninitializedTimeline;
      57                 : use self::timeline::EvictionTaskTenantState;
      58                 : use self::timeline::TimelineResources;
      59                 : use self::timeline::WaitLsnError;
      60                 : use crate::config::PageServerConf;
      61                 : use crate::context::{DownloadBehavior, RequestContext};
      62                 : use crate::deletion_queue::DeletionQueueClient;
      63                 : use crate::deletion_queue::DeletionQueueError;
      64                 : use crate::import_datadir;
      65                 : use crate::is_uninit_mark;
      66                 : use crate::metrics::TENANT;
      67                 : use crate::metrics::{remove_tenant_metrics, TENANT_STATE_METRIC, TENANT_SYNTHETIC_SIZE_METRIC};
      68                 : use crate::repository::GcResult;
      69                 : use crate::task_mgr;
      70                 : use crate::task_mgr::TaskKind;
      71                 : use crate::tenant::config::LocationMode;
      72                 : use crate::tenant::config::TenantConfOpt;
      73                 : use crate::tenant::metadata::load_metadata;
      74                 : pub use crate::tenant::remote_timeline_client::index::IndexPart;
      75                 : use crate::tenant::remote_timeline_client::MaybeDeletedIndexPart;
      76                 : use crate::tenant::remote_timeline_client::INITDB_PATH;
      77                 : use crate::tenant::storage_layer::DeltaLayer;
      78                 : use crate::tenant::storage_layer::ImageLayer;
      79                 : use crate::InitializationOrder;
      80                 : use std::cmp::min;
      81                 : use std::collections::hash_map::Entry;
      82                 : use std::collections::BTreeSet;
      83                 : use std::collections::HashMap;
      84                 : use std::collections::HashSet;
      85                 : use std::fmt::Debug;
      86                 : use std::fmt::Display;
      87                 : use std::fs;
      88                 : use std::fs::File;
      89                 : use std::io;
      90                 : use std::ops::Bound::Included;
      91                 : use std::process::Stdio;
      92                 : use std::sync::atomic::AtomicU64;
      93                 : use std::sync::atomic::Ordering;
      94                 : use std::sync::Arc;
      95                 : use std::sync::{Mutex, RwLock};
      96                 : use std::time::{Duration, Instant};
      97                 : 
      98                 : use crate::tenant::timeline::delete::DeleteTimelineFlow;
      99                 : use crate::tenant::timeline::uninit::cleanup_timeline_directory;
     100                 : use crate::virtual_file::VirtualFile;
     101                 : use crate::walredo::PostgresRedoManager;
     102                 : use crate::TEMP_FILE_SUFFIX;
     103                 : use once_cell::sync::Lazy;
     104                 : pub use pageserver_api::models::TenantState;
     105                 : use tokio::sync::Semaphore;
     106                 : 
     107 CBC         337 : static INIT_DB_SEMAPHORE: Lazy<Semaphore> = Lazy::new(|| Semaphore::new(8));
     108                 : use toml_edit;
     109                 : use utils::{
     110                 :     crashsafe,
     111                 :     generation::Generation,
     112                 :     id::{TenantId, TimelineId},
     113                 :     lsn::{Lsn, RecordLsn},
     114                 : };
     115                 : 
     116                 : /// Declare a failpoint that can use the `pause` failpoint action.
     117                 : /// We don't want to block the executor thread, hence, spawn_blocking + await.
     118                 : macro_rules! pausable_failpoint {
     119                 :     ($name:literal) => {
     120                 :         if cfg!(feature = "testing") {
     121                 :             tokio::task::spawn_blocking({
     122                 :                 let current = tracing::Span::current();
     123                 :                 move || {
     124                 :                     let _entered = current.entered();
     125                 :                     tracing::info!("at failpoint {}", $name);
     126                 :                     fail::fail_point!($name);
     127                 :                 }
     128                 :             })
     129                 :             .await
     130                 :             .expect("spawn_blocking");
     131                 :         }
     132                 :     };
     133                 : }
     134                 : 
     135                 : pub mod blob_io;
     136                 : pub mod block_io;
     137                 : 
     138                 : pub mod disk_btree;
     139                 : pub(crate) mod ephemeral_file;
     140                 : pub mod layer_map;
     141                 : mod span;
     142                 : 
     143                 : pub mod metadata;
     144                 : mod par_fsync;
     145                 : pub mod remote_timeline_client;
     146                 : pub mod storage_layer;
     147                 : 
     148                 : pub mod config;
     149                 : pub mod delete;
     150                 : pub mod mgr;
     151                 : pub mod secondary;
     152                 : pub mod tasks;
     153                 : pub mod upload_queue;
     154                 : 
     155                 : pub(crate) mod timeline;
     156                 : 
     157                 : pub mod size;
     158                 : 
     159                 : pub(crate) use timeline::span::debug_assert_current_span_has_tenant_and_timeline_id;
     160                 : pub(crate) use timeline::{LogicalSizeCalculationCause, PageReconstructError, Timeline};
     161                 : 
     162                 : // re-export for use in remote_timeline_client.rs
     163                 : pub use crate::tenant::metadata::save_metadata;
     164                 : 
     165                 : // re-export for use in walreceiver
     166                 : pub use crate::tenant::timeline::WalReceiverInfo;
     167                 : 
     168                 : /// The "tenants" part of `tenants/<tenant>/timelines...`
     169                 : pub const TENANTS_SEGMENT_NAME: &str = "tenants";
     170                 : 
     171                 : /// Parts of the `.neon/tenants/<tenant_id>/timelines/<timeline_id>` directory prefix.
     172                 : pub const TIMELINES_SEGMENT_NAME: &str = "timelines";
     173                 : 
     174                 : pub const TENANT_DELETED_MARKER_FILE_NAME: &str = "deleted";
     175                 : 
     176                 : /// References to shared objects that are passed into each tenant, such
     177                 : /// as the shared remote storage client and process initialization state.
     178             273 : #[derive(Clone)]
     179                 : pub struct TenantSharedResources {
     180                 :     pub broker_client: storage_broker::BrokerClientChannel,
     181                 :     pub remote_storage: Option<GenericRemoteStorage>,
     182                 :     pub deletion_queue_client: DeletionQueueClient,
     183                 : }
     184                 : 
     185                 : /// A [`Tenant`] is really an _attached_ tenant.  The configuration
     186                 : /// for an attached tenant is a subset of the [`LocationConf`], represented
     187                 : /// in this struct.
     188                 : pub(super) struct AttachedTenantConf {
     189                 :     tenant_conf: TenantConfOpt,
     190                 :     location: AttachedLocationConfig,
     191                 : }
     192                 : 
     193                 : impl AttachedTenantConf {
     194             798 :     fn try_from(location_conf: LocationConf) -> anyhow::Result<Self> {
     195             798 :         match &location_conf.mode {
     196             798 :             LocationMode::Attached(attach_conf) => Ok(Self {
     197             798 :                 tenant_conf: location_conf.tenant_conf,
     198             798 :                 location: attach_conf.clone(),
     199             798 :             }),
     200                 :             LocationMode::Secondary(_) => {
     201 UBC           0 :                 anyhow::bail!("Attempted to construct AttachedTenantConf from a LocationConf in secondary mode")
     202                 :             }
     203                 :         }
     204 CBC         798 :     }
     205                 : }
     206                 : struct TimelinePreload {
     207                 :     timeline_id: TimelineId,
     208                 :     client: RemoteTimelineClient,
     209                 :     index_part: Result<MaybeDeletedIndexPart, DownloadError>,
     210                 : }
     211                 : 
     212                 : pub(crate) struct TenantPreload {
     213                 :     deleting: bool,
     214                 :     timelines: HashMap<TimelineId, TimelinePreload>,
     215                 : }
     216                 : 
     217                 : /// When we spawn a tenant, there is a special mode for tenant creation that
     218                 : /// avoids trying to read anything from remote storage.
     219                 : pub(crate) enum SpawnMode {
     220                 :     Normal,
     221                 :     Create,
     222                 : }
     223                 : 
     224                 : ///
     225                 : /// Tenant consists of multiple timelines. Keep them in a hash table.
     226                 : ///
     227                 : pub struct Tenant {
     228                 :     // Global pageserver config parameters
     229                 :     pub conf: &'static PageServerConf,
     230                 : 
     231                 :     /// The value creation timestamp, used to measure activation delay, see:
     232                 :     /// <https://github.com/neondatabase/neon/issues/4025>
     233                 :     constructed_at: Instant,
     234                 : 
     235                 :     state: watch::Sender<TenantState>,
     236                 : 
     237                 :     // Overridden tenant-specific config parameters.
     238                 :     // We keep TenantConfOpt sturct here to preserve the information
     239                 :     // about parameters that are not set.
     240                 :     // This is necessary to allow global config updates.
     241                 :     tenant_conf: Arc<RwLock<AttachedTenantConf>>,
     242                 : 
     243                 :     tenant_shard_id: TenantShardId,
     244                 : 
     245                 :     // The detailed sharding information, beyond the number/count in tenant_shard_id
     246                 :     shard_identity: ShardIdentity,
     247                 : 
     248                 :     /// The remote storage generation, used to protect S3 objects from split-brain.
     249                 :     /// Does not change over the lifetime of the [`Tenant`] object.
     250                 :     ///
     251                 :     /// This duplicates the generation stored in LocationConf, but that structure is mutable:
     252                 :     /// this copy enforces the invariant that generatio doesn't change during a Tenant's lifetime.
     253                 :     generation: Generation,
     254                 : 
     255                 :     timelines: Mutex<HashMap<TimelineId, Arc<Timeline>>>,
     256                 : 
     257                 :     /// During timeline creation, we first insert the TimelineId to the
     258                 :     /// creating map, then `timelines`, then remove it from the creating map.
     259                 :     /// **Lock order**: if acquring both, acquire`timelines` before `timelines_creating`
     260                 :     timelines_creating: std::sync::Mutex<HashSet<TimelineId>>,
     261                 : 
     262                 :     // This mutex prevents creation of new timelines during GC.
     263                 :     // Adding yet another mutex (in addition to `timelines`) is needed because holding
     264                 :     // `timelines` mutex during all GC iteration
     265                 :     // may block for a long time `get_timeline`, `get_timelines_state`,... and other operations
     266                 :     // with timelines, which in turn may cause dropping replication connection, expiration of wait_for_lsn
     267                 :     // timeout...
     268                 :     gc_cs: tokio::sync::Mutex<()>,
     269                 :     walredo_mgr: Arc<WalRedoManager>,
     270                 : 
     271                 :     // provides access to timeline data sitting in the remote storage
     272                 :     pub(crate) remote_storage: Option<GenericRemoteStorage>,
     273                 : 
     274                 :     // Access to global deletion queue for when this tenant wants to schedule a deletion
     275                 :     deletion_queue_client: DeletionQueueClient,
     276                 : 
     277                 :     /// Cached logical sizes updated updated on each [`Tenant::gather_size_inputs`].
     278                 :     cached_logical_sizes: tokio::sync::Mutex<HashMap<(TimelineId, Lsn), u64>>,
     279                 :     cached_synthetic_tenant_size: Arc<AtomicU64>,
     280                 : 
     281                 :     eviction_task_tenant_state: tokio::sync::Mutex<EvictionTaskTenantState>,
     282                 : 
     283                 :     /// If the tenant is in Activating state, notify this to encourage it
     284                 :     /// to proceed to Active as soon as possible, rather than waiting for lazy
     285                 :     /// background warmup.
     286                 :     pub(crate) activate_now_sem: tokio::sync::Semaphore,
     287                 : 
     288                 :     pub(crate) delete_progress: Arc<tokio::sync::Mutex<DeleteTenantFlow>>,
     289                 : 
     290                 :     // Cancellation token fires when we have entered shutdown().  This is a parent of
     291                 :     // Timelines' cancellation token.
     292                 :     pub(crate) cancel: CancellationToken,
     293                 : 
     294                 :     // Users of the Tenant such as the page service must take this Gate to avoid
     295                 :     // trying to use a Tenant which is shutting down.
     296                 :     pub(crate) gate: Gate,
     297                 : }
     298                 : 
     299                 : impl std::fmt::Debug for Tenant {
     300               1 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
     301               1 :         write!(f, "{} ({})", self.tenant_shard_id, self.current_state())
     302               1 :     }
     303                 : }
     304                 : 
     305                 : pub(crate) enum WalRedoManager {
     306                 :     Prod(PostgresRedoManager),
     307                 :     #[cfg(test)]
     308                 :     Test(harness::TestRedoManager),
     309                 : }
     310                 : 
     311                 : impl From<PostgresRedoManager> for WalRedoManager {
     312             736 :     fn from(mgr: PostgresRedoManager) -> Self {
     313             736 :         Self::Prod(mgr)
     314             736 :     }
     315                 : }
     316                 : 
     317                 : #[cfg(test)]
     318                 : impl From<harness::TestRedoManager> for WalRedoManager {
     319              42 :     fn from(mgr: harness::TestRedoManager) -> Self {
     320              42 :         Self::Test(mgr)
     321              42 :     }
     322                 : }
     323                 : 
     324                 : impl WalRedoManager {
     325 UBC           0 :     pub(crate) fn maybe_quiesce(&self, idle_timeout: Duration) {
     326               0 :         match self {
     327 CBC         727 :             Self::Prod(mgr) => mgr.maybe_quiesce(idle_timeout),
     328             727 :             #[cfg(test)]
     329             727 :             Self::Test(_) => {
     330 UBC           0 :                 // Not applicable to test redo manager
     331               0 :             }
     332 CBC         727 :         }
     333             727 :     }
     334                 : 
     335                 :     /// # Cancel-Safety
     336                 :     ///
     337                 :     /// This method is cancellation-safe.
     338         2060218 :     pub async fn request_redo(
     339         2060218 :         &self,
     340         2060218 :         key: crate::repository::Key,
     341         2060218 :         lsn: Lsn,
     342         2060218 :         base_img: Option<(Lsn, bytes::Bytes)>,
     343         2060218 :         records: Vec<(Lsn, crate::walrecord::NeonWalRecord)>,
     344         2060218 :         pg_version: u32,
     345         2060218 :     ) -> anyhow::Result<bytes::Bytes> {
     346 UBC           0 :         match self {
     347 CBC     2060218 :             Self::Prod(mgr) => {
     348 UBC           0 :                 mgr.request_redo(key, lsn, base_img, records, pg_version)
     349               0 :                     .await
     350                 :             }
     351                 :             #[cfg(test)]
     352               0 :             Self::Test(mgr) => {
     353               0 :                 mgr.request_redo(key, lsn, base_img, records, pg_version)
     354               0 :                     .await
     355                 :             }
     356                 :         }
     357 CBC     2060218 :     }
     358                 : }
     359                 : 
     360             100 : #[derive(Debug, thiserror::Error, PartialEq, Eq)]
     361                 : pub enum GetTimelineError {
     362                 :     #[error("Timeline {tenant_id}/{timeline_id} is not active, state: {state:?}")]
     363                 :     NotActive {
     364                 :         tenant_id: TenantId,
     365                 :         timeline_id: TimelineId,
     366                 :         state: TimelineState,
     367                 :     },
     368                 :     #[error("Timeline {tenant_id}/{timeline_id} was not found")]
     369                 :     NotFound {
     370                 :         tenant_id: TenantId,
     371                 :         timeline_id: TimelineId,
     372                 :     },
     373                 : }
     374                 : 
     375 UBC           0 : #[derive(Debug, thiserror::Error)]
     376                 : pub enum LoadLocalTimelineError {
     377                 :     #[error("FailedToLoad")]
     378                 :     Load(#[source] anyhow::Error),
     379                 :     #[error("FailedToResumeDeletion")]
     380                 :     ResumeDeletion(#[source] anyhow::Error),
     381                 : }
     382                 : 
     383 CBC         139 : #[derive(thiserror::Error)]
     384                 : pub enum DeleteTimelineError {
     385                 :     #[error("NotFound")]
     386                 :     NotFound,
     387                 : 
     388                 :     #[error("HasChildren")]
     389                 :     HasChildren(Vec<TimelineId>),
     390                 : 
     391                 :     #[error("Timeline deletion is already in progress")]
     392                 :     AlreadyInProgress(Arc<tokio::sync::Mutex<DeleteTimelineFlow>>),
     393                 : 
     394                 :     #[error(transparent)]
     395                 :     Other(#[from] anyhow::Error),
     396                 : }
     397                 : 
     398                 : impl Debug for DeleteTimelineError {
     399 UBC           0 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
     400               0 :         match self {
     401               0 :             Self::NotFound => write!(f, "NotFound"),
     402               0 :             Self::HasChildren(c) => f.debug_tuple("HasChildren").field(c).finish(),
     403               0 :             Self::AlreadyInProgress(_) => f.debug_tuple("AlreadyInProgress").finish(),
     404               0 :             Self::Other(e) => f.debug_tuple("Other").field(e).finish(),
     405                 :         }
     406               0 :     }
     407                 : }
     408                 : 
     409                 : pub enum SetStoppingError {
     410                 :     AlreadyStopping(completion::Barrier),
     411                 :     Broken,
     412                 : }
     413                 : 
     414                 : impl Debug for SetStoppingError {
     415               0 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
     416               0 :         match self {
     417               0 :             Self::AlreadyStopping(_) => f.debug_tuple("AlreadyStopping").finish(),
     418               0 :             Self::Broken => write!(f, "Broken"),
     419                 :         }
     420               0 :     }
     421                 : }
     422                 : 
     423 CBC           6 : #[derive(thiserror::Error, Debug)]
     424                 : pub enum CreateTimelineError {
     425                 :     #[error("creation of timeline with the given ID is in progress")]
     426                 :     AlreadyCreating,
     427                 :     #[error("timeline already exists with different parameters")]
     428                 :     Conflict,
     429                 :     #[error(transparent)]
     430                 :     AncestorLsn(anyhow::Error),
     431                 :     #[error("ancestor timeline is not active")]
     432                 :     AncestorNotActive,
     433                 :     #[error("tenant shutting down")]
     434                 :     ShuttingDown,
     435                 :     #[error(transparent)]
     436                 :     Other(#[from] anyhow::Error),
     437                 : }
     438                 : 
     439 UBC           0 : #[derive(thiserror::Error, Debug)]
     440                 : enum InitdbError {
     441                 :     Other(anyhow::Error),
     442                 :     Cancelled,
     443                 :     Spawn(std::io::Result<()>),
     444                 :     Failed(std::process::ExitStatus, Vec<u8>),
     445                 : }
     446                 : 
     447                 : impl fmt::Display for InitdbError {
     448               0 :     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
     449               0 :         match self {
     450               0 :             InitdbError::Cancelled => write!(f, "Operation was cancelled"),
     451               0 :             InitdbError::Spawn(e) => write!(f, "Spawn error: {:?}", e),
     452               0 :             InitdbError::Failed(status, stderr) => write!(
     453               0 :                 f,
     454               0 :                 "Command failed with status {:?}: {}",
     455               0 :                 status,
     456               0 :                 String::from_utf8_lossy(stderr)
     457               0 :             ),
     458               0 :             InitdbError::Other(e) => write!(f, "Error: {:?}", e),
     459                 :         }
     460               0 :     }
     461                 : }
     462                 : 
     463                 : impl From<std::io::Error> for InitdbError {
     464               0 :     fn from(error: std::io::Error) -> Self {
     465               0 :         InitdbError::Spawn(Err(error))
     466               0 :     }
     467                 : }
     468                 : 
     469                 : struct TenantDirectoryScan {
     470                 :     sorted_timelines_to_load: Vec<(TimelineId, TimelineMetadata)>,
     471                 :     timelines_to_resume_deletion: Vec<(TimelineId, Option<TimelineMetadata>)>,
     472                 : }
     473                 : 
     474                 : enum CreateTimelineCause {
     475                 :     Load,
     476                 :     Delete,
     477                 : }
     478                 : 
     479                 : impl Tenant {
     480                 :     /// Yet another helper for timeline initialization.
     481                 :     ///
     482                 :     /// - Initializes the Timeline struct and inserts it into the tenant's hash map
     483                 :     /// - Scans the local timeline directory for layer files and builds the layer map
     484                 :     /// - Downloads remote index file and adds remote files to the layer map
     485                 :     /// - Schedules remote upload tasks for any files that are present locally but missing from remote storage.
     486                 :     ///
     487                 :     /// If the operation fails, the timeline is left in the tenant's hash map in Broken state. On success,
     488                 :     /// it is marked as Active.
     489                 :     #[allow(clippy::too_many_arguments)]
     490 CBC         357 :     async fn timeline_init_and_sync(
     491             357 :         &self,
     492             357 :         timeline_id: TimelineId,
     493             357 :         resources: TimelineResources,
     494             357 :         index_part: Option<IndexPart>,
     495             357 :         metadata: TimelineMetadata,
     496             357 :         ancestor: Option<Arc<Timeline>>,
     497             357 :         _ctx: &RequestContext,
     498             357 :     ) -> anyhow::Result<()> {
     499             357 :         let tenant_id = self.tenant_shard_id;
     500                 : 
     501             357 :         let timeline = self.create_timeline_struct(
     502             357 :             timeline_id,
     503             357 :             &metadata,
     504             357 :             ancestor.clone(),
     505             357 :             resources,
     506             357 :             CreateTimelineCause::Load,
     507             357 :         )?;
     508             357 :         let disk_consistent_lsn = timeline.get_disk_consistent_lsn();
     509             357 :         anyhow::ensure!(
     510             357 :             disk_consistent_lsn.is_valid(),
     511 UBC           0 :             "Timeline {tenant_id}/{timeline_id} has invalid disk_consistent_lsn"
     512                 :         );
     513 CBC         357 :         assert_eq!(
     514             357 :             disk_consistent_lsn,
     515             357 :             metadata.disk_consistent_lsn(),
     516 UBC           0 :             "these are used interchangeably"
     517                 :         );
     518                 : 
     519 CBC         357 :         if let Some(index_part) = index_part.as_ref() {
     520             357 :             timeline
     521             357 :                 .remote_client
     522             357 :                 .as_ref()
     523             357 :                 .unwrap()
     524             357 :                 .init_upload_queue(index_part)?;
     525 UBC           0 :         } else if self.remote_storage.is_some() {
     526                 :             // No data on the remote storage, but we have local metadata file. We can end up
     527                 :             // here with timeline_create being interrupted before finishing index part upload.
     528                 :             // By doing what we do here, the index part upload is retried.
     529                 :             // If control plane retries timeline creation in the meantime, the mgmt API handler
     530                 :             // for timeline creation will coalesce on the upload we queue here.
     531               0 :             let rtc = timeline.remote_client.as_ref().unwrap();
     532               0 :             rtc.init_upload_queue_for_empty_remote(&metadata)?;
     533               0 :             rtc.schedule_index_upload_for_metadata_update(&metadata)?;
     534               0 :         }
     535                 : 
     536 CBC         357 :         timeline
     537             357 :             .load_layer_map(disk_consistent_lsn, index_part)
     538             351 :             .await
     539             357 :             .with_context(|| {
     540 UBC           0 :                 format!("Failed to load layermap for timeline {tenant_id}/{timeline_id}")
     541 CBC         357 :             })?;
     542                 : 
     543                 :         {
     544                 :             // avoiding holding it across awaits
     545             357 :             let mut timelines_accessor = self.timelines.lock().unwrap();
     546             357 :             match timelines_accessor.entry(timeline_id) {
     547                 :                 Entry::Occupied(_) => {
     548                 :                     // The uninit mark file acts as a lock that prevents another task from
     549                 :                     // initializing the timeline at the same time.
     550 UBC           0 :                     unreachable!(
     551               0 :                         "Timeline {tenant_id}/{timeline_id} already exists in the tenant map"
     552               0 :                     );
     553                 :                 }
     554 CBC         357 :                 Entry::Vacant(v) => {
     555             357 :                     v.insert(Arc::clone(&timeline));
     556             357 :                     timeline.maybe_spawn_flush_loop();
     557             357 :                 }
     558             357 :             }
     559             357 :         };
     560             357 : 
     561             357 :         // Sanity check: a timeline should have some content.
     562             357 :         anyhow::ensure!(
     563             357 :             ancestor.is_some()
     564             307 :                 || timeline
     565             307 :                     .layers
     566             307 :                     .read()
     567 UBC           0 :                     .await
     568 CBC         307 :                     .layer_map()
     569             307 :                     .iter_historic_layers()
     570             307 :                     .next()
     571             307 :                     .is_some(),
     572 UBC           0 :             "Timeline has no ancestor and no layer files"
     573                 :         );
     574                 : 
     575 CBC         357 :         Ok(())
     576             357 :     }
     577                 : 
     578                 :     /// Attach a tenant that's available in cloud storage.
     579                 :     ///
     580                 :     /// This returns quickly, after just creating the in-memory object
     581                 :     /// Tenant struct and launching a background task to download
     582                 :     /// the remote index files.  On return, the tenant is most likely still in
     583                 :     /// Attaching state, and it will become Active once the background task
     584                 :     /// finishes. You can use wait_until_active() to wait for the task to
     585                 :     /// complete.
     586                 :     ///
     587                 :     #[allow(clippy::too_many_arguments)]
     588             736 :     pub(crate) fn spawn(
     589             736 :         conf: &'static PageServerConf,
     590             736 :         tenant_shard_id: TenantShardId,
     591             736 :         resources: TenantSharedResources,
     592             736 :         attached_conf: AttachedTenantConf,
     593             736 :         shard_identity: ShardIdentity,
     594             736 :         init_order: Option<InitializationOrder>,
     595             736 :         tenants: &'static std::sync::RwLock<TenantsMap>,
     596             736 :         mode: SpawnMode,
     597             736 :         ctx: &RequestContext,
     598             736 :     ) -> anyhow::Result<Arc<Tenant>> {
     599             736 :         let wal_redo_manager = Arc::new(WalRedoManager::from(PostgresRedoManager::new(
     600             736 :             conf,
     601             736 :             tenant_shard_id,
     602             736 :         )));
     603             736 : 
     604             736 :         let TenantSharedResources {
     605             736 :             broker_client,
     606             736 :             remote_storage,
     607             736 :             deletion_queue_client,
     608             736 :         } = resources;
     609             736 : 
     610             736 :         let tenant = Arc::new(Tenant::new(
     611             736 :             TenantState::Attaching,
     612             736 :             conf,
     613             736 :             attached_conf,
     614             736 :             shard_identity,
     615             736 :             wal_redo_manager,
     616             736 :             tenant_shard_id,
     617             736 :             remote_storage.clone(),
     618             736 :             deletion_queue_client,
     619             736 :         ));
     620             736 : 
     621             736 :         // Do all the hard work in the background
     622             736 :         let tenant_clone = Arc::clone(&tenant);
     623             736 : 
     624             736 :         let ctx = ctx.detached_child(TaskKind::Attach, DownloadBehavior::Warn);
     625             736 :         task_mgr::spawn(
     626             736 :             &tokio::runtime::Handle::current(),
     627             736 :             TaskKind::Attach,
     628             736 :             Some(tenant_shard_id),
     629             736 :             None,
     630             736 :             "attach tenant",
     631                 :             false,
     632             736 :             async move {
     633             736 :                 // Is this tenant being spawned as part of process startup?
     634             736 :                 let starting_up = init_order.is_some();
     635             728 :                 scopeguard::defer! {
     636             728 :                     if starting_up {
     637             194 :                         TENANT.startup_complete.inc();
     638             534 :                     }
     639                 :                 }
     640                 : 
     641                 :                 // Ideally we should use Tenant::set_broken_no_wait, but it is not supposed to be used when tenant is in loading state.
     642             736 :                 let make_broken =
     643               6 :                     |t: &Tenant, err: anyhow::Error| {
     644               6 :                         error!("attach failed, setting tenant state to Broken: {err:?}");
     645               6 :                         t.state.send_modify(|state| {
     646                 :                             // The Stopping case is for when we have passed control on to DeleteTenantFlow:
     647                 :                             // if it errors, we will call make_broken when tenant is already in Stopping.
     648               6 :                             assert!(
     649               6 :                             matches!(*state, TenantState::Attaching | TenantState::Stopping { .. }),
     650 UBC           0 :                             "the attach task owns the tenant state until activation is complete"
     651                 :                         );
     652                 : 
     653 CBC           6 :                             *state = TenantState::broken_from_reason(err.to_string());
     654               6 :                         });
     655               6 :                     };
     656                 : 
     657             736 :                 let mut init_order = init_order;
     658             736 :                 // take the completion because initial tenant loading will complete when all of
     659             736 :                 // these tasks complete.
     660             736 :                 let _completion = init_order
     661             736 :                     .as_mut()
     662             736 :                     .and_then(|x| x.initial_tenant_load.take());
     663             736 :                 let remote_load_completion = init_order
     664             736 :                     .as_mut()
     665             736 :                     .and_then(|x| x.initial_tenant_load_remote.take());
     666                 : 
     667                 :                 enum AttachType<'a> {
     668                 :                     // During pageserver startup, we are attaching this tenant lazily in the background
     669                 :                     Warmup(tokio::sync::SemaphorePermit<'a>),
     670                 :                     // During pageserver startup, we are attaching this tenant as soon as we can,
     671                 :                     // because a client tried to access it.
     672                 :                     OnDemand,
     673                 :                     // During normal operations after startup, we are attaching a tenant.
     674                 :                     Normal,
     675                 :                 }
     676                 : 
     677                 :                 // Before doing any I/O, wait for either or:
     678                 :                 // - A client to attempt to access to this tenant (on-demand loading)
     679                 :                 // - A permit to become available in the warmup semaphore (background warmup)
     680                 :                 //
     681                 :                 // Some-ness of init_order is how we know if we're attaching during startup or later
     682                 :                 // in process lifetime.
     683             736 :                 let attach_type = if init_order.is_some() {
     684             202 :                     tokio::select!(
     685                 :                         _ = tenant_clone.activate_now_sem.acquire() => {
     686               3 :                             tracing::info!("Activating tenant (on-demand)");
     687                 :                             AttachType::OnDemand
     688                 :                         },
     689             199 :                         permit_result = conf.concurrent_tenant_warmup.inner().acquire() => {
     690                 :                             match permit_result {
     691                 :                                 Ok(p) => {
     692             199 :                                     tracing::info!("Activating tenant (warmup)");
     693                 :                                     AttachType::Warmup(p)
     694                 :                                 }
     695                 :                                 Err(_) => {
     696                 :                                     // This is unexpected: the warmup semaphore should stay alive
     697                 :                                     // for the lifetime of init_order.  Log a warning and proceed.
     698 UBC           0 :                                     tracing::warn!("warmup_limit semaphore unexpectedly closed");
     699                 :                                     AttachType::Normal
     700                 :                                 }
     701                 :                             }
     702                 : 
     703                 :                         }
     704                 :                         _ = tenant_clone.cancel.cancelled() => {
     705                 :                             // This is safe, but should be pretty rare: it is interesting if a tenant
     706                 :                             // stayed in Activating for such a long time that shutdown found it in
     707                 :                             // that state.
     708               0 :                             tracing::info!(state=%tenant_clone.current_state(), "Tenant shut down before activation");
     709                 :                             return Ok(());
     710                 :                         },
     711                 :                     )
     712                 :                 } else {
     713 CBC         534 :                     AttachType::Normal
     714                 :                 };
     715                 : 
     716             736 :                 let preload_timer = TENANT.preload.start_timer();
     717             736 :                 let preload = match mode {
     718                 :                     SpawnMode::Create => {
     719                 :                         // Don't count the skipped preload into the histogram of preload durations
     720             430 :                         preload_timer.stop_and_discard();
     721             430 :                         None
     722                 :                     },
     723                 :                     SpawnMode::Normal => {
     724             306 :                         match &remote_storage {
     725             306 :                             Some(remote_storage) => Some(
     726             306 :                                 match tenant_clone
     727             306 :                                     .preload(remote_storage, task_mgr::shutdown_token())
     728             306 :                                     .instrument(
     729             306 :                                         tracing::info_span!(parent: None, "attach_preload", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug()),
     730                 :                                     )
     731            1410 :                                     .await {
     732             300 :                                         Ok(p) => {
     733             300 :                                             preload_timer.observe_duration();
     734             300 :                                             p
     735                 :                                         }
     736                 :                                             ,
     737               6 :                                         Err(e) => {
     738               6 :                                             make_broken(&tenant_clone, anyhow::anyhow!(e));
     739               6 :                                                 return Ok(());
     740                 :                                         }
     741                 :                                     },
     742                 :                             ),
     743 UBC           0 :                             None => None,
     744                 :                         }
     745                 :                     }
     746                 :                 };
     747                 : 
     748                 :                 // Remote preload is complete.
     749 CBC         730 :                 drop(remote_load_completion);
     750                 : 
     751             730 :                 let pending_deletion = {
     752             730 :                     match DeleteTenantFlow::should_resume_deletion(
     753             730 :                         conf,
     754             730 :                         preload.as_ref().map(|p| p.deleting).unwrap_or(false),
     755             730 :                         &tenant_clone,
     756             730 :                     )
     757 UBC           0 :                     .await
     758                 :                     {
     759 CBC         730 :                         Ok(should_resume_deletion) => should_resume_deletion,
     760 UBC           0 :                         Err(err) => {
     761               0 :                             make_broken(&tenant_clone, anyhow::anyhow!(err));
     762               0 :                             return Ok(());
     763                 :                         }
     764                 :                     }
     765                 :                 };
     766                 : 
     767 CBC         730 :                 info!("pending_deletion {}", pending_deletion.is_some());
     768                 : 
     769             730 :                 if let Some(deletion) = pending_deletion {
     770                 :                     // as we are no longer loading, signal completion by dropping
     771                 :                     // the completion while we resume deletion
     772              21 :                     drop(_completion);
     773              21 :                     let background_jobs_can_start =
     774              21 :                         init_order.as_ref().map(|x| &x.background_jobs_can_start);
     775              21 :                     if let Some(background) = background_jobs_can_start {
     776              20 :                         info!("waiting for backgound jobs barrier");
     777              20 :                         background.clone().wait().await;
     778              20 :                         info!("ready for backgound jobs barrier");
     779               1 :                     }
     780                 : 
     781              21 :                     match DeleteTenantFlow::resume_from_attach(
     782              21 :                         deletion,
     783              21 :                         &tenant_clone,
     784              21 :                         preload,
     785              21 :                         tenants,
     786              21 :                         &ctx,
     787              21 :                     )
     788             763 :                     .await
     789                 :                     {
     790 UBC           0 :                         Err(err) => {
     791               0 :                             make_broken(&tenant_clone, anyhow::anyhow!(err));
     792               0 :                             return Ok(());
     793                 :                         }
     794 CBC          21 :                         Ok(()) => return Ok(()),
     795                 :                     }
     796             709 :                 }
     797                 : 
     798                 :                 // We will time the duration of the attach phase unless this is a creation (attach will do no work)
     799             709 :                 let attach_timer = match mode {
     800             430 :                     SpawnMode::Create => None,
     801             279 :                     SpawnMode::Normal => {Some(TENANT.attach.start_timer())}
     802                 :                 };
     803            1085 :                 match tenant_clone.attach(preload, &ctx).await {
     804                 :                     Ok(()) => {
     805             709 :                         info!("attach finished, activating");
     806             709 :                         if let Some(t)=  attach_timer {t.observe_duration();}
     807             709 :                         tenant_clone.activate(broker_client, None, &ctx);
     808                 :                     }
     809 UBC           0 :                     Err(e) => {
     810               0 :                         if let Some(t)=  attach_timer {t.observe_duration();}
     811               0 :                         make_broken(&tenant_clone, anyhow::anyhow!(e));
     812                 :                     }
     813                 :                 }
     814                 : 
     815                 :                 // If we are doing an opportunistic warmup attachment at startup, initialize
     816                 :                 // logical size at the same time.  This is better than starting a bunch of idle tenants
     817                 :                 // with cold caches and then coming back later to initialize their logical sizes.
     818                 :                 //
     819                 :                 // It also prevents the warmup proccess competing with the concurrency limit on
     820                 :                 // logical size calculations: if logical size calculation semaphore is saturated,
     821                 :                 // then warmup will wait for that before proceeding to the next tenant.
     822 CBC         709 :                 if let AttachType::Warmup(_permit) = attach_type {
     823             179 :                     let mut futs = FuturesUnordered::new();
     824             179 :                     let timelines: Vec<_> = tenant_clone.timelines.lock().unwrap().values().cloned().collect();
     825             410 :                     for t in timelines {
     826             231 :                         futs.push(t.await_initial_logical_size())
     827                 :                     }
     828             179 :                     tracing::info!("Waiting for initial logical sizes while warming up...");
     829             402 :                     while futs.next().await.is_some() {
     830             223 : 
     831             223 :                     }
     832             171 :                     tracing::info!("Warm-up complete");
     833             530 :                 }
     834                 : 
     835             701 :                 Ok(())
     836             728 :             }
     837             736 :             .instrument({
     838             736 :                 let span = tracing::info_span!(parent: None, "attach", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug());
     839             736 :                 span.follows_from(Span::current());
     840             736 :                 span
     841             736 :             }),
     842             736 :         );
     843             736 :         Ok(tenant)
     844             736 :     }
     845                 : 
     846             308 :     pub(crate) async fn preload(
     847             308 :         self: &Arc<Tenant>,
     848             308 :         remote_storage: &GenericRemoteStorage,
     849             308 :         cancel: CancellationToken,
     850             308 :     ) -> anyhow::Result<TenantPreload> {
     851                 :         // Get list of remote timelines
     852                 :         // download index files for every tenant timeline
     853             308 :         info!("listing remote timelines");
     854             308 :         let (remote_timeline_ids, other_keys) = remote_timeline_client::list_remote_timelines(
     855             308 :             remote_storage,
     856             308 :             self.tenant_shard_id,
     857             308 :             cancel.clone(),
     858             308 :         )
     859            1055 :         .await?;
     860                 : 
     861             302 :         let deleting = other_keys.contains(TENANT_DELETED_MARKER_FILE_NAME);
     862             302 :         info!(
     863             302 :             "found {} timelines, deleting={}",
     864             302 :             remote_timeline_ids.len(),
     865             302 :             deleting
     866             302 :         );
     867                 : 
     868             317 :         for k in other_keys {
     869              15 :             if k != TENANT_DELETED_MARKER_FILE_NAME {
     870 UBC           0 :                 warn!("Unexpected non timeline key {k}");
     871 CBC          15 :             }
     872                 :         }
     873                 : 
     874                 :         Ok(TenantPreload {
     875             302 :             deleting,
     876             302 :             timelines: self
     877             302 :                 .load_timeline_metadata(remote_timeline_ids, remote_storage, cancel)
     878             362 :                 .await?,
     879                 :         })
     880             308 :     }
     881                 : 
     882                 :     ///
     883                 :     /// Background task that downloads all data for a tenant and brings it to Active state.
     884                 :     ///
     885                 :     /// No background tasks are started as part of this routine.
     886                 :     ///
     887             732 :     async fn attach(
     888             732 :         self: &Arc<Tenant>,
     889             732 :         preload: Option<TenantPreload>,
     890             732 :         ctx: &RequestContext,
     891             732 :     ) -> anyhow::Result<()> {
     892             732 :         span::debug_assert_current_span_has_tenant_id();
     893                 : 
     894               3 :         failpoint_support::sleep_millis_async!("before-attaching-tenant");
     895                 : 
     896             732 :         let preload = match preload {
     897             302 :             Some(p) => p,
     898                 :             None => {
     899                 :                 // Deprecated dev mode: load from local disk state instead of remote storage
     900                 :                 // https://github.com/neondatabase/neon/issues/5624
     901             430 :                 return self.load_local(ctx).await;
     902                 :             }
     903                 :         };
     904                 : 
     905             302 :         let mut timelines_to_resume_deletions = vec![];
     906             302 : 
     907             302 :         let mut remote_index_and_client = HashMap::new();
     908             302 :         let mut timeline_ancestors = HashMap::new();
     909             302 :         let mut existent_timelines = HashSet::new();
     910             696 :         for (timeline_id, preload) in preload.timelines {
     911             369 :             let index_part = match preload.index_part {
     912             369 :                 Ok(i) => {
     913 UBC           0 :                     debug!("remote index part exists for timeline {timeline_id}");
     914                 :                     // We found index_part on the remote, this is the standard case.
     915 CBC         369 :                     existent_timelines.insert(timeline_id);
     916             369 :                     i
     917                 :                 }
     918                 :                 Err(DownloadError::NotFound) => {
     919                 :                     // There is no index_part on the remote. We only get here
     920                 :                     // if there is some prefix for the timeline in the remote storage.
     921                 :                     // This can e.g. be the initdb.tar.zst archive, maybe a
     922                 :                     // remnant from a prior incomplete creation or deletion attempt.
     923                 :                     // Delete the local directory as the deciding criterion for a
     924                 :                     // timeline's existence is presence of index_part.
     925              25 :                     info!(%timeline_id, "index_part not found on remote");
     926              25 :                     continue;
     927                 :                 }
     928 UBC           0 :                 Err(e) => {
     929                 :                     // Some (possibly ephemeral) error happened during index_part download.
     930                 :                     // Pretend the timeline exists to not delete the timeline directory,
     931                 :                     // as it might be a temporary issue and we don't want to re-download
     932                 :                     // everything after it resolves.
     933               0 :                     warn!(%timeline_id, "Failed to load index_part from remote storage, failed creation? ({e})");
     934                 : 
     935               0 :                     existent_timelines.insert(timeline_id);
     936               0 :                     continue;
     937                 :                 }
     938                 :             };
     939 CBC         369 :             match index_part {
     940             357 :                 MaybeDeletedIndexPart::IndexPart(index_part) => {
     941             357 :                     timeline_ancestors.insert(timeline_id, index_part.metadata.clone());
     942             357 :                     remote_index_and_client.insert(timeline_id, (index_part, preload.client));
     943             357 :                 }
     944              12 :                 MaybeDeletedIndexPart::Deleted(index_part) => {
     945              12 :                     info!(
     946              12 :                         "timeline {} is deleted, picking to resume deletion",
     947              12 :                         timeline_id
     948              12 :                     );
     949              12 :                     timelines_to_resume_deletions.push((timeline_id, index_part, preload.client));
     950                 :                 }
     951                 :             }
     952                 :         }
     953                 : 
     954                 :         // For every timeline, download the metadata file, scan the local directory,
     955                 :         // and build a layer map that contains an entry for each remote and local
     956                 :         // layer file.
     957             357 :         let sorted_timelines = tree_sort_timelines(timeline_ancestors, |m| m.ancestor_timeline())?;
     958             659 :         for (timeline_id, remote_metadata) in sorted_timelines {
     959             357 :             let (index_part, remote_client) = remote_index_and_client
     960             357 :                 .remove(&timeline_id)
     961             357 :                 .expect("just put it in above");
     962             357 : 
     963             357 :             // TODO again handle early failure
     964             357 :             self.load_remote_timeline(
     965             357 :                 timeline_id,
     966             357 :                 index_part,
     967             357 :                 remote_metadata,
     968             357 :                 TimelineResources {
     969             357 :                     remote_client: Some(remote_client),
     970             357 :                     deletion_queue_client: self.deletion_queue_client.clone(),
     971             357 :                 },
     972             357 :                 ctx,
     973             357 :             )
     974             696 :             .await
     975             357 :             .with_context(|| {
     976 UBC           0 :                 format!(
     977               0 :                     "failed to load remote timeline {} for tenant {}",
     978               0 :                     timeline_id, self.tenant_shard_id
     979               0 :                 )
     980 CBC         357 :             })?;
     981                 :         }
     982                 : 
     983                 :         // Walk through deleted timelines, resume deletion
     984             314 :         for (timeline_id, index_part, remote_timeline_client) in timelines_to_resume_deletions {
     985              12 :             remote_timeline_client
     986              12 :                 .init_upload_queue_stopped_to_continue_deletion(&index_part)
     987              12 :                 .context("init queue stopped")
     988              12 :                 .map_err(LoadLocalTimelineError::ResumeDeletion)?;
     989                 : 
     990              12 :             DeleteTimelineFlow::resume_deletion(
     991              12 :                 Arc::clone(self),
     992              12 :                 timeline_id,
     993              12 :                 &index_part.metadata,
     994              12 :                 Some(remote_timeline_client),
     995              12 :                 self.deletion_queue_client.clone(),
     996              12 :             )
     997 UBC           0 :             .await
     998 CBC          12 :             .context("resume_deletion")
     999              12 :             .map_err(LoadLocalTimelineError::ResumeDeletion)?;
    1000                 :         }
    1001                 : 
    1002                 :         // The local filesystem contents are a cache of what's in the remote IndexPart;
    1003                 :         // IndexPart is the source of truth.
    1004             302 :         self.clean_up_timelines(&existent_timelines)?;
    1005                 : 
    1006               9 :         failpoint_support::sleep_millis_async!("attach-before-activate");
    1007                 : 
    1008             302 :         info!("Done");
    1009                 : 
    1010             302 :         Ok(())
    1011             732 :     }
    1012                 : 
    1013                 :     /// Check for any local timeline directories that are temporary, or do not correspond to a
    1014                 :     /// timeline that still exists: this can happen if we crashed during a deletion/creation, or
    1015                 :     /// if a timeline was deleted while the tenant was attached to a different pageserver.
    1016             302 :     fn clean_up_timelines(&self, existent_timelines: &HashSet<TimelineId>) -> anyhow::Result<()> {
    1017             302 :         let timelines_dir = self.conf.timelines_path(&self.tenant_shard_id);
    1018                 : 
    1019             302 :         let entries = match timelines_dir.read_dir_utf8() {
    1020             300 :             Ok(d) => d,
    1021               2 :             Err(e) => {
    1022               2 :                 if e.kind() == std::io::ErrorKind::NotFound {
    1023               2 :                     return Ok(());
    1024                 :                 } else {
    1025 UBC           0 :                     return Err(e).context("list timelines directory for tenant");
    1026                 :                 }
    1027                 :             }
    1028                 :         };
    1029                 : 
    1030 CBC         676 :         for entry in entries {
    1031             376 :             let entry = entry.context("read timeline dir entry")?;
    1032             376 :             let entry_path = entry.path();
    1033                 : 
    1034             376 :             let purge = if crate::is_temporary(entry_path)
    1035                 :                 // TODO: uninit_mark isn't needed any more, since uninitialized timelines are already
    1036                 :                 // covered by the check that the timeline must exist in remote storage.
    1037             374 :                 || is_uninit_mark(entry_path)
    1038             372 :                 || crate::is_delete_mark(entry_path)
    1039                 :             {
    1040               4 :                 true
    1041                 :             } else {
    1042             372 :                 match TimelineId::try_from(entry_path.file_name()) {
    1043             372 :                     Ok(i) => {
    1044             372 :                         // Purge if the timeline ID does not exist in remote storage: remote storage is the authority.
    1045             372 :                         !existent_timelines.contains(&i)
    1046                 :                     }
    1047 UBC           0 :                     Err(e) => {
    1048               0 :                         tracing::warn!(
    1049               0 :                             "Unparseable directory in timelines directory: {entry_path}, ignoring ({e})"
    1050               0 :                         );
    1051                 :                         // Do not purge junk: if we don't recognize it, be cautious and leave it for a human.
    1052               0 :                         false
    1053                 :                     }
    1054                 :                 }
    1055                 :             };
    1056                 : 
    1057 CBC         376 :             if purge {
    1058               9 :                 tracing::info!("Purging stale timeline dentry {entry_path}");
    1059               9 :                 if let Err(e) = match entry.file_type() {
    1060               9 :                     Ok(t) => if t.is_dir() {
    1061               7 :                         std::fs::remove_dir_all(entry_path)
    1062                 :                     } else {
    1063               2 :                         std::fs::remove_file(entry_path)
    1064                 :                     }
    1065               9 :                     .or_else(fs_ext::ignore_not_found),
    1066 UBC           0 :                     Err(e) => Err(e),
    1067                 :                 } {
    1068 LBC         (1) :                     tracing::warn!("Failed to purge stale timeline dentry {entry_path}: {e}");
    1069 CBC           9 :                 }
    1070             367 :             }
    1071                 :         }
    1072                 : 
    1073             300 :         Ok(())
    1074             302 :     }
    1075                 : 
    1076                 :     /// Get sum of all remote timelines sizes
    1077                 :     ///
    1078                 :     /// This function relies on the index_part instead of listing the remote storage
    1079              27 :     pub fn remote_size(&self) -> u64 {
    1080              27 :         let mut size = 0;
    1081                 : 
    1082              27 :         for timeline in self.list_timelines() {
    1083              18 :             if let Some(remote_client) = &timeline.remote_client {
    1084              18 :                 size += remote_client.get_remote_physical_size();
    1085              18 :             }
    1086                 :         }
    1087                 : 
    1088              27 :         size
    1089              27 :     }
    1090                 : 
    1091             357 :     #[instrument(skip_all, fields(timeline_id=%timeline_id))]
    1092                 :     async fn load_remote_timeline(
    1093                 :         &self,
    1094                 :         timeline_id: TimelineId,
    1095                 :         index_part: IndexPart,
    1096                 :         remote_metadata: TimelineMetadata,
    1097                 :         resources: TimelineResources,
    1098                 :         ctx: &RequestContext,
    1099                 :     ) -> anyhow::Result<()> {
    1100                 :         span::debug_assert_current_span_has_tenant_id();
    1101                 : 
    1102             357 :         info!("downloading index file for timeline {}", timeline_id);
    1103                 :         tokio::fs::create_dir_all(self.conf.timeline_path(&self.tenant_shard_id, &timeline_id))
    1104                 :             .await
    1105                 :             .context("Failed to create new timeline directory")?;
    1106                 : 
    1107                 :         let ancestor = if let Some(ancestor_id) = remote_metadata.ancestor_timeline() {
    1108                 :             let timelines = self.timelines.lock().unwrap();
    1109                 :             Some(Arc::clone(timelines.get(&ancestor_id).ok_or_else(
    1110 UBC           0 :                 || {
    1111               0 :                     anyhow::anyhow!(
    1112               0 :                         "cannot find ancestor timeline {ancestor_id} for timeline {timeline_id}"
    1113               0 :                     )
    1114               0 :                 },
    1115                 :             )?))
    1116                 :         } else {
    1117                 :             None
    1118                 :         };
    1119                 : 
    1120                 :         // timeline loading after attach expects to find metadata file for each metadata
    1121                 :         save_metadata(
    1122                 :             self.conf,
    1123                 :             &self.tenant_shard_id,
    1124                 :             &timeline_id,
    1125                 :             &remote_metadata,
    1126                 :         )
    1127                 :         .await
    1128                 :         .context("save_metadata")
    1129                 :         .map_err(LoadLocalTimelineError::Load)?;
    1130                 : 
    1131                 :         self.timeline_init_and_sync(
    1132                 :             timeline_id,
    1133                 :             resources,
    1134                 :             Some(index_part),
    1135                 :             remote_metadata,
    1136                 :             ancestor,
    1137                 :             ctx,
    1138                 :         )
    1139                 :         .await
    1140                 :     }
    1141                 : 
    1142                 :     /// Create a placeholder Tenant object for a broken tenant
    1143               0 :     pub fn create_broken_tenant(
    1144               0 :         conf: &'static PageServerConf,
    1145               0 :         tenant_shard_id: TenantShardId,
    1146               0 :         reason: String,
    1147               0 :     ) -> Arc<Tenant> {
    1148               0 :         let wal_redo_manager = Arc::new(WalRedoManager::from(PostgresRedoManager::new(
    1149               0 :             conf,
    1150               0 :             tenant_shard_id,
    1151               0 :         )));
    1152               0 :         Arc::new(Tenant::new(
    1153               0 :             TenantState::Broken {
    1154               0 :                 reason,
    1155               0 :                 backtrace: String::new(),
    1156               0 :             },
    1157               0 :             conf,
    1158               0 :             AttachedTenantConf::try_from(LocationConf::default()).unwrap(),
    1159               0 :             // Shard identity isn't meaningful for a broken tenant: it's just a placeholder
    1160               0 :             // to occupy the slot for this TenantShardId.
    1161               0 :             ShardIdentity::broken(tenant_shard_id.shard_number, tenant_shard_id.shard_count),
    1162               0 :             wal_redo_manager,
    1163               0 :             tenant_shard_id,
    1164               0 :             None,
    1165               0 :             DeletionQueueClient::broken(),
    1166               0 :         ))
    1167               0 :     }
    1168                 : 
    1169 CBC         470 :     fn scan_and_sort_timelines_dir(self: Arc<Tenant>) -> anyhow::Result<TenantDirectoryScan> {
    1170             470 :         let mut timelines_to_load: HashMap<TimelineId, TimelineMetadata> = HashMap::new();
    1171             470 :         // Note timelines_to_resume_deletion needs to be separate because it can be not sortable
    1172             470 :         // from the point of `tree_sort_timelines`. I e some parents can be missing because deletion
    1173             470 :         // completed in non topological order (for example because parent has smaller number of layer files in it)
    1174             470 :         let mut timelines_to_resume_deletion: Vec<(TimelineId, Option<TimelineMetadata>)> = vec![];
    1175             470 : 
    1176             470 :         let timelines_dir = self.conf.timelines_path(&self.tenant_shard_id);
    1177                 : 
    1178             470 :         for entry in timelines_dir
    1179             470 :             .read_dir_utf8()
    1180             470 :             .context("list timelines directory for tenant")?
    1181                 :         {
    1182               3 :             let entry = entry.context("read timeline dir entry")?;
    1183               3 :             let timeline_dir = entry.path();
    1184               3 : 
    1185               3 :             if crate::is_temporary(timeline_dir) {
    1186 UBC           0 :                 info!("Found temporary timeline directory, removing: {timeline_dir}");
    1187               0 :                 if let Err(e) = std::fs::remove_dir_all(timeline_dir) {
    1188               0 :                     error!("Failed to remove temporary directory '{timeline_dir}': {e:?}");
    1189               0 :                 }
    1190 CBC           3 :             } else if is_uninit_mark(timeline_dir) {
    1191               1 :                 if !timeline_dir.exists() {
    1192 GBC           1 :                     warn!("Timeline dir entry become invalid: {timeline_dir}");
    1193               1 :                     continue;
    1194 LBC         (1) :                 }
    1195             (1) : 
    1196             (1) :                 let timeline_uninit_mark_file = &timeline_dir;
    1197             (1) :                 info!(
    1198             (1) :                     "Found an uninit mark file {timeline_uninit_mark_file}, removing the timeline and its uninit mark",
    1199             (1) :                 );
    1200             (1) :                 let timeline_id =
    1201             (1) :                     TimelineId::try_from(timeline_uninit_mark_file.file_stem())
    1202             (1) :                         .with_context(|| {
    1203 UBC           0 :                             format!(
    1204               0 :                                 "Could not parse timeline id out of the timeline uninit mark name {timeline_uninit_mark_file}",
    1205               0 :                             )
    1206 LBC         (1) :                         })?;
    1207             (1) :                 let timeline_dir = self.conf.timeline_path(&self.tenant_shard_id, &timeline_id);
    1208 UBC           0 :                 if let Err(e) =
    1209 LBC         (1) :                     remove_timeline_and_uninit_mark(&timeline_dir, timeline_uninit_mark_file)
    1210                 :                 {
    1211 UBC           0 :                     error!("Failed to clean up uninit marked timeline: {e:?}");
    1212 LBC         (1) :                 }
    1213 CBC           2 :             } else if crate::is_delete_mark(timeline_dir) {
    1214                 :                 // If metadata exists, load as usual, continue deletion
    1215 UBC           0 :                 let timeline_id = TimelineId::try_from(timeline_dir.file_stem())
    1216               0 :                     .with_context(|| {
    1217               0 :                         format!(
    1218               0 :                             "Could not parse timeline id out of the timeline uninit mark name {timeline_dir}",
    1219               0 :                         )
    1220               0 :                     })?;
    1221                 : 
    1222               0 :                 info!("Found deletion mark for timeline {}", timeline_id);
    1223                 : 
    1224               0 :                 match load_metadata(self.conf, &self.tenant_shard_id, &timeline_id) {
    1225               0 :                     Ok(metadata) => {
    1226               0 :                         timelines_to_resume_deletion.push((timeline_id, Some(metadata)))
    1227                 :                     }
    1228               0 :                     Err(e) => match &e {
    1229               0 :                         LoadMetadataError::Read(r) => {
    1230               0 :                             if r.kind() != io::ErrorKind::NotFound {
    1231               0 :                                 return Err(anyhow::anyhow!(e)).with_context(|| {
    1232               0 :                                     format!("Failed to load metadata for timeline_id {timeline_id}")
    1233               0 :                                 });
    1234               0 :                             }
    1235               0 : 
    1236               0 :                             // If metadata doesnt exist it means that we've crashed without
    1237               0 :                             // completing cleanup_remaining_timeline_fs_traces in DeleteTimelineFlow.
    1238               0 :                             // So save timeline_id for later call to `DeleteTimelineFlow::cleanup_remaining_timeline_fs_traces`.
    1239               0 :                             // We cant do it here because the method is async so we'd need block_on
    1240               0 :                             // and here we're in spawn_blocking. cleanup_remaining_timeline_fs_traces uses fs operations
    1241               0 :                             // so that basically results in a cycle:
    1242               0 :                             // spawn_blocking
    1243               0 :                             // - block_on
    1244               0 :                             //   - spawn_blocking
    1245               0 :                             // which can lead to running out of threads in blocing pool.
    1246               0 :                             timelines_to_resume_deletion.push((timeline_id, None));
    1247                 :                         }
    1248                 :                         _ => {
    1249               0 :                             return Err(anyhow::anyhow!(e)).with_context(|| {
    1250               0 :                                 format!("Failed to load metadata for timeline_id {timeline_id}")
    1251               0 :                             })
    1252                 :                         }
    1253                 :                     },
    1254                 :                 }
    1255                 :             } else {
    1256 CBC           2 :                 if !timeline_dir.exists() {
    1257 LBC         (1) :                     warn!("Timeline dir entry become invalid: {timeline_dir}");
    1258             (1) :                     continue;
    1259 CBC           2 :                 }
    1260               2 :                 let timeline_id = TimelineId::try_from(timeline_dir.file_name())
    1261               2 :                     .with_context(|| {
    1262 UBC           0 :                         format!(
    1263               0 :                             "Could not parse timeline id out of the timeline dir name {timeline_dir}",
    1264               0 :                         )
    1265 CBC           2 :                     })?;
    1266               2 :                 let timeline_uninit_mark_file = self
    1267               2 :                     .conf
    1268               2 :                     .timeline_uninit_mark_file_path(self.tenant_shard_id, timeline_id);
    1269               2 :                 if timeline_uninit_mark_file.exists() {
    1270 GBC           1 :                     info!(
    1271               1 :                         %timeline_id,
    1272               1 :                         "Found an uninit mark file, removing the timeline and its uninit mark",
    1273               1 :                     );
    1274 UBC           0 :                     if let Err(e) =
    1275 GBC           1 :                         remove_timeline_and_uninit_mark(timeline_dir, &timeline_uninit_mark_file)
    1276                 :                     {
    1277 UBC           0 :                         error!("Failed to clean up uninit marked timeline: {e:?}");
    1278 GBC           1 :                     }
    1279               1 :                     continue;
    1280 CBC           1 :                 }
    1281               1 : 
    1282               1 :                 let timeline_delete_mark_file = self
    1283               1 :                     .conf
    1284               1 :                     .timeline_delete_mark_file_path(self.tenant_shard_id, timeline_id);
    1285               1 :                 if timeline_delete_mark_file.exists() {
    1286                 :                     // Cleanup should be done in `is_delete_mark` branch above
    1287 UBC           0 :                     continue;
    1288 CBC           1 :                 }
    1289               1 : 
    1290               1 :                 let file_name = entry.file_name();
    1291               1 :                 if let Ok(timeline_id) = file_name.parse::<TimelineId>() {
    1292               1 :                     let metadata = load_metadata(self.conf, &self.tenant_shard_id, &timeline_id)
    1293               1 :                         .context("failed to load metadata")?;
    1294 UBC           0 :                     timelines_to_load.insert(timeline_id, metadata);
    1295                 :                 } else {
    1296                 :                     // A file or directory that doesn't look like a timeline ID
    1297               0 :                     warn!("unexpected file or directory in timelines directory: {file_name}");
    1298                 :                 }
    1299                 :             }
    1300                 :         }
    1301                 : 
    1302                 :         // Sort the array of timeline IDs into tree-order, so that parent comes before
    1303                 :         // all its children.
    1304 CBC         469 :         tree_sort_timelines(timelines_to_load, |m| m.ancestor_timeline()).map(|sorted_timelines| {
    1305             469 :             TenantDirectoryScan {
    1306             469 :                 sorted_timelines_to_load: sorted_timelines,
    1307             469 :                 timelines_to_resume_deletion,
    1308             469 :             }
    1309             469 :         })
    1310             470 :     }
    1311                 : 
    1312             302 :     async fn load_timeline_metadata(
    1313             302 :         self: &Arc<Tenant>,
    1314             302 :         timeline_ids: HashSet<TimelineId>,
    1315             302 :         remote_storage: &GenericRemoteStorage,
    1316             302 :         cancel: CancellationToken,
    1317             302 :     ) -> anyhow::Result<HashMap<TimelineId, TimelinePreload>> {
    1318             302 :         let mut part_downloads = JoinSet::new();
    1319             696 :         for timeline_id in timeline_ids {
    1320             394 :             let client = RemoteTimelineClient::new(
    1321             394 :                 remote_storage.clone(),
    1322             394 :                 self.deletion_queue_client.clone(),
    1323             394 :                 self.conf,
    1324             394 :                 self.tenant_shard_id,
    1325             394 :                 timeline_id,
    1326             394 :                 self.generation,
    1327             394 :             );
    1328             394 :             let cancel_clone = cancel.clone();
    1329             394 :             part_downloads.spawn(
    1330             394 :                 async move {
    1331             394 :                     debug!("starting index part download");
    1332                 : 
    1333            2975 :                     let index_part = client.download_index_file(cancel_clone).await;
    1334                 : 
    1335             394 :                     debug!("finished index part download");
    1336                 : 
    1337             394 :                     Result::<_, anyhow::Error>::Ok(TimelinePreload {
    1338             394 :                         client,
    1339             394 :                         timeline_id,
    1340             394 :                         index_part,
    1341             394 :                     })
    1342             394 :                 }
    1343             394 :                 .map(move |res| {
    1344             394 :                     res.with_context(|| format!("download index part for timeline {timeline_id}"))
    1345             394 :                 })
    1346             394 :                 .instrument(info_span!("download_index_part", %timeline_id)),
    1347                 :             );
    1348                 :         }
    1349                 : 
    1350             302 :         let mut timeline_preloads: HashMap<TimelineId, TimelinePreload> = HashMap::new();
    1351                 : 
    1352             696 :         loop {
    1353            1058 :             tokio::select!(
    1354             696 :                 next = part_downloads.join_next() => {
    1355                 :                     match next {
    1356                 :                         Some(result) => {
    1357                 :                             let preload_result = result.context("join preload task")?;
    1358                 :                             let preload = preload_result?;
    1359                 :                             timeline_preloads.insert(preload.timeline_id, preload);
    1360                 :                         },
    1361                 :                         None => {
    1362                 :                             break;
    1363                 :                         }
    1364                 :                     }
    1365                 :                 },
    1366                 :                 _ = cancel.cancelled() => {
    1367                 :                     anyhow::bail!("Cancelled while waiting for remote index download")
    1368                 :                 }
    1369             696 :             )
    1370             696 :         }
    1371                 : 
    1372             302 :         Ok(timeline_preloads)
    1373             302 :     }
    1374                 : 
    1375                 :     ///
    1376                 :     /// Background task to load in-memory data structures for this tenant, from
    1377                 :     /// files on disk. Used at pageserver startup.
    1378                 :     ///
    1379                 :     /// No background tasks are started as part of this routine.
    1380             470 :     async fn load_local(self: &Arc<Tenant>, ctx: &RequestContext) -> anyhow::Result<()> {
    1381             470 :         span::debug_assert_current_span_has_tenant_id();
    1382                 : 
    1383 UBC           0 :         debug!("loading tenant task");
    1384                 : 
    1385                 :         // Load in-memory state to reflect the local files on disk
    1386                 :         //
    1387                 :         // Scan the directory, peek into the metadata file of each timeline, and
    1388                 :         // collect a list of timelines and their ancestors.
    1389 CBC         470 :         let span = info_span!("blocking");
    1390             470 :         let cloned = Arc::clone(self);
    1391                 : 
    1392             470 :         let scan = tokio::task::spawn_blocking(move || {
    1393             470 :             let _g = span.entered();
    1394             470 :             cloned.scan_and_sort_timelines_dir()
    1395             470 :         })
    1396             470 :         .await
    1397             470 :         .context("load spawn_blocking")
    1398             470 :         .and_then(|res| res)?;
    1399                 : 
    1400                 :         // FIXME original collect_timeline_files contained one more check:
    1401                 :         //    1. "Timeline has no ancestor and no layer files"
    1402                 : 
    1403                 :         // Process loadable timelines first
    1404             469 :         for (timeline_id, local_metadata) in scan.sorted_timelines_to_load {
    1405 UBC           0 :             if let Err(e) = self
    1406               0 :                 .load_local_timeline(timeline_id, local_metadata, ctx, false)
    1407               0 :                 .await
    1408                 :             {
    1409               0 :                 match e {
    1410               0 :                     LoadLocalTimelineError::Load(source) => {
    1411               0 :                         return Err(anyhow::anyhow!(source)).with_context(|| {
    1412               0 :                             format!("Failed to load local timeline: {timeline_id}")
    1413               0 :                         })
    1414                 :                     }
    1415               0 :                     LoadLocalTimelineError::ResumeDeletion(source) => {
    1416                 :                         // Make sure resumed deletion wont fail loading for entire tenant.
    1417               0 :                         error!("Failed to resume timeline deletion: {source:#}")
    1418                 :                     }
    1419                 :                 }
    1420               0 :             }
    1421                 :         }
    1422                 : 
    1423                 :         // Resume deletion ones with deleted_mark
    1424 CBC         469 :         for (timeline_id, maybe_local_metadata) in scan.timelines_to_resume_deletion {
    1425 UBC           0 :             match maybe_local_metadata {
    1426                 :                 None => {
    1427                 :                     // See comment in `scan_and_sort_timelines_dir`.
    1428               0 :                     if let Err(e) =
    1429               0 :                         DeleteTimelineFlow::cleanup_remaining_timeline_fs_traces(self, timeline_id)
    1430               0 :                             .await
    1431                 :                     {
    1432               0 :                         warn!(
    1433               0 :                             "cannot clean up deleted timeline dir timeline_id: {} error: {:#}",
    1434               0 :                             timeline_id, e
    1435               0 :                         );
    1436               0 :                     }
    1437                 :                 }
    1438               0 :                 Some(local_metadata) => {
    1439               0 :                     if let Err(e) = self
    1440               0 :                         .load_local_timeline(timeline_id, local_metadata, ctx, true)
    1441               0 :                         .await
    1442                 :                     {
    1443               0 :                         match e {
    1444               0 :                             LoadLocalTimelineError::Load(source) => {
    1445               0 :                                 // We tried to load deleted timeline, this is a bug.
    1446               0 :                                 return Err(anyhow::anyhow!(source).context(
    1447               0 :                                     format!("This is a bug. We tried to load deleted timeline which is wrong and loading failed. Timeline: {timeline_id}")
    1448               0 :                                 ));
    1449                 :                             }
    1450               0 :                             LoadLocalTimelineError::ResumeDeletion(source) => {
    1451                 :                                 // Make sure resumed deletion wont fail loading for entire tenant.
    1452               0 :                                 error!("Failed to resume timeline deletion: {source:#}")
    1453                 :                             }
    1454                 :                         }
    1455               0 :                     }
    1456                 :                 }
    1457                 :             }
    1458                 :         }
    1459                 : 
    1460               0 :         trace!("Done");
    1461                 : 
    1462 CBC         469 :         Ok(())
    1463             470 :     }
    1464                 : 
    1465                 :     /// Subroutine of `load_tenant`, to load an individual timeline
    1466                 :     ///
    1467                 :     /// NB: The parent is assumed to be already loaded!
    1468 UBC           0 :     #[instrument(skip(self, local_metadata, ctx))]
    1469                 :     async fn load_local_timeline(
    1470                 :         self: &Arc<Self>,
    1471                 :         timeline_id: TimelineId,
    1472                 :         local_metadata: TimelineMetadata,
    1473                 :         ctx: &RequestContext,
    1474                 :         found_delete_mark: bool,
    1475                 :     ) -> Result<(), LoadLocalTimelineError> {
    1476                 :         span::debug_assert_current_span_has_tenant_id();
    1477                 : 
    1478                 :         let resources = self.build_timeline_resources(timeline_id);
    1479                 : 
    1480                 :         if found_delete_mark {
    1481                 :             // There is no remote client, we found local metadata.
    1482                 :             // Continue cleaning up local disk.
    1483                 :             DeleteTimelineFlow::resume_deletion(
    1484                 :                 Arc::clone(self),
    1485                 :                 timeline_id,
    1486                 :                 &local_metadata,
    1487                 :                 None,
    1488                 :                 self.deletion_queue_client.clone(),
    1489                 :             )
    1490                 :             .await
    1491                 :             .context("resume deletion")
    1492                 :             .map_err(LoadLocalTimelineError::ResumeDeletion)?;
    1493                 :             return Ok(());
    1494                 :         }
    1495                 : 
    1496                 :         let ancestor = if let Some(ancestor_timeline_id) = local_metadata.ancestor_timeline() {
    1497                 :             let ancestor_timeline = self.get_timeline(ancestor_timeline_id, false)
    1498               0 :                 .with_context(|| anyhow::anyhow!("cannot find ancestor timeline {ancestor_timeline_id} for timeline {timeline_id}"))
    1499                 :                 .map_err(LoadLocalTimelineError::Load)?;
    1500                 :             Some(ancestor_timeline)
    1501                 :         } else {
    1502                 :             None
    1503                 :         };
    1504                 : 
    1505                 :         self.timeline_init_and_sync(timeline_id, resources, None, local_metadata, ancestor, ctx)
    1506                 :             .await
    1507                 :             .map_err(LoadLocalTimelineError::Load)
    1508                 :     }
    1509                 : 
    1510 CBC         891 :     pub(crate) fn tenant_id(&self) -> TenantId {
    1511             891 :         self.tenant_shard_id.tenant_id
    1512             891 :     }
    1513                 : 
    1514               6 :     pub(crate) fn tenant_shard_id(&self) -> TenantShardId {
    1515               6 :         self.tenant_shard_id
    1516               6 :     }
    1517                 : 
    1518                 :     /// Get Timeline handle for given Neon timeline ID.
    1519                 :     /// This function is idempotent. It doesn't change internal state in any way.
    1520           14488 :     pub fn get_timeline(
    1521           14488 :         &self,
    1522           14488 :         timeline_id: TimelineId,
    1523           14488 :         active_only: bool,
    1524           14488 :     ) -> Result<Arc<Timeline>, GetTimelineError> {
    1525           14488 :         let timelines_accessor = self.timelines.lock().unwrap();
    1526           14488 :         let timeline = timelines_accessor
    1527           14488 :             .get(&timeline_id)
    1528           14488 :             .ok_or(GetTimelineError::NotFound {
    1529           14488 :                 tenant_id: self.tenant_shard_id.tenant_id,
    1530           14488 :                 timeline_id,
    1531           14488 :             })?;
    1532                 : 
    1533           14437 :         if active_only && !timeline.is_active() {
    1534               1 :             Err(GetTimelineError::NotActive {
    1535               1 :                 tenant_id: self.tenant_shard_id.tenant_id,
    1536               1 :                 timeline_id,
    1537               1 :                 state: timeline.current_state(),
    1538               1 :             })
    1539                 :         } else {
    1540           14436 :             Ok(Arc::clone(timeline))
    1541                 :         }
    1542           14488 :     }
    1543                 : 
    1544                 :     /// Lists timelines the tenant contains.
    1545                 :     /// Up to tenant's implementation to omit certain timelines that ar not considered ready for use.
    1546             730 :     pub fn list_timelines(&self) -> Vec<Arc<Timeline>> {
    1547             730 :         self.timelines
    1548             730 :             .lock()
    1549             730 :             .unwrap()
    1550             730 :             .values()
    1551             730 :             .map(Arc::clone)
    1552             730 :             .collect()
    1553             730 :     }
    1554                 : 
    1555             466 :     pub fn list_timeline_ids(&self) -> Vec<TimelineId> {
    1556             466 :         self.timelines.lock().unwrap().keys().cloned().collect()
    1557             466 :     }
    1558                 : 
    1559                 :     /// This is used to create the initial 'main' timeline during bootstrapping,
    1560                 :     /// or when importing a new base backup. The caller is expected to load an
    1561                 :     /// initial image of the datadir to the new timeline after this.
    1562                 :     ///
    1563                 :     /// Until that happens, the on-disk state is invalid (disk_consistent_lsn=Lsn(0))
    1564                 :     /// and the timeline will fail to load at a restart.
    1565                 :     ///
    1566                 :     /// That's why we add an uninit mark file, and wrap it together witht the Timeline
    1567                 :     /// in-memory object into UninitializedTimeline.
    1568                 :     /// Once the caller is done setting up the timeline, they should call
    1569                 :     /// `UninitializedTimeline::initialize_with_lock` to remove the uninit mark.
    1570                 :     ///
    1571                 :     /// For tests, use `DatadirModification::init_empty_test_timeline` + `commit` to setup the
    1572                 :     /// minimum amount of keys required to get a writable timeline.
    1573                 :     /// (Without it, `put` might fail due to `repartition` failing.)
    1574              48 :     pub(crate) async fn create_empty_timeline(
    1575              48 :         &self,
    1576              48 :         new_timeline_id: TimelineId,
    1577              48 :         initdb_lsn: Lsn,
    1578              48 :         pg_version: u32,
    1579              48 :         _ctx: &RequestContext,
    1580              48 :     ) -> anyhow::Result<UninitializedTimeline> {
    1581              48 :         anyhow::ensure!(
    1582              48 :             self.is_active(),
    1583 UBC           0 :             "Cannot create empty timelines on inactive tenant"
    1584                 :         );
    1585                 : 
    1586 CBC          48 :         let timeline_uninit_mark = self.create_timeline_uninit_mark(new_timeline_id)?;
    1587              47 :         let new_metadata = TimelineMetadata::new(
    1588              47 :             // Initialize disk_consistent LSN to 0, The caller must import some data to
    1589              47 :             // make it valid, before calling finish_creation()
    1590              47 :             Lsn(0),
    1591              47 :             None,
    1592              47 :             None,
    1593              47 :             Lsn(0),
    1594              47 :             initdb_lsn,
    1595              47 :             initdb_lsn,
    1596              47 :             pg_version,
    1597              47 :         );
    1598              47 :         self.prepare_new_timeline(
    1599              47 :             new_timeline_id,
    1600              47 :             &new_metadata,
    1601              47 :             timeline_uninit_mark,
    1602              47 :             initdb_lsn,
    1603              47 :             None,
    1604              47 :         )
    1605 UBC           0 :         .await
    1606 CBC          48 :     }
    1607                 : 
    1608                 :     /// Helper for unit tests to create an empty timeline.
    1609                 :     ///
    1610                 :     /// The timeline is has state value `Active` but its background loops are not running.
    1611                 :     // This makes the various functions which anyhow::ensure! for Active state work in tests.
    1612                 :     // Our current tests don't need the background loops.
    1613                 :     #[cfg(test)]
    1614              34 :     pub async fn create_test_timeline(
    1615              34 :         &self,
    1616              34 :         new_timeline_id: TimelineId,
    1617              34 :         initdb_lsn: Lsn,
    1618              34 :         pg_version: u32,
    1619              34 :         ctx: &RequestContext,
    1620              34 :     ) -> anyhow::Result<Arc<Timeline>> {
    1621              34 :         let uninit_tl = self
    1622              34 :             .create_empty_timeline(new_timeline_id, initdb_lsn, pg_version, ctx)
    1623 UBC           0 :             .await?;
    1624 CBC          34 :         let tline = uninit_tl.raw_timeline().expect("we just created it");
    1625              34 :         assert_eq!(tline.get_last_record_lsn(), Lsn(0));
    1626                 : 
    1627                 :         // Setup minimum keys required for the timeline to be usable.
    1628              34 :         let mut modification = tline.begin_modification(initdb_lsn);
    1629              34 :         modification
    1630              34 :             .init_empty_test_timeline()
    1631              34 :             .context("init_empty_test_timeline")?;
    1632              34 :         modification
    1633              34 :             .commit(ctx)
    1634 UBC           0 :             .await
    1635 CBC          34 :             .context("commit init_empty_test_timeline modification")?;
    1636                 : 
    1637                 :         // Flush to disk so that uninit_tl's check for valid disk_consistent_lsn passes.
    1638              34 :         tline.maybe_spawn_flush_loop();
    1639              34 :         tline.freeze_and_flush().await.context("freeze_and_flush")?;
    1640                 : 
    1641                 :         // Make sure the freeze_and_flush reaches remote storage.
    1642              34 :         tline
    1643              34 :             .remote_client
    1644              34 :             .as_ref()
    1645              34 :             .unwrap()
    1646              34 :             .wait_completion()
    1647               9 :             .await
    1648              34 :             .unwrap();
    1649                 : 
    1650              34 :         let tl = uninit_tl.finish_creation()?;
    1651                 :         // The non-test code would call tl.activate() here.
    1652              34 :         tl.set_state(TimelineState::Active);
    1653              34 :         Ok(tl)
    1654              34 :     }
    1655                 : 
    1656                 :     /// Create a new timeline.
    1657                 :     ///
    1658                 :     /// Returns the new timeline ID and reference to its Timeline object.
    1659                 :     ///
    1660                 :     /// If the caller specified the timeline ID to use (`new_timeline_id`), and timeline with
    1661                 :     /// the same timeline ID already exists, returns CreateTimelineError::AlreadyExists.
    1662                 :     #[allow(clippy::too_many_arguments)]
    1663             783 :     pub(crate) async fn create_timeline(
    1664             783 :         &self,
    1665             783 :         new_timeline_id: TimelineId,
    1666             783 :         ancestor_timeline_id: Option<TimelineId>,
    1667             783 :         mut ancestor_start_lsn: Option<Lsn>,
    1668             783 :         pg_version: u32,
    1669             783 :         load_existing_initdb: Option<TimelineId>,
    1670             783 :         broker_client: storage_broker::BrokerClientChannel,
    1671             783 :         ctx: &RequestContext,
    1672             783 :     ) -> Result<Arc<Timeline>, CreateTimelineError> {
    1673             783 :         if !self.is_active() {
    1674 UBC           0 :             return Err(CreateTimelineError::Other(anyhow::anyhow!(
    1675               0 :                 "Cannot create timelines on inactive tenant"
    1676               0 :             )));
    1677 CBC         783 :         }
    1678                 : 
    1679             783 :         let _gate = self
    1680             783 :             .gate
    1681             783 :             .enter()
    1682             783 :             .map_err(|_| CreateTimelineError::ShuttingDown)?;
    1683                 : 
    1684                 :         // Get exclusive access to the timeline ID: this ensures that it does not already exist,
    1685                 :         // and that no other creation attempts will be allowed in while we are working.  The
    1686                 :         // uninit_mark is a guard.
    1687             783 :         let uninit_mark = match self.create_timeline_uninit_mark(new_timeline_id) {
    1688             783 :             Ok(m) => m,
    1689                 :             Err(TimelineExclusionError::AlreadyCreating) => {
    1690                 :                 // Creation is in progress, we cannot create it again, and we cannot
    1691                 :                 // check if this request matches the existing one, so caller must try
    1692                 :                 // again later.
    1693 UBC           0 :                 return Err(CreateTimelineError::AlreadyCreating);
    1694                 :             }
    1695               0 :             Err(TimelineExclusionError::Other(e)) => {
    1696               0 :                 return Err(CreateTimelineError::Other(e));
    1697                 :             }
    1698               0 :             Err(TimelineExclusionError::AlreadyExists(existing)) => {
    1699               0 :                 debug!("timeline {new_timeline_id} already exists");
    1700                 : 
    1701                 :                 // Idempotency: creating the same timeline twice is not an error, unless
    1702                 :                 // the second creation has different parameters.
    1703               0 :                 if existing.get_ancestor_timeline_id() != ancestor_timeline_id
    1704               0 :                     || existing.pg_version != pg_version
    1705               0 :                     || (ancestor_start_lsn.is_some()
    1706               0 :                         && ancestor_start_lsn != Some(existing.get_ancestor_lsn()))
    1707                 :                 {
    1708               0 :                     return Err(CreateTimelineError::Conflict);
    1709               0 :                 }
    1710                 : 
    1711               0 :                 if let Some(remote_client) = existing.remote_client.as_ref() {
    1712                 :                     // Wait for uploads to complete, so that when we return Ok, the timeline
    1713                 :                     // is known to be durable on remote storage. Just like we do at the end of
    1714                 :                     // this function, after we have created the timeline ourselves.
    1715                 :                     //
    1716                 :                     // We only really care that the initial version of `index_part.json` has
    1717                 :                     // been uploaded. That's enough to remember that the timeline
    1718                 :                     // exists. However, there is no function to wait specifically for that so
    1719                 :                     // we just wait for all in-progress uploads to finish.
    1720               0 :                     remote_client
    1721               0 :                         .wait_completion()
    1722               0 :                         .await
    1723               0 :                         .context("wait for timeline uploads to complete")?;
    1724               0 :                 }
    1725                 : 
    1726               0 :                 return Ok(existing);
    1727                 :             }
    1728                 :         };
    1729                 : 
    1730 CBC         783 :         let loaded_timeline = match ancestor_timeline_id {
    1731             258 :             Some(ancestor_timeline_id) => {
    1732             258 :                 let ancestor_timeline = self
    1733             258 :                     .get_timeline(ancestor_timeline_id, false)
    1734             258 :                     .context("Cannot branch off the timeline that's not present in pageserver")?;
    1735                 : 
    1736                 :                 // instead of waiting around, just deny the request because ancestor is not yet
    1737                 :                 // ready for other purposes either.
    1738             258 :                 if !ancestor_timeline.is_active() {
    1739               6 :                     return Err(CreateTimelineError::AncestorNotActive);
    1740             252 :                 }
    1741                 : 
    1742             252 :                 if let Some(lsn) = ancestor_start_lsn.as_mut() {
    1743              30 :                     *lsn = lsn.align();
    1744              30 : 
    1745              30 :                     let ancestor_ancestor_lsn = ancestor_timeline.get_ancestor_lsn();
    1746              30 :                     if ancestor_ancestor_lsn > *lsn {
    1747                 :                         // can we safely just branch from the ancestor instead?
    1748               2 :                         return Err(CreateTimelineError::AncestorLsn(anyhow::anyhow!(
    1749               2 :                             "invalid start lsn {} for ancestor timeline {}: less than timeline ancestor lsn {}",
    1750               2 :                             lsn,
    1751               2 :                             ancestor_timeline_id,
    1752               2 :                             ancestor_ancestor_lsn,
    1753               2 :                         )));
    1754              28 :                     }
    1755              28 : 
    1756              28 :                     // Wait for the WAL to arrive and be processed on the parent branch up
    1757              28 :                     // to the requested branch point. The repository code itself doesn't
    1758              28 :                     // require it, but if we start to receive WAL on the new timeline,
    1759              28 :                     // decoding the new WAL might need to look up previous pages, relation
    1760              28 :                     // sizes etc. and that would get confused if the previous page versions
    1761              28 :                     // are not in the repository yet.
    1762              28 :                     ancestor_timeline
    1763              28 :                         .wait_lsn(*lsn, ctx)
    1764              10 :                         .await
    1765              28 :                         .map_err(|e| match e {
    1766 UBC           0 :                             e @ (WaitLsnError::Timeout(_) | WaitLsnError::BadState) => {
    1767               0 :                                 CreateTimelineError::AncestorLsn(anyhow::anyhow!(e))
    1768                 :                             }
    1769               0 :                             WaitLsnError::Shutdown => CreateTimelineError::ShuttingDown,
    1770 CBC          28 :                         })?;
    1771             222 :                 }
    1772                 : 
    1773             250 :                 self.branch_timeline(
    1774             250 :                     &ancestor_timeline,
    1775             250 :                     new_timeline_id,
    1776             250 :                     ancestor_start_lsn,
    1777             250 :                     uninit_mark,
    1778             250 :                     ctx,
    1779             250 :                 )
    1780               8 :                 .await?
    1781                 :             }
    1782                 :             None => {
    1783             525 :                 self.bootstrap_timeline(
    1784             525 :                     new_timeline_id,
    1785             525 :                     pg_version,
    1786             525 :                     load_existing_initdb,
    1787             525 :                     uninit_mark,
    1788             525 :                     ctx,
    1789             525 :                 )
    1790         6921368 :                 .await?
    1791                 :             }
    1792                 :         };
    1793                 : 
    1794                 :         // At this point we have dropped our guard on [`Self::timelines_creating`], and
    1795                 :         // the timeline is visible in [`Self::timelines`], but it is _not_ durable yet.  We must
    1796                 :         // not send a success to the caller until it is.  The same applies to handling retries,
    1797                 :         // see the handling of [`TimelineExclusionError::AlreadyExists`] above.
    1798             763 :         if let Some(remote_client) = loaded_timeline.remote_client.as_ref() {
    1799             763 :             let kind = ancestor_timeline_id
    1800             763 :                 .map(|_| "branched")
    1801             763 :                 .unwrap_or("bootstrapped");
    1802             763 :             remote_client.wait_completion().await.with_context(|| {
    1803 UBC           0 :                 format!("wait for {} timeline initial uploads to complete", kind)
    1804 CBC         759 :             })?;
    1805 UBC           0 :         }
    1806                 : 
    1807 CBC         759 :         loaded_timeline.activate(broker_client, None, ctx);
    1808             759 : 
    1809             759 :         Ok(loaded_timeline)
    1810             777 :     }
    1811                 : 
    1812              60 :     pub(crate) async fn delete_timeline(
    1813              60 :         self: Arc<Self>,
    1814              60 :         timeline_id: TimelineId,
    1815              60 :     ) -> Result<(), DeleteTimelineError> {
    1816             346 :         DeleteTimelineFlow::run(&self, timeline_id, false).await?;
    1817                 : 
    1818              51 :         Ok(())
    1819              60 :     }
    1820                 : 
    1821                 :     /// perform one garbage collection iteration, removing old data files from disk.
    1822                 :     /// this function is periodically called by gc task.
    1823                 :     /// also it can be explicitly requested through page server api 'do_gc' command.
    1824                 :     ///
    1825                 :     /// `target_timeline_id` specifies the timeline to GC, or None for all.
    1826                 :     ///
    1827                 :     /// The `horizon` an `pitr` parameters determine how much WAL history needs to be retained.
    1828                 :     /// Also known as the retention period, or the GC cutoff point. `horizon` specifies
    1829                 :     /// the amount of history, as LSN difference from current latest LSN on each timeline.
    1830                 :     /// `pitr` specifies the same as a time difference from the current time. The effective
    1831                 :     /// GC cutoff point is determined conservatively by either `horizon` and `pitr`, whichever
    1832                 :     /// requires more history to be retained.
    1833                 :     //
    1834             366 :     pub async fn gc_iteration(
    1835             366 :         &self,
    1836             366 :         target_timeline_id: Option<TimelineId>,
    1837             366 :         horizon: u64,
    1838             366 :         pitr: Duration,
    1839             366 :         cancel: &CancellationToken,
    1840             366 :         ctx: &RequestContext,
    1841             366 :     ) -> anyhow::Result<GcResult> {
    1842             366 :         // Don't start doing work during shutdown
    1843             366 :         if let TenantState::Stopping { .. } = self.current_state() {
    1844 UBC           0 :             return Ok(GcResult::default());
    1845 CBC         366 :         }
    1846             366 : 
    1847             366 :         // there is a global allowed_error for this
    1848             366 :         anyhow::ensure!(
    1849             366 :             self.is_active(),
    1850 UBC           0 :             "Cannot run GC iteration on inactive tenant"
    1851                 :         );
    1852                 : 
    1853                 :         {
    1854 CBC         366 :             let conf = self.tenant_conf.read().unwrap();
    1855             366 : 
    1856             366 :             if !conf.location.may_delete_layers_hint() {
    1857 GBC           1 :                 info!("Skipping GC in location state {:?}", conf.location);
    1858               1 :                 return Ok(GcResult::default());
    1859 CBC         365 :             }
    1860             365 :         }
    1861             365 : 
    1862             365 :         self.gc_iteration_internal(target_timeline_id, horizon, pitr, cancel, ctx)
    1863          236069 :             .await
    1864             360 :     }
    1865                 : 
    1866                 :     /// Perform one compaction iteration.
    1867                 :     /// This function is periodically called by compactor task.
    1868                 :     /// Also it can be explicitly requested per timeline through page server
    1869                 :     /// api's 'compact' command.
    1870             334 :     async fn compaction_iteration(
    1871             334 :         &self,
    1872             334 :         cancel: &CancellationToken,
    1873             334 :         ctx: &RequestContext,
    1874             334 :     ) -> anyhow::Result<(), timeline::CompactionError> {
    1875             334 :         // Don't start doing work during shutdown, or when broken, we do not need those in the logs
    1876             334 :         if !self.is_active() {
    1877               2 :             return Ok(());
    1878             332 :         }
    1879             332 : 
    1880             332 :         {
    1881             332 :             let conf = self.tenant_conf.read().unwrap();
    1882             332 :             if !conf.location.may_delete_layers_hint() || !conf.location.may_upload_layers_hint() {
    1883               9 :                 info!("Skipping compaction in location state {:?}", conf.location);
    1884               9 :                 return Ok(());
    1885             323 :             }
    1886             323 :         }
    1887             323 : 
    1888             323 :         // Scan through the hashmap and collect a list of all the timelines,
    1889             323 :         // while holding the lock. Then drop the lock and actually perform the
    1890             323 :         // compactions.  We don't want to block everything else while the
    1891             323 :         // compaction runs.
    1892             323 :         let timelines_to_compact = {
    1893             323 :             let timelines = self.timelines.lock().unwrap();
    1894             323 :             let timelines_to_compact = timelines
    1895             323 :                 .iter()
    1896             530 :                 .filter_map(|(timeline_id, timeline)| {
    1897             530 :                     if timeline.is_active() {
    1898             528 :                         Some((*timeline_id, timeline.clone()))
    1899                 :                     } else {
    1900               2 :                         None
    1901                 :                     }
    1902             530 :                 })
    1903             323 :                 .collect::<Vec<_>>();
    1904             323 :             drop(timelines);
    1905             323 :             timelines_to_compact
    1906                 :         };
    1907                 : 
    1908             835 :         for (timeline_id, timeline) in &timelines_to_compact {
    1909             521 :             timeline
    1910             521 :                 .compact(cancel, EnumSet::empty(), ctx)
    1911             521 :                 .instrument(info_span!("compact_timeline", %timeline_id))
    1912           90272 :                 .await?;
    1913                 :         }
    1914                 : 
    1915             314 :         Ok(())
    1916             325 :     }
    1917                 : 
    1918           20287 :     pub fn current_state(&self) -> TenantState {
    1919           20287 :         self.state.borrow().clone()
    1920           20287 :     }
    1921                 : 
    1922            2416 :     pub fn is_active(&self) -> bool {
    1923            2416 :         self.current_state() == TenantState::Active
    1924            2416 :     }
    1925                 : 
    1926                 :     /// Changes tenant status to active, unless shutdown was already requested.
    1927                 :     ///
    1928                 :     /// `background_jobs_can_start` is an optional barrier set to a value during pageserver startup
    1929                 :     /// to delay background jobs. Background jobs can be started right away when None is given.
    1930             709 :     fn activate(
    1931             709 :         self: &Arc<Self>,
    1932             709 :         broker_client: BrokerClientChannel,
    1933             709 :         background_jobs_can_start: Option<&completion::Barrier>,
    1934             709 :         ctx: &RequestContext,
    1935             709 :     ) {
    1936             709 :         span::debug_assert_current_span_has_tenant_id();
    1937             709 : 
    1938             709 :         let mut activating = false;
    1939             709 :         self.state.send_modify(|current_state| {
    1940             709 :             use pageserver_api::models::ActivatingFrom;
    1941             709 :             match &*current_state {
    1942                 :                 TenantState::Activating(_) | TenantState::Active | TenantState::Broken { .. } | TenantState::Stopping { .. } => {
    1943 UBC           0 :                     panic!("caller is responsible for calling activate() only on Loading / Attaching tenants, got {state:?}", state = current_state);
    1944                 :                 }
    1945               0 :                 TenantState::Loading => {
    1946               0 :                     *current_state = TenantState::Activating(ActivatingFrom::Loading);
    1947               0 :                 }
    1948 CBC         709 :                 TenantState::Attaching => {
    1949             709 :                     *current_state = TenantState::Activating(ActivatingFrom::Attaching);
    1950             709 :                 }
    1951                 :             }
    1952             709 :             debug!(tenant_id = %self.tenant_shard_id.tenant_id, shard_id = %self.tenant_shard_id.shard_slug(), "Activating tenant");
    1953             709 :             activating = true;
    1954             709 :             // Continue outside the closure. We need to grab timelines.lock()
    1955             709 :             // and we plan to turn it into a tokio::sync::Mutex in a future patch.
    1956             709 :         });
    1957             709 : 
    1958             709 :         if activating {
    1959             709 :             let timelines_accessor = self.timelines.lock().unwrap();
    1960             709 :             let timelines_to_activate = timelines_accessor
    1961             709 :                 .values()
    1962             709 :                 .filter(|timeline| !(timeline.is_broken() || timeline.is_stopping()));
    1963             709 : 
    1964             709 :             // Spawn gc and compaction loops. The loops will shut themselves
    1965             709 :             // down when they notice that the tenant is inactive.
    1966             709 :             tasks::start_background_loops(self, background_jobs_can_start);
    1967             709 : 
    1968             709 :             let mut activated_timelines = 0;
    1969                 : 
    1970            1043 :             for timeline in timelines_to_activate {
    1971             334 :                 timeline.activate(broker_client.clone(), background_jobs_can_start, ctx);
    1972             334 :                 activated_timelines += 1;
    1973             334 :             }
    1974                 : 
    1975             709 :             self.state.send_modify(move |current_state| {
    1976             709 :                 assert!(
    1977             709 :                     matches!(current_state, TenantState::Activating(_)),
    1978 UBC           0 :                     "set_stopping and set_broken wait for us to leave Activating state",
    1979                 :                 );
    1980 CBC         709 :                 *current_state = TenantState::Active;
    1981             709 : 
    1982             709 :                 let elapsed = self.constructed_at.elapsed();
    1983             709 :                 let total_timelines = timelines_accessor.len();
    1984             709 : 
    1985             709 :                 // log a lot of stuff, because some tenants sometimes suffer from user-visible
    1986             709 :                 // times to activate. see https://github.com/neondatabase/neon/issues/4025
    1987             709 :                 info!(
    1988             709 :                     since_creation_millis = elapsed.as_millis(),
    1989             709 :                     tenant_id = %self.tenant_shard_id.tenant_id,
    1990             709 :                     shard_id = %self.tenant_shard_id.shard_slug(),
    1991             709 :                     activated_timelines,
    1992             709 :                     total_timelines,
    1993             709 :                     post_state = <&'static str>::from(&*current_state),
    1994             709 :                     "activation attempt finished"
    1995             709 :                 );
    1996                 : 
    1997             709 :                 TENANT.activation.observe(elapsed.as_secs_f64());
    1998             709 :             });
    1999 UBC           0 :         }
    2000 CBC         709 :     }
    2001                 : 
    2002                 :     /// Shutdown the tenant and join all of the spawned tasks.
    2003                 :     ///
    2004                 :     /// The method caters for all use-cases:
    2005                 :     /// - pageserver shutdown (freeze_and_flush == true)
    2006                 :     /// - detach + ignore (freeze_and_flush == false)
    2007                 :     ///
    2008                 :     /// This will attempt to shutdown even if tenant is broken.
    2009                 :     ///
    2010                 :     /// `shutdown_progress` is a [`completion::Barrier`] for the shutdown initiated by this call.
    2011                 :     /// If the tenant is already shutting down, we return a clone of the first shutdown call's
    2012                 :     /// `Barrier` as an `Err`. This not-first caller can use the returned barrier to join with
    2013                 :     /// the ongoing shutdown.
    2014             368 :     async fn shutdown(
    2015             368 :         &self,
    2016             368 :         shutdown_progress: completion::Barrier,
    2017             368 :         freeze_and_flush: bool,
    2018             368 :     ) -> Result<(), completion::Barrier> {
    2019             368 :         span::debug_assert_current_span_has_tenant_id();
    2020             368 : 
    2021             368 :         // Set tenant (and its timlines) to Stoppping state.
    2022             368 :         //
    2023             368 :         // Since we can only transition into Stopping state after activation is complete,
    2024             368 :         // run it in a JoinSet so all tenants have a chance to stop before we get SIGKILLed.
    2025             368 :         //
    2026             368 :         // Transitioning tenants to Stopping state has a couple of non-obvious side effects:
    2027             368 :         // 1. Lock out any new requests to the tenants.
    2028             368 :         // 2. Signal cancellation to WAL receivers (we wait on it below).
    2029             368 :         // 3. Signal cancellation for other tenant background loops.
    2030             368 :         // 4. ???
    2031             368 :         //
    2032             368 :         // The waiting for the cancellation is not done uniformly.
    2033             368 :         // We certainly wait for WAL receivers to shut down.
    2034             368 :         // That is necessary so that no new data comes in before the freeze_and_flush.
    2035             368 :         // But the tenant background loops are joined-on in our caller.
    2036             368 :         // It's mesed up.
    2037             368 :         // we just ignore the failure to stop
    2038             368 : 
    2039             368 :         match self.set_stopping(shutdown_progress, false, false).await {
    2040             312 :             Ok(()) => {}
    2041              56 :             Err(SetStoppingError::Broken) => {
    2042              56 :                 // assume that this is acceptable
    2043              56 :             }
    2044 UBC           0 :             Err(SetStoppingError::AlreadyStopping(other)) => {
    2045                 :                 // give caller the option to wait for this this shutdown
    2046               0 :                 info!("Tenant::shutdown: AlreadyStopping");
    2047               0 :                 return Err(other);
    2048                 :             }
    2049                 :         };
    2050                 : 
    2051 CBC         368 :         let mut js = tokio::task::JoinSet::new();
    2052             368 :         {
    2053             368 :             let timelines = self.timelines.lock().unwrap();
    2054             499 :             timelines.values().for_each(|timeline| {
    2055             499 :                 let timeline = Arc::clone(timeline);
    2056             499 :                 let span = Span::current();
    2057             499 :                 js.spawn(async move {
    2058             499 :                     if freeze_and_flush {
    2059             835 :                         timeline.flush_and_shutdown().instrument(span).await
    2060                 :                     } else {
    2061             623 :                         timeline.shutdown().instrument(span).await
    2062                 :                     }
    2063             499 :                 });
    2064             499 :             })
    2065                 :         };
    2066                 :         // test_long_timeline_create_then_tenant_delete is leaning on this message
    2067             368 :         tracing::info!("Waiting for timelines...");
    2068             867 :         while let Some(res) = js.join_next().await {
    2069 UBC           0 :             match res {
    2070 CBC         499 :                 Ok(()) => {}
    2071 UBC           0 :                 Err(je) if je.is_cancelled() => unreachable!("no cancelling used"),
    2072               0 :                 Err(je) if je.is_panic() => { /* logged already */ }
    2073               0 :                 Err(je) => warn!("unexpected JoinError: {je:?}"),
    2074                 :             }
    2075                 :         }
    2076                 : 
    2077                 :         // We cancel the Tenant's cancellation token _after_ the timelines have all shut down.  This permits
    2078                 :         // them to continue to do work during their shutdown methods, e.g. flushing data.
    2079               0 :         tracing::debug!("Cancelling CancellationToken");
    2080 CBC         368 :         self.cancel.cancel();
    2081                 : 
    2082                 :         // shutdown all tenant and timeline tasks: gc, compaction, page service
    2083                 :         // No new tasks will be started for this tenant because it's in `Stopping` state.
    2084                 :         //
    2085                 :         // this will additionally shutdown and await all timeline tasks.
    2086 UBC           0 :         tracing::debug!("Waiting for tasks...");
    2087 CBC         368 :         task_mgr::shutdown_tasks(None, Some(self.tenant_shard_id), None).await;
    2088                 : 
    2089                 :         // Wait for any in-flight operations to complete
    2090             368 :         self.gate.close().await;
    2091                 : 
    2092             368 :         Ok(())
    2093             368 :     }
    2094                 : 
    2095                 :     /// Change tenant status to Stopping, to mark that it is being shut down.
    2096                 :     ///
    2097                 :     /// This function waits for the tenant to become active if it isn't already, before transitioning it into Stopping state.
    2098                 :     ///
    2099                 :     /// This function is not cancel-safe!
    2100                 :     ///
    2101                 :     /// `allow_transition_from_loading` is needed for the special case of loading task deleting the tenant.
    2102                 :     /// `allow_transition_from_attaching` is needed for the special case of attaching deleted tenant.
    2103             389 :     async fn set_stopping(
    2104             389 :         &self,
    2105             389 :         progress: completion::Barrier,
    2106             389 :         allow_transition_from_loading: bool,
    2107             389 :         allow_transition_from_attaching: bool,
    2108             389 :     ) -> Result<(), SetStoppingError> {
    2109             389 :         let mut rx = self.state.subscribe();
    2110             389 : 
    2111             389 :         // cannot stop before we're done activating, so wait out until we're done activating
    2112             392 :         rx.wait_for(|state| match state {
    2113              21 :             TenantState::Attaching if allow_transition_from_attaching => true,
    2114                 :             TenantState::Activating(_) | TenantState::Attaching => {
    2115               3 :                 info!(
    2116               3 :                     "waiting for {} to turn Active|Broken|Stopping",
    2117               3 :                     <&'static str>::from(state)
    2118               3 :                 );
    2119               3 :                 false
    2120                 :             }
    2121 UBC           0 :             TenantState::Loading => allow_transition_from_loading,
    2122 CBC         368 :             TenantState::Active | TenantState::Broken { .. } | TenantState::Stopping { .. } => true,
    2123             392 :         })
    2124               3 :         .await
    2125             389 :         .expect("cannot drop self.state while on a &self method");
    2126             389 : 
    2127             389 :         // we now know we're done activating, let's see whether this task is the winner to transition into Stopping
    2128             389 :         let mut err = None;
    2129             389 :         let stopping = self.state.send_if_modified(|current_state| match current_state {
    2130                 :             TenantState::Activating(_) => {
    2131 UBC           0 :                 unreachable!("1we ensured above that we're done with activation, and, there is no re-activation")
    2132                 :             }
    2133                 :             TenantState::Attaching => {
    2134 CBC          21 :                 if !allow_transition_from_attaching {
    2135 UBC           0 :                     unreachable!("2we ensured above that we're done with activation, and, there is no re-activation")
    2136 CBC          21 :                 };
    2137              21 :                 *current_state = TenantState::Stopping { progress };
    2138              21 :                 true
    2139                 :             }
    2140                 :             TenantState::Loading => {
    2141 UBC           0 :                 if !allow_transition_from_loading {
    2142               0 :                     unreachable!("3we ensured above that we're done with activation, and, there is no re-activation")
    2143               0 :                 };
    2144               0 :                 *current_state = TenantState::Stopping { progress };
    2145               0 :                 true
    2146                 :             }
    2147                 :             TenantState::Active => {
    2148                 :                 // FIXME: due to time-of-check vs time-of-use issues, it can happen that new timelines
    2149                 :                 // are created after the transition to Stopping. That's harmless, as the Timelines
    2150                 :                 // won't be accessible to anyone afterwards, because the Tenant is in Stopping state.
    2151 CBC         312 :                 *current_state = TenantState::Stopping { progress };
    2152             312 :                 // Continue stopping outside the closure. We need to grab timelines.lock()
    2153             312 :                 // and we plan to turn it into a tokio::sync::Mutex in a future patch.
    2154             312 :                 true
    2155                 :             }
    2156              56 :             TenantState::Broken { reason, .. } => {
    2157              56 :                 info!(
    2158              56 :                     "Cannot set tenant to Stopping state, it is in Broken state due to: {reason}"
    2159              56 :                 );
    2160              56 :                 err = Some(SetStoppingError::Broken);
    2161              56 :                 false
    2162                 :             }
    2163 UBC           0 :             TenantState::Stopping { progress } => {
    2164               0 :                 info!("Tenant is already in Stopping state");
    2165               0 :                 err = Some(SetStoppingError::AlreadyStopping(progress.clone()));
    2166               0 :                 false
    2167                 :             }
    2168 CBC         389 :         });
    2169             389 :         match (stopping, err) {
    2170             333 :             (true, None) => {} // continue
    2171              56 :             (false, Some(err)) => return Err(err),
    2172 UBC           0 :             (true, Some(_)) => unreachable!(
    2173               0 :                 "send_if_modified closure must error out if not transitioning to Stopping"
    2174               0 :             ),
    2175               0 :             (false, None) => unreachable!(
    2176               0 :                 "send_if_modified closure must return true if transitioning to Stopping"
    2177               0 :             ),
    2178                 :         }
    2179                 : 
    2180 CBC         333 :         let timelines_accessor = self.timelines.lock().unwrap();
    2181             333 :         let not_broken_timelines = timelines_accessor
    2182             333 :             .values()
    2183             439 :             .filter(|timeline| !timeline.is_broken());
    2184             763 :         for timeline in not_broken_timelines {
    2185             430 :             timeline.set_state(TimelineState::Stopping);
    2186             430 :         }
    2187             333 :         Ok(())
    2188             389 :     }
    2189                 : 
    2190                 :     /// Method for tenant::mgr to transition us into Broken state in case of a late failure in
    2191                 :     /// `remove_tenant_from_memory`
    2192                 :     ///
    2193                 :     /// This function waits for the tenant to become active if it isn't already, before transitioning it into Stopping state.
    2194                 :     ///
    2195                 :     /// In tests, we also use this to set tenants to Broken state on purpose.
    2196              51 :     pub(crate) async fn set_broken(&self, reason: String) {
    2197              51 :         let mut rx = self.state.subscribe();
    2198              51 : 
    2199              51 :         // The load & attach routines own the tenant state until it has reached `Active`.
    2200              51 :         // So, wait until it's done.
    2201              51 :         rx.wait_for(|state| match state {
    2202                 :             TenantState::Activating(_) | TenantState::Loading | TenantState::Attaching => {
    2203 UBC           0 :                 info!(
    2204               0 :                     "waiting for {} to turn Active|Broken|Stopping",
    2205               0 :                     <&'static str>::from(state)
    2206               0 :                 );
    2207               0 :                 false
    2208                 :             }
    2209 CBC          51 :             TenantState::Active | TenantState::Broken { .. } | TenantState::Stopping { .. } => true,
    2210              51 :         })
    2211 UBC           0 :         .await
    2212 CBC          51 :         .expect("cannot drop self.state while on a &self method");
    2213              51 : 
    2214              51 :         // we now know we're done activating, let's see whether this task is the winner to transition into Broken
    2215              51 :         self.set_broken_no_wait(reason)
    2216              51 :     }
    2217                 : 
    2218              51 :     pub(crate) fn set_broken_no_wait(&self, reason: impl Display) {
    2219              51 :         let reason = reason.to_string();
    2220              51 :         self.state.send_modify(|current_state| {
    2221              51 :             match *current_state {
    2222                 :                 TenantState::Activating(_) | TenantState::Loading | TenantState::Attaching => {
    2223 UBC           0 :                     unreachable!("we ensured above that we're done with activation, and, there is no re-activation")
    2224                 :                 }
    2225                 :                 TenantState::Active => {
    2226 CBC           2 :                     if cfg!(feature = "testing") {
    2227               2 :                         warn!("Changing Active tenant to Broken state, reason: {}", reason);
    2228               2 :                         *current_state = TenantState::broken_from_reason(reason);
    2229                 :                     } else {
    2230 UBC           0 :                         unreachable!("not allowed to call set_broken on Active tenants in non-testing builds")
    2231                 :                     }
    2232                 :                 }
    2233                 :                 TenantState::Broken { .. } => {
    2234               0 :                     warn!("Tenant is already in Broken state");
    2235                 :                 }
    2236                 :                 // This is the only "expected" path, any other path is a bug.
    2237                 :                 TenantState::Stopping { .. } => {
    2238 CBC          49 :                     warn!(
    2239              49 :                         "Marking Stopping tenant as Broken state, reason: {}",
    2240              49 :                         reason
    2241              49 :                     );
    2242              49 :                     *current_state = TenantState::broken_from_reason(reason);
    2243                 :                 }
    2244                 :            }
    2245              51 :         });
    2246              51 :     }
    2247                 : 
    2248             352 :     pub fn subscribe_for_state_updates(&self) -> watch::Receiver<TenantState> {
    2249             352 :         self.state.subscribe()
    2250             352 :     }
    2251                 : 
    2252                 :     /// The activate_now semaphore is initialized with zero units.  As soon as
    2253                 :     /// we add a unit, waiters will be able to acquire a unit and proceed.
    2254            1026 :     pub(crate) fn activate_now(&self) {
    2255            1026 :         self.activate_now_sem.add_permits(1);
    2256            1026 :     }
    2257                 : 
    2258            1478 :     pub(crate) async fn wait_to_become_active(
    2259            1478 :         &self,
    2260            1478 :         timeout: Duration,
    2261            1478 :     ) -> Result<(), GetActiveTenantError> {
    2262            1478 :         let mut receiver = self.state.subscribe();
    2263            2363 :         loop {
    2264            2363 :             let current_state = receiver.borrow_and_update().clone();
    2265            2363 :             match current_state {
    2266                 :                 TenantState::Loading | TenantState::Attaching | TenantState::Activating(_) => {
    2267                 :                     // in these states, there's a chance that we can reach ::Active
    2268             885 :                     self.activate_now();
    2269            1031 :                     match timeout_cancellable(timeout, &self.cancel, receiver.changed()).await {
    2270             885 :                         Ok(r) => {
    2271             885 :                             r.map_err(
    2272             885 :                             |_e: tokio::sync::watch::error::RecvError|
    2273                 :                                 // Tenant existed but was dropped: report it as non-existent
    2274             885 :                                 GetActiveTenantError::NotFound(GetTenantError::NotFound(self.tenant_shard_id.tenant_id))
    2275             885 :                         )?
    2276                 :                         }
    2277                 :                         Err(TimeoutCancellableError::Cancelled) => {
    2278 UBC           0 :                             return Err(GetActiveTenantError::Cancelled);
    2279                 :                         }
    2280                 :                         Err(TimeoutCancellableError::Timeout) => {
    2281               0 :                             return Err(GetActiveTenantError::WaitForActiveTimeout {
    2282               0 :                                 latest_state: Some(self.current_state()),
    2283               0 :                                 wait_time: timeout,
    2284               0 :                             });
    2285                 :                         }
    2286                 :                     }
    2287                 :                 }
    2288                 :                 TenantState::Active { .. } => {
    2289 CBC        1478 :                     return Ok(());
    2290                 :                 }
    2291                 :                 TenantState::Broken { .. } | TenantState::Stopping { .. } => {
    2292                 :                     // There's no chance the tenant can transition back into ::Active
    2293 UBC           0 :                     return Err(GetActiveTenantError::WillNotBecomeActive(current_state));
    2294                 :                 }
    2295                 :             }
    2296                 :         }
    2297 CBC        1478 :     }
    2298                 : 
    2299              34 :     pub(crate) fn get_attach_mode(&self) -> AttachmentMode {
    2300              34 :         self.tenant_conf
    2301              34 :             .read()
    2302              34 :             .unwrap()
    2303              34 :             .location
    2304              34 :             .attach_mode
    2305              34 :             .clone()
    2306              34 :     }
    2307                 : 
    2308              36 :     pub(crate) fn get_tenant_shard_id(&self) -> &TenantShardId {
    2309              36 :         &self.tenant_shard_id
    2310              36 :     }
    2311                 : 
    2312               6 :     pub(crate) fn get_generation(&self) -> Generation {
    2313               6 :         self.generation
    2314               6 :     }
    2315                 : }
    2316                 : 
    2317                 : /// Given a Vec of timelines and their ancestors (timeline_id, ancestor_id),
    2318                 : /// perform a topological sort, so that the parent of each timeline comes
    2319                 : /// before the children.
    2320                 : /// E extracts the ancestor from T
    2321                 : /// This allows for T to be different. It can be TimelineMetadata, can be Timeline itself, etc.
    2322             864 : fn tree_sort_timelines<T, E>(
    2323             864 :     timelines: HashMap<TimelineId, T>,
    2324             864 :     extractor: E,
    2325             864 : ) -> anyhow::Result<Vec<(TimelineId, T)>>
    2326             864 : where
    2327             864 :     E: Fn(&T) -> Option<TimelineId>,
    2328             864 : {
    2329             864 :     let mut result = Vec::with_capacity(timelines.len());
    2330             864 : 
    2331             864 :     let mut now = Vec::with_capacity(timelines.len());
    2332             864 :     // (ancestor, children)
    2333             864 :     let mut later: HashMap<TimelineId, Vec<(TimelineId, T)>> =
    2334             864 :         HashMap::with_capacity(timelines.len());
    2335                 : 
    2336            1368 :     for (timeline_id, value) in timelines {
    2337             504 :         if let Some(ancestor_id) = extractor(&value) {
    2338              52 :             let children = later.entry(ancestor_id).or_default();
    2339              52 :             children.push((timeline_id, value));
    2340             452 :         } else {
    2341             452 :             now.push((timeline_id, value));
    2342             452 :         }
    2343                 :     }
    2344                 : 
    2345            1368 :     while let Some((timeline_id, metadata)) = now.pop() {
    2346             504 :         result.push((timeline_id, metadata));
    2347                 :         // All children of this can be loaded now
    2348             504 :         if let Some(mut children) = later.remove(&timeline_id) {
    2349              41 :             now.append(&mut children);
    2350             463 :         }
    2351                 :     }
    2352                 : 
    2353                 :     // All timelines should be visited now. Unless there were timelines with missing ancestors.
    2354             864 :     if !later.is_empty() {
    2355 UBC           0 :         for (missing_id, orphan_ids) in later {
    2356               0 :             for (orphan_id, _) in orphan_ids {
    2357               0 :                 error!("could not load timeline {orphan_id} because its ancestor timeline {missing_id} could not be loaded");
    2358                 :             }
    2359                 :         }
    2360               0 :         bail!("could not load tenant because some timelines are missing ancestors");
    2361 CBC         864 :     }
    2362             864 : 
    2363             864 :     Ok(result)
    2364             864 : }
    2365                 : 
    2366                 : impl Tenant {
    2367              90 :     pub fn tenant_specific_overrides(&self) -> TenantConfOpt {
    2368              90 :         self.tenant_conf.read().unwrap().tenant_conf
    2369              90 :     }
    2370                 : 
    2371              45 :     pub fn effective_config(&self) -> TenantConf {
    2372              45 :         self.tenant_specific_overrides()
    2373              45 :             .merge(self.conf.default_tenant_conf)
    2374              45 :     }
    2375                 : 
    2376               5 :     pub fn get_checkpoint_distance(&self) -> u64 {
    2377               5 :         let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
    2378               5 :         tenant_conf
    2379               5 :             .checkpoint_distance
    2380               5 :             .unwrap_or(self.conf.default_tenant_conf.checkpoint_distance)
    2381               5 :     }
    2382                 : 
    2383               5 :     pub fn get_checkpoint_timeout(&self) -> Duration {
    2384               5 :         let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
    2385               5 :         tenant_conf
    2386               5 :             .checkpoint_timeout
    2387               5 :             .unwrap_or(self.conf.default_tenant_conf.checkpoint_timeout)
    2388               5 :     }
    2389                 : 
    2390               5 :     pub fn get_compaction_target_size(&self) -> u64 {
    2391               5 :         let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
    2392               5 :         tenant_conf
    2393               5 :             .compaction_target_size
    2394               5 :             .unwrap_or(self.conf.default_tenant_conf.compaction_target_size)
    2395               5 :     }
    2396                 : 
    2397            1083 :     pub fn get_compaction_period(&self) -> Duration {
    2398            1083 :         let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
    2399            1083 :         tenant_conf
    2400            1083 :             .compaction_period
    2401            1083 :             .unwrap_or(self.conf.default_tenant_conf.compaction_period)
    2402            1083 :     }
    2403                 : 
    2404               5 :     pub fn get_compaction_threshold(&self) -> usize {
    2405               5 :         let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
    2406               5 :         tenant_conf
    2407               5 :             .compaction_threshold
    2408               5 :             .unwrap_or(self.conf.default_tenant_conf.compaction_threshold)
    2409               5 :     }
    2410                 : 
    2411             544 :     pub fn get_gc_horizon(&self) -> u64 {
    2412             544 :         let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
    2413             544 :         tenant_conf
    2414             544 :             .gc_horizon
    2415             544 :             .unwrap_or(self.conf.default_tenant_conf.gc_horizon)
    2416             544 :     }
    2417                 : 
    2418             984 :     pub fn get_gc_period(&self) -> Duration {
    2419             984 :         let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
    2420             984 :         tenant_conf
    2421             984 :             .gc_period
    2422             984 :             .unwrap_or(self.conf.default_tenant_conf.gc_period)
    2423             984 :     }
    2424                 : 
    2425               5 :     pub fn get_image_creation_threshold(&self) -> usize {
    2426               5 :         let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
    2427               5 :         tenant_conf
    2428               5 :             .image_creation_threshold
    2429               5 :             .unwrap_or(self.conf.default_tenant_conf.image_creation_threshold)
    2430               5 :     }
    2431                 : 
    2432             459 :     pub fn get_pitr_interval(&self) -> Duration {
    2433             459 :         let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
    2434             459 :         tenant_conf
    2435             459 :             .pitr_interval
    2436             459 :             .unwrap_or(self.conf.default_tenant_conf.pitr_interval)
    2437             459 :     }
    2438                 : 
    2439            6961 :     pub fn get_trace_read_requests(&self) -> bool {
    2440            6961 :         let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
    2441            6961 :         tenant_conf
    2442            6961 :             .trace_read_requests
    2443            6961 :             .unwrap_or(self.conf.default_tenant_conf.trace_read_requests)
    2444            6961 :     }
    2445                 : 
    2446              23 :     pub fn get_min_resident_size_override(&self) -> Option<u64> {
    2447              23 :         let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
    2448              23 :         tenant_conf
    2449              23 :             .min_resident_size_override
    2450              23 :             .or(self.conf.default_tenant_conf.min_resident_size_override)
    2451              23 :     }
    2452                 : 
    2453             842 :     pub fn get_heatmap_period(&self) -> Option<Duration> {
    2454             842 :         let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
    2455             842 :         let heatmap_period = tenant_conf
    2456             842 :             .heatmap_period
    2457             842 :             .unwrap_or(self.conf.default_tenant_conf.heatmap_period);
    2458             842 :         if heatmap_period.is_zero() {
    2459             842 :             None
    2460                 :         } else {
    2461 UBC           0 :             Some(heatmap_period)
    2462                 :         }
    2463 CBC         842 :     }
    2464                 : 
    2465              30 :     pub fn set_new_tenant_config(&self, new_tenant_conf: TenantConfOpt) {
    2466              30 :         self.tenant_conf.write().unwrap().tenant_conf = new_tenant_conf;
    2467              30 :         // Don't hold self.timelines.lock() during the notifies.
    2468              30 :         // There's no risk of deadlock right now, but there could be if we consolidate
    2469              30 :         // mutexes in struct Timeline in the future.
    2470              30 :         let timelines = self.list_timelines();
    2471              62 :         for timeline in timelines {
    2472              32 :             timeline.tenant_conf_updated();
    2473              32 :         }
    2474              30 :     }
    2475                 : 
    2476              20 :     pub(crate) fn set_new_location_config(&self, new_conf: AttachedTenantConf) {
    2477              20 :         *self.tenant_conf.write().unwrap() = new_conf;
    2478              20 :         // Don't hold self.timelines.lock() during the notifies.
    2479              20 :         // There's no risk of deadlock right now, but there could be if we consolidate
    2480              20 :         // mutexes in struct Timeline in the future.
    2481              20 :         let timelines = self.list_timelines();
    2482              40 :         for timeline in timelines {
    2483              20 :             timeline.tenant_conf_updated();
    2484              20 :         }
    2485              20 :     }
    2486                 : 
    2487                 :     /// Helper function to create a new Timeline struct.
    2488                 :     ///
    2489                 :     /// The returned Timeline is in Loading state. The caller is responsible for
    2490                 :     /// initializing any on-disk state, and for inserting the Timeline to the 'timelines'
    2491                 :     /// map.
    2492                 :     ///
    2493                 :     /// `validate_ancestor == false` is used when a timeline is created for deletion
    2494                 :     /// and we might not have the ancestor present anymore which is fine for to be
    2495                 :     /// deleted timelines.
    2496            1290 :     fn create_timeline_struct(
    2497            1290 :         &self,
    2498            1290 :         new_timeline_id: TimelineId,
    2499            1290 :         new_metadata: &TimelineMetadata,
    2500            1290 :         ancestor: Option<Arc<Timeline>>,
    2501            1290 :         resources: TimelineResources,
    2502            1290 :         cause: CreateTimelineCause,
    2503            1290 :     ) -> anyhow::Result<Arc<Timeline>> {
    2504            1290 :         let state = match cause {
    2505                 :             CreateTimelineCause::Load => {
    2506            1278 :                 let ancestor_id = new_metadata.ancestor_timeline();
    2507            1278 :                 anyhow::ensure!(
    2508            1278 :                     ancestor_id == ancestor.as_ref().map(|t| t.timeline_id),
    2509 UBC           0 :                     "Timeline's {new_timeline_id} ancestor {ancestor_id:?} was not found"
    2510                 :                 );
    2511 CBC        1278 :                 TimelineState::Loading
    2512                 :             }
    2513              12 :             CreateTimelineCause::Delete => TimelineState::Stopping,
    2514                 :         };
    2515                 : 
    2516            1290 :         let pg_version = new_metadata.pg_version();
    2517            1290 : 
    2518            1290 :         let timeline = Timeline::new(
    2519            1290 :             self.conf,
    2520            1290 :             Arc::clone(&self.tenant_conf),
    2521            1290 :             new_metadata,
    2522            1290 :             ancestor,
    2523            1290 :             new_timeline_id,
    2524            1290 :             self.tenant_shard_id,
    2525            1290 :             self.generation,
    2526            1290 :             self.shard_identity,
    2527            1290 :             Arc::clone(&self.walredo_mgr),
    2528            1290 :             resources,
    2529            1290 :             pg_version,
    2530            1290 :             state,
    2531            1290 :             self.cancel.child_token(),
    2532            1290 :         );
    2533            1290 : 
    2534            1290 :         Ok(timeline)
    2535            1290 :     }
    2536                 : 
    2537                 :     // Allow too_many_arguments because a constructor's argument list naturally grows with the
    2538                 :     // number of attributes in the struct: breaking these out into a builder wouldn't be helpful.
    2539                 :     #[allow(clippy::too_many_arguments)]
    2540             778 :     fn new(
    2541             778 :         state: TenantState,
    2542             778 :         conf: &'static PageServerConf,
    2543             778 :         attached_conf: AttachedTenantConf,
    2544             778 :         shard_identity: ShardIdentity,
    2545             778 :         walredo_mgr: Arc<WalRedoManager>,
    2546             778 :         tenant_shard_id: TenantShardId,
    2547             778 :         remote_storage: Option<GenericRemoteStorage>,
    2548             778 :         deletion_queue_client: DeletionQueueClient,
    2549             778 :     ) -> Tenant {
    2550             778 :         let (state, mut rx) = watch::channel(state);
    2551             778 : 
    2552             778 :         tokio::spawn(async move {
    2553             778 :             let tid = tenant_shard_id.to_string();
    2554             778 : 
    2555            2278 :             fn inspect_state(state: &TenantState) -> ([&'static str; 1], bool) {
    2556            2278 :                 ([state.into()], matches!(state, TenantState::Broken { .. }))
    2557            2278 :             }
    2558             778 : 
    2559             778 :             let mut tuple = inspect_state(&rx.borrow_and_update());
    2560             778 : 
    2561             778 :             let is_broken = tuple.1;
    2562             778 :             let mut counted_broken = if !is_broken {
    2563                 :                 // the tenant might be ignored and reloaded, so first remove any previous set
    2564                 :                 // element. it most likely has already been scraped, as these are manual operations
    2565                 :                 // right now. most likely we will add it back very soon.
    2566             778 :                 drop(crate::metrics::BROKEN_TENANTS_SET.remove_label_values(&[&tid]));
    2567             778 :                 false
    2568                 :             } else {
    2569                 :                 // add the id to the set right away, there should not be any updates on the channel
    2570                 :                 // after
    2571 UBC           0 :                 crate::metrics::BROKEN_TENANTS_SET
    2572               0 :                     .with_label_values(&[&tid])
    2573               0 :                     .set(1);
    2574               0 :                 true
    2575                 :             };
    2576                 : 
    2577 CBC        2278 :             loop {
    2578            2278 :                 let labels = &tuple.0;
    2579            2278 :                 let current = TENANT_STATE_METRIC.with_label_values(labels);
    2580            2278 :                 current.inc();
    2581            2278 : 
    2582            2278 :                 if rx.changed().await.is_err() {
    2583                 :                     // tenant has been dropped; decrement the counter because a tenant with that
    2584                 :                     // state is no longer in tenant map, but allow any broken set item to exist
    2585                 :                     // still.
    2586             161 :                     current.dec();
    2587             161 :                     break;
    2588            1500 :                 }
    2589            1500 : 
    2590            1500 :                 current.dec();
    2591            1500 :                 tuple = inspect_state(&rx.borrow_and_update());
    2592            1500 : 
    2593            1500 :                 let is_broken = tuple.1;
    2594            1500 :                 if is_broken && !counted_broken {
    2595              57 :                     counted_broken = true;
    2596              57 :                     // insert the tenant_id (back) into the set
    2597              57 :                     crate::metrics::BROKEN_TENANTS_SET
    2598              57 :                         .with_label_values(&[&tid])
    2599              57 :                         .inc();
    2600            1443 :                 }
    2601                 :             }
    2602             778 :         });
    2603             778 : 
    2604             778 :         Tenant {
    2605             778 :             tenant_shard_id,
    2606             778 :             shard_identity,
    2607             778 :             generation: attached_conf.location.generation,
    2608             778 :             conf,
    2609             778 :             // using now here is good enough approximation to catch tenants with really long
    2610             778 :             // activation times.
    2611             778 :             constructed_at: Instant::now(),
    2612             778 :             tenant_conf: Arc::new(RwLock::new(attached_conf)),
    2613             778 :             timelines: Mutex::new(HashMap::new()),
    2614             778 :             timelines_creating: Mutex::new(HashSet::new()),
    2615             778 :             gc_cs: tokio::sync::Mutex::new(()),
    2616             778 :             walredo_mgr,
    2617             778 :             remote_storage,
    2618             778 :             deletion_queue_client,
    2619             778 :             state,
    2620             778 :             cached_logical_sizes: tokio::sync::Mutex::new(HashMap::new()),
    2621             778 :             cached_synthetic_tenant_size: Arc::new(AtomicU64::new(0)),
    2622             778 :             eviction_task_tenant_state: tokio::sync::Mutex::new(EvictionTaskTenantState::default()),
    2623             778 :             activate_now_sem: tokio::sync::Semaphore::new(0),
    2624             778 :             delete_progress: Arc::new(tokio::sync::Mutex::new(DeleteTenantFlow::default())),
    2625             778 :             cancel: CancellationToken::default(),
    2626             778 :             gate: Gate::new(format!("Tenant<{tenant_shard_id}>")),
    2627             778 :         }
    2628             778 :     }
    2629                 : 
    2630                 :     /// Locate and load config
    2631             224 :     pub(super) fn load_tenant_config(
    2632             224 :         conf: &'static PageServerConf,
    2633             224 :         tenant_shard_id: &TenantShardId,
    2634             224 :     ) -> anyhow::Result<LocationConf> {
    2635             224 :         let legacy_config_path = conf.tenant_config_path(tenant_shard_id);
    2636             224 :         let config_path = conf.tenant_location_config_path(tenant_shard_id);
    2637             224 : 
    2638             224 :         if config_path.exists() {
    2639                 :             // New-style config takes precedence
    2640             219 :             let deserialized = Self::read_config(&config_path)?;
    2641             219 :             Ok(toml_edit::de::from_document::<LocationConf>(deserialized)?)
    2642               5 :         } else if legacy_config_path.exists() {
    2643                 :             // Upgrade path: found an old-style configuration only
    2644               1 :             let deserialized = Self::read_config(&legacy_config_path)?;
    2645                 : 
    2646               1 :             let mut tenant_conf = TenantConfOpt::default();
    2647               1 :             for (key, item) in deserialized.iter() {
    2648               1 :                 match key {
    2649               1 :                     "tenant_config" => {
    2650               1 :                         tenant_conf = TenantConfOpt::try_from(item.to_owned()).context(format!("Failed to parse config from file '{legacy_config_path}' as pageserver config"))?;
    2651                 :                     }
    2652 UBC           0 :                     _ => bail!(
    2653               0 :                         "config file {legacy_config_path} has unrecognized pageserver option '{key}'"
    2654               0 :                     ),
    2655                 :                 }
    2656                 :             }
    2657                 : 
    2658                 :             // Legacy configs are implicitly in attached state
    2659 CBC           1 :             Ok(LocationConf::attached_single(
    2660               1 :                 tenant_conf,
    2661               1 :                 Generation::none(),
    2662               1 :             ))
    2663                 :         } else {
    2664                 :             // FIXME If the config file is not found, assume that we're attaching
    2665                 :             // a detached tenant and config is passed via attach command.
    2666                 :             // https://github.com/neondatabase/neon/issues/1555
    2667                 :             // OR: we're loading after incomplete deletion that managed to remove config.
    2668               4 :             info!(
    2669               4 :                 "tenant config not found in {} or {}",
    2670               4 :                 config_path, legacy_config_path
    2671               4 :             );
    2672               4 :             Ok(LocationConf::default())
    2673                 :         }
    2674             224 :     }
    2675                 : 
    2676             220 :     fn read_config(path: &Utf8Path) -> anyhow::Result<toml_edit::Document> {
    2677             220 :         info!("loading tenant configuration from {path}");
    2678                 : 
    2679                 :         // load and parse file
    2680             220 :         let config = fs::read_to_string(path)
    2681             220 :             .with_context(|| format!("Failed to load config from path '{path}'"))?;
    2682                 : 
    2683             220 :         config
    2684             220 :             .parse::<toml_edit::Document>()
    2685             220 :             .with_context(|| format!("Failed to parse config from file '{path}' as toml file"))
    2686             220 :     }
    2687                 : 
    2688             353 :     #[tracing::instrument(skip_all, fields(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug()))]
    2689                 :     pub(super) async fn persist_tenant_config(
    2690                 :         conf: &'static PageServerConf,
    2691                 :         tenant_shard_id: &TenantShardId,
    2692                 :         location_conf: &LocationConf,
    2693                 :     ) -> anyhow::Result<()> {
    2694                 :         let legacy_config_path = conf.tenant_config_path(tenant_shard_id);
    2695                 :         let config_path = conf.tenant_location_config_path(tenant_shard_id);
    2696                 : 
    2697                 :         Self::persist_tenant_config_at(
    2698                 :             tenant_shard_id,
    2699                 :             &config_path,
    2700                 :             &legacy_config_path,
    2701                 :             location_conf,
    2702                 :         )
    2703                 :         .await
    2704                 :     }
    2705                 : 
    2706 UBC           0 :     #[tracing::instrument(skip_all, fields(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug()))]
    2707                 :     pub(super) async fn persist_tenant_config_at(
    2708                 :         tenant_shard_id: &TenantShardId,
    2709                 :         config_path: &Utf8Path,
    2710                 :         legacy_config_path: &Utf8Path,
    2711                 :         location_conf: &LocationConf,
    2712                 :     ) -> anyhow::Result<()> {
    2713                 :         // Forward compat: write out an old-style configuration that old versions can read, in case we roll back
    2714                 :         Self::persist_tenant_config_legacy(
    2715                 :             tenant_shard_id,
    2716                 :             legacy_config_path,
    2717                 :             &location_conf.tenant_conf,
    2718                 :         )
    2719                 :         .await?;
    2720                 : 
    2721                 :         if let LocationMode::Attached(attach_conf) = &location_conf.mode {
    2722                 :             // Once we use LocationMode, generations are mandatory.  If we aren't using generations,
    2723                 :             // then drop out after writing legacy-style config.
    2724                 :             if attach_conf.generation.is_none() {
    2725               0 :                 tracing::debug!("Running without generations, not writing new-style LocationConf");
    2726                 :                 return Ok(());
    2727                 :             }
    2728                 :         }
    2729                 : 
    2730               0 :         debug!("persisting tenantconf to {config_path}");
    2731                 : 
    2732                 :         let mut conf_content = r#"# This file contains a specific per-tenant's config.
    2733                 : #  It is read in case of pageserver restart.
    2734                 : "#
    2735                 :         .to_string();
    2736                 : 
    2737                 :         // Convert the config to a toml file.
    2738                 :         conf_content += &toml_edit::ser::to_string_pretty(&location_conf)?;
    2739                 : 
    2740                 :         let temp_path = path_with_suffix_extension(config_path, TEMP_FILE_SUFFIX);
    2741                 : 
    2742                 :         let tenant_shard_id = *tenant_shard_id;
    2743                 :         let config_path = config_path.to_owned();
    2744 CBC         812 :         tokio::task::spawn_blocking(move || {
    2745             812 :             Handle::current().block_on(async move {
    2746             812 :                 let conf_content = conf_content.as_bytes();
    2747             812 :                 VirtualFile::crashsafe_overwrite(&config_path, &temp_path, conf_content)
    2748 UBC           0 :                     .await
    2749 CBC         812 :                     .with_context(|| {
    2750 UBC           0 :                         format!("write tenant {tenant_shard_id} config to {config_path}")
    2751 CBC         812 :                     })
    2752             812 :             })
    2753             812 :         })
    2754                 :         .await??;
    2755                 : 
    2756                 :         Ok(())
    2757                 :     }
    2758                 : 
    2759             813 :     #[tracing::instrument(skip_all, fields(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug()))]
    2760                 :     async fn persist_tenant_config_legacy(
    2761                 :         tenant_shard_id: &TenantShardId,
    2762                 :         target_config_path: &Utf8Path,
    2763                 :         tenant_conf: &TenantConfOpt,
    2764                 :     ) -> anyhow::Result<()> {
    2765 UBC           0 :         debug!("persisting tenantconf to {target_config_path}");
    2766                 : 
    2767                 :         let mut conf_content = r#"# This file contains a specific per-tenant's config.
    2768                 : #  It is read in case of pageserver restart.
    2769                 : 
    2770                 : [tenant_config]
    2771                 : "#
    2772                 :         .to_string();
    2773                 : 
    2774                 :         // Convert the config to a toml file.
    2775                 :         conf_content += &toml_edit::ser::to_string(&tenant_conf)?;
    2776                 : 
    2777                 :         let temp_path = path_with_suffix_extension(target_config_path, TEMP_FILE_SUFFIX);
    2778                 : 
    2779                 :         let tenant_shard_id = *tenant_shard_id;
    2780                 :         let target_config_path = target_config_path.to_owned();
    2781 CBC         813 :         tokio::task::spawn_blocking(move || {
    2782             813 :             Handle::current().block_on(async move {
    2783             813 :                 let conf_content = conf_content.as_bytes();
    2784             813 :                 VirtualFile::crashsafe_overwrite(&target_config_path, &temp_path, conf_content)
    2785 UBC           0 :                     .await
    2786 CBC         813 :                     .with_context(|| {
    2787 UBC           0 :                         format!("write tenant {tenant_shard_id} config to {target_config_path}")
    2788 CBC         813 :                     })
    2789             813 :             })
    2790             813 :         })
    2791                 :         .await??;
    2792                 :         Ok(())
    2793                 :     }
    2794                 : 
    2795                 :     //
    2796                 :     // How garbage collection works:
    2797                 :     //
    2798                 :     //                    +--bar------------->
    2799                 :     //                   /
    2800                 :     //             +----+-----foo---------------->
    2801                 :     //            /
    2802                 :     // ----main--+-------------------------->
    2803                 :     //                \
    2804                 :     //                 +-----baz-------->
    2805                 :     //
    2806                 :     //
    2807                 :     // 1. Grab 'gc_cs' mutex to prevent new timelines from being created while Timeline's
    2808                 :     //    `gc_infos` are being refreshed
    2809                 :     // 2. Scan collected timelines, and on each timeline, make note of the
    2810                 :     //    all the points where other timelines have been branched off.
    2811                 :     //    We will refrain from removing page versions at those LSNs.
    2812                 :     // 3. For each timeline, scan all layer files on the timeline.
    2813                 :     //    Remove all files for which a newer file exists and which
    2814                 :     //    don't cover any branch point LSNs.
    2815                 :     //
    2816                 :     // TODO:
    2817                 :     // - if a relation has a non-incremental persistent layer on a child branch, then we
    2818                 :     //   don't need to keep that in the parent anymore. But currently
    2819                 :     //   we do.
    2820             365 :     async fn gc_iteration_internal(
    2821             365 :         &self,
    2822             365 :         target_timeline_id: Option<TimelineId>,
    2823             365 :         horizon: u64,
    2824             365 :         pitr: Duration,
    2825             365 :         cancel: &CancellationToken,
    2826             365 :         ctx: &RequestContext,
    2827             365 :     ) -> anyhow::Result<GcResult> {
    2828             365 :         let mut totals: GcResult = Default::default();
    2829             365 :         let now = Instant::now();
    2830                 : 
    2831             365 :         let gc_timelines = match self
    2832             365 :             .refresh_gc_info_internal(target_timeline_id, horizon, pitr, cancel, ctx)
    2833          235890 :             .await
    2834                 :         {
    2835             363 :             Ok(result) => result,
    2836               1 :             Err(e) => {
    2837 UBC           0 :                 if let Some(PageReconstructError::Cancelled) =
    2838 CBC           1 :                     e.downcast_ref::<PageReconstructError>()
    2839                 :                 {
    2840                 :                     // Handle cancellation
    2841 UBC           0 :                     totals.elapsed = now.elapsed();
    2842               0 :                     return Ok(totals);
    2843                 :                 } else {
    2844                 :                     // Propagate other errors
    2845 CBC           1 :                     return Err(e);
    2846                 :                 }
    2847                 :             }
    2848                 :         };
    2849                 : 
    2850               3 :         failpoint_support::sleep_millis_async!("gc_iteration_internal_after_getting_gc_timelines");
    2851                 : 
    2852                 :         // If there is nothing to GC, we don't want any messages in the INFO log.
    2853             363 :         if !gc_timelines.is_empty() {
    2854             355 :             info!("{} timelines need GC", gc_timelines.len());
    2855                 :         } else {
    2856 UBC           0 :             debug!("{} timelines need GC", gc_timelines.len());
    2857                 :         }
    2858                 : 
    2859                 :         // Perform GC for each timeline.
    2860                 :         //
    2861                 :         // Note that we don't hold the `Tenant::gc_cs` lock here because we don't want to delay the
    2862                 :         // branch creation task, which requires the GC lock. A GC iteration can run concurrently
    2863                 :         // with branch creation.
    2864                 :         //
    2865                 :         // See comments in [`Tenant::branch_timeline`] for more information about why branch
    2866                 :         // creation task can run concurrently with timeline's GC iteration.
    2867 CBC         854 :         for timeline in gc_timelines {
    2868             496 :             if task_mgr::is_shutdown_requested() || cancel.is_cancelled() {
    2869                 :                 // We were requested to shut down. Stop and return with the progress we
    2870                 :                 // made.
    2871 UBC           0 :                 break;
    2872 CBC         496 :             }
    2873             496 :             let result = timeline.gc().await?;
    2874             491 :             totals += result;
    2875                 :         }
    2876                 : 
    2877             358 :         totals.elapsed = now.elapsed();
    2878             358 :         Ok(totals)
    2879             359 :     }
    2880                 : 
    2881                 :     /// Refreshes the Timeline::gc_info for all timelines, returning the
    2882                 :     /// vector of timelines which have [`Timeline::get_last_record_lsn`] past
    2883                 :     /// [`Tenant::get_gc_horizon`].
    2884                 :     ///
    2885                 :     /// This is usually executed as part of periodic gc, but can now be triggered more often.
    2886              92 :     pub async fn refresh_gc_info(
    2887              92 :         &self,
    2888              92 :         cancel: &CancellationToken,
    2889              92 :         ctx: &RequestContext,
    2890              92 :     ) -> anyhow::Result<Vec<Arc<Timeline>>> {
    2891              92 :         // since this method can now be called at different rates than the configured gc loop, it
    2892              92 :         // might be that these configuration values get applied faster than what it was previously,
    2893              92 :         // since these were only read from the gc task.
    2894              92 :         let horizon = self.get_gc_horizon();
    2895              92 :         let pitr = self.get_pitr_interval();
    2896              92 : 
    2897              92 :         // refresh all timelines
    2898              92 :         let target_timeline_id = None;
    2899              92 : 
    2900              92 :         self.refresh_gc_info_internal(target_timeline_id, horizon, pitr, cancel, ctx)
    2901              20 :             .await
    2902              92 :     }
    2903                 : 
    2904             457 :     async fn refresh_gc_info_internal(
    2905             457 :         &self,
    2906             457 :         target_timeline_id: Option<TimelineId>,
    2907             457 :         horizon: u64,
    2908             457 :         pitr: Duration,
    2909             457 :         cancel: &CancellationToken,
    2910             457 :         ctx: &RequestContext,
    2911             457 :     ) -> anyhow::Result<Vec<Arc<Timeline>>> {
    2912                 :         // grab mutex to prevent new timelines from being created here.
    2913             457 :         let gc_cs = self.gc_cs.lock().await;
    2914                 : 
    2915                 :         // Scan all timelines. For each timeline, remember the timeline ID and
    2916                 :         // the branch point where it was created.
    2917             456 :         let (all_branchpoints, timeline_ids): (BTreeSet<(TimelineId, Lsn)>, _) = {
    2918             457 :             let timelines = self.timelines.lock().unwrap();
    2919             457 :             let mut all_branchpoints = BTreeSet::new();
    2920             456 :             let timeline_ids = {
    2921             457 :                 if let Some(target_timeline_id) = target_timeline_id.as_ref() {
    2922             325 :                     if timelines.get(target_timeline_id).is_none() {
    2923               1 :                         bail!("gc target timeline does not exist")
    2924             324 :                     }
    2925             132 :                 };
    2926                 : 
    2927             456 :                 timelines
    2928             456 :                     .iter()
    2929             949 :                     .map(|(timeline_id, timeline_entry)| {
    2930             500 :                         if let Some(ancestor_timeline_id) =
    2931             949 :                             &timeline_entry.get_ancestor_timeline_id()
    2932                 :                         {
    2933                 :                             // If target_timeline is specified, we only need to know branchpoints of its children
    2934             500 :                             if let Some(timeline_id) = target_timeline_id {
    2935             304 :                                 if ancestor_timeline_id == &timeline_id {
    2936               6 :                                     all_branchpoints.insert((
    2937               6 :                                         *ancestor_timeline_id,
    2938               6 :                                         timeline_entry.get_ancestor_lsn(),
    2939               6 :                                     ));
    2940             298 :                                 }
    2941                 :                             }
    2942                 :                             // Collect branchpoints for all timelines
    2943             196 :                             else {
    2944             196 :                                 all_branchpoints.insert((
    2945             196 :                                     *ancestor_timeline_id,
    2946             196 :                                     timeline_entry.get_ancestor_lsn(),
    2947             196 :                                 ));
    2948             196 :                             }
    2949             449 :                         }
    2950                 : 
    2951             949 :                         *timeline_id
    2952             949 :                     })
    2953             456 :                     .collect::<Vec<_>>()
    2954             456 :             };
    2955             456 :             (all_branchpoints, timeline_ids)
    2956             456 :         };
    2957             456 : 
    2958             456 :         // Ok, we now know all the branch points.
    2959             456 :         // Update the GC information for each timeline.
    2960             456 :         let mut gc_timelines = Vec::with_capacity(timeline_ids.len());
    2961            1399 :         for timeline_id in timeline_ids {
    2962                 :             // Timeline is known to be local and loaded.
    2963             944 :             let timeline = self
    2964             944 :                 .get_timeline(timeline_id, false)
    2965             944 :                 .with_context(|| format!("Timeline {timeline_id} was not found"))?;
    2966                 : 
    2967                 :             // If target_timeline is specified, ignore all other timelines
    2968             944 :             if let Some(target_timeline_id) = target_timeline_id {
    2969             628 :                 if timeline_id != target_timeline_id {
    2970             304 :                     continue;
    2971             324 :                 }
    2972             316 :             }
    2973                 : 
    2974             640 :             if let Some(cutoff) = timeline.get_last_record_lsn().checked_sub(horizon) {
    2975             561 :                 let branchpoints: Vec<Lsn> = all_branchpoints
    2976             561 :                     .range((
    2977             561 :                         Included((timeline_id, Lsn(0))),
    2978             561 :                         Included((timeline_id, Lsn(u64::MAX))),
    2979             561 :                     ))
    2980             561 :                     .map(|&x| x.1)
    2981             561 :                     .collect();
    2982             561 :                 timeline
    2983             561 :                     .update_gc_info(branchpoints, cutoff, pitr, cancel, ctx)
    2984          235910 :                     .await?;
    2985                 : 
    2986             560 :                 gc_timelines.push(timeline);
    2987              79 :             }
    2988                 :         }
    2989             455 :         drop(gc_cs);
    2990             455 :         Ok(gc_timelines)
    2991             456 :     }
    2992                 : 
    2993                 :     /// A substitute for `branch_timeline` for use in unit tests.
    2994                 :     /// The returned timeline will have state value `Active` to make various `anyhow::ensure!()`
    2995                 :     /// calls pass, but, we do not actually call `.activate()` under the hood. So, none of the
    2996                 :     /// timeline background tasks are launched, except the flush loop.
    2997                 :     #[cfg(test)]
    2998             107 :     async fn branch_timeline_test(
    2999             107 :         &self,
    3000             107 :         src_timeline: &Arc<Timeline>,
    3001             107 :         dst_id: TimelineId,
    3002             107 :         start_lsn: Option<Lsn>,
    3003             107 :         ctx: &RequestContext,
    3004             107 :     ) -> Result<Arc<Timeline>, CreateTimelineError> {
    3005             107 :         let uninit_mark = self.create_timeline_uninit_mark(dst_id).unwrap();
    3006             107 :         let tl = self
    3007             107 :             .branch_timeline_impl(src_timeline, dst_id, start_lsn, uninit_mark, ctx)
    3008               2 :             .await?;
    3009             105 :         tl.set_state(TimelineState::Active);
    3010             105 :         Ok(tl)
    3011             107 :     }
    3012                 : 
    3013                 :     /// Branch an existing timeline.
    3014                 :     ///
    3015                 :     /// The caller is responsible for activating the returned timeline.
    3016             250 :     async fn branch_timeline(
    3017             250 :         &self,
    3018             250 :         src_timeline: &Arc<Timeline>,
    3019             250 :         dst_id: TimelineId,
    3020             250 :         start_lsn: Option<Lsn>,
    3021             250 :         timeline_uninit_mark: TimelineUninitMark<'_>,
    3022             250 :         ctx: &RequestContext,
    3023             250 :     ) -> Result<Arc<Timeline>, CreateTimelineError> {
    3024             250 :         self.branch_timeline_impl(src_timeline, dst_id, start_lsn, timeline_uninit_mark, ctx)
    3025               8 :             .await
    3026             250 :     }
    3027                 : 
    3028             357 :     async fn branch_timeline_impl(
    3029             357 :         &self,
    3030             357 :         src_timeline: &Arc<Timeline>,
    3031             357 :         dst_id: TimelineId,
    3032             357 :         start_lsn: Option<Lsn>,
    3033             357 :         timeline_uninit_mark: TimelineUninitMark<'_>,
    3034             357 :         _ctx: &RequestContext,
    3035             357 :     ) -> Result<Arc<Timeline>, CreateTimelineError> {
    3036             357 :         let src_id = src_timeline.timeline_id;
    3037                 : 
    3038                 :         // We will validate our ancestor LSN in this function.  Acquire the GC lock so that
    3039                 :         // this check cannot race with GC, and the ancestor LSN is guaranteed to remain
    3040                 :         // valid while we are creating the branch.
    3041             357 :         let _gc_cs = self.gc_cs.lock().await;
    3042                 : 
    3043                 :         // If no start LSN is specified, we branch the new timeline from the source timeline's last record LSN
    3044             357 :         let start_lsn = start_lsn.unwrap_or_else(|| {
    3045             222 :             let lsn = src_timeline.get_last_record_lsn();
    3046             222 :             info!("branching timeline {dst_id} from timeline {src_id} at last record LSN: {lsn}");
    3047             222 :             lsn
    3048             357 :         });
    3049             357 : 
    3050             357 :         // Ensure that `start_lsn` is valid, i.e. the LSN is within the PITR
    3051             357 :         // horizon on the source timeline
    3052             357 :         //
    3053             357 :         // We check it against both the planned GC cutoff stored in 'gc_info',
    3054             357 :         // and the 'latest_gc_cutoff' of the last GC that was performed.  The
    3055             357 :         // planned GC cutoff in 'gc_info' is normally larger than
    3056             357 :         // 'latest_gc_cutoff_lsn', but beware of corner cases like if you just
    3057             357 :         // changed the GC settings for the tenant to make the PITR window
    3058             357 :         // larger, but some of the data was already removed by an earlier GC
    3059             357 :         // iteration.
    3060             357 : 
    3061             357 :         // check against last actual 'latest_gc_cutoff' first
    3062             357 :         let latest_gc_cutoff_lsn = src_timeline.get_latest_gc_cutoff_lsn();
    3063             357 :         src_timeline
    3064             357 :             .check_lsn_is_in_scope(start_lsn, &latest_gc_cutoff_lsn)
    3065             357 :             .context(format!(
    3066             357 :                 "invalid branch start lsn: less than latest GC cutoff {}",
    3067             357 :                 *latest_gc_cutoff_lsn,
    3068             357 :             ))
    3069             357 :             .map_err(CreateTimelineError::AncestorLsn)?;
    3070                 : 
    3071                 :         // and then the planned GC cutoff
    3072                 :         {
    3073             349 :             let gc_info = src_timeline.gc_info.read().unwrap();
    3074             349 :             let cutoff = min(gc_info.pitr_cutoff, gc_info.horizon_cutoff);
    3075             349 :             if start_lsn < cutoff {
    3076 UBC           0 :                 return Err(CreateTimelineError::AncestorLsn(anyhow::anyhow!(
    3077               0 :                     "invalid branch start lsn: less than planned GC cutoff {cutoff}"
    3078               0 :                 )));
    3079 CBC         349 :             }
    3080             349 :         }
    3081             349 : 
    3082             349 :         //
    3083             349 :         // The branch point is valid, and we are still holding the 'gc_cs' lock
    3084             349 :         // so that GC cannot advance the GC cutoff until we are finished.
    3085             349 :         // Proceed with the branch creation.
    3086             349 :         //
    3087             349 : 
    3088             349 :         // Determine prev-LSN for the new timeline. We can only determine it if
    3089             349 :         // the timeline was branched at the current end of the source timeline.
    3090             349 :         let RecordLsn {
    3091             349 :             last: src_last,
    3092             349 :             prev: src_prev,
    3093             349 :         } = src_timeline.get_last_record_rlsn();
    3094             349 :         let dst_prev = if src_last == start_lsn {
    3095             334 :             Some(src_prev)
    3096                 :         } else {
    3097              15 :             None
    3098                 :         };
    3099                 : 
    3100                 :         // Create the metadata file, noting the ancestor of the new timeline.
    3101                 :         // There is initially no data in it, but all the read-calls know to look
    3102                 :         // into the ancestor.
    3103             349 :         let metadata = TimelineMetadata::new(
    3104             349 :             start_lsn,
    3105             349 :             dst_prev,
    3106             349 :             Some(src_id),
    3107             349 :             start_lsn,
    3108             349 :             *src_timeline.latest_gc_cutoff_lsn.read(), // FIXME: should we hold onto this guard longer?
    3109             349 :             src_timeline.initdb_lsn,
    3110             349 :             src_timeline.pg_version,
    3111             349 :         );
    3112                 : 
    3113             349 :         let uninitialized_timeline = self
    3114             349 :             .prepare_new_timeline(
    3115             349 :                 dst_id,
    3116             349 :                 &metadata,
    3117             349 :                 timeline_uninit_mark,
    3118             349 :                 start_lsn + 1,
    3119             349 :                 Some(Arc::clone(src_timeline)),
    3120             349 :             )
    3121 UBC           0 :             .await?;
    3122                 : 
    3123 CBC         349 :         let new_timeline = uninitialized_timeline.finish_creation()?;
    3124                 : 
    3125                 :         // Root timeline gets its layers during creation and uploads them along with the metadata.
    3126                 :         // A branch timeline though, when created, can get no writes for some time, hence won't get any layers created.
    3127                 :         // We still need to upload its metadata eagerly: if other nodes `attach` the tenant and miss this timeline, their GC
    3128                 :         // could get incorrect information and remove more layers, than needed.
    3129                 :         // See also https://github.com/neondatabase/neon/issues/3865
    3130             349 :         if let Some(remote_client) = new_timeline.remote_client.as_ref() {
    3131             349 :             remote_client
    3132             349 :                 .schedule_index_upload_for_metadata_update(&metadata)
    3133             349 :                 .context("branch initial metadata upload")?;
    3134 UBC           0 :         }
    3135                 : 
    3136 CBC         349 :         info!("branched timeline {dst_id} from {src_id} at {start_lsn}");
    3137                 : 
    3138             349 :         Ok(new_timeline)
    3139             357 :     }
    3140                 : 
    3141                 :     /// For unit tests, make this visible so that other modules can directly create timelines
    3142                 :     #[cfg(test)]
    3143               1 :     #[tracing::instrument(fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug(), %timeline_id))]
    3144                 :     pub(crate) async fn bootstrap_timeline_test(
    3145                 :         &self,
    3146                 :         timeline_id: TimelineId,
    3147                 :         pg_version: u32,
    3148                 :         load_existing_initdb: Option<TimelineId>,
    3149                 :         ctx: &RequestContext,
    3150                 :     ) -> anyhow::Result<Arc<Timeline>> {
    3151                 :         let uninit_mark = self.create_timeline_uninit_mark(timeline_id).unwrap();
    3152                 :         self.bootstrap_timeline(
    3153                 :             timeline_id,
    3154                 :             pg_version,
    3155                 :             load_existing_initdb,
    3156                 :             uninit_mark,
    3157                 :             ctx,
    3158                 :         )
    3159                 :         .await
    3160                 :     }
    3161                 : 
    3162                 :     /// - run initdb to init temporary instance and get bootstrap data
    3163                 :     /// - after initialization completes, tar up the temp dir and upload it to S3.
    3164                 :     ///
    3165                 :     /// The caller is responsible for activating the returned timeline.
    3166             526 :     async fn bootstrap_timeline(
    3167             526 :         &self,
    3168             526 :         timeline_id: TimelineId,
    3169             526 :         pg_version: u32,
    3170             526 :         load_existing_initdb: Option<TimelineId>,
    3171             526 :         timeline_uninit_mark: TimelineUninitMark<'_>,
    3172             526 :         ctx: &RequestContext,
    3173             526 :     ) -> anyhow::Result<Arc<Timeline>> {
    3174             526 :         // create a `tenant/{tenant_id}/timelines/basebackup-{timeline_id}.{TEMP_FILE_SUFFIX}/`
    3175             526 :         // temporary directory for basebackup files for the given timeline.
    3176             526 : 
    3177             526 :         let timelines_path = self.conf.timelines_path(&self.tenant_shard_id);
    3178             526 :         let pgdata_path = path_with_suffix_extension(
    3179             526 :             timelines_path.join(format!("basebackup-{timeline_id}")),
    3180             526 :             TEMP_FILE_SUFFIX,
    3181             526 :         );
    3182             526 : 
    3183             526 :         // an uninit mark was placed before, nothing else can access this timeline files
    3184             526 :         // current initdb was not run yet, so remove whatever was left from the previous runs
    3185             526 :         if pgdata_path.exists() {
    3186 UBC           0 :             fs::remove_dir_all(&pgdata_path).with_context(|| {
    3187               0 :                 format!("Failed to remove already existing initdb directory: {pgdata_path}")
    3188               0 :             })?;
    3189 CBC         526 :         }
    3190                 :         // this new directory is very temporary, set to remove it immediately after bootstrap, we don't need it
    3191             524 :         scopeguard::defer! {
    3192             524 :             if let Err(e) = fs::remove_dir_all(&pgdata_path) {
    3193                 :                 // this is unlikely, but we will remove the directory on pageserver restart or another bootstrap call
    3194 UBC           0 :                 error!("Failed to remove temporary initdb directory '{pgdata_path}': {e}");
    3195 CBC         524 :             }
    3196                 :         }
    3197             526 :         if let Some(existing_initdb_timeline_id) = load_existing_initdb {
    3198               3 :             let Some(storage) = &self.remote_storage else {
    3199 UBC           0 :                 bail!("no storage configured but load_existing_initdb set to {existing_initdb_timeline_id}");
    3200                 :             };
    3201 CBC           3 :             let (initdb_tar_zst_path, initdb_tar_zst) =
    3202               3 :                 self::remote_timeline_client::download_initdb_tar_zst(
    3203               3 :                     self.conf,
    3204               3 :                     storage,
    3205               3 :                     &self.tenant_shard_id,
    3206               3 :                     &existing_initdb_timeline_id,
    3207               3 :                     &self.cancel,
    3208               3 :                 )
    3209            1246 :                 .await
    3210               3 :                 .context("download initdb tar")?;
    3211               3 :             let buf_read =
    3212               3 :                 BufReader::with_capacity(remote_timeline_client::BUFFER_SIZE, initdb_tar_zst);
    3213               3 :             import_datadir::extract_tar_zst(&pgdata_path, buf_read)
    3214           16100 :                 .await
    3215               3 :                 .context("extract initdb tar")?;
    3216                 : 
    3217               3 :             tokio::fs::remove_file(&initdb_tar_zst_path)
    3218               3 :                 .await
    3219               3 :                 .or_else(|e| {
    3220 UBC           0 :                     if e.kind() == std::io::ErrorKind::NotFound {
    3221                 :                         // If something else already removed the file, ignore the error
    3222               0 :                         Ok(())
    3223                 :                     } else {
    3224               0 :                         Err(e)
    3225                 :                     }
    3226 CBC           3 :                 })
    3227               3 :                 .with_context(|| format!("tempfile removal {initdb_tar_zst_path}"))?;
    3228                 :         } else {
    3229                 :             // Init temporarily repo to get bootstrap data, this creates a directory in the `initdb_path` path
    3230            6469 :             run_initdb(self.conf, &pgdata_path, pg_version, &self.cancel).await?;
    3231                 : 
    3232                 :             // Upload the created data dir to S3
    3233             522 :             if let Some(storage) = &self.remote_storage {
    3234             522 :                 let temp_path = timelines_path.join(format!(
    3235             522 :                     "{INITDB_PATH}.upload-{timeline_id}.{TEMP_FILE_SUFFIX}"
    3236             522 :                 ));
    3237                 : 
    3238             522 :                 let (pgdata_zstd, tar_zst_size) =
    3239         4270485 :                     import_datadir::create_tar_zst(&pgdata_path, &temp_path).await?;
    3240             522 :                 backoff::retry(
    3241             589 :                     || async {
    3242             589 :                         self::remote_timeline_client::upload_initdb_dir(
    3243             589 :                             storage,
    3244             589 :                             &self.tenant_shard_id.tenant_id,
    3245             589 :                             &timeline_id,
    3246             589 :                             pgdata_zstd.try_clone().await?,
    3247             589 :                             tar_zst_size,
    3248             589 :                             &self.cancel,
    3249                 :                         )
    3250           23739 :                         .await
    3251             589 :                     },
    3252             522 :                     |_| false,
    3253             522 :                     3,
    3254             522 :                     u32::MAX,
    3255             522 :                     "persist_initdb_tar_zst",
    3256             522 :                     backoff::Cancel::new(self.cancel.clone(), || anyhow::anyhow!("Cancelled")),
    3257             522 :                 )
    3258           24322 :                 .await?;
    3259                 : 
    3260             522 :                 tokio::fs::remove_file(&temp_path)
    3261             521 :                     .await
    3262             522 :                     .or_else(|e| {
    3263 UBC           0 :                         if e.kind() == std::io::ErrorKind::NotFound {
    3264                 :                             // If something else already removed the file, ignore the error
    3265               0 :                             Ok(())
    3266                 :                         } else {
    3267               0 :                             Err(e)
    3268                 :                         }
    3269 CBC         522 :                     })
    3270             522 :                     .with_context(|| format!("tempfile removal {temp_path}"))?;
    3271 UBC           0 :             }
    3272                 :         }
    3273 CBC         525 :         let pgdata_lsn = import_datadir::get_lsn_from_controlfile(&pgdata_path)?.align();
    3274             525 : 
    3275             525 :         // Import the contents of the data directory at the initial checkpoint
    3276             525 :         // LSN, and any WAL after that.
    3277             525 :         // Initdb lsn will be equal to last_record_lsn which will be set after import.
    3278             525 :         // Because we know it upfront avoid having an option or dummy zero value by passing it to the metadata.
    3279             525 :         let new_metadata = TimelineMetadata::new(
    3280             525 :             Lsn(0),
    3281             525 :             None,
    3282             525 :             None,
    3283             525 :             Lsn(0),
    3284             525 :             pgdata_lsn,
    3285             525 :             pgdata_lsn,
    3286             525 :             pg_version,
    3287             525 :         );
    3288             525 :         let raw_timeline = self
    3289             525 :             .prepare_new_timeline(
    3290             525 :                 timeline_id,
    3291             525 :                 &new_metadata,
    3292             525 :                 timeline_uninit_mark,
    3293             525 :                 pgdata_lsn,
    3294             525 :                 None,
    3295             525 :             )
    3296               1 :             .await?;
    3297                 : 
    3298             524 :         let tenant_shard_id = raw_timeline.owning_tenant.tenant_shard_id;
    3299             524 :         let unfinished_timeline = raw_timeline.raw_timeline()?;
    3300                 : 
    3301             524 :         import_datadir::import_timeline_from_postgres_datadir(
    3302             524 :             unfinished_timeline,
    3303             524 :             &pgdata_path,
    3304             524 :             pgdata_lsn,
    3305             524 :             ctx,
    3306             524 :         )
    3307         2611938 :         .await
    3308             524 :         .with_context(|| {
    3309 UBC           0 :             format!("Failed to import pgdatadir for timeline {tenant_shard_id}/{timeline_id}")
    3310 CBC         524 :         })?;
    3311                 : 
    3312                 :         // Flush the new layer files to disk, before we make the timeline as available to
    3313                 :         // the outside world.
    3314                 :         //
    3315                 :         // Flush loop needs to be spawned in order to be able to flush.
    3316             524 :         unfinished_timeline.maybe_spawn_flush_loop();
    3317             524 : 
    3318             524 :         fail::fail_point!("before-checkpoint-new-timeline", |_| {
    3319               2 :             anyhow::bail!("failpoint before-checkpoint-new-timeline");
    3320             524 :         });
    3321                 : 
    3322             522 :         unfinished_timeline
    3323             522 :             .freeze_and_flush()
    3324             521 :             .await
    3325             521 :             .with_context(|| {
    3326 UBC           0 :                 format!(
    3327               0 :                     "Failed to flush after pgdatadir import for timeline {tenant_shard_id}/{timeline_id}"
    3328               0 :                 )
    3329 CBC         521 :             })?;
    3330                 : 
    3331                 :         // All done!
    3332             521 :         let timeline = raw_timeline.finish_creation()?;
    3333                 : 
    3334             520 :         info!(
    3335             520 :             "created root timeline {} timeline.lsn {}",
    3336             520 :             timeline_id,
    3337             520 :             timeline.get_last_record_lsn()
    3338             520 :         );
    3339                 : 
    3340             520 :         Ok(timeline)
    3341             524 :     }
    3342                 : 
    3343                 :     /// Call this before constructing a timeline, to build its required structures
    3344             921 :     fn build_timeline_resources(&self, timeline_id: TimelineId) -> TimelineResources {
    3345             921 :         let remote_client = if let Some(remote_storage) = self.remote_storage.as_ref() {
    3346             921 :             let remote_client = RemoteTimelineClient::new(
    3347             921 :                 remote_storage.clone(),
    3348             921 :                 self.deletion_queue_client.clone(),
    3349             921 :                 self.conf,
    3350             921 :                 self.tenant_shard_id,
    3351             921 :                 timeline_id,
    3352             921 :                 self.generation,
    3353             921 :             );
    3354             921 :             Some(remote_client)
    3355                 :         } else {
    3356 UBC           0 :             None
    3357                 :         };
    3358                 : 
    3359 CBC         921 :         TimelineResources {
    3360             921 :             remote_client,
    3361             921 :             deletion_queue_client: self.deletion_queue_client.clone(),
    3362             921 :         }
    3363             921 :     }
    3364                 : 
    3365                 :     /// Creates intermediate timeline structure and its files.
    3366                 :     ///
    3367                 :     /// An empty layer map is initialized, and new data and WAL can be imported starting
    3368                 :     /// at 'disk_consistent_lsn'. After any initial data has been imported, call
    3369                 :     /// `finish_creation` to insert the Timeline into the timelines map and to remove the
    3370                 :     /// uninit mark file.
    3371             921 :     async fn prepare_new_timeline<'a>(
    3372             921 :         &'a self,
    3373             921 :         new_timeline_id: TimelineId,
    3374             921 :         new_metadata: &TimelineMetadata,
    3375             921 :         uninit_mark: TimelineUninitMark<'a>,
    3376             921 :         start_lsn: Lsn,
    3377             921 :         ancestor: Option<Arc<Timeline>>,
    3378             921 :     ) -> anyhow::Result<UninitializedTimeline> {
    3379             921 :         let tenant_shard_id = self.tenant_shard_id;
    3380             921 : 
    3381             921 :         let resources = self.build_timeline_resources(new_timeline_id);
    3382             921 :         if let Some(remote_client) = &resources.remote_client {
    3383             921 :             remote_client.init_upload_queue_for_empty_remote(new_metadata)?;
    3384 UBC           0 :         }
    3385                 : 
    3386 CBC         921 :         let timeline_struct = self
    3387             921 :             .create_timeline_struct(
    3388             921 :                 new_timeline_id,
    3389             921 :                 new_metadata,
    3390             921 :                 ancestor,
    3391             921 :                 resources,
    3392             921 :                 CreateTimelineCause::Load,
    3393             921 :             )
    3394             921 :             .context("Failed to create timeline data structure")?;
    3395                 : 
    3396             921 :         timeline_struct.init_empty_layer_map(start_lsn);
    3397                 : 
    3398             921 :         if let Err(e) = self
    3399             921 :             .create_timeline_files(&uninit_mark.timeline_path, &new_timeline_id, new_metadata)
    3400 UBC           0 :             .await
    3401                 :         {
    3402 CBC           1 :             error!("Failed to create initial files for timeline {tenant_shard_id}/{new_timeline_id}, cleaning up: {e:?}");
    3403               1 :             cleanup_timeline_directory(uninit_mark);
    3404               1 :             return Err(e);
    3405             920 :         }
    3406                 : 
    3407 UBC           0 :         debug!(
    3408               0 :             "Successfully created initial files for timeline {tenant_shard_id}/{new_timeline_id}"
    3409               0 :         );
    3410                 : 
    3411 CBC         920 :         Ok(UninitializedTimeline::new(
    3412             920 :             self,
    3413             920 :             new_timeline_id,
    3414             920 :             Some((timeline_struct, uninit_mark)),
    3415             920 :         ))
    3416             921 :     }
    3417                 : 
    3418             921 :     async fn create_timeline_files(
    3419             921 :         &self,
    3420             921 :         timeline_path: &Utf8Path,
    3421             921 :         new_timeline_id: &TimelineId,
    3422             921 :         new_metadata: &TimelineMetadata,
    3423             921 :     ) -> anyhow::Result<()> {
    3424             921 :         crashsafe::create_dir(timeline_path).context("Failed to create timeline directory")?;
    3425                 : 
    3426             921 :         fail::fail_point!("after-timeline-uninit-mark-creation", |_| {
    3427               1 :             anyhow::bail!("failpoint after-timeline-uninit-mark-creation");
    3428             921 :         });
    3429                 : 
    3430             920 :         save_metadata(
    3431             920 :             self.conf,
    3432             920 :             &self.tenant_shard_id,
    3433             920 :             new_timeline_id,
    3434             920 :             new_metadata,
    3435             920 :         )
    3436 UBC           0 :         .await
    3437 CBC         920 :         .context("Failed to create timeline metadata")?;
    3438             920 :         Ok(())
    3439             921 :     }
    3440                 : 
    3441                 :     /// Attempts to create an uninit mark file for the timeline initialization.
    3442                 :     /// Bails, if the timeline is already loaded into the memory (i.e. initialized before), or the uninit mark file already exists.
    3443                 :     ///
    3444                 :     /// This way, we need to hold the timelines lock only for small amount of time during the mark check/creation per timeline init.
    3445             939 :     fn create_timeline_uninit_mark(
    3446             939 :         &self,
    3447             939 :         timeline_id: TimelineId,
    3448             939 :     ) -> Result<TimelineUninitMark, TimelineExclusionError> {
    3449             939 :         let tenant_shard_id = self.tenant_shard_id;
    3450             939 : 
    3451             939 :         let uninit_mark_path = self
    3452             939 :             .conf
    3453             939 :             .timeline_uninit_mark_file_path(tenant_shard_id, timeline_id);
    3454             939 :         let timeline_path = self.conf.timeline_path(&tenant_shard_id, &timeline_id);
    3455                 : 
    3456             939 :         let uninit_mark = TimelineUninitMark::new(
    3457             939 :             self,
    3458             939 :             timeline_id,
    3459             939 :             uninit_mark_path.clone(),
    3460             939 :             timeline_path.clone(),
    3461             939 :         )?;
    3462                 : 
    3463                 :         // At this stage, we have got exclusive access to in-memory state for this timeline ID
    3464                 :         // for creation.
    3465                 :         // A timeline directory should never exist on disk already:
    3466                 :         // - a previous failed creation would have cleaned up after itself
    3467                 :         // - a pageserver restart would clean up timeline directories that don't have valid remote state
    3468                 :         //
    3469                 :         // Therefore it is an unexpected internal error to encounter a timeline directory already existing here,
    3470                 :         // this error may indicate a bug in cleanup on failed creations.
    3471             938 :         if timeline_path.exists() {
    3472 UBC           0 :             return Err(TimelineExclusionError::Other(anyhow::anyhow!(
    3473               0 :                 "Timeline directory already exists! This is a bug."
    3474               0 :             )));
    3475 CBC         938 :         }
    3476             938 : 
    3477             938 :         // Create the on-disk uninit mark _after_ the in-memory acquisition of the tenant ID: guarantees
    3478             938 :         // that during process runtime, colliding creations will be caught in-memory without getting
    3479             938 :         // as far as failing to write a file.
    3480             938 :         fs::OpenOptions::new()
    3481             938 :             .write(true)
    3482             938 :             .create_new(true)
    3483             938 :             .open(&uninit_mark_path)
    3484             938 :             .context("Failed to create uninit mark file")
    3485             938 :             .and_then(|_| {
    3486             938 :                 crashsafe::fsync_file_and_parent(&uninit_mark_path)
    3487             938 :                     .context("Failed to fsync uninit mark file")
    3488             938 :             })
    3489             938 :             .with_context(|| {
    3490 UBC           0 :                 format!("Failed to crate uninit mark for timeline {tenant_shard_id}/{timeline_id}")
    3491 CBC         938 :             })?;
    3492                 : 
    3493             938 :         Ok(uninit_mark)
    3494             939 :     }
    3495                 : 
    3496                 :     /// Gathers inputs from all of the timelines to produce a sizing model input.
    3497                 :     ///
    3498                 :     /// Future is cancellation safe. Only one calculation can be running at once per tenant.
    3499              84 :     #[instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug()))]
    3500                 :     pub async fn gather_size_inputs(
    3501                 :         &self,
    3502                 :         // `max_retention_period` overrides the cutoff that is used to calculate the size
    3503                 :         // (only if it is shorter than the real cutoff).
    3504                 :         max_retention_period: Option<u64>,
    3505                 :         cause: LogicalSizeCalculationCause,
    3506                 :         cancel: &CancellationToken,
    3507                 :         ctx: &RequestContext,
    3508                 :     ) -> anyhow::Result<size::ModelInputs> {
    3509                 :         let logical_sizes_at_once = self
    3510                 :             .conf
    3511                 :             .concurrent_tenant_size_logical_size_queries
    3512                 :             .inner();
    3513                 : 
    3514                 :         // TODO: Having a single mutex block concurrent reads is not great for performance.
    3515                 :         //
    3516                 :         // But the only case where we need to run multiple of these at once is when we
    3517                 :         // request a size for a tenant manually via API, while another background calculation
    3518                 :         // is in progress (which is not a common case).
    3519                 :         //
    3520                 :         // See more for on the issue #2748 condenced out of the initial PR review.
    3521                 :         let mut shared_cache = self.cached_logical_sizes.lock().await;
    3522                 : 
    3523                 :         size::gather_inputs(
    3524                 :             self,
    3525                 :             logical_sizes_at_once,
    3526                 :             max_retention_period,
    3527                 :             &mut shared_cache,
    3528                 :             cause,
    3529                 :             cancel,
    3530                 :             ctx,
    3531                 :         )
    3532                 :         .await
    3533                 :     }
    3534                 : 
    3535                 :     /// Calculate synthetic tenant size and cache the result.
    3536                 :     /// This is periodically called by background worker.
    3537                 :     /// result is cached in tenant struct
    3538              29 :     #[instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug()))]
    3539                 :     pub async fn calculate_synthetic_size(
    3540                 :         &self,
    3541                 :         cause: LogicalSizeCalculationCause,
    3542                 :         cancel: &CancellationToken,
    3543                 :         ctx: &RequestContext,
    3544                 :     ) -> anyhow::Result<u64> {
    3545                 :         let inputs = self.gather_size_inputs(None, cause, cancel, ctx).await?;
    3546                 : 
    3547                 :         let size = inputs.calculate()?;
    3548                 : 
    3549                 :         self.set_cached_synthetic_size(size);
    3550                 : 
    3551                 :         Ok(size)
    3552                 :     }
    3553                 : 
    3554                 :     /// Cache given synthetic size and update the metric value
    3555              27 :     pub fn set_cached_synthetic_size(&self, size: u64) {
    3556              27 :         self.cached_synthetic_tenant_size
    3557              27 :             .store(size, Ordering::Relaxed);
    3558              27 : 
    3559              27 :         TENANT_SYNTHETIC_SIZE_METRIC
    3560              27 :             .get_metric_with_label_values(&[&self.tenant_shard_id.tenant_id.to_string()])
    3561              27 :             .unwrap()
    3562              27 :             .set(size);
    3563              27 :     }
    3564                 : 
    3565              27 :     pub fn cached_synthetic_size(&self) -> u64 {
    3566              27 :         self.cached_synthetic_tenant_size.load(Ordering::Relaxed)
    3567              27 :     }
    3568                 : 
    3569                 :     /// Flush any in-progress layers, schedule uploads, and wait for uploads to complete.
    3570                 :     ///
    3571                 :     /// This function can take a long time: callers should wrap it in a timeout if calling
    3572                 :     /// from an external API handler.
    3573                 :     ///
    3574                 :     /// Cancel-safety: cancelling this function may leave I/O running, but such I/O is
    3575                 :     /// still bounded by tenant/timeline shutdown.
    3576 UBC           0 :     #[tracing::instrument(skip_all)]
    3577                 :     pub(crate) async fn flush_remote(&self) -> anyhow::Result<()> {
    3578                 :         let timelines = self.timelines.lock().unwrap().clone();
    3579                 : 
    3580 CBC           1 :         async fn flush_timeline(_gate: GateGuard, timeline: Arc<Timeline>) -> anyhow::Result<()> {
    3581               1 :             tracing::info!(timeline_id=%timeline.timeline_id, "Flushing...");
    3582               1 :             timeline.freeze_and_flush().await?;
    3583               1 :             tracing::info!(timeline_id=%timeline.timeline_id, "Waiting for uploads...");
    3584               1 :             if let Some(client) = &timeline.remote_client {
    3585               1 :                 client.wait_completion().await?;
    3586 UBC           0 :             }
    3587                 : 
    3588 CBC           1 :             Ok(())
    3589               1 :         }
    3590                 : 
    3591                 :         // We do not use a JoinSet for these tasks, because we don't want them to be
    3592                 :         // aborted when this function's future is cancelled: they should stay alive
    3593                 :         // holding their GateGuard until they complete, to ensure their I/Os complete
    3594                 :         // before Timeline shutdown completes.
    3595                 :         let mut results = FuturesUnordered::new();
    3596                 : 
    3597                 :         for (_timeline_id, timeline) in timelines {
    3598                 :             // Run each timeline's flush in a task holding the timeline's gate: this
    3599                 :             // means that if this function's future is cancelled, the Timeline shutdown
    3600                 :             // will still wait for any I/O in here to complete.
    3601                 :             let gate = match timeline.gate.enter() {
    3602                 :                 Ok(g) => g,
    3603                 :                 Err(_) => continue,
    3604                 :             };
    3605               2 :             let jh = tokio::task::spawn(async move { flush_timeline(gate, timeline).await });
    3606                 :             results.push(jh);
    3607                 :         }
    3608                 : 
    3609                 :         while let Some(r) = results.next().await {
    3610                 :             if let Err(e) = r {
    3611                 :                 if !e.is_cancelled() && !e.is_panic() {
    3612 UBC           0 :                     tracing::error!("unexpected join error: {e:?}");
    3613                 :                 }
    3614                 :             }
    3615                 :         }
    3616                 : 
    3617                 :         // The flushes we did above were just writes, but the Tenant might have had
    3618                 :         // pending deletions as well from recent compaction/gc: we want to flush those
    3619                 :         // as well.  This requires flushing the global delete queue.  This is cheap
    3620                 :         // because it's typically a no-op.
    3621                 :         match self.deletion_queue_client.flush_execute().await {
    3622                 :             Ok(_) => {}
    3623                 :             Err(DeletionQueueError::ShuttingDown) => {}
    3624                 :         }
    3625                 : 
    3626                 :         Ok(())
    3627                 :     }
    3628                 : }
    3629                 : 
    3630 CBC           1 : fn remove_timeline_and_uninit_mark(
    3631               1 :     timeline_dir: &Utf8Path,
    3632               1 :     uninit_mark: &Utf8Path,
    3633               1 : ) -> anyhow::Result<()> {
    3634               1 :     fs::remove_dir_all(timeline_dir)
    3635               1 :         .or_else(|e| {
    3636 UBC           0 :             if e.kind() == std::io::ErrorKind::NotFound {
    3637                 :                 // we can leave the uninit mark without a timeline dir,
    3638                 :                 // just remove the mark then
    3639               0 :                 Ok(())
    3640                 :             } else {
    3641               0 :                 Err(e)
    3642                 :             }
    3643 CBC           1 :         })
    3644               1 :         .with_context(|| {
    3645 UBC           0 :             format!("Failed to remove unit marked timeline directory {timeline_dir}")
    3646 CBC           1 :         })?;
    3647               1 :     fs::remove_file(uninit_mark)
    3648               1 :         .with_context(|| format!("Failed to remove timeline uninit mark file {uninit_mark}"))?;
    3649                 : 
    3650               1 :     Ok(())
    3651               1 : }
    3652                 : 
    3653             462 : pub(crate) async fn create_tenant_files(
    3654             462 :     conf: &'static PageServerConf,
    3655             462 :     location_conf: &LocationConf,
    3656             462 :     tenant_shard_id: &TenantShardId,
    3657             462 : ) -> anyhow::Result<Utf8PathBuf> {
    3658             462 :     let target_tenant_directory = conf.tenant_path(tenant_shard_id);
    3659             462 :     anyhow::ensure!(
    3660             462 :         !target_tenant_directory
    3661             462 :             .try_exists()
    3662             462 :             .context("check existence of tenant directory")?,
    3663               2 :         "tenant directory already exists",
    3664                 :     );
    3665                 : 
    3666             460 :     let temporary_tenant_dir =
    3667             460 :         path_with_suffix_extension(&target_tenant_directory, TEMP_FILE_SUFFIX);
    3668 UBC           0 :     debug!("Creating temporary directory structure in {temporary_tenant_dir}");
    3669                 : 
    3670                 :     // top-level dir may exist if we are creating it through CLI
    3671 CBC         460 :     crashsafe::create_dir_all(&temporary_tenant_dir).with_context(|| {
    3672 UBC           0 :         format!("could not create temporary tenant directory {temporary_tenant_dir}")
    3673 CBC         460 :     })?;
    3674                 : 
    3675             460 :     let creation_result = try_create_target_tenant_dir(
    3676             460 :         conf,
    3677             460 :         location_conf,
    3678             460 :         tenant_shard_id,
    3679             460 :         &temporary_tenant_dir,
    3680             460 :         &target_tenant_directory,
    3681             460 :     )
    3682             919 :     .await;
    3683                 : 
    3684             460 :     if creation_result.is_err() {
    3685               1 :         error!(
    3686               1 :             "Failed to create directory structure for tenant {tenant_shard_id}, cleaning tmp data"
    3687               1 :         );
    3688               1 :         if let Err(e) = fs::remove_dir_all(&temporary_tenant_dir) {
    3689 UBC           0 :             error!("Failed to remove temporary tenant directory {temporary_tenant_dir:?}: {e}")
    3690 CBC           1 :         } else if let Err(e) = crashsafe::fsync(&temporary_tenant_dir) {
    3691               1 :             error!(
    3692               1 :                 "Failed to fsync removed temporary tenant directory {temporary_tenant_dir:?}: {e}"
    3693               1 :             )
    3694 UBC           0 :         }
    3695 CBC         459 :     }
    3696                 : 
    3697             460 :     creation_result?;
    3698                 : 
    3699             459 :     Ok(target_tenant_directory)
    3700             462 : }
    3701                 : 
    3702             460 : async fn try_create_target_tenant_dir(
    3703             460 :     conf: &'static PageServerConf,
    3704             460 :     location_conf: &LocationConf,
    3705             460 :     tenant_shard_id: &TenantShardId,
    3706             460 :     temporary_tenant_dir: &Utf8Path,
    3707             460 :     target_tenant_directory: &Utf8Path,
    3708             460 : ) -> Result<(), anyhow::Error> {
    3709             460 :     let temporary_tenant_timelines_dir = rebase_directory(
    3710             460 :         &conf.timelines_path(tenant_shard_id),
    3711             460 :         target_tenant_directory,
    3712             460 :         temporary_tenant_dir,
    3713             460 :     )
    3714             460 :     .with_context(|| format!("resolve tenant {tenant_shard_id} temporary timelines dir"))?;
    3715             460 :     let temporary_legacy_tenant_config_path = rebase_directory(
    3716             460 :         &conf.tenant_config_path(tenant_shard_id),
    3717             460 :         target_tenant_directory,
    3718             460 :         temporary_tenant_dir,
    3719             460 :     )
    3720             460 :     .with_context(|| format!("resolve tenant {tenant_shard_id} temporary config path"))?;
    3721             460 :     let temporary_tenant_config_path = rebase_directory(
    3722             460 :         &conf.tenant_location_config_path(tenant_shard_id),
    3723             460 :         target_tenant_directory,
    3724             460 :         temporary_tenant_dir,
    3725             460 :     )
    3726             460 :     .with_context(|| format!("resolve tenant {tenant_shard_id} temporary config path"))?;
    3727                 : 
    3728             460 :     Tenant::persist_tenant_config_at(
    3729             460 :         tenant_shard_id,
    3730             460 :         &temporary_tenant_config_path,
    3731             460 :         &temporary_legacy_tenant_config_path,
    3732             460 :         location_conf,
    3733             460 :     )
    3734             919 :     .await?;
    3735                 : 
    3736             460 :     crashsafe::create_dir(&temporary_tenant_timelines_dir).with_context(|| {
    3737 UBC           0 :         format!(
    3738               0 :             "create tenant {} temporary timelines directory {}",
    3739               0 :             tenant_shard_id, temporary_tenant_timelines_dir,
    3740               0 :         )
    3741 CBC         460 :     })?;
    3742             460 :     fail::fail_point!("tenant-creation-before-tmp-rename", |_| {
    3743               1 :         anyhow::bail!("failpoint tenant-creation-before-tmp-rename");
    3744             460 :     });
    3745                 : 
    3746                 :     // Make sure the current tenant directory entries are durable before renaming.
    3747                 :     // Without this, a crash may reorder any of the directory entry creations above.
    3748             459 :     crashsafe::fsync(temporary_tenant_dir)
    3749             459 :         .with_context(|| format!("sync temporary tenant directory {temporary_tenant_dir:?}"))?;
    3750                 : 
    3751             459 :     fs::rename(temporary_tenant_dir, target_tenant_directory).with_context(|| {
    3752 UBC           0 :         format!(
    3753               0 :             "move tenant {} temporary directory {} into the permanent one {}",
    3754               0 :             tenant_shard_id, temporary_tenant_dir, target_tenant_directory
    3755               0 :         )
    3756 CBC         459 :     })?;
    3757             459 :     let target_dir_parent = target_tenant_directory.parent().with_context(|| {
    3758 UBC           0 :         format!(
    3759               0 :             "get tenant {} dir parent for {}",
    3760               0 :             tenant_shard_id, target_tenant_directory,
    3761               0 :         )
    3762 CBC         459 :     })?;
    3763             459 :     crashsafe::fsync(target_dir_parent).with_context(|| {
    3764 UBC           0 :         format!(
    3765               0 :             "fsync renamed directory's parent {} for tenant {}",
    3766               0 :             target_dir_parent, tenant_shard_id,
    3767               0 :         )
    3768 CBC         459 :     })?;
    3769                 : 
    3770             459 :     Ok(())
    3771             460 : }
    3772                 : 
    3773            1380 : fn rebase_directory(
    3774            1380 :     original_path: &Utf8Path,
    3775            1380 :     base: &Utf8Path,
    3776            1380 :     new_base: &Utf8Path,
    3777            1380 : ) -> anyhow::Result<Utf8PathBuf> {
    3778            1380 :     let relative_path = original_path.strip_prefix(base).with_context(|| {
    3779 UBC           0 :         format!(
    3780               0 :             "Failed to strip base prefix '{}' off path '{}'",
    3781               0 :             base, original_path
    3782               0 :         )
    3783 CBC        1380 :     })?;
    3784            1380 :     Ok(new_base.join(relative_path))
    3785            1380 : }
    3786                 : 
    3787                 : /// Create the cluster temporarily in 'initdbpath' directory inside the repository
    3788                 : /// to get bootstrap data for timeline initialization.
    3789             523 : async fn run_initdb(
    3790             523 :     conf: &'static PageServerConf,
    3791             523 :     initdb_target_dir: &Utf8Path,
    3792             523 :     pg_version: u32,
    3793             523 :     cancel: &CancellationToken,
    3794             523 : ) -> Result<(), InitdbError> {
    3795             523 :     let initdb_bin_path = conf
    3796             523 :         .pg_bin_dir(pg_version)
    3797             523 :         .map_err(InitdbError::Other)?
    3798             523 :         .join("initdb");
    3799             523 :     let initdb_lib_dir = conf.pg_lib_dir(pg_version).map_err(InitdbError::Other)?;
    3800             523 :     info!(
    3801             523 :         "running {} in {}, libdir: {}",
    3802             523 :         initdb_bin_path, initdb_target_dir, initdb_lib_dir,
    3803             523 :     );
    3804                 : 
    3805             523 :     let _permit = INIT_DB_SEMAPHORE.acquire().await;
    3806                 : 
    3807             523 :     let initdb_command = tokio::process::Command::new(&initdb_bin_path)
    3808             523 :         .args(["-D", initdb_target_dir.as_ref()])
    3809             523 :         .args(["-U", &conf.superuser])
    3810             523 :         .args(["-E", "utf8"])
    3811             523 :         .arg("--no-instructions")
    3812             523 :         .arg("--no-sync")
    3813             523 :         .env_clear()
    3814             523 :         .env("LD_LIBRARY_PATH", &initdb_lib_dir)
    3815             523 :         .env("DYLD_LIBRARY_PATH", &initdb_lib_dir)
    3816             523 :         .stdout(Stdio::piped())
    3817             523 :         .stderr(Stdio::piped())
    3818             523 :         // If the `select!` below doesn't finish the `wait_with_output`,
    3819             523 :         // let the task get `wait()`ed for asynchronously by tokio.
    3820             523 :         // This means there is a slim chance we can go over the INIT_DB_SEMAPHORE.
    3821             523 :         // TODO: fix for this is non-trivial, see
    3822             523 :         // https://github.com/neondatabase/neon/pull/5921#pullrequestreview-1750858021
    3823             523 :         //
    3824             523 :         .kill_on_drop(true)
    3825             523 :         .spawn()?;
    3826                 : 
    3827             523 :     tokio::select! {
    3828             522 :         initdb_output = initdb_command.wait_with_output() => {
    3829                 :             let initdb_output = initdb_output?;
    3830                 :             if !initdb_output.status.success() {
    3831                 :                 return Err(InitdbError::Failed(initdb_output.status, initdb_output.stderr));
    3832                 :             }
    3833                 :         }
    3834                 :         _ = cancel.cancelled() => {
    3835                 :             return Err(InitdbError::Cancelled);
    3836                 :         }
    3837                 :     }
    3838                 : 
    3839             522 :     Ok(())
    3840             522 : }
    3841                 : 
    3842                 : impl Drop for Tenant {
    3843             199 :     fn drop(&mut self) {
    3844             199 :         remove_tenant_metrics(&self.tenant_shard_id.tenant_id);
    3845             199 :     }
    3846                 : }
    3847                 : /// Dump contents of a layer file to stdout.
    3848 UBC           0 : pub async fn dump_layerfile_from_path(
    3849               0 :     path: &Utf8Path,
    3850               0 :     verbose: bool,
    3851               0 :     ctx: &RequestContext,
    3852               0 : ) -> anyhow::Result<()> {
    3853                 :     use std::os::unix::fs::FileExt;
    3854                 : 
    3855                 :     // All layer files start with a two-byte "magic" value, to identify the kind of
    3856                 :     // file.
    3857               0 :     let file = File::open(path)?;
    3858               0 :     let mut header_buf = [0u8; 2];
    3859               0 :     file.read_exact_at(&mut header_buf, 0)?;
    3860                 : 
    3861               0 :     match u16::from_be_bytes(header_buf) {
    3862                 :         crate::IMAGE_FILE_MAGIC => {
    3863               0 :             ImageLayer::new_for_path(path, file)?
    3864               0 :                 .dump(verbose, ctx)
    3865               0 :                 .await?
    3866                 :         }
    3867                 :         crate::DELTA_FILE_MAGIC => {
    3868               0 :             DeltaLayer::new_for_path(path, file)?
    3869               0 :                 .dump(verbose, ctx)
    3870               0 :                 .await?
    3871                 :         }
    3872               0 :         magic => bail!("unrecognized magic identifier: {:?}", magic),
    3873                 :     }
    3874                 : 
    3875               0 :     Ok(())
    3876               0 : }
    3877                 : 
    3878                 : #[cfg(test)]
    3879                 : pub(crate) mod harness {
    3880                 :     use bytes::{Bytes, BytesMut};
    3881                 :     use once_cell::sync::OnceCell;
    3882                 :     use pageserver_api::shard::ShardIndex;
    3883                 :     use std::fs;
    3884                 :     use std::sync::Arc;
    3885                 :     use utils::logging;
    3886                 :     use utils::lsn::Lsn;
    3887                 : 
    3888                 :     use crate::deletion_queue::mock::MockDeletionQueue;
    3889                 :     use crate::{
    3890                 :         config::PageServerConf, repository::Key, tenant::Tenant, walrecord::NeonWalRecord,
    3891                 :     };
    3892                 : 
    3893                 :     use super::*;
    3894                 :     use crate::tenant::config::{TenantConf, TenantConfOpt};
    3895                 :     use hex_literal::hex;
    3896                 :     use utils::id::{TenantId, TimelineId};
    3897                 : 
    3898                 :     pub const TIMELINE_ID: TimelineId =
    3899                 :         TimelineId::from_array(hex!("11223344556677881122334455667788"));
    3900                 :     pub const NEW_TIMELINE_ID: TimelineId =
    3901                 :         TimelineId::from_array(hex!("AA223344556677881122334455667788"));
    3902                 : 
    3903                 :     /// Convenience function to create a page image with given string as the only content
    3904                 :     #[allow(non_snake_case)]
    3905 CBC      854114 :     pub fn TEST_IMG(s: &str) -> Bytes {
    3906          854114 :         let mut buf = BytesMut::new();
    3907          854114 :         buf.extend_from_slice(s.as_bytes());
    3908          854114 :         buf.resize(64, 0);
    3909          854114 : 
    3910          854114 :         buf.freeze()
    3911          854114 :     }
    3912                 : 
    3913                 :     impl From<TenantConf> for TenantConfOpt {
    3914              42 :         fn from(tenant_conf: TenantConf) -> Self {
    3915              42 :             Self {
    3916              42 :                 checkpoint_distance: Some(tenant_conf.checkpoint_distance),
    3917              42 :                 checkpoint_timeout: Some(tenant_conf.checkpoint_timeout),
    3918              42 :                 compaction_target_size: Some(tenant_conf.compaction_target_size),
    3919              42 :                 compaction_period: Some(tenant_conf.compaction_period),
    3920              42 :                 compaction_threshold: Some(tenant_conf.compaction_threshold),
    3921              42 :                 gc_horizon: Some(tenant_conf.gc_horizon),
    3922              42 :                 gc_period: Some(tenant_conf.gc_period),
    3923              42 :                 image_creation_threshold: Some(tenant_conf.image_creation_threshold),
    3924              42 :                 pitr_interval: Some(tenant_conf.pitr_interval),
    3925              42 :                 walreceiver_connect_timeout: Some(tenant_conf.walreceiver_connect_timeout),
    3926              42 :                 lagging_wal_timeout: Some(tenant_conf.lagging_wal_timeout),
    3927              42 :                 max_lsn_wal_lag: Some(tenant_conf.max_lsn_wal_lag),
    3928              42 :                 trace_read_requests: Some(tenant_conf.trace_read_requests),
    3929              42 :                 eviction_policy: Some(tenant_conf.eviction_policy),
    3930              42 :                 min_resident_size_override: tenant_conf.min_resident_size_override,
    3931              42 :                 evictions_low_residence_duration_metric_threshold: Some(
    3932              42 :                     tenant_conf.evictions_low_residence_duration_metric_threshold,
    3933              42 :                 ),
    3934              42 :                 gc_feedback: Some(tenant_conf.gc_feedback),
    3935              42 :                 heatmap_period: Some(tenant_conf.heatmap_period),
    3936              42 :             }
    3937              42 :         }
    3938                 :     }
    3939                 : 
    3940                 :     enum LoadMode {
    3941                 :         Local,
    3942                 :         Remote,
    3943                 :     }
    3944                 : 
    3945                 :     pub struct TenantHarness {
    3946                 :         pub conf: &'static PageServerConf,
    3947                 :         pub tenant_conf: TenantConf,
    3948                 :         // TODO(sharding): remove duplicative `tenant_id` in favor of access to tenant_shard_id
    3949                 :         pub(crate) tenant_id: TenantId,
    3950                 :         pub tenant_shard_id: TenantShardId,
    3951                 :         pub generation: Generation,
    3952                 :         pub shard: ShardIndex,
    3953                 :         pub remote_storage: GenericRemoteStorage,
    3954                 :         pub remote_fs_dir: Utf8PathBuf,
    3955                 :         pub deletion_queue: MockDeletionQueue,
    3956                 :     }
    3957                 : 
    3958                 :     static LOG_HANDLE: OnceCell<()> = OnceCell::new();
    3959                 : 
    3960              44 :     pub(crate) fn setup_logging() {
    3961              44 :         LOG_HANDLE.get_or_init(|| {
    3962              44 :             logging::init(
    3963              44 :                 logging::LogFormat::Test,
    3964              44 :                 // enable it in case the tests exercise code paths that use
    3965              44 :                 // debug_assert_current_span_has_tenant_and_timeline_id
    3966              44 :                 logging::TracingErrorLayerEnablement::EnableWithRustLogFilter,
    3967              44 :                 logging::Output::Stdout,
    3968              44 :             )
    3969              44 :             .expect("Failed to init test logging")
    3970              44 :         });
    3971              44 :     }
    3972                 : 
    3973                 :     impl TenantHarness {
    3974              41 :         pub fn create(test_name: &'static str) -> anyhow::Result<Self> {
    3975              41 :             setup_logging();
    3976              41 : 
    3977              41 :             let repo_dir = PageServerConf::test_repo_dir(test_name);
    3978              41 :             let _ = fs::remove_dir_all(&repo_dir);
    3979              41 :             fs::create_dir_all(&repo_dir)?;
    3980                 : 
    3981              41 :             let conf = PageServerConf::dummy_conf(repo_dir);
    3982              41 :             // Make a static copy of the config. This can never be free'd, but that's
    3983              41 :             // OK in a test.
    3984              41 :             let conf: &'static PageServerConf = Box::leak(Box::new(conf));
    3985              41 : 
    3986              41 :             // Disable automatic GC and compaction to make the unit tests more deterministic.
    3987              41 :             // The tests perform them manually if needed.
    3988              41 :             let tenant_conf = TenantConf {
    3989              41 :                 gc_period: Duration::ZERO,
    3990              41 :                 compaction_period: Duration::ZERO,
    3991              41 :                 ..TenantConf::default()
    3992              41 :             };
    3993              41 : 
    3994              41 :             let tenant_id = TenantId::generate();
    3995              41 :             let tenant_shard_id = TenantShardId::unsharded(tenant_id);
    3996              41 :             fs::create_dir_all(conf.tenant_path(&tenant_shard_id))?;
    3997              41 :             fs::create_dir_all(conf.timelines_path(&tenant_shard_id))?;
    3998                 : 
    3999                 :             use remote_storage::{RemoteStorageConfig, RemoteStorageKind};
    4000              41 :             let remote_fs_dir = conf.workdir.join("localfs");
    4001              41 :             std::fs::create_dir_all(&remote_fs_dir).unwrap();
    4002              41 :             let config = RemoteStorageConfig {
    4003              41 :                 storage: RemoteStorageKind::LocalFs(remote_fs_dir.clone()),
    4004              41 :             };
    4005              41 :             let remote_storage = GenericRemoteStorage::from_config(&config).unwrap();
    4006              41 :             let deletion_queue = MockDeletionQueue::new(Some(remote_storage.clone()));
    4007              41 : 
    4008              41 :             Ok(Self {
    4009              41 :                 conf,
    4010              41 :                 tenant_conf,
    4011              41 :                 tenant_id,
    4012              41 :                 tenant_shard_id,
    4013              41 :                 generation: Generation::new(0xdeadbeef),
    4014              41 :                 shard: ShardIndex::unsharded(),
    4015              41 :                 remote_storage,
    4016              41 :                 remote_fs_dir,
    4017              41 :                 deletion_queue,
    4018              41 :             })
    4019              41 :         }
    4020                 : 
    4021              40 :         pub async fn load(&self) -> (Arc<Tenant>, RequestContext) {
    4022              40 :             let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error);
    4023              40 :             (
    4024              40 :                 self.try_load(&ctx)
    4025              50 :                     .await
    4026              40 :                     .expect("failed to load test tenant"),
    4027              40 :                 ctx,
    4028              40 :             )
    4029              40 :         }
    4030                 : 
    4031              41 :         fn remote_empty(&self) -> bool {
    4032              41 :             let tenant_path = self.conf.tenant_path(&self.tenant_shard_id);
    4033              41 :             let remote_tenant_dir = self
    4034              41 :                 .remote_fs_dir
    4035              41 :                 .join(tenant_path.strip_prefix(&self.conf.workdir).unwrap());
    4036              41 :             if std::fs::metadata(&remote_tenant_dir).is_err() {
    4037              39 :                 return true;
    4038               2 :             }
    4039               2 : 
    4040               2 :             match std::fs::read_dir(remote_tenant_dir)
    4041               2 :                 .unwrap()
    4042               2 :                 .flatten()
    4043               2 :                 .next()
    4044                 :             {
    4045               2 :                 Some(entry) => {
    4046               2 :                     tracing::debug!(
    4047 UBC           0 :                         "remote_empty: not empty, found file {}",
    4048               0 :                         entry.file_name().to_string_lossy(),
    4049               0 :                     );
    4050 CBC           2 :                     false
    4051                 :                 }
    4052 UBC           0 :                 None => true,
    4053                 :             }
    4054 CBC          41 :         }
    4055                 : 
    4056              42 :         async fn do_try_load(
    4057              42 :             &self,
    4058              42 :             ctx: &RequestContext,
    4059              42 :             mode: LoadMode,
    4060              42 :         ) -> anyhow::Result<Arc<Tenant>> {
    4061              42 :             let walredo_mgr = Arc::new(WalRedoManager::from(TestRedoManager));
    4062              42 : 
    4063              42 :             let tenant = Arc::new(Tenant::new(
    4064              42 :                 TenantState::Loading,
    4065              42 :                 self.conf,
    4066              42 :                 AttachedTenantConf::try_from(LocationConf::attached_single(
    4067              42 :                     TenantConfOpt::from(self.tenant_conf),
    4068              42 :                     self.generation,
    4069              42 :                 ))
    4070              42 :                 .unwrap(),
    4071              42 :                 // This is a legacy/test code path: sharding isn't supported here.
    4072              42 :                 ShardIdentity::unsharded(),
    4073              42 :                 walredo_mgr,
    4074              42 :                 self.tenant_shard_id,
    4075              42 :                 Some(self.remote_storage.clone()),
    4076              42 :                 self.deletion_queue.new_client(),
    4077              42 :             ));
    4078              42 : 
    4079              42 :             match mode {
    4080                 :                 LoadMode::Local => {
    4081              40 :                     tenant
    4082              40 :                         .load_local(ctx)
    4083              40 :                         .instrument(info_span!("try_load", tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug()))
    4084              40 :                         .await?;
    4085                 :                 }
    4086                 :                 LoadMode::Remote => {
    4087               2 :                     let preload = tenant
    4088               2 :                         .preload(&self.remote_storage, CancellationToken::new())
    4089               2 :                         .instrument(info_span!("try_load_preload", tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug()))
    4090               7 :                         .await?;
    4091               2 :                     tenant
    4092               2 :                         .attach(Some(preload), ctx)
    4093               2 :                         .instrument(info_span!("try_load", tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug()))
    4094               5 :                         .await?;
    4095                 :                 }
    4096                 :             }
    4097                 : 
    4098              41 :             tenant.state.send_replace(TenantState::Active);
    4099              41 :             for timeline in tenant.timelines.lock().unwrap().values() {
    4100               3 :                 timeline.set_state(TimelineState::Active);
    4101               3 :             }
    4102              41 :             Ok(tenant)
    4103              42 :         }
    4104                 : 
    4105                 :         /// For tests that specifically want to exercise the local load path, which does
    4106                 :         /// not use remote storage.
    4107               1 :         pub async fn try_load_local(&self, ctx: &RequestContext) -> anyhow::Result<Arc<Tenant>> {
    4108               1 :             self.do_try_load(ctx, LoadMode::Local).await
    4109               1 :         }
    4110                 : 
    4111                 :         /// The 'load' in this function is either a local load or a normal attachment,
    4112              41 :         pub async fn try_load(&self, ctx: &RequestContext) -> anyhow::Result<Arc<Tenant>> {
    4113                 :             // If we have nothing in remote storage, must use load_local instead of attach: attach
    4114                 :             // will error out if there are no timelines.
    4115                 :             //
    4116                 :             // See https://github.com/neondatabase/neon/issues/5456 for how we will eliminate
    4117                 :             // this weird state of a Tenant which exists but doesn't have any timelines.
    4118              41 :             let mode = match self.remote_empty() {
    4119              39 :                 true => LoadMode::Local,
    4120               2 :                 false => LoadMode::Remote,
    4121                 :             };
    4122                 : 
    4123              51 :             self.do_try_load(ctx, mode).await
    4124              41 :         }
    4125                 : 
    4126               3 :         pub fn timeline_path(&self, timeline_id: &TimelineId) -> Utf8PathBuf {
    4127               3 :             self.conf.timeline_path(&self.tenant_shard_id, timeline_id)
    4128               3 :         }
    4129                 :     }
    4130                 : 
    4131                 :     // Mock WAL redo manager that doesn't do much
    4132                 :     pub(crate) struct TestRedoManager;
    4133                 : 
    4134                 :     impl TestRedoManager {
    4135                 :         /// # Cancel-Safety
    4136                 :         ///
    4137                 :         /// This method is cancellation-safe.
    4138 UBC           0 :         pub async fn request_redo(
    4139               0 :             &self,
    4140               0 :             key: Key,
    4141               0 :             lsn: Lsn,
    4142               0 :             base_img: Option<(Lsn, Bytes)>,
    4143               0 :             records: Vec<(Lsn, NeonWalRecord)>,
    4144               0 :             _pg_version: u32,
    4145               0 :         ) -> anyhow::Result<Bytes> {
    4146               0 :             let s = format!(
    4147               0 :                 "redo for {} to get to {}, with {} and {} records",
    4148               0 :                 key,
    4149               0 :                 lsn,
    4150               0 :                 if base_img.is_some() {
    4151               0 :                     "base image"
    4152                 :                 } else {
    4153               0 :                     "no base image"
    4154                 :                 },
    4155               0 :                 records.len()
    4156               0 :             );
    4157               0 :             println!("{s}");
    4158               0 : 
    4159               0 :             Ok(TEST_IMG(&s))
    4160               0 :         }
    4161                 :     }
    4162                 : }
    4163                 : 
    4164                 : #[cfg(test)]
    4165                 : mod tests {
    4166                 :     use super::*;
    4167                 :     use crate::keyspace::KeySpaceAccum;
    4168                 :     use crate::repository::{Key, Value};
    4169                 :     use crate::tenant::harness::*;
    4170                 :     use crate::DEFAULT_PG_VERSION;
    4171                 :     use crate::METADATA_FILE_NAME;
    4172                 :     use bytes::BytesMut;
    4173                 :     use hex_literal::hex;
    4174                 :     use once_cell::sync::Lazy;
    4175                 :     use rand::{thread_rng, Rng};
    4176                 :     use tokio_util::sync::CancellationToken;
    4177                 : 
    4178                 :     static TEST_KEY: Lazy<Key> =
    4179 CBC           9 :         Lazy::new(|| Key::from_slice(&hex!("010000000033333333444444445500000001")));
    4180                 : 
    4181               1 :     #[tokio::test]
    4182               1 :     async fn test_basic() -> anyhow::Result<()> {
    4183               1 :         let (tenant, ctx) = TenantHarness::create("test_basic")?.load().await;
    4184               1 :         let tline = tenant
    4185               1 :             .create_test_timeline(TIMELINE_ID, Lsn(0x08), DEFAULT_PG_VERSION, &ctx)
    4186               1 :             .await?;
    4187                 : 
    4188               1 :         let writer = tline.writer().await;
    4189               1 :         writer
    4190               1 :             .put(
    4191               1 :                 *TEST_KEY,
    4192               1 :                 Lsn(0x10),
    4193               1 :                 &Value::Image(TEST_IMG("foo at 0x10")),
    4194               1 :                 &ctx,
    4195               1 :             )
    4196 UBC           0 :             .await?;
    4197 CBC           1 :         writer.finish_write(Lsn(0x10));
    4198               1 :         drop(writer);
    4199                 : 
    4200               1 :         let writer = tline.writer().await;
    4201               1 :         writer
    4202               1 :             .put(
    4203               1 :                 *TEST_KEY,
    4204               1 :                 Lsn(0x20),
    4205               1 :                 &Value::Image(TEST_IMG("foo at 0x20")),
    4206               1 :                 &ctx,
    4207               1 :             )
    4208 UBC           0 :             .await?;
    4209 CBC           1 :         writer.finish_write(Lsn(0x20));
    4210               1 :         drop(writer);
    4211                 : 
    4212               1 :         assert_eq!(
    4213               1 :             tline.get(*TEST_KEY, Lsn(0x10), &ctx).await?,
    4214               1 :             TEST_IMG("foo at 0x10")
    4215                 :         );
    4216               1 :         assert_eq!(
    4217               1 :             tline.get(*TEST_KEY, Lsn(0x1f), &ctx).await?,
    4218               1 :             TEST_IMG("foo at 0x10")
    4219                 :         );
    4220               1 :         assert_eq!(
    4221               1 :             tline.get(*TEST_KEY, Lsn(0x20), &ctx).await?,
    4222               1 :             TEST_IMG("foo at 0x20")
    4223                 :         );
    4224                 : 
    4225               1 :         Ok(())
    4226                 :     }
    4227                 : 
    4228               1 :     #[tokio::test]
    4229               1 :     async fn no_duplicate_timelines() -> anyhow::Result<()> {
    4230               1 :         let (tenant, ctx) = TenantHarness::create("no_duplicate_timelines")?
    4231               1 :             .load()
    4232               1 :             .await;
    4233               1 :         let _ = tenant
    4234               1 :             .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
    4235               1 :             .await?;
    4236                 : 
    4237               1 :         match tenant
    4238               1 :             .create_empty_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
    4239 UBC           0 :             .await
    4240                 :         {
    4241               0 :             Ok(_) => panic!("duplicate timeline creation should fail"),
    4242 CBC           1 :             Err(e) => assert_eq!(e.to_string(), "Already exists".to_string()),
    4243                 :         }
    4244                 : 
    4245               1 :         Ok(())
    4246                 :     }
    4247                 : 
    4248                 :     /// Convenience function to create a page image with given string as the only content
    4249               5 :     pub fn test_value(s: &str) -> Value {
    4250               5 :         let mut buf = BytesMut::new();
    4251               5 :         buf.extend_from_slice(s.as_bytes());
    4252               5 :         Value::Image(buf.freeze())
    4253               5 :     }
    4254                 : 
    4255                 :     ///
    4256                 :     /// Test branch creation
    4257                 :     ///
    4258               1 :     #[tokio::test]
    4259               1 :     async fn test_branch() -> anyhow::Result<()> {
    4260                 :         use std::str::from_utf8;
    4261                 : 
    4262               1 :         let (tenant, ctx) = TenantHarness::create("test_branch")?.load().await;
    4263               1 :         let tline = tenant
    4264               1 :             .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
    4265               2 :             .await?;
    4266               1 :         let writer = tline.writer().await;
    4267                 : 
    4268                 :         #[allow(non_snake_case)]
    4269               1 :         let TEST_KEY_A: Key = Key::from_hex("110000000033333333444444445500000001").unwrap();
    4270               1 :         #[allow(non_snake_case)]
    4271               1 :         let TEST_KEY_B: Key = Key::from_hex("110000000033333333444444445500000002").unwrap();
    4272               1 : 
    4273               1 :         // Insert a value on the timeline
    4274               1 :         writer
    4275               1 :             .put(TEST_KEY_A, Lsn(0x20), &test_value("foo at 0x20"), &ctx)
    4276 UBC           0 :             .await?;
    4277 CBC           1 :         writer
    4278               1 :             .put(TEST_KEY_B, Lsn(0x20), &test_value("foobar at 0x20"), &ctx)
    4279 UBC           0 :             .await?;
    4280 CBC           1 :         writer.finish_write(Lsn(0x20));
    4281               1 : 
    4282               1 :         writer
    4283               1 :             .put(TEST_KEY_A, Lsn(0x30), &test_value("foo at 0x30"), &ctx)
    4284 UBC           0 :             .await?;
    4285 CBC           1 :         writer.finish_write(Lsn(0x30));
    4286               1 :         writer
    4287               1 :             .put(TEST_KEY_A, Lsn(0x40), &test_value("foo at 0x40"), &ctx)
    4288 UBC           0 :             .await?;
    4289 CBC           1 :         writer.finish_write(Lsn(0x40));
    4290               1 : 
    4291               1 :         //assert_current_logical_size(&tline, Lsn(0x40));
    4292               1 : 
    4293               1 :         // Branch the history, modify relation differently on the new timeline
    4294               1 :         tenant
    4295               1 :             .branch_timeline_test(&tline, NEW_TIMELINE_ID, Some(Lsn(0x30)), &ctx)
    4296 UBC           0 :             .await?;
    4297 CBC           1 :         let newtline = tenant
    4298               1 :             .get_timeline(NEW_TIMELINE_ID, true)
    4299               1 :             .expect("Should have a local timeline");
    4300               1 :         let new_writer = newtline.writer().await;
    4301               1 :         new_writer
    4302               1 :             .put(TEST_KEY_A, Lsn(0x40), &test_value("bar at 0x40"), &ctx)
    4303 UBC           0 :             .await?;
    4304 CBC           1 :         new_writer.finish_write(Lsn(0x40));
    4305                 : 
    4306                 :         // Check page contents on both branches
    4307               1 :         assert_eq!(
    4308               1 :             from_utf8(&tline.get(TEST_KEY_A, Lsn(0x40), &ctx).await?)?,
    4309                 :             "foo at 0x40"
    4310                 :         );
    4311               1 :         assert_eq!(
    4312               1 :             from_utf8(&newtline.get(TEST_KEY_A, Lsn(0x40), &ctx).await?)?,
    4313                 :             "bar at 0x40"
    4314                 :         );
    4315               1 :         assert_eq!(
    4316               1 :             from_utf8(&newtline.get(TEST_KEY_B, Lsn(0x40), &ctx).await?)?,
    4317                 :             "foobar at 0x20"
    4318                 :         );
    4319                 : 
    4320                 :         //assert_current_logical_size(&tline, Lsn(0x40));
    4321                 : 
    4322               1 :         Ok(())
    4323                 :     }
    4324                 : 
    4325              10 :     async fn make_some_layers(
    4326              10 :         tline: &Timeline,
    4327              10 :         start_lsn: Lsn,
    4328              10 :         ctx: &RequestContext,
    4329              10 :     ) -> anyhow::Result<()> {
    4330              10 :         let mut lsn = start_lsn;
    4331                 :         #[allow(non_snake_case)]
    4332                 :         {
    4333              10 :             let writer = tline.writer().await;
    4334                 :             // Create a relation on the timeline
    4335              10 :             writer
    4336              10 :                 .put(
    4337              10 :                     *TEST_KEY,
    4338              10 :                     lsn,
    4339              10 :                     &Value::Image(TEST_IMG(&format!("foo at {}", lsn))),
    4340              10 :                     ctx,
    4341              10 :                 )
    4342 UBC           0 :                 .await?;
    4343 CBC          10 :             writer.finish_write(lsn);
    4344              10 :             lsn += 0x10;
    4345              10 :             writer
    4346              10 :                 .put(
    4347              10 :                     *TEST_KEY,
    4348              10 :                     lsn,
    4349              10 :                     &Value::Image(TEST_IMG(&format!("foo at {}", lsn))),
    4350              10 :                     ctx,
    4351              10 :                 )
    4352 UBC           0 :                 .await?;
    4353 CBC          10 :             writer.finish_write(lsn);
    4354              10 :             lsn += 0x10;
    4355              10 :         }
    4356              10 :         tline.freeze_and_flush().await?;
    4357                 :         {
    4358              10 :             let writer = tline.writer().await;
    4359              10 :             writer
    4360              10 :                 .put(
    4361              10 :                     *TEST_KEY,
    4362              10 :                     lsn,
    4363              10 :                     &Value::Image(TEST_IMG(&format!("foo at {}", lsn))),
    4364              10 :                     ctx,
    4365              10 :                 )
    4366 UBC           0 :                 .await?;
    4367 CBC          10 :             writer.finish_write(lsn);
    4368              10 :             lsn += 0x10;
    4369              10 :             writer
    4370              10 :                 .put(
    4371              10 :                     *TEST_KEY,
    4372              10 :                     lsn,
    4373              10 :                     &Value::Image(TEST_IMG(&format!("foo at {}", lsn))),
    4374              10 :                     ctx,
    4375              10 :                 )
    4376 UBC           0 :                 .await?;
    4377 CBC          10 :             writer.finish_write(lsn);
    4378              10 :         }
    4379              10 :         tline.freeze_and_flush().await
    4380              10 :     }
    4381                 : 
    4382               1 :     #[tokio::test]
    4383               1 :     async fn test_prohibit_branch_creation_on_garbage_collected_data() -> anyhow::Result<()> {
    4384               1 :         let (tenant, ctx) =
    4385               1 :             TenantHarness::create("test_prohibit_branch_creation_on_garbage_collected_data")?
    4386               1 :                 .load()
    4387               1 :                 .await;
    4388               1 :         let tline = tenant
    4389               1 :             .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
    4390               1 :             .await?;
    4391               2 :         make_some_layers(tline.as_ref(), Lsn(0x20), &ctx).await?;
    4392                 : 
    4393                 :         // this removes layers before lsn 40 (50 minus 10), so there are two remaining layers, image and delta for 31-50
    4394                 :         // FIXME: this doesn't actually remove any layer currently, given how the flushing
    4395                 :         // and compaction works. But it does set the 'cutoff' point so that the cross check
    4396                 :         // below should fail.
    4397               1 :         tenant
    4398               1 :             .gc_iteration(
    4399               1 :                 Some(TIMELINE_ID),
    4400               1 :                 0x10,
    4401               1 :                 Duration::ZERO,
    4402               1 :                 &CancellationToken::new(),
    4403               1 :                 &ctx,
    4404               1 :             )
    4405 UBC           0 :             .await?;
    4406                 : 
    4407                 :         // try to branch at lsn 25, should fail because we already garbage collected the data
    4408 CBC           1 :         match tenant
    4409               1 :             .branch_timeline_test(&tline, NEW_TIMELINE_ID, Some(Lsn(0x25)), &ctx)
    4410 UBC           0 :             .await
    4411                 :         {
    4412               0 :             Ok(_) => panic!("branching should have failed"),
    4413 CBC           1 :             Err(err) => {
    4414               1 :                 let CreateTimelineError::AncestorLsn(err) = err else {
    4415 UBC           0 :                     panic!("wrong error type")
    4416                 :                 };
    4417 CBC           1 :                 assert!(err.to_string().contains("invalid branch start lsn"));
    4418               1 :                 assert!(err
    4419               1 :                     .source()
    4420               1 :                     .unwrap()
    4421               1 :                     .to_string()
    4422               1 :                     .contains("we might've already garbage collected needed data"))
    4423                 :             }
    4424                 :         }
    4425                 : 
    4426               1 :         Ok(())
    4427                 :     }
    4428                 : 
    4429               1 :     #[tokio::test]
    4430               1 :     async fn test_prohibit_branch_creation_on_pre_initdb_lsn() -> anyhow::Result<()> {
    4431               1 :         let (tenant, ctx) =
    4432               1 :             TenantHarness::create("test_prohibit_branch_creation_on_pre_initdb_lsn")?
    4433               1 :                 .load()
    4434               1 :                 .await;
    4435                 : 
    4436               1 :         let tline = tenant
    4437               1 :             .create_test_timeline(TIMELINE_ID, Lsn(0x50), DEFAULT_PG_VERSION, &ctx)
    4438               1 :             .await?;
    4439                 :         // try to branch at lsn 0x25, should fail because initdb lsn is 0x50
    4440               1 :         match tenant
    4441               1 :             .branch_timeline_test(&tline, NEW_TIMELINE_ID, Some(Lsn(0x25)), &ctx)
    4442 UBC           0 :             .await
    4443                 :         {
    4444               0 :             Ok(_) => panic!("branching should have failed"),
    4445 CBC           1 :             Err(err) => {
    4446               1 :                 let CreateTimelineError::AncestorLsn(err) = err else {
    4447 UBC           0 :                     panic!("wrong error type");
    4448                 :                 };
    4449 CBC           1 :                 assert!(&err.to_string().contains("invalid branch start lsn"));
    4450               1 :                 assert!(&err
    4451               1 :                     .source()
    4452               1 :                     .unwrap()
    4453               1 :                     .to_string()
    4454               1 :                     .contains("is earlier than latest GC horizon"));
    4455                 :             }
    4456                 :         }
    4457                 : 
    4458               1 :         Ok(())
    4459                 :     }
    4460                 : 
    4461                 :     /*
    4462                 :     // FIXME: This currently fails to error out. Calling GC doesn't currently
    4463                 :     // remove the old value, we'd need to work a little harder
    4464                 :     #[tokio::test]
    4465                 :     async fn test_prohibit_get_for_garbage_collected_data() -> anyhow::Result<()> {
    4466                 :         let repo =
    4467                 :             RepoHarness::create("test_prohibit_get_for_garbage_collected_data")?
    4468                 :             .load();
    4469                 : 
    4470                 :         let tline = repo.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION)?;
    4471                 :         make_some_layers(tline.as_ref(), Lsn(0x20), &ctx).await?;
    4472                 : 
    4473                 :         repo.gc_iteration(Some(TIMELINE_ID), 0x10, Duration::ZERO)?;
    4474                 :         let latest_gc_cutoff_lsn = tline.get_latest_gc_cutoff_lsn();
    4475                 :         assert!(*latest_gc_cutoff_lsn > Lsn(0x25));
    4476                 :         match tline.get(*TEST_KEY, Lsn(0x25)) {
    4477                 :             Ok(_) => panic!("request for page should have failed"),
    4478                 :             Err(err) => assert!(err.to_string().contains("not found at")),
    4479                 :         }
    4480                 :         Ok(())
    4481                 :     }
    4482                 :      */
    4483                 : 
    4484               1 :     #[tokio::test]
    4485               1 :     async fn test_get_branchpoints_from_an_inactive_timeline() -> anyhow::Result<()> {
    4486               1 :         let (tenant, ctx) =
    4487               1 :             TenantHarness::create("test_get_branchpoints_from_an_inactive_timeline")?
    4488               1 :                 .load()
    4489               1 :                 .await;
    4490               1 :         let tline = tenant
    4491               1 :             .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
    4492               2 :             .await?;
    4493               2 :         make_some_layers(tline.as_ref(), Lsn(0x20), &ctx).await?;
    4494                 : 
    4495               1 :         tenant
    4496               1 :             .branch_timeline_test(&tline, NEW_TIMELINE_ID, Some(Lsn(0x40)), &ctx)
    4497 UBC           0 :             .await?;
    4498 CBC           1 :         let newtline = tenant
    4499               1 :             .get_timeline(NEW_TIMELINE_ID, true)
    4500               1 :             .expect("Should have a local timeline");
    4501               1 : 
    4502               2 :         make_some_layers(newtline.as_ref(), Lsn(0x60), &ctx).await?;
    4503                 : 
    4504               1 :         tline.set_broken("test".to_owned());
    4505               1 : 
    4506               1 :         tenant
    4507               1 :             .gc_iteration(
    4508               1 :                 Some(TIMELINE_ID),
    4509               1 :                 0x10,
    4510               1 :                 Duration::ZERO,
    4511               1 :                 &CancellationToken::new(),
    4512               1 :                 &ctx,
    4513               1 :             )
    4514 UBC           0 :             .await?;
    4515                 : 
    4516                 :         // The branchpoints should contain all timelines, even ones marked
    4517                 :         // as Broken.
    4518                 :         {
    4519 CBC           1 :             let branchpoints = &tline.gc_info.read().unwrap().retain_lsns;
    4520               1 :             assert_eq!(branchpoints.len(), 1);
    4521               1 :             assert_eq!(branchpoints[0], Lsn(0x40));
    4522                 :         }
    4523                 : 
    4524                 :         // You can read the key from the child branch even though the parent is
    4525                 :         // Broken, as long as you don't need to access data from the parent.
    4526               1 :         assert_eq!(
    4527               1 :             newtline.get(*TEST_KEY, Lsn(0x70), &ctx).await?,
    4528               1 :             TEST_IMG(&format!("foo at {}", Lsn(0x70)))
    4529                 :         );
    4530                 : 
    4531                 :         // This needs to traverse to the parent, and fails.
    4532               1 :         let err = newtline.get(*TEST_KEY, Lsn(0x50), &ctx).await.unwrap_err();
    4533               1 :         assert!(err
    4534               1 :             .to_string()
    4535               1 :             .contains("will not become active. Current state: Broken"));
    4536                 : 
    4537               1 :         Ok(())
    4538                 :     }
    4539                 : 
    4540               1 :     #[tokio::test]
    4541               1 :     async fn test_retain_data_in_parent_which_is_needed_for_child() -> anyhow::Result<()> {
    4542               1 :         let (tenant, ctx) =
    4543               1 :             TenantHarness::create("test_retain_data_in_parent_which_is_needed_for_child")?
    4544               1 :                 .load()
    4545               1 :                 .await;
    4546               1 :         let tline = tenant
    4547               1 :             .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
    4548               2 :             .await?;
    4549               2 :         make_some_layers(tline.as_ref(), Lsn(0x20), &ctx).await?;
    4550                 : 
    4551               1 :         tenant
    4552               1 :             .branch_timeline_test(&tline, NEW_TIMELINE_ID, Some(Lsn(0x40)), &ctx)
    4553 UBC           0 :             .await?;
    4554 CBC           1 :         let newtline = tenant
    4555               1 :             .get_timeline(NEW_TIMELINE_ID, true)
    4556               1 :             .expect("Should have a local timeline");
    4557               1 :         // this removes layers before lsn 40 (50 minus 10), so there are two remaining layers, image and delta for 31-50
    4558               1 :         tenant
    4559               1 :             .gc_iteration(
    4560               1 :                 Some(TIMELINE_ID),
    4561               1 :                 0x10,
    4562               1 :                 Duration::ZERO,
    4563               1 :                 &CancellationToken::new(),
    4564               1 :                 &ctx,
    4565               1 :             )
    4566 UBC           0 :             .await?;
    4567 CBC           1 :         assert!(newtline.get(*TEST_KEY, Lsn(0x25), &ctx).await.is_ok());
    4568                 : 
    4569               1 :         Ok(())
    4570                 :     }
    4571               1 :     #[tokio::test]
    4572               1 :     async fn test_parent_keeps_data_forever_after_branching() -> anyhow::Result<()> {
    4573               1 :         let (tenant, ctx) =
    4574               1 :             TenantHarness::create("test_parent_keeps_data_forever_after_branching")?
    4575               1 :                 .load()
    4576               1 :                 .await;
    4577               1 :         let tline = tenant
    4578               1 :             .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
    4579               1 :             .await?;
    4580               2 :         make_some_layers(tline.as_ref(), Lsn(0x20), &ctx).await?;
    4581                 : 
    4582               1 :         tenant
    4583               1 :             .branch_timeline_test(&tline, NEW_TIMELINE_ID, Some(Lsn(0x40)), &ctx)
    4584 UBC           0 :             .await?;
    4585 CBC           1 :         let newtline = tenant
    4586               1 :             .get_timeline(NEW_TIMELINE_ID, true)
    4587               1 :             .expect("Should have a local timeline");
    4588               1 : 
    4589               2 :         make_some_layers(newtline.as_ref(), Lsn(0x60), &ctx).await?;
    4590                 : 
    4591                 :         // run gc on parent
    4592               1 :         tenant
    4593               1 :             .gc_iteration(
    4594               1 :                 Some(TIMELINE_ID),
    4595               1 :                 0x10,
    4596               1 :                 Duration::ZERO,
    4597               1 :                 &CancellationToken::new(),
    4598               1 :                 &ctx,
    4599               1 :             )
    4600 UBC           0 :             .await?;
    4601                 : 
    4602                 :         // Check that the data is still accessible on the branch.
    4603 CBC           1 :         assert_eq!(
    4604               1 :             newtline.get(*TEST_KEY, Lsn(0x50), &ctx).await?,
    4605               1 :             TEST_IMG(&format!("foo at {}", Lsn(0x40)))
    4606                 :         );
    4607                 : 
    4608               1 :         Ok(())
    4609                 :     }
    4610                 : 
    4611               1 :     #[tokio::test]
    4612               1 :     async fn timeline_load() -> anyhow::Result<()> {
    4613                 :         const TEST_NAME: &str = "timeline_load";
    4614               1 :         let harness = TenantHarness::create(TEST_NAME)?;
    4615                 :         {
    4616               1 :             let (tenant, ctx) = harness.load().await;
    4617               1 :             let tline = tenant
    4618               1 :                 .create_test_timeline(TIMELINE_ID, Lsn(0x7000), DEFAULT_PG_VERSION, &ctx)
    4619               1 :                 .await?;
    4620               2 :             make_some_layers(tline.as_ref(), Lsn(0x8000), &ctx).await?;
    4621                 :             // so that all uploads finish & we can call harness.load() below again
    4622               1 :             tenant
    4623               1 :                 .shutdown(Default::default(), true)
    4624               1 :                 .instrument(info_span!("test_shutdown", tenant_id=%tenant.tenant_shard_id))
    4625               1 :                 .await
    4626               1 :                 .ok()
    4627               1 :                 .unwrap();
    4628                 :         }
    4629                 : 
    4630               3 :         let (tenant, _ctx) = harness.load().await;
    4631               1 :         tenant
    4632               1 :             .get_timeline(TIMELINE_ID, true)
    4633               1 :             .expect("cannot load timeline");
    4634               1 : 
    4635               1 :         Ok(())
    4636                 :     }
    4637                 : 
    4638               1 :     #[tokio::test]
    4639               1 :     async fn timeline_load_with_ancestor() -> anyhow::Result<()> {
    4640                 :         const TEST_NAME: &str = "timeline_load_with_ancestor";
    4641               1 :         let harness = TenantHarness::create(TEST_NAME)?;
    4642                 :         // create two timelines
    4643                 :         {
    4644               1 :             let (tenant, ctx) = harness.load().await;
    4645               1 :             let tline = tenant
    4646               1 :                 .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
    4647               2 :                 .await?;
    4648                 : 
    4649               2 :             make_some_layers(tline.as_ref(), Lsn(0x20), &ctx).await?;
    4650                 : 
    4651               1 :             let child_tline = tenant
    4652               1 :                 .branch_timeline_test(&tline, NEW_TIMELINE_ID, Some(Lsn(0x40)), &ctx)
    4653 UBC           0 :                 .await?;
    4654 CBC           1 :             child_tline.set_state(TimelineState::Active);
    4655               1 : 
    4656               1 :             let newtline = tenant
    4657               1 :                 .get_timeline(NEW_TIMELINE_ID, true)
    4658               1 :                 .expect("Should have a local timeline");
    4659               1 : 
    4660               2 :             make_some_layers(newtline.as_ref(), Lsn(0x60), &ctx).await?;
    4661                 : 
    4662                 :             // so that all uploads finish & we can call harness.load() below again
    4663               1 :             tenant
    4664               1 :                 .shutdown(Default::default(), true)
    4665               1 :                 .instrument(info_span!("test_shutdown", tenant_id=%tenant.tenant_shard_id))
    4666               2 :                 .await
    4667               1 :                 .ok()
    4668               1 :                 .unwrap();
    4669                 :         }
    4670                 : 
    4671                 :         // check that both of them are initially unloaded
    4672               9 :         let (tenant, _ctx) = harness.load().await;
    4673                 : 
    4674                 :         // check that both, child and ancestor are loaded
    4675               1 :         let _child_tline = tenant
    4676               1 :             .get_timeline(NEW_TIMELINE_ID, true)
    4677               1 :             .expect("cannot get child timeline loaded");
    4678               1 : 
    4679               1 :         let _ancestor_tline = tenant
    4680               1 :             .get_timeline(TIMELINE_ID, true)
    4681               1 :             .expect("cannot get ancestor timeline loaded");
    4682               1 : 
    4683               1 :         Ok(())
    4684                 :     }
    4685                 : 
    4686               1 :     #[tokio::test]
    4687               1 :     async fn delta_layer_dumping() -> anyhow::Result<()> {
    4688                 :         use storage_layer::AsLayerDesc;
    4689               1 :         let (tenant, ctx) = TenantHarness::create("test_layer_dumping")?.load().await;
    4690               1 :         let tline = tenant
    4691               1 :             .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
    4692               1 :             .await?;
    4693               2 :         make_some_layers(tline.as_ref(), Lsn(0x20), &ctx).await?;
    4694                 : 
    4695               1 :         let layer_map = tline.layers.read().await;
    4696               1 :         let level0_deltas = layer_map
    4697               1 :             .layer_map()
    4698               1 :             .get_level0_deltas()?
    4699               1 :             .into_iter()
    4700               2 :             .map(|desc| layer_map.get_from_desc(&desc))
    4701               1 :             .collect::<Vec<_>>();
    4702                 : 
    4703               1 :         assert!(!level0_deltas.is_empty());
    4704                 : 
    4705               3 :         for delta in level0_deltas {
    4706                 :             // Ensure we are dumping a delta layer here
    4707               2 :             assert!(delta.layer_desc().is_delta);
    4708               2 :             delta.dump(true, &ctx).await.unwrap();
    4709                 :         }
    4710                 : 
    4711               1 :         Ok(())
    4712                 :     }
    4713                 : 
    4714               1 :     #[tokio::test]
    4715               1 :     async fn corrupt_local_metadata() -> anyhow::Result<()> {
    4716                 :         const TEST_NAME: &str = "corrupt_metadata";
    4717               1 :         let harness = TenantHarness::create(TEST_NAME)?;
    4718               1 :         let (tenant, ctx) = harness.load().await;
    4719                 : 
    4720               1 :         let tline = tenant
    4721               1 :             .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
    4722               2 :             .await?;
    4723               1 :         drop(tline);
    4724               1 :         // so that all uploads finish & we can call harness.try_load() below again
    4725               1 :         tenant
    4726               1 :             .shutdown(Default::default(), true)
    4727               1 :             .instrument(info_span!("test_shutdown", tenant_id=%tenant.tenant_shard_id))
    4728               1 :             .await
    4729               1 :             .ok()
    4730               1 :             .unwrap();
    4731               1 :         drop(tenant);
    4732               1 : 
    4733               1 :         // Corrupt local metadata
    4734               1 :         let metadata_path = harness.timeline_path(&TIMELINE_ID).join(METADATA_FILE_NAME);
    4735               1 :         assert!(metadata_path.is_file());
    4736               1 :         let mut metadata_bytes = std::fs::read(&metadata_path)?;
    4737               1 :         assert_eq!(metadata_bytes.len(), 512);
    4738               1 :         metadata_bytes[8] ^= 1;
    4739               1 :         std::fs::write(metadata_path, metadata_bytes)?;
    4740                 : 
    4741               1 :         let err = harness.try_load_local(&ctx).await.expect_err("should fail");
    4742               1 :         // get all the stack with all .context, not only the last one
    4743               1 :         let message = format!("{err:#}");
    4744               1 :         let expected = "failed to load metadata";
    4745               1 :         assert!(
    4746               1 :             message.contains(expected),
    4747 UBC           0 :             "message '{message}' expected to contain {expected}"
    4748                 :         );
    4749                 : 
    4750 CBC           1 :         let mut found_error_message = false;
    4751               1 :         let mut err_source = err.source();
    4752               1 :         while let Some(source) = err_source {
    4753               1 :             if source.to_string().contains("metadata checksum mismatch") {
    4754               1 :                 found_error_message = true;
    4755               1 :                 break;
    4756 UBC           0 :             }
    4757               0 :             err_source = source.source();
    4758                 :         }
    4759 CBC           1 :         assert!(
    4760               1 :             found_error_message,
    4761 UBC           0 :             "didn't find the corrupted metadata error in {}",
    4762                 :             message
    4763                 :         );
    4764                 : 
    4765 CBC           1 :         Ok(())
    4766                 :     }
    4767                 : 
    4768               1 :     #[tokio::test]
    4769               1 :     async fn test_images() -> anyhow::Result<()> {
    4770               1 :         let (tenant, ctx) = TenantHarness::create("test_images")?.load().await;
    4771               1 :         let tline = tenant
    4772               1 :             .create_test_timeline(TIMELINE_ID, Lsn(0x08), DEFAULT_PG_VERSION, &ctx)
    4773               1 :             .await?;
    4774                 : 
    4775               1 :         let writer = tline.writer().await;
    4776               1 :         writer
    4777               1 :             .put(
    4778               1 :                 *TEST_KEY,
    4779               1 :                 Lsn(0x10),
    4780               1 :                 &Value::Image(TEST_IMG("foo at 0x10")),
    4781               1 :                 &ctx,
    4782               1 :             )
    4783 UBC           0 :             .await?;
    4784 CBC           1 :         writer.finish_write(Lsn(0x10));
    4785               1 :         drop(writer);
    4786               1 : 
    4787               1 :         tline.freeze_and_flush().await?;
    4788               1 :         tline
    4789               1 :             .compact(&CancellationToken::new(), EnumSet::empty(), &ctx)
    4790               1 :             .await?;
    4791                 : 
    4792               1 :         let writer = tline.writer().await;
    4793               1 :         writer
    4794               1 :             .put(
    4795               1 :                 *TEST_KEY,
    4796               1 :                 Lsn(0x20),
    4797               1 :                 &Value::Image(TEST_IMG("foo at 0x20")),
    4798               1 :                 &ctx,
    4799               1 :             )
    4800 UBC           0 :             .await?;
    4801 CBC           1 :         writer.finish_write(Lsn(0x20));
    4802               1 :         drop(writer);
    4803               1 : 
    4804               1 :         tline.freeze_and_flush().await?;
    4805               1 :         tline
    4806               1 :             .compact(&CancellationToken::new(), EnumSet::empty(), &ctx)
    4807               1 :             .await?;
    4808                 : 
    4809               1 :         let writer = tline.writer().await;
    4810               1 :         writer
    4811               1 :             .put(
    4812               1 :                 *TEST_KEY,
    4813               1 :                 Lsn(0x30),
    4814               1 :                 &Value::Image(TEST_IMG("foo at 0x30")),
    4815               1 :                 &ctx,
    4816               1 :             )
    4817 UBC           0 :             .await?;
    4818 CBC           1 :         writer.finish_write(Lsn(0x30));
    4819               1 :         drop(writer);
    4820               1 : 
    4821               1 :         tline.freeze_and_flush().await?;
    4822               1 :         tline
    4823               1 :             .compact(&CancellationToken::new(), EnumSet::empty(), &ctx)
    4824               1 :             .await?;
    4825                 : 
    4826               1 :         let writer = tline.writer().await;
    4827               1 :         writer
    4828               1 :             .put(
    4829               1 :                 *TEST_KEY,
    4830               1 :                 Lsn(0x40),
    4831               1 :                 &Value::Image(TEST_IMG("foo at 0x40")),
    4832               1 :                 &ctx,
    4833               1 :             )
    4834 UBC           0 :             .await?;
    4835 CBC           1 :         writer.finish_write(Lsn(0x40));
    4836               1 :         drop(writer);
    4837               1 : 
    4838               1 :         tline.freeze_and_flush().await?;
    4839               1 :         tline
    4840               1 :             .compact(&CancellationToken::new(), EnumSet::empty(), &ctx)
    4841               1 :             .await?;
    4842                 : 
    4843               1 :         assert_eq!(
    4844               1 :             tline.get(*TEST_KEY, Lsn(0x10), &ctx).await?,
    4845               1 :             TEST_IMG("foo at 0x10")
    4846                 :         );
    4847               1 :         assert_eq!(
    4848               1 :             tline.get(*TEST_KEY, Lsn(0x1f), &ctx).await?,
    4849               1 :             TEST_IMG("foo at 0x10")
    4850                 :         );
    4851               1 :         assert_eq!(
    4852               1 :             tline.get(*TEST_KEY, Lsn(0x20), &ctx).await?,
    4853               1 :             TEST_IMG("foo at 0x20")
    4854                 :         );
    4855               1 :         assert_eq!(
    4856               1 :             tline.get(*TEST_KEY, Lsn(0x30), &ctx).await?,
    4857               1 :             TEST_IMG("foo at 0x30")
    4858                 :         );
    4859               1 :         assert_eq!(
    4860               1 :             tline.get(*TEST_KEY, Lsn(0x40), &ctx).await?,
    4861               1 :             TEST_IMG("foo at 0x40")
    4862                 :         );
    4863                 : 
    4864               1 :         Ok(())
    4865                 :     }
    4866                 : 
    4867                 :     //
    4868                 :     // Insert 1000 key-value pairs with increasing keys, flush, compact, GC.
    4869                 :     // Repeat 50 times.
    4870                 :     //
    4871               1 :     #[tokio::test]
    4872               1 :     async fn test_bulk_insert() -> anyhow::Result<()> {
    4873               1 :         let harness = TenantHarness::create("test_bulk_insert")?;
    4874               1 :         let (tenant, ctx) = harness.load().await;
    4875               1 :         let tline = tenant
    4876               1 :             .create_test_timeline(TIMELINE_ID, Lsn(0x08), DEFAULT_PG_VERSION, &ctx)
    4877               2 :             .await?;
    4878                 : 
    4879               1 :         let mut lsn = Lsn(0x10);
    4880               1 : 
    4881               1 :         let mut keyspace = KeySpaceAccum::new();
    4882               1 : 
    4883               1 :         let mut test_key = Key::from_hex("010000000033333333444444445500000000").unwrap();
    4884               1 :         let mut blknum = 0;
    4885              51 :         for _ in 0..50 {
    4886          500050 :             for _ in 0..10000 {
    4887          500000 :                 test_key.field6 = blknum;
    4888          500000 :                 let writer = tline.writer().await;
    4889          500000 :                 writer
    4890          500000 :                     .put(
    4891          500000 :                         test_key,
    4892          500000 :                         lsn,
    4893          500000 :                         &Value::Image(TEST_IMG(&format!("{} at {}", blknum, lsn))),
    4894          500000 :                         &ctx,
    4895          500000 :                     )
    4896            7802 :                     .await?;
    4897          500000 :                 writer.finish_write(lsn);
    4898          500000 :                 drop(writer);
    4899          500000 : 
    4900          500000 :                 keyspace.add_key(test_key);
    4901          500000 : 
    4902          500000 :                 lsn = Lsn(lsn.0 + 0x10);
    4903          500000 :                 blknum += 1;
    4904                 :             }
    4905                 : 
    4906              50 :             let cutoff = tline.get_last_record_lsn();
    4907              50 : 
    4908              50 :             tline
    4909              50 :                 .update_gc_info(
    4910              50 :                     Vec::new(),
    4911              50 :                     cutoff,
    4912              50 :                     Duration::ZERO,
    4913              50 :                     &CancellationToken::new(),
    4914              50 :                     &ctx,
    4915              50 :                 )
    4916 UBC           0 :                 .await?;
    4917 CBC          50 :             tline.freeze_and_flush().await?;
    4918              50 :             tline
    4919              50 :                 .compact(&CancellationToken::new(), EnumSet::empty(), &ctx)
    4920            7930 :                 .await?;
    4921              50 :             tline.gc().await?;
    4922                 :         }
    4923                 : 
    4924               1 :         Ok(())
    4925                 :     }
    4926                 : 
    4927               1 :     #[tokio::test]
    4928               1 :     async fn test_random_updates() -> anyhow::Result<()> {
    4929               1 :         let harness = TenantHarness::create("test_random_updates")?;
    4930               1 :         let (tenant, ctx) = harness.load().await;
    4931               1 :         let tline = tenant
    4932               1 :             .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
    4933               1 :             .await?;
    4934                 : 
    4935                 :         const NUM_KEYS: usize = 1000;
    4936                 : 
    4937               1 :         let mut test_key = Key::from_hex("010000000033333333444444445500000000").unwrap();
    4938               1 : 
    4939               1 :         let mut keyspace = KeySpaceAccum::new();
    4940               1 : 
    4941               1 :         // Track when each page was last modified. Used to assert that
    4942               1 :         // a read sees the latest page version.
    4943               1 :         let mut updated = [Lsn(0); NUM_KEYS];
    4944               1 : 
    4945               1 :         let mut lsn = Lsn(0x10);
    4946                 :         #[allow(clippy::needless_range_loop)]
    4947            1001 :         for blknum in 0..NUM_KEYS {
    4948            1000 :             lsn = Lsn(lsn.0 + 0x10);
    4949            1000 :             test_key.field6 = blknum as u32;
    4950            1000 :             let writer = tline.writer().await;
    4951            1000 :             writer
    4952            1000 :                 .put(
    4953            1000 :                     test_key,
    4954            1000 :                     lsn,
    4955            1000 :                     &Value::Image(TEST_IMG(&format!("{} at {}", blknum, lsn))),
    4956            1000 :                     &ctx,
    4957            1000 :                 )
    4958              17 :                 .await?;
    4959            1000 :             writer.finish_write(lsn);
    4960            1000 :             updated[blknum] = lsn;
    4961            1000 :             drop(writer);
    4962            1000 : 
    4963            1000 :             keyspace.add_key(test_key);
    4964                 :         }
    4965                 : 
    4966              51 :         for _ in 0..50 {
    4967           50050 :             for _ in 0..NUM_KEYS {
    4968           50000 :                 lsn = Lsn(lsn.0 + 0x10);
    4969           50000 :                 let blknum = thread_rng().gen_range(0..NUM_KEYS);
    4970           50000 :                 test_key.field6 = blknum as u32;
    4971           50000 :                 let writer = tline.writer().await;
    4972           50000 :                 writer
    4973           50000 :                     .put(
    4974           50000 :                         test_key,
    4975           50000 :                         lsn,
    4976           50000 :                         &Value::Image(TEST_IMG(&format!("{} at {}", blknum, lsn))),
    4977           50000 :                         &ctx,
    4978           50000 :                     )
    4979             702 :                     .await?;
    4980           50000 :                 writer.finish_write(lsn);
    4981           50000 :                 drop(writer);
    4982           50000 :                 updated[blknum] = lsn;
    4983                 :             }
    4984                 : 
    4985                 :             // Read all the blocks
    4986           50000 :             for (blknum, last_lsn) in updated.iter().enumerate() {
    4987           50000 :                 test_key.field6 = blknum as u32;
    4988           50000 :                 assert_eq!(
    4989           50000 :                     tline.get(test_key, lsn, &ctx).await?,
    4990           50000 :                     TEST_IMG(&format!("{} at {}", blknum, last_lsn))
    4991                 :                 );
    4992                 :             }
    4993                 : 
    4994                 :             // Perform a cycle of flush, compact, and GC
    4995              50 :             let cutoff = tline.get_last_record_lsn();
    4996              50 :             tline
    4997              50 :                 .update_gc_info(
    4998              50 :                     Vec::new(),
    4999              50 :                     cutoff,
    5000              50 :                     Duration::ZERO,
    5001              50 :                     &CancellationToken::new(),
    5002              50 :                     &ctx,
    5003              50 :                 )
    5004 UBC           0 :                 .await?;
    5005 CBC          54 :             tline.freeze_and_flush().await?;
    5006              50 :             tline
    5007              50 :                 .compact(&CancellationToken::new(), EnumSet::empty(), &ctx)
    5008             861 :                 .await?;
    5009              50 :             tline.gc().await?;
    5010                 :         }
    5011                 : 
    5012               1 :         Ok(())
    5013                 :     }
    5014                 : 
    5015               1 :     #[tokio::test]
    5016               1 :     async fn test_traverse_branches() -> anyhow::Result<()> {
    5017               1 :         let (tenant, ctx) = TenantHarness::create("test_traverse_branches")?
    5018               1 :             .load()
    5019               1 :             .await;
    5020               1 :         let mut tline = tenant
    5021               1 :             .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
    5022               1 :             .await?;
    5023                 : 
    5024                 :         const NUM_KEYS: usize = 1000;
    5025                 : 
    5026               1 :         let mut test_key = Key::from_hex("010000000033333333444444445500000000").unwrap();
    5027               1 : 
    5028               1 :         let mut keyspace = KeySpaceAccum::new();
    5029               1 : 
    5030               1 :         // Track when each page was last modified. Used to assert that
    5031               1 :         // a read sees the latest page version.
    5032               1 :         let mut updated = [Lsn(0); NUM_KEYS];
    5033               1 : 
    5034               1 :         let mut lsn = Lsn(0x10);
    5035                 :         #[allow(clippy::needless_range_loop)]
    5036            1001 :         for blknum in 0..NUM_KEYS {
    5037            1000 :             lsn = Lsn(lsn.0 + 0x10);
    5038            1000 :             test_key.field6 = blknum as u32;
    5039            1000 :             let writer = tline.writer().await;
    5040            1000 :             writer
    5041            1000 :                 .put(
    5042            1000 :                     test_key,
    5043            1000 :                     lsn,
    5044            1000 :                     &Value::Image(TEST_IMG(&format!("{} at {}", blknum, lsn))),
    5045            1000 :                     &ctx,
    5046            1000 :                 )
    5047              17 :                 .await?;
    5048            1000 :             writer.finish_write(lsn);
    5049            1000 :             updated[blknum] = lsn;
    5050            1000 :             drop(writer);
    5051            1000 : 
    5052            1000 :             keyspace.add_key(test_key);
    5053                 :         }
    5054                 : 
    5055              51 :         for _ in 0..50 {
    5056              50 :             let new_tline_id = TimelineId::generate();
    5057              50 :             tenant
    5058              50 :                 .branch_timeline_test(&tline, new_tline_id, Some(lsn), &ctx)
    5059 UBC           0 :                 .await?;
    5060 CBC          50 :             tline = tenant
    5061              50 :                 .get_timeline(new_tline_id, true)
    5062              50 :                 .expect("Should have the branched timeline");
    5063                 : 
    5064           50050 :             for _ in 0..NUM_KEYS {
    5065           50000 :                 lsn = Lsn(lsn.0 + 0x10);
    5066           50000 :                 let blknum = thread_rng().gen_range(0..NUM_KEYS);
    5067           50000 :                 test_key.field6 = blknum as u32;
    5068           50000 :                 let writer = tline.writer().await;
    5069           50000 :                 writer
    5070           50000 :                     .put(
    5071           50000 :                         test_key,
    5072           50000 :                         lsn,
    5073           50000 :                         &Value::Image(TEST_IMG(&format!("{} at {}", blknum, lsn))),
    5074           50000 :                         &ctx,
    5075           50000 :                     )
    5076             799 :                     .await?;
    5077           50000 :                 println!("updating {} at {}", blknum, lsn);
    5078           50000 :                 writer.finish_write(lsn);
    5079           50000 :                 drop(writer);
    5080           50000 :                 updated[blknum] = lsn;
    5081                 :             }
    5082                 : 
    5083                 :             // Read all the blocks
    5084           50000 :             for (blknum, last_lsn) in updated.iter().enumerate() {
    5085           50000 :                 test_key.field6 = blknum as u32;
    5086           50000 :                 assert_eq!(
    5087           50000 :                     tline.get(test_key, lsn, &ctx).await?,
    5088           50000 :                     TEST_IMG(&format!("{} at {}", blknum, last_lsn))
    5089                 :                 );
    5090                 :             }
    5091                 : 
    5092                 :             // Perform a cycle of flush, compact, and GC
    5093              50 :             let cutoff = tline.get_last_record_lsn();
    5094              50 :             tline
    5095              50 :                 .update_gc_info(
    5096              50 :                     Vec::new(),
    5097              50 :                     cutoff,
    5098              50 :                     Duration::ZERO,
    5099              50 :                     &CancellationToken::new(),
    5100              50 :                     &ctx,
    5101              50 :                 )
    5102 UBC           0 :                 .await?;
    5103 CBC          53 :             tline.freeze_and_flush().await?;
    5104              50 :             tline
    5105              50 :                 .compact(&CancellationToken::new(), EnumSet::empty(), &ctx)
    5106             192 :                 .await?;
    5107              50 :             tline.gc().await?;
    5108                 :         }
    5109                 : 
    5110               1 :         Ok(())
    5111                 :     }
    5112                 : 
    5113               1 :     #[tokio::test]
    5114               1 :     async fn test_traverse_ancestors() -> anyhow::Result<()> {
    5115               1 :         let (tenant, ctx) = TenantHarness::create("test_traverse_ancestors")?
    5116               1 :             .load()
    5117               1 :             .await;
    5118               1 :         let mut tline = tenant
    5119               1 :             .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
    5120               1 :             .await?;
    5121                 : 
    5122                 :         const NUM_KEYS: usize = 100;
    5123                 :         const NUM_TLINES: usize = 50;
    5124                 : 
    5125               1 :         let mut test_key = Key::from_hex("010000000033333333444444445500000000").unwrap();
    5126               1 :         // Track page mutation lsns across different timelines.
    5127               1 :         let mut updated = [[Lsn(0); NUM_KEYS]; NUM_TLINES];
    5128               1 : 
    5129               1 :         let mut lsn = Lsn(0x10);
    5130                 : 
    5131                 :         #[allow(clippy::needless_range_loop)]
    5132              51 :         for idx in 0..NUM_TLINES {
    5133              50 :             let new_tline_id = TimelineId::generate();
    5134              50 :             tenant
    5135              50 :                 .branch_timeline_test(&tline, new_tline_id, Some(lsn), &ctx)
    5136 UBC           0 :                 .await?;
    5137 CBC          50 :             tline = tenant
    5138              50 :                 .get_timeline(new_tline_id, true)
    5139              50 :                 .expect("Should have the branched timeline");
    5140                 : 
    5141            5050 :             for _ in 0..NUM_KEYS {
    5142            5000 :                 lsn = Lsn(lsn.0 + 0x10);
    5143            5000 :                 let blknum = thread_rng().gen_range(0..NUM_KEYS);
    5144            5000 :                 test_key.field6 = blknum as u32;
    5145            5000 :                 let writer = tline.writer().await;
    5146            5000 :                 writer
    5147            5000 :                     .put(
    5148            5000 :                         test_key,
    5149            5000 :                         lsn,
    5150            5000 :                         &Value::Image(TEST_IMG(&format!("{} {} at {}", idx, blknum, lsn))),
    5151            5000 :                         &ctx,
    5152            5000 :                     )
    5153              78 :                     .await?;
    5154            5000 :                 println!("updating [{}][{}] at {}", idx, blknum, lsn);
    5155            5000 :                 writer.finish_write(lsn);
    5156            5000 :                 drop(writer);
    5157            5000 :                 updated[idx][blknum] = lsn;
    5158                 :             }
    5159                 :         }
    5160                 : 
    5161                 :         // Read pages from leaf timeline across all ancestors.
    5162              50 :         for (idx, lsns) in updated.iter().enumerate() {
    5163            5000 :             for (blknum, lsn) in lsns.iter().enumerate() {
    5164                 :                 // Skip empty mutations.
    5165            5000 :                 if lsn.0 == 0 {
    5166            1836 :                     continue;
    5167            3164 :                 }
    5168            3164 :                 println!("checking [{idx}][{blknum}] at {lsn}");
    5169            3164 :                 test_key.field6 = blknum as u32;
    5170            3164 :                 assert_eq!(
    5171            3164 :                     tline.get(test_key, *lsn, &ctx).await?,
    5172            3164 :                     TEST_IMG(&format!("{idx} {blknum} at {lsn}"))
    5173                 :                 );
    5174                 :             }
    5175                 :         }
    5176               1 :         Ok(())
    5177                 :     }
    5178                 : 
    5179               1 :     #[tokio::test]
    5180               1 :     async fn test_write_at_initdb_lsn_takes_optimization_code_path() -> anyhow::Result<()> {
    5181               1 :         let (tenant, ctx) = TenantHarness::create("test_empty_test_timeline_is_usable")?
    5182               1 :             .load()
    5183               1 :             .await;
    5184                 : 
    5185               1 :         let initdb_lsn = Lsn(0x20);
    5186               1 :         let utline = tenant
    5187               1 :             .create_empty_timeline(TIMELINE_ID, initdb_lsn, DEFAULT_PG_VERSION, &ctx)
    5188 UBC           0 :             .await?;
    5189 CBC           1 :         let tline = utline.raw_timeline().unwrap();
    5190               1 : 
    5191               1 :         // Spawn flush loop now so that we can set the `expect_initdb_optimization`
    5192               1 :         tline.maybe_spawn_flush_loop();
    5193               1 : 
    5194               1 :         // Make sure the timeline has the minimum set of required keys for operation.
    5195               1 :         // The only operation you can always do on an empty timeline is to `put` new data.
    5196               1 :         // Except if you `put` at `initdb_lsn`.
    5197               1 :         // In that case, there's an optimization to directly create image layers instead of delta layers.
    5198               1 :         // It uses `repartition()`, which assumes some keys to be present.
    5199               1 :         // Let's make sure the test timeline can handle that case.
    5200               1 :         {
    5201               1 :             let mut state = tline.flush_loop_state.lock().unwrap();
    5202               1 :             assert_eq!(
    5203               1 :                 timeline::FlushLoopState::Running {
    5204               1 :                     expect_initdb_optimization: false,
    5205               1 :                     initdb_optimization_count: 0,
    5206               1 :                 },
    5207               1 :                 *state
    5208               1 :             );
    5209               1 :             *state = timeline::FlushLoopState::Running {
    5210               1 :                 expect_initdb_optimization: true,
    5211               1 :                 initdb_optimization_count: 0,
    5212               1 :             };
    5213               1 :         }
    5214               1 : 
    5215               1 :         // Make writes at the initdb_lsn. When we flush it below, it should be handled by the optimization.
    5216               1 :         // As explained above, the optimization requires some keys to be present.
    5217               1 :         // As per `create_empty_timeline` documentation, use init_empty to set them.
    5218               1 :         // This is what `create_test_timeline` does, by the way.
    5219               1 :         let mut modification = tline.begin_modification(initdb_lsn);
    5220               1 :         modification
    5221               1 :             .init_empty_test_timeline()
    5222               1 :             .context("init_empty_test_timeline")?;
    5223               1 :         modification
    5224               1 :             .commit(&ctx)
    5225 UBC           0 :             .await
    5226 CBC           1 :             .context("commit init_empty_test_timeline modification")?;
    5227                 : 
    5228                 :         // Do the flush. The flush code will check the expectations that we set above.
    5229               1 :         tline.freeze_and_flush().await?;
    5230                 : 
    5231                 :         // assert freeze_and_flush exercised the initdb optimization
    5232                 :         {
    5233               1 :             let state = tline.flush_loop_state.lock().unwrap();
    5234                 :             let timeline::FlushLoopState::Running {
    5235               1 :                 expect_initdb_optimization,
    5236               1 :                 initdb_optimization_count,
    5237               1 :             } = *state
    5238                 :             else {
    5239 UBC           0 :                 panic!("unexpected state: {:?}", *state);
    5240                 :             };
    5241 CBC           1 :             assert!(expect_initdb_optimization);
    5242               1 :             assert!(initdb_optimization_count > 0);
    5243                 :         }
    5244               1 :         Ok(())
    5245                 :     }
    5246                 : 
    5247               1 :     #[tokio::test]
    5248               1 :     async fn test_uninit_mark_crash() -> anyhow::Result<()> {
    5249               1 :         let name = "test_uninit_mark_crash";
    5250               1 :         let harness = TenantHarness::create(name)?;
    5251                 :         {
    5252               1 :             let (tenant, ctx) = harness.load().await;
    5253               1 :             let tline = tenant
    5254               1 :                 .create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)
    5255 UBC           0 :                 .await?;
    5256                 :             // Keeps uninit mark in place
    5257 CBC           1 :             let raw_tline = tline.raw_timeline().unwrap();
    5258               1 :             raw_tline
    5259               1 :                 .shutdown()
    5260               1 :                 .instrument(info_span!("test_shutdown", tenant_id=%raw_tline.tenant_shard_id))
    5261 UBC           0 :                 .await;
    5262 CBC           1 :             std::mem::forget(tline);
    5263                 :         }
    5264                 : 
    5265               1 :         let (tenant, _) = harness.load().await;
    5266               1 :         match tenant.get_timeline(TIMELINE_ID, false) {
    5267 UBC           0 :             Ok(_) => panic!("timeline should've been removed during load"),
    5268 CBC           1 :             Err(e) => {
    5269               1 :                 assert_eq!(
    5270               1 :                     e,
    5271               1 :                     GetTimelineError::NotFound {
    5272               1 :                         tenant_id: tenant.tenant_shard_id.tenant_id,
    5273               1 :                         timeline_id: TIMELINE_ID,
    5274               1 :                     }
    5275               1 :                 )
    5276                 :             }
    5277                 :         }
    5278                 : 
    5279               1 :         assert!(!harness
    5280               1 :             .conf
    5281               1 :             .timeline_path(&tenant.tenant_shard_id, &TIMELINE_ID)
    5282               1 :             .exists());
    5283                 : 
    5284               1 :         assert!(!harness
    5285               1 :             .conf
    5286               1 :             .timeline_uninit_mark_file_path(tenant.tenant_shard_id, TIMELINE_ID)
    5287               1 :             .exists());
    5288                 : 
    5289               1 :         Ok(())
    5290                 :     }
    5291                 : }
        

Generated by: LCOV version 2.1-beta