LCOV - code coverage report
Current view: top level - pageserver/src/tenant - mgr.rs (source / functions) Coverage Total Hit
Test: 9e3a1ccbd88185d44390421f76c05f0bf588f617.info Lines: 15.7 % 1191 187
Test Date: 2025-07-29 14:19:29 Functions: 14.4 % 111 16

            Line data    Source code
       1              : //! This module acts as a switchboard to access different repositories managed by this
       2              : //! page server.
       3              : 
       4              : use std::borrow::Cow;
       5              : use std::cmp::Ordering;
       6              : use std::collections::{BTreeMap, HashMap, HashSet};
       7              : use std::ops::Deref;
       8              : use std::sync::Arc;
       9              : use std::time::Duration;
      10              : 
      11              : use anyhow::Context;
      12              : use camino::{Utf8DirEntry, Utf8Path, Utf8PathBuf};
      13              : use futures::StreamExt;
      14              : use itertools::Itertools;
      15              : use pageserver_api::key::Key;
      16              : use pageserver_api::models::{DetachBehavior, LocationConfigMode};
      17              : use pageserver_api::shard::{
      18              :     ShardCount, ShardIdentity, ShardIndex, ShardNumber, ShardStripeSize, TenantShardId,
      19              : };
      20              : use pageserver_api::upcall_api::ReAttachResponseTenant;
      21              : use rand::Rng;
      22              : use rand::distr::Alphanumeric;
      23              : use remote_storage::TimeoutOrCancel;
      24              : use sysinfo::SystemExt;
      25              : use tokio::fs;
      26              : use tokio::task::JoinSet;
      27              : use tokio_util::sync::CancellationToken;
      28              : use tracing::*;
      29              : use utils::crashsafe::path_with_suffix_extension;
      30              : use utils::fs_ext::PathExt;
      31              : use utils::generation::Generation;
      32              : use utils::id::{TenantId, TimelineId};
      33              : use utils::{backoff, completion, crashsafe};
      34              : 
      35              : use super::remote_timeline_client::remote_tenant_path;
      36              : use super::secondary::SecondaryTenant;
      37              : use super::timeline::detach_ancestor::{self, PreparedTimelineDetach};
      38              : use super::{GlobalShutDown, TenantSharedResources};
      39              : use crate::config::PageServerConf;
      40              : use crate::context::{DownloadBehavior, RequestContext};
      41              : use crate::controller_upcall_client::{
      42              :     RetryForeverError, StorageControllerUpcallApi, StorageControllerUpcallClient,
      43              : };
      44              : use crate::deletion_queue::DeletionQueueClient;
      45              : use crate::http::routes::ACTIVE_TENANT_TIMEOUT;
      46              : use crate::metrics::{LOCAL_DATA_LOSS_SUSPECTED, TENANT, TENANT_MANAGER as METRICS};
      47              : use crate::task_mgr::{BACKGROUND_RUNTIME, TaskKind};
      48              : use crate::tenant::config::{
      49              :     AttachedLocationConfig, AttachmentMode, LocationConf, LocationMode, SecondaryLocationConfig,
      50              : };
      51              : use crate::tenant::span::debug_assert_current_span_has_tenant_id;
      52              : use crate::tenant::storage_layer::inmemory_layer;
      53              : use crate::tenant::timeline::ShutdownMode;
      54              : use crate::tenant::timeline::layer_manager::LayerManagerLockHolder;
      55              : use crate::tenant::{
      56              :     AttachedTenantConf, GcError, LoadConfigError, SpawnMode, TenantShard, TenantState,
      57              : };
      58              : use crate::virtual_file::MaybeFatalIo;
      59              : use crate::{InitializationOrder, TEMP_FILE_SUFFIX};
      60              : 
      61              : /// For a tenant that appears in TenantsMap, it may either be
      62              : /// - `Attached`: has a full Tenant object, is elegible to service
      63              : ///   reads and ingest WAL.
      64              : /// - `Secondary`: is only keeping a local cache warm.
      65              : ///
      66              : /// Secondary is a totally distinct state rather than being a mode of a `Tenant`, because
      67              : /// that way we avoid having to carefully switch a tenant's ingestion etc on and off during
      68              : /// its lifetime, and we can preserve some important safety invariants like `Tenant` always
      69              : /// having a properly acquired generation (Secondary doesn't need a generation)
      70              : #[derive(Clone)]
      71              : pub(crate) enum TenantSlot {
      72              :     Attached(Arc<TenantShard>),
      73              :     Secondary(Arc<SecondaryTenant>),
      74              :     /// In this state, other administrative operations acting on the TenantId should
      75              :     /// block, or return a retry indicator equivalent to HTTP 503.
      76              :     InProgress(utils::completion::Barrier),
      77              : }
      78              : 
      79              : impl std::fmt::Debug for TenantSlot {
      80            0 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
      81            0 :         match self {
      82            0 :             Self::Attached(tenant) => write!(f, "Attached({})", tenant.current_state()),
      83            0 :             Self::Secondary(_) => write!(f, "Secondary"),
      84            0 :             Self::InProgress(_) => write!(f, "InProgress"),
      85              :         }
      86            0 :     }
      87              : }
      88              : 
      89              : impl TenantSlot {
      90              :     /// Return the `Tenant` in this slot if attached, else None
      91            0 :     fn get_attached(&self) -> Option<&Arc<TenantShard>> {
      92            0 :         match self {
      93            0 :             Self::Attached(t) => Some(t),
      94            0 :             Self::Secondary(_) => None,
      95            0 :             Self::InProgress(_) => None,
      96              :         }
      97            0 :     }
      98              : }
      99              : 
     100              : /// The tenants known to the pageserver.
     101              : /// The enum variants are used to distinguish the different states that the pageserver can be in.
     102              : pub(crate) enum TenantsMap {
     103              :     /// [`init_tenant_mgr`] is not done yet.
     104              :     Initializing,
     105              :     /// [`init_tenant_mgr`] is done, all on-disk tenants have been loaded.
     106              :     /// New tenants can be added using [`TenantManager::tenant_map_acquire_slot`].
     107              :     Open(BTreeMap<TenantShardId, TenantSlot>),
     108              :     /// The pageserver has entered shutdown mode via [`TenantManager::shutdown`].
     109              :     /// Existing tenants are still accessible, but no new tenants can be created.
     110              :     ShuttingDown(BTreeMap<TenantShardId, TenantSlot>),
     111              : }
     112              : 
     113              : /// When resolving a TenantId to a shard, we may be looking for the 0th
     114              : /// shard, or we might be looking for whichever shard holds a particular page.
     115              : #[derive(Copy, Clone)]
     116              : pub(crate) enum ShardSelector {
     117              :     /// Only return the 0th shard, if it is present.  If a non-0th shard is present,
     118              :     /// ignore it.
     119              :     Zero,
     120              :     /// Pick the shard that holds this key
     121              :     Page(Key),
     122              :     /// The shard ID is known: pick the given shard
     123              :     Known(ShardIndex),
     124              : }
     125              : 
     126              : /// A convenience for use with the re_attach ControllerUpcallClient function: rather
     127              : /// than the serializable struct, we build this enum that encapsulates
     128              : /// the invariant that attached tenants always have generations.
     129              : ///
     130              : /// This represents the subset of a LocationConfig that we receive during re-attach.
     131              : pub(crate) enum TenantStartupMode {
     132              :     Attached((AttachmentMode, Generation, ShardStripeSize)),
     133              :     Secondary,
     134              : }
     135              : 
     136              : impl TenantStartupMode {
     137              :     /// Return the generation & mode that should be used when starting
     138              :     /// this tenant.
     139              :     ///
     140              :     /// If this returns None, the re-attach struct is in an invalid state and
     141              :     /// should be ignored in the response.
     142            0 :     fn from_reattach_tenant(rart: ReAttachResponseTenant) -> Option<Self> {
     143            0 :         match (rart.mode, rart.r#gen) {
     144            0 :             (LocationConfigMode::Detached, _) => None,
     145            0 :             (LocationConfigMode::Secondary, _) => Some(Self::Secondary),
     146            0 :             (LocationConfigMode::AttachedMulti, Some(g)) => Some(Self::Attached((
     147            0 :                 AttachmentMode::Multi,
     148            0 :                 Generation::new(g),
     149            0 :                 rart.stripe_size,
     150            0 :             ))),
     151            0 :             (LocationConfigMode::AttachedSingle, Some(g)) => Some(Self::Attached((
     152            0 :                 AttachmentMode::Single,
     153            0 :                 Generation::new(g),
     154            0 :                 rart.stripe_size,
     155            0 :             ))),
     156            0 :             (LocationConfigMode::AttachedStale, Some(g)) => Some(Self::Attached((
     157            0 :                 AttachmentMode::Stale,
     158            0 :                 Generation::new(g),
     159            0 :                 rart.stripe_size,
     160            0 :             ))),
     161              :             _ => {
     162            0 :                 tracing::warn!(
     163            0 :                     "Received invalid re-attach state for tenant {}: {rart:?}",
     164              :                     rart.id
     165              :                 );
     166            0 :                 None
     167              :             }
     168              :         }
     169            0 :     }
     170              : }
     171              : 
     172              : /// Result type for looking up a TenantId to a specific shard
     173              : pub(crate) enum ShardResolveResult {
     174              :     NotFound,
     175              :     Found(Arc<TenantShard>),
     176              :     // Wait for this barrrier, then query again
     177              :     InProgress(utils::completion::Barrier),
     178              : }
     179              : 
     180              : impl TenantsMap {
     181              :     /// Convenience function for typical usage, where we want to get a `Tenant` object, for
     182              :     /// working with attached tenants.  If the TenantId is in the map but in Secondary state,
     183              :     /// None is returned.
     184            0 :     pub(crate) fn get(&self, tenant_shard_id: &TenantShardId) -> Option<&Arc<TenantShard>> {
     185            0 :         match self {
     186            0 :             TenantsMap::Initializing => None,
     187            0 :             TenantsMap::Open(m) | TenantsMap::ShuttingDown(m) => {
     188            0 :                 m.get(tenant_shard_id).and_then(|slot| slot.get_attached())
     189              :             }
     190              :         }
     191            0 :     }
     192              : 
     193              :     #[cfg(all(debug_assertions, not(test)))]
     194            0 :     pub(crate) fn len(&self) -> usize {
     195            0 :         match self {
     196            0 :             TenantsMap::Initializing => 0,
     197            0 :             TenantsMap::Open(m) | TenantsMap::ShuttingDown(m) => m.len(),
     198              :         }
     199            0 :     }
     200              : }
     201              : 
     202              : /// Precursor to deletion of a tenant dir: we do a fast rename to a tmp path, and then
     203              : /// the slower actual deletion in the background.
     204              : ///
     205              : /// This is "safe" in that that it won't leave behind a partially deleted directory
     206              : /// at the original path, because we rename with TEMP_FILE_SUFFIX before starting deleting
     207              : /// the contents.
     208              : ///
     209              : /// This is pageserver-specific, as it relies on future processes after a crash to check
     210              : /// for TEMP_FILE_SUFFIX when loading things.
     211            0 : async fn safe_rename_tenant_dir(path: impl AsRef<Utf8Path>) -> std::io::Result<Utf8PathBuf> {
     212            0 :     let parent = path
     213            0 :         .as_ref()
     214            0 :         .parent()
     215              :         // It is invalid to call this function with a relative path.  Tenant directories
     216              :         // should always have a parent.
     217            0 :         .ok_or(std::io::Error::new(
     218            0 :             std::io::ErrorKind::InvalidInput,
     219              :             "Path must be absolute",
     220            0 :         ))?;
     221            0 :     let rand_suffix = rand::rng()
     222            0 :         .sample_iter(&Alphanumeric)
     223            0 :         .take(8)
     224            0 :         .map(char::from)
     225            0 :         .collect::<String>()
     226            0 :         + TEMP_FILE_SUFFIX;
     227            0 :     let tmp_path = path_with_suffix_extension(&path, &rand_suffix);
     228            0 :     fs::rename(path.as_ref(), &tmp_path).await?;
     229            0 :     fs::File::open(parent)
     230            0 :         .await?
     231            0 :         .sync_all()
     232            0 :         .await
     233            0 :         .maybe_fatal_err("safe_rename_tenant_dir")?;
     234            0 :     Ok(tmp_path)
     235            0 : }
     236              : 
     237              : /// See [`Self::spawn`].
     238              : #[derive(Clone, Default)]
     239              : pub struct BackgroundPurges(tokio_util::task::TaskTracker);
     240              : 
     241              : impl BackgroundPurges {
     242              :     /// When we have moved a tenant's content to a temporary directory, we may delete it lazily in
     243              :     /// the background, and thereby avoid blocking any API requests on this deletion completing.
     244              :     ///
     245              :     /// Although we are cleaning up the tenant, this task is not meant to be bound by the lifetime of the tenant in memory.
     246              :     /// Thus the [`BackgroundPurges`] type to keep track of these tasks.
     247            0 :     pub fn spawn(&self, tmp_path: Utf8PathBuf) {
     248              :         // because on shutdown we close and wait, we are misusing TaskTracker a bit.
     249              :         //
     250              :         // so first acquire a token, then check if the tracker has been closed. the tracker might get closed
     251              :         // right after, but at least the shutdown will wait for what we are spawning next.
     252            0 :         let token = self.0.token();
     253              : 
     254            0 :         if self.0.is_closed() {
     255            0 :             warn!(
     256              :                 %tmp_path,
     257            0 :                 "trying to spawn background purge during shutdown, ignoring"
     258              :             );
     259            0 :             return;
     260            0 :         }
     261              : 
     262            0 :         let span = info_span!(parent: None, "background_purge", %tmp_path);
     263              : 
     264            0 :         let task = move || {
     265            0 :             let _token = token;
     266            0 :             let _entered = span.entered();
     267            0 :             if let Err(error) = std::fs::remove_dir_all(tmp_path.as_path()) {
     268              :                 // should we fatal_io_error here?
     269            0 :                 warn!(%error, "failed to purge tenant directory");
     270            0 :             }
     271            0 :         };
     272              : 
     273            0 :         BACKGROUND_RUNTIME.spawn_blocking(task);
     274            0 :     }
     275              : 
     276              :     /// When this future completes, all background purges have completed.
     277              :     /// The first poll of the future will already lock out new background purges spawned via [`Self::spawn`].
     278              :     ///
     279              :     /// Concurrent calls will coalesce.
     280              :     ///
     281              :     /// # Cancellation-Safety
     282              :     ///
     283              :     /// If this future is dropped before polled to completion, concurrent and subsequent
     284              :     /// instances of this future will continue to be correct.
     285              :     #[instrument(skip_all)]
     286              :     pub async fn shutdown(&self) {
     287              :         // forbid new tasks (can be called many times)
     288              :         self.0.close();
     289              :         self.0.wait().await;
     290              :     }
     291              : }
     292              : 
     293              : /// Responsible for storing and mutating the collection of all tenants
     294              : /// that this pageserver has state for.
     295              : ///
     296              : /// Every Tenant and SecondaryTenant instance lives inside the TenantManager.
     297              : ///
     298              : /// The most important role of the TenantManager is to prevent conflicts: e.g. trying to attach
     299              : /// the same tenant twice concurrently, or trying to configure the same tenant into secondary
     300              : /// and attached modes concurrently.
     301              : pub struct TenantManager {
     302              :     conf: &'static PageServerConf,
     303              :     tenants: std::sync::RwLock<TenantsMap>,
     304              :     resources: TenantSharedResources,
     305              : 
     306              :     // Long-running operations that happen outside of a [`Tenant`] lifetime should respect this token.
     307              :     // This is for edge cases like tenant deletion.  In normal cases (within a Tenant lifetime),
     308              :     // tenants have their own cancellation tokens, which we fire individually in [`Self::shutdown`], or
     309              :     // when the tenant detaches.
     310              :     cancel: CancellationToken,
     311              : 
     312              :     background_purges: BackgroundPurges,
     313              : }
     314              : 
     315            0 : fn emergency_generations(
     316            0 :     tenant_confs: &HashMap<TenantShardId, Result<LocationConf, LoadConfigError>>,
     317            0 : ) -> HashMap<TenantShardId, TenantStartupMode> {
     318            0 :     tenant_confs
     319            0 :         .iter()
     320            0 :         .filter_map(|(tid, lc)| {
     321            0 :             let lc = match lc {
     322            0 :                 Ok(lc) => lc,
     323            0 :                 Err(_) => return None,
     324              :             };
     325              :             Some((
     326            0 :                 *tid,
     327            0 :                 match &lc.mode {
     328            0 :                     LocationMode::Attached(alc) => TenantStartupMode::Attached((
     329            0 :                         alc.attach_mode,
     330            0 :                         alc.generation,
     331            0 :                         lc.shard.stripe_size,
     332            0 :                     )),
     333            0 :                     LocationMode::Secondary(_) => TenantStartupMode::Secondary,
     334              :                 },
     335              :             ))
     336            0 :         })
     337            0 :         .collect()
     338            0 : }
     339              : 
     340            0 : async fn init_load_generations(
     341            0 :     conf: &'static PageServerConf,
     342            0 :     tenant_confs: &HashMap<TenantShardId, Result<LocationConf, LoadConfigError>>,
     343            0 :     resources: &TenantSharedResources,
     344            0 :     cancel: &CancellationToken,
     345            0 : ) -> anyhow::Result<Option<HashMap<TenantShardId, TenantStartupMode>>> {
     346            0 :     let generations = if conf.control_plane_emergency_mode {
     347            0 :         error!(
     348            0 :             "Emergency mode!  Tenants will be attached unsafely using their last known generation"
     349              :         );
     350            0 :         emergency_generations(tenant_confs)
     351              :     } else {
     352            0 :         let client = StorageControllerUpcallClient::new(conf, cancel);
     353            0 :         info!("Calling {} API to re-attach tenants", client.base_url());
     354              :         // If we are configured to use the control plane API, then it is the source of truth for what tenants to load.
     355            0 :         let empty_local_disk = tenant_confs.is_empty();
     356            0 :         match client.re_attach(conf, empty_local_disk).await {
     357            0 :             Ok(tenants) => tenants
     358            0 :                 .into_iter()
     359            0 :                 .flat_map(|(id, rart)| {
     360            0 :                     TenantStartupMode::from_reattach_tenant(rart).map(|tsm| (id, tsm))
     361            0 :                 })
     362            0 :                 .collect(),
     363              :             Err(RetryForeverError::ShuttingDown) => {
     364            0 :                 anyhow::bail!("Shut down while waiting for control plane re-attach response")
     365              :             }
     366              :         }
     367              :     };
     368              : 
     369              :     // The deletion queue needs to know about the startup attachment state to decide which (if any) stored
     370              :     // deletion list entries may still be valid.  We provide that by pushing a recovery operation into
     371              :     // the queue. Sequential processing of te queue ensures that recovery is done before any new tenant deletions
     372              :     // are processed, even though we don't block on recovery completing here.
     373            0 :     let attached_tenants = generations
     374            0 :         .iter()
     375            0 :         .flat_map(|(id, start_mode)| {
     376            0 :             match start_mode {
     377            0 :                 TenantStartupMode::Attached((_mode, generation, _stripe_size)) => Some(generation),
     378            0 :                 TenantStartupMode::Secondary => None,
     379              :             }
     380            0 :             .map(|gen_| (*id, *gen_))
     381            0 :         })
     382            0 :         .collect();
     383            0 :     resources.deletion_queue_client.recover(attached_tenants)?;
     384              : 
     385            0 :     Ok(Some(generations))
     386            0 : }
     387              : 
     388              : /// Given a directory discovered in the pageserver's tenants/ directory, attempt
     389              : /// to load a tenant config from it.
     390              : ///
     391              : /// If we cleaned up something expected (like an empty dir or a temp dir), return None.
     392            0 : fn load_tenant_config(
     393            0 :     conf: &'static PageServerConf,
     394            0 :     tenant_shard_id: TenantShardId,
     395            0 :     dentry: Utf8DirEntry,
     396            0 : ) -> Option<Result<LocationConf, LoadConfigError>> {
     397            0 :     let tenant_dir_path = dentry.path().to_path_buf();
     398            0 :     if crate::is_temporary(&tenant_dir_path) {
     399            0 :         info!("Found temporary tenant directory, removing: {tenant_dir_path}");
     400              :         // No need to use safe_remove_tenant_dir_all because this is already
     401              :         // a temporary path
     402            0 :         std::fs::remove_dir_all(&tenant_dir_path).fatal_err("delete temporary tenant dir");
     403            0 :         return None;
     404            0 :     }
     405              : 
     406              :     // This case happens if we crash during attachment before writing a config into the dir
     407            0 :     let is_empty = tenant_dir_path
     408            0 :         .is_empty_dir()
     409            0 :         .fatal_err("Checking for empty tenant dir");
     410            0 :     if is_empty {
     411            0 :         info!("removing empty tenant directory {tenant_dir_path:?}");
     412            0 :         std::fs::remove_dir(&tenant_dir_path).fatal_err("delete empty tenant dir");
     413            0 :         return None;
     414            0 :     }
     415              : 
     416            0 :     Some(TenantShard::load_tenant_config(conf, &tenant_shard_id))
     417            0 : }
     418              : 
     419              : /// Initial stage of load: walk the local tenants directory, clean up any temp files,
     420              : /// and load configurations for the tenants we found.
     421              : ///
     422              : /// Do this in parallel, because we expect 10k+ tenants, so serial execution can take
     423              : /// seconds even on reasonably fast drives.
     424            0 : async fn init_load_tenant_configs(
     425            0 :     conf: &'static PageServerConf,
     426            0 : ) -> HashMap<TenantShardId, Result<LocationConf, LoadConfigError>> {
     427            0 :     let tenants_dir = conf.tenants_path();
     428              : 
     429            0 :     let dentries = tokio::task::spawn_blocking(move || -> Vec<Utf8DirEntry> {
     430            0 :         let context = format!("read tenants dir {tenants_dir}");
     431            0 :         let dir_entries = tenants_dir.read_dir_utf8().fatal_err(&context);
     432              : 
     433            0 :         dir_entries
     434            0 :             .collect::<Result<Vec<_>, std::io::Error>>()
     435            0 :             .fatal_err(&context)
     436            0 :     })
     437            0 :     .await
     438            0 :     .expect("Config load task panicked");
     439              : 
     440            0 :     let mut configs = HashMap::new();
     441              : 
     442            0 :     let mut join_set = JoinSet::new();
     443            0 :     for dentry in dentries {
     444            0 :         let tenant_shard_id = match dentry.file_name().parse::<TenantShardId>() {
     445            0 :             Ok(id) => id,
     446              :             Err(_) => {
     447            0 :                 warn!(
     448            0 :                     "Invalid tenant path (garbage in our repo directory?): '{}'",
     449            0 :                     dentry.file_name()
     450              :                 );
     451            0 :                 continue;
     452              :             }
     453              :         };
     454              : 
     455            0 :         join_set.spawn_blocking(move || {
     456            0 :             (
     457            0 :                 tenant_shard_id,
     458            0 :                 load_tenant_config(conf, tenant_shard_id, dentry),
     459            0 :             )
     460            0 :         });
     461              :     }
     462              : 
     463            0 :     while let Some(r) = join_set.join_next().await {
     464            0 :         let (tenant_shard_id, tenant_config) = r.expect("Panic in config load task");
     465            0 :         if let Some(tenant_config) = tenant_config {
     466            0 :             configs.insert(tenant_shard_id, tenant_config);
     467            0 :         }
     468              :     }
     469              : 
     470            0 :     configs
     471            0 : }
     472              : 
     473              : #[derive(Debug, thiserror::Error)]
     474              : pub(crate) enum DeleteTenantError {
     475              :     #[error("Tenant map slot error {0}")]
     476              :     SlotError(#[from] TenantSlotError),
     477              : 
     478              :     #[error("Cancelled")]
     479              :     Cancelled,
     480              : 
     481              :     #[error(transparent)]
     482              :     Other(#[from] anyhow::Error),
     483              : }
     484              : 
     485              : /// Initialize repositories at `Initializing` state.
     486            0 : pub fn init(
     487            0 :     conf: &'static PageServerConf,
     488            0 :     background_purges: BackgroundPurges,
     489            0 :     resources: TenantSharedResources,
     490            0 :     cancel: CancellationToken,
     491            0 : ) -> TenantManager {
     492            0 :     TenantManager {
     493            0 :         conf,
     494            0 :         tenants: std::sync::RwLock::new(TenantsMap::Initializing),
     495            0 :         resources,
     496            0 :         cancel,
     497            0 :         background_purges,
     498            0 :     }
     499            0 : }
     500              : 
     501              : /// Transition repositories from `Initializing` state to `Open` state with locally available timelines.
     502              : /// Timelines that are only partially available locally (remote storage has more data than this pageserver)
     503              : /// are scheduled for download and added to the tenant once download is completed.
     504              : #[instrument(skip_all)]
     505              : pub async fn init_tenant_mgr(
     506              :     tenant_manager: Arc<TenantManager>,
     507              :     init_order: InitializationOrder,
     508              : ) -> anyhow::Result<()> {
     509              :     debug_assert!(matches!(
     510              :         *tenant_manager.tenants.read().unwrap(),
     511              :         TenantsMap::Initializing
     512              :     ));
     513              :     let mut tenants = BTreeMap::new();
     514              : 
     515              :     let ctx = RequestContext::todo_child(TaskKind::Startup, DownloadBehavior::Warn);
     516              : 
     517              :     let conf = tenant_manager.conf;
     518              :     let resources = &tenant_manager.resources;
     519              :     let cancel = &tenant_manager.cancel;
     520              :     let background_purges = &tenant_manager.background_purges;
     521              : 
     522              :     // Initialize dynamic limits that depend on system resources
     523              :     let system_memory =
     524              :         sysinfo::System::new_with_specifics(sysinfo::RefreshKind::new().with_memory())
     525              :             .total_memory();
     526              :     let max_ephemeral_layer_bytes =
     527              :         conf.ephemeral_bytes_per_memory_kb as u64 * (system_memory / 1024);
     528              :     tracing::info!(
     529              :         "Initialized ephemeral layer size limit to {max_ephemeral_layer_bytes}, for {system_memory} bytes of memory"
     530              :     );
     531              :     inmemory_layer::GLOBAL_RESOURCES.max_dirty_bytes.store(
     532              :         max_ephemeral_layer_bytes,
     533              :         std::sync::atomic::Ordering::Relaxed,
     534              :     );
     535              : 
     536              :     // Scan local filesystem for attached tenants
     537              :     let tenant_configs = init_load_tenant_configs(conf).await;
     538              : 
     539              :     // Determine which tenants are to be secondary or attached, and in which generation
     540              :     let tenant_modes = init_load_generations(conf, &tenant_configs, resources, cancel).await?;
     541              : 
     542              :     // Hadron local SSD check: Raise an alert if our local filesystem does not contain any tenants but the re-attach request returned tenants.
     543              :     // This can happen if the PS suffered a Kubernetes node failure resulting in loss of all local data, but recovered quickly on another node
     544              :     // so the Storage Controller has not had the time to move tenants out.
     545              :     let data_loss_suspected = if let Some(tenant_modes) = &tenant_modes {
     546              :         tenant_configs.is_empty() && !tenant_modes.is_empty()
     547              :     } else {
     548              :         false
     549              :     };
     550              :     if data_loss_suspected {
     551              :         tracing::error!(
     552              :             "Local data loss suspected: no tenants found on local filesystem, but re-attach request returned tenants"
     553              :         );
     554              :     }
     555              :     LOCAL_DATA_LOSS_SUSPECTED.set(if data_loss_suspected { 1 } else { 0 });
     556              : 
     557              :     tracing::info!(
     558              :         "Attaching {} tenants at startup, warming up {} at a time",
     559              :         tenant_configs.len(),
     560              :         conf.concurrent_tenant_warmup.initial_permits()
     561              :     );
     562              :     TENANT.startup_scheduled.inc_by(tenant_configs.len() as u64);
     563              : 
     564              :     // Accumulate futures for writing tenant configs, so that we can execute in parallel
     565              :     let mut config_write_futs = Vec::new();
     566              : 
     567              :     // Update the location configs according to the re-attach response and persist them to disk
     568              :     tracing::info!("Updating {} location configs", tenant_configs.len());
     569              :     for (tenant_shard_id, location_conf) in tenant_configs {
     570              :         let tenant_dir_path = conf.tenant_path(&tenant_shard_id);
     571              : 
     572              :         let mut location_conf = match location_conf {
     573              :             Ok(l) => l,
     574              :             Err(e) => {
     575              :                 // This should only happen in the case of a serialization bug or critical local I/O error: we cannot load this tenant
     576              :                 error!(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), "Failed to load tenant config, failed to {e:#}");
     577              :                 continue;
     578              :             }
     579              :         };
     580              : 
     581              :         // FIXME: if we were attached, and get demoted to secondary on re-attach, we
     582              :         // don't have a place to get a config.
     583              :         // (https://github.com/neondatabase/neon/issues/5377)
     584              :         const DEFAULT_SECONDARY_CONF: SecondaryLocationConfig =
     585              :             SecondaryLocationConfig { warm: true };
     586              : 
     587              :         if let Some(tenant_modes) = &tenant_modes {
     588              :             // We have a generation map: treat it as the authority for whether
     589              :             // this tenant is really attached.
     590              :             match tenant_modes.get(&tenant_shard_id) {
     591              :                 None => {
     592              :                     info!(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), "Detaching tenant, control plane omitted it in re-attach response");
     593              : 
     594              :                     match safe_rename_tenant_dir(&tenant_dir_path).await {
     595              :                         Ok(tmp_path) => {
     596              :                             background_purges.spawn(tmp_path);
     597              :                         }
     598              :                         Err(e) => {
     599              :                             error!(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(),
     600              :                             "Failed to move detached tenant directory '{tenant_dir_path}': {e:?}");
     601              :                         }
     602              :                     };
     603              : 
     604              :                     // We deleted local content: move on to next tenant, don't try and spawn this one.
     605              :                     continue;
     606              :                 }
     607              :                 Some(TenantStartupMode::Secondary) => {
     608              :                     if !matches!(location_conf.mode, LocationMode::Secondary(_)) {
     609              :                         location_conf.mode = LocationMode::Secondary(DEFAULT_SECONDARY_CONF);
     610              :                     }
     611              :                 }
     612              :                 Some(TenantStartupMode::Attached((attach_mode, generation, stripe_size))) => {
     613              :                     let old_gen_higher = match &location_conf.mode {
     614              :                         LocationMode::Attached(AttachedLocationConfig {
     615              :                             generation: old_generation,
     616              :                             attach_mode: _attach_mode,
     617              :                         }) => {
     618              :                             if old_generation > generation {
     619              :                                 Some(old_generation)
     620              :                             } else {
     621              :                                 None
     622              :                             }
     623              :                         }
     624              :                         _ => None,
     625              :                     };
     626              :                     if let Some(old_generation) = old_gen_higher {
     627              :                         tracing::error!(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(),
     628              :                             "Control plane gave decreasing generation ({generation:?}) in re-attach response for tenant that was attached in generation {:?}, demoting to secondary",
     629              :                             old_generation
     630              :                         );
     631              : 
     632              :                         // We cannot safely attach this tenant given a bogus generation number, but let's avoid throwing away
     633              :                         // local disk content: demote to secondary rather than detaching.
     634              :                         location_conf.mode = LocationMode::Secondary(DEFAULT_SECONDARY_CONF);
     635              :                     } else {
     636              :                         location_conf.attach_in_generation(*attach_mode, *generation, *stripe_size);
     637              :                     }
     638              :                 }
     639              :             }
     640              :         } else {
     641              :             // Legacy mode: no generation information, any tenant present
     642              :             // on local disk may activate
     643              :             info!(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), "Starting tenant in legacy mode, no generation",);
     644              :         };
     645              : 
     646              :         // Presence of a generation number implies attachment: attach the tenant
     647              :         // if it wasn't already, and apply the generation number.
     648            0 :         config_write_futs.push(async move {
     649            0 :             let r =
     650            0 :                 TenantShard::persist_tenant_config(conf, &tenant_shard_id, &location_conf).await;
     651            0 :             (tenant_shard_id, location_conf, r)
     652            0 :         });
     653              :     }
     654              : 
     655              :     // Execute config writes with concurrency, to avoid bottlenecking on local FS write latency
     656              :     tracing::info!(
     657              :         "Writing {} location config files...",
     658              :         config_write_futs.len()
     659              :     );
     660              :     let config_write_results = futures::stream::iter(config_write_futs)
     661              :         .buffer_unordered(16)
     662              :         .collect::<Vec<_>>()
     663              :         .await;
     664              : 
     665              :     tracing::info!(
     666              :         "Spawning {} tenant shard locations...",
     667              :         config_write_results.len()
     668              :     );
     669              :     // For those shards that have live configurations, construct `Tenant` or `SecondaryTenant` objects and start them running
     670              :     for (tenant_shard_id, location_conf, config_write_result) in config_write_results {
     671              :         // Writing a config to local disk is foundational to startup up tenants: panic if we can't.
     672              :         config_write_result.fatal_err("write tenant shard config file");
     673              : 
     674              :         let tenant_dir_path = conf.tenant_path(&tenant_shard_id);
     675              :         let shard_identity = location_conf.shard;
     676              :         let slot = match location_conf.mode {
     677              :             LocationMode::Attached(attached_conf) => TenantSlot::Attached(
     678              :                 tenant_spawn(
     679              :                     conf,
     680              :                     tenant_shard_id,
     681              :                     &tenant_dir_path,
     682              :                     resources.clone(),
     683              :                     AttachedTenantConf::new(conf, location_conf.tenant_conf, attached_conf),
     684              :                     shard_identity,
     685              :                     Some(init_order.clone()),
     686              :                     SpawnMode::Lazy,
     687              :                     &ctx,
     688              :                 )
     689              :                 .expect("global shutdown during init_tenant_mgr cannot happen"),
     690              :             ),
     691              :             LocationMode::Secondary(secondary_conf) => {
     692              :                 info!(
     693              :                     tenant_id = %tenant_shard_id.tenant_id,
     694              :                     shard_id = %tenant_shard_id.shard_slug(),
     695              :                     "Starting secondary tenant"
     696              :                 );
     697              :                 TenantSlot::Secondary(SecondaryTenant::new(
     698              :                     tenant_shard_id,
     699              :                     shard_identity,
     700              :                     location_conf.tenant_conf,
     701              :                     &secondary_conf,
     702              :                 ))
     703              :             }
     704              :         };
     705              : 
     706              :         METRICS.slot_inserted(&slot);
     707              :         tenants.insert(tenant_shard_id, slot);
     708              :     }
     709              : 
     710              :     info!("Processed {} local tenants at startup", tenants.len());
     711              : 
     712              :     let mut tenant_map = tenant_manager.tenants.write().unwrap();
     713              :     *tenant_map = TenantsMap::Open(tenants);
     714              : 
     715              :     Ok(())
     716              : }
     717              : 
     718              : /// Wrapper for Tenant::spawn that checks invariants before running
     719              : #[allow(clippy::too_many_arguments)]
     720            0 : fn tenant_spawn(
     721            0 :     conf: &'static PageServerConf,
     722            0 :     tenant_shard_id: TenantShardId,
     723            0 :     tenant_path: &Utf8Path,
     724            0 :     resources: TenantSharedResources,
     725            0 :     location_conf: AttachedTenantConf,
     726            0 :     shard_identity: ShardIdentity,
     727            0 :     init_order: Option<InitializationOrder>,
     728            0 :     mode: SpawnMode,
     729            0 :     ctx: &RequestContext,
     730            0 : ) -> Result<Arc<TenantShard>, GlobalShutDown> {
     731              :     // All these conditions should have been satisfied by our caller: the tenant dir exists, is a well formed
     732              :     // path, and contains a configuration file.  Assertions that do synchronous I/O are limited to debug mode
     733              :     // to avoid impacting prod runtime performance.
     734            0 :     assert!(!crate::is_temporary(tenant_path));
     735            0 :     debug_assert!(tenant_path.is_dir());
     736            0 :     debug_assert!(
     737            0 :         conf.tenant_location_config_path(&tenant_shard_id)
     738            0 :             .try_exists()
     739            0 :             .unwrap()
     740              :     );
     741              : 
     742            0 :     TenantShard::spawn(
     743            0 :         conf,
     744            0 :         tenant_shard_id,
     745            0 :         resources,
     746            0 :         location_conf,
     747            0 :         shard_identity,
     748            0 :         init_order,
     749            0 :         mode,
     750            0 :         ctx,
     751              :     )
     752            0 : }
     753              : 
     754              : #[derive(thiserror::Error, Debug)]
     755              : pub(crate) enum UpsertLocationError {
     756              :     #[error("Bad config request: {0}")]
     757              :     BadRequest(anyhow::Error),
     758              : 
     759              :     #[error("Cannot change config in this state: {0}")]
     760              :     Unavailable(#[from] TenantMapError),
     761              : 
     762              :     #[error("Tenant is already being modified")]
     763              :     InProgress,
     764              : 
     765              :     #[error("Failed to flush: {0}")]
     766              :     Flush(anyhow::Error),
     767              : 
     768              :     /// This error variant is for unexpected situations (soft assertions) where the system is in an unexpected state.
     769              :     #[error("Internal error: {0}")]
     770              :     InternalError(anyhow::Error),
     771              : }
     772              : 
     773              : impl TenantManager {
     774              :     /// Convenience function so that anyone with a TenantManager can get at the global configuration, without
     775              :     /// having to pass it around everywhere as a separate object.
     776            0 :     pub(crate) fn get_conf(&self) -> &'static PageServerConf {
     777            0 :         self.conf
     778            0 :     }
     779              : 
     780              :     /// Gets the attached tenant from the in-memory data, erroring if it's absent, in secondary mode, or currently
     781              :     /// undergoing a state change (i.e. slot is InProgress).
     782              :     ///
     783              :     /// The return TenantShard is not guaranteed to be active: check its status after obtaing it, or
     784              :     /// use [`TenantShard::wait_to_become_active`] before using it if you will do I/O on it.
     785            0 :     pub(crate) fn get_attached_tenant_shard(
     786            0 :         &self,
     787            0 :         tenant_shard_id: TenantShardId,
     788            0 :     ) -> Result<Arc<TenantShard>, GetTenantError> {
     789            0 :         let locked = self.tenants.read().unwrap();
     790              : 
     791            0 :         let peek_slot = tenant_map_peek_slot(&locked, &tenant_shard_id, TenantSlotPeekMode::Read)?;
     792              : 
     793            0 :         match peek_slot {
     794            0 :             Some(TenantSlot::Attached(tenant)) => Ok(Arc::clone(tenant)),
     795            0 :             Some(TenantSlot::InProgress(_)) => Err(GetTenantError::NotActive(tenant_shard_id)),
     796              :             None | Some(TenantSlot::Secondary(_)) => {
     797            0 :                 Err(GetTenantError::ShardNotFound(tenant_shard_id))
     798              :             }
     799              :         }
     800            0 :     }
     801              : 
     802            0 :     pub(crate) fn get_secondary_tenant_shard(
     803            0 :         &self,
     804            0 :         tenant_shard_id: TenantShardId,
     805            0 :     ) -> Option<Arc<SecondaryTenant>> {
     806            0 :         let locked = self.tenants.read().unwrap();
     807              : 
     808            0 :         let peek_slot = tenant_map_peek_slot(&locked, &tenant_shard_id, TenantSlotPeekMode::Read)
     809            0 :             .ok()
     810            0 :             .flatten();
     811              : 
     812            0 :         match peek_slot {
     813            0 :             Some(TenantSlot::Secondary(s)) => Some(s.clone()),
     814            0 :             _ => None,
     815              :         }
     816            0 :     }
     817              : 
     818              :     /// Whether the `TenantManager` is responsible for the tenant shard
     819            0 :     pub(crate) fn manages_tenant_shard(&self, tenant_shard_id: TenantShardId) -> bool {
     820            0 :         let locked = self.tenants.read().unwrap();
     821              : 
     822            0 :         let peek_slot = tenant_map_peek_slot(&locked, &tenant_shard_id, TenantSlotPeekMode::Read)
     823            0 :             .ok()
     824            0 :             .flatten();
     825              : 
     826            0 :         peek_slot.is_some()
     827            0 :     }
     828              : 
     829              :     #[instrument(skip_all, fields(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug()))]
     830              :     pub(crate) async fn upsert_location(
     831              :         &self,
     832              :         tenant_shard_id: TenantShardId,
     833              :         new_location_config: LocationConf,
     834              :         flush: Option<Duration>,
     835              :         mut spawn_mode: SpawnMode,
     836              :         ctx: &RequestContext,
     837              :     ) -> Result<Option<Arc<TenantShard>>, UpsertLocationError> {
     838              :         debug_assert_current_span_has_tenant_id();
     839              :         info!("configuring tenant location to state {new_location_config:?}");
     840              : 
     841              :         enum FastPathModified {
     842              :             Attached(Arc<TenantShard>),
     843              :             Secondary(Arc<SecondaryTenant>),
     844              :         }
     845              : 
     846              :         // Special case fast-path for updates to existing slots: if our upsert is only updating configuration,
     847              :         // then we do not need to set the slot to InProgress, we can just call into the
     848              :         // existng tenant.
     849              :         let fast_path_taken = {
     850              :             let locked = self.tenants.read().unwrap();
     851              :             let peek_slot =
     852              :                 tenant_map_peek_slot(&locked, &tenant_shard_id, TenantSlotPeekMode::Write)?;
     853              :             match (&new_location_config.mode, peek_slot) {
     854              :                 (LocationMode::Attached(attach_conf), Some(TenantSlot::Attached(tenant))) => {
     855              :                     match attach_conf.generation.cmp(&tenant.generation) {
     856              :                         Ordering::Equal => {
     857              :                             // A transition from Attached to Attached in the same generation, we may
     858              :                             // take our fast path and just provide the updated configuration
     859              :                             // to the tenant.
     860              :                             tenant.set_new_location_config(
     861              :                                 AttachedTenantConf::try_from(
     862              :                                     self.conf,
     863              :                                     new_location_config.clone(),
     864              :                                 )
     865              :                                 .map_err(UpsertLocationError::BadRequest)?,
     866              :                             );
     867              : 
     868              :                             Some(FastPathModified::Attached(tenant.clone()))
     869              :                         }
     870              :                         Ordering::Less => {
     871              :                             return Err(UpsertLocationError::BadRequest(anyhow::anyhow!(
     872              :                                 "Generation {:?} is less than existing {:?}",
     873              :                                 attach_conf.generation,
     874              :                                 tenant.generation
     875              :                             )));
     876              :                         }
     877              :                         Ordering::Greater => {
     878              :                             // Generation advanced, fall through to general case of replacing `Tenant` object
     879              :                             None
     880              :                         }
     881              :                     }
     882              :                 }
     883              :                 (
     884              :                     LocationMode::Secondary(secondary_conf),
     885              :                     Some(TenantSlot::Secondary(secondary_tenant)),
     886              :                 ) => {
     887              :                     secondary_tenant.set_config(secondary_conf);
     888              :                     secondary_tenant.set_tenant_conf(&new_location_config.tenant_conf);
     889              :                     Some(FastPathModified::Secondary(secondary_tenant.clone()))
     890              :                 }
     891              :                 _ => {
     892              :                     // Not an Attached->Attached transition, fall through to general case
     893              :                     None
     894              :                 }
     895              :             }
     896              :         };
     897              : 
     898              :         // Fast-path continued: having dropped out of the self.tenants lock, do the async
     899              :         // phase of writing config and/or waiting for flush, before returning.
     900              :         match fast_path_taken {
     901              :             Some(FastPathModified::Attached(tenant)) => {
     902              :                 tenant
     903              :                     .shard_identity
     904              :                     .assert_equal(new_location_config.shard);
     905              :                 TenantShard::persist_tenant_config(
     906              :                     self.conf,
     907              :                     &tenant_shard_id,
     908              :                     &new_location_config,
     909              :                 )
     910              :                 .await
     911              :                 .fatal_err("write tenant shard config");
     912              : 
     913              :                 // Transition to AttachedStale means we may well hold a valid generation
     914              :                 // still, and have been requested to go stale as part of a migration.  If
     915              :                 // the caller set `flush`, then flush to remote storage.
     916              :                 if let LocationMode::Attached(AttachedLocationConfig {
     917              :                     generation: _,
     918              :                     attach_mode: AttachmentMode::Stale,
     919              :                 }) = &new_location_config.mode
     920              :                 {
     921              :                     if let Some(flush_timeout) = flush {
     922              :                         match tokio::time::timeout(flush_timeout, tenant.flush_remote()).await {
     923              :                             Ok(Err(e)) => {
     924              :                                 return Err(UpsertLocationError::Flush(e));
     925              :                             }
     926              :                             Ok(Ok(_)) => return Ok(Some(tenant)),
     927              :                             Err(_) => {
     928              :                                 tracing::warn!(
     929              :                                     timeout_ms = flush_timeout.as_millis(),
     930              :                                     "Timed out waiting for flush to remote storage, proceeding anyway."
     931              :                                 )
     932              :                             }
     933              :                         }
     934              :                     }
     935              :                 }
     936              : 
     937              :                 return Ok(Some(tenant));
     938              :             }
     939              :             Some(FastPathModified::Secondary(secondary_tenant)) => {
     940              :                 secondary_tenant
     941              :                     .shard_identity
     942              :                     .assert_equal(new_location_config.shard);
     943              :                 TenantShard::persist_tenant_config(
     944              :                     self.conf,
     945              :                     &tenant_shard_id,
     946              :                     &new_location_config,
     947              :                 )
     948              :                 .await
     949              :                 .fatal_err("write tenant shard config");
     950              : 
     951              :                 return Ok(None);
     952              :             }
     953              :             None => {
     954              :                 // Proceed with the general case procedure, where we will shutdown & remove any existing
     955              :                 // slot contents and replace with a fresh one
     956              :             }
     957              :         };
     958              : 
     959              :         // General case for upserts to TenantsMap, excluding the case above: we will substitute an
     960              :         // InProgress value to the slot while we make whatever changes are required.  The state for
     961              :         // the tenant is inaccessible to the outside world while we are doing this, but that is sensible:
     962              :         // the state is ill-defined while we're in transition.  Transitions are async, but fast: we do
     963              :         // not do significant I/O, and shutdowns should be prompt via cancellation tokens.
     964              :         let mut slot_guard = self
     965              :             .tenant_map_acquire_slot(&tenant_shard_id, TenantSlotAcquireMode::Any)
     966            0 :             .map_err(|e| match e {
     967              :                 TenantSlotError::NotFound(_) => {
     968            0 :                     unreachable!("Called with mode Any")
     969              :                 }
     970            0 :                 TenantSlotError::InProgress => UpsertLocationError::InProgress,
     971            0 :                 TenantSlotError::MapState(s) => UpsertLocationError::Unavailable(s),
     972            0 :             })?;
     973              : 
     974              :         match slot_guard.get_old_value() {
     975              :             Some(TenantSlot::Attached(tenant)) => {
     976              :                 tenant
     977              :                     .shard_identity
     978              :                     .assert_equal(new_location_config.shard);
     979              : 
     980              :                 // The case where we keep a Tenant alive was covered above in the special case
     981              :                 // for Attached->Attached transitions in the same generation.  By this point,
     982              :                 // if we see an attached tenant we know it will be discarded and should be
     983              :                 // shut down.
     984              :                 let (_guard, progress) = utils::completion::channel();
     985              : 
     986              :                 match tenant.get_attach_mode() {
     987              :                     AttachmentMode::Single | AttachmentMode::Multi => {
     988              :                         // Before we leave our state as the presumed holder of the latest generation,
     989              :                         // flush any outstanding deletions to reduce the risk of leaking objects.
     990              :                         self.resources.deletion_queue_client.flush_advisory()
     991              :                     }
     992              :                     AttachmentMode::Stale => {
     993              :                         // If we're stale there's not point trying to flush deletions
     994              :                     }
     995              :                 };
     996              : 
     997              :                 info!("Shutting down attached tenant");
     998              :                 match tenant.shutdown(progress, ShutdownMode::Hard).await {
     999              :                     Ok(()) => {}
    1000              :                     Err(barrier) => {
    1001              :                         info!("Shutdown already in progress, waiting for it to complete");
    1002              :                         barrier.wait().await;
    1003              :                     }
    1004              :                 }
    1005              :                 slot_guard.drop_old_value().expect("We just shut it down");
    1006              : 
    1007              :                 // Edge case: if we were called with SpawnMode::Create, but a Tenant already existed, then
    1008              :                 // the caller thinks they're creating but the tenant already existed.  We must switch to
    1009              :                 // Eager mode so that when starting this Tenant we properly probe remote storage for timelines,
    1010              :                 // rather than assuming it to be empty.
    1011              :                 spawn_mode = SpawnMode::Eager;
    1012              :             }
    1013              :             Some(TenantSlot::Secondary(secondary_tenant)) => {
    1014              :                 secondary_tenant
    1015              :                     .shard_identity
    1016              :                     .assert_equal(new_location_config.shard);
    1017              : 
    1018              :                 info!("Shutting down secondary tenant");
    1019              :                 secondary_tenant.shutdown().await;
    1020              :             }
    1021              :             Some(TenantSlot::InProgress(_)) => {
    1022              :                 // This should never happen: acquire_slot should error out
    1023              :                 // if the contents of a slot were InProgress.
    1024              :                 return Err(UpsertLocationError::InternalError(anyhow::anyhow!(
    1025              :                     "Acquired an InProgress slot, this is a bug."
    1026              :                 )));
    1027              :             }
    1028              :             None => {
    1029              :                 // Slot was vacant, nothing needs shutting down.
    1030              :             }
    1031              :         }
    1032              : 
    1033              :         let tenant_path = self.conf.tenant_path(&tenant_shard_id);
    1034              :         let timelines_path = self.conf.timelines_path(&tenant_shard_id);
    1035              : 
    1036              :         // Directory structure is the same for attached and secondary modes:
    1037              :         // create it if it doesn't exist.  Timeline load/creation expects the
    1038              :         // timelines/ subdir to already exist.
    1039              :         //
    1040              :         // Does not need to be fsync'd because local storage is just a cache.
    1041              :         tokio::fs::create_dir_all(&timelines_path)
    1042              :             .await
    1043              :             .fatal_err("create timelines/ dir");
    1044              : 
    1045              :         // Before activating either secondary or attached mode, persist the
    1046              :         // configuration, so that on restart we will re-attach (or re-start
    1047              :         // secondary) on the tenant.
    1048              :         TenantShard::persist_tenant_config(self.conf, &tenant_shard_id, &new_location_config)
    1049              :             .await
    1050              :             .fatal_err("write tenant shard config");
    1051              : 
    1052              :         let new_slot = match &new_location_config.mode {
    1053              :             LocationMode::Secondary(secondary_config) => {
    1054              :                 let shard_identity = new_location_config.shard;
    1055              :                 TenantSlot::Secondary(SecondaryTenant::new(
    1056              :                     tenant_shard_id,
    1057              :                     shard_identity,
    1058              :                     new_location_config.tenant_conf,
    1059              :                     secondary_config,
    1060              :                 ))
    1061              :             }
    1062              :             LocationMode::Attached(_attach_config) => {
    1063              :                 let shard_identity = new_location_config.shard;
    1064              : 
    1065              :                 // Testing hack: if we are configured with no control plane, then drop the generation
    1066              :                 // from upserts.  This enables creating generation-less tenants even though neon_local
    1067              :                 // always uses generations when calling the location conf API.
    1068              :                 let attached_conf = AttachedTenantConf::try_from(self.conf, new_location_config)
    1069              :                     .map_err(UpsertLocationError::BadRequest)?;
    1070              : 
    1071              :                 let tenant = tenant_spawn(
    1072              :                     self.conf,
    1073              :                     tenant_shard_id,
    1074              :                     &tenant_path,
    1075              :                     self.resources.clone(),
    1076              :                     attached_conf,
    1077              :                     shard_identity,
    1078              :                     None,
    1079              :                     spawn_mode,
    1080              :                     ctx,
    1081              :                 )
    1082            0 :                 .map_err(|_: GlobalShutDown| {
    1083            0 :                     UpsertLocationError::Unavailable(TenantMapError::ShuttingDown)
    1084            0 :                 })?;
    1085              : 
    1086              :                 TenantSlot::Attached(tenant)
    1087              :             }
    1088              :         };
    1089              : 
    1090              :         let attached_tenant = if let TenantSlot::Attached(tenant) = &new_slot {
    1091              :             Some(tenant.clone())
    1092              :         } else {
    1093              :             None
    1094              :         };
    1095              : 
    1096              :         match slot_guard.upsert(new_slot) {
    1097              :             Err(TenantSlotUpsertError::InternalError(e)) => {
    1098              :                 Err(UpsertLocationError::InternalError(anyhow::anyhow!(e)))
    1099              :             }
    1100              :             Err(TenantSlotUpsertError::MapState(e)) => Err(UpsertLocationError::Unavailable(e)),
    1101              :             Err(TenantSlotUpsertError::ShuttingDown((new_slot, _completion))) => {
    1102              :                 // If we just called tenant_spawn() on a new tenant, and can't insert it into our map, then
    1103              :                 // we must not leak it: this would violate the invariant that after shutdown_all_tenants, all tenants
    1104              :                 // are shutdown.
    1105              :                 //
    1106              :                 // We must shut it down inline here.
    1107              :                 match new_slot {
    1108              :                     TenantSlot::InProgress(_) => {
    1109              :                         // Unreachable because we never insert an InProgress
    1110              :                         unreachable!()
    1111              :                     }
    1112              :                     TenantSlot::Attached(tenant) => {
    1113              :                         let (_guard, progress) = utils::completion::channel();
    1114              :                         info!(
    1115              :                             "Shutting down just-spawned tenant, because tenant manager is shut down"
    1116              :                         );
    1117              :                         match tenant.shutdown(progress, ShutdownMode::Hard).await {
    1118              :                             Ok(()) => {
    1119              :                                 info!("Finished shutting down just-spawned tenant");
    1120              :                             }
    1121              :                             Err(barrier) => {
    1122              :                                 info!("Shutdown already in progress, waiting for it to complete");
    1123              :                                 barrier.wait().await;
    1124              :                             }
    1125              :                         }
    1126              :                     }
    1127              :                     TenantSlot::Secondary(secondary_tenant) => {
    1128              :                         secondary_tenant.shutdown().await;
    1129              :                     }
    1130              :                 }
    1131              : 
    1132              :                 Err(UpsertLocationError::Unavailable(
    1133              :                     TenantMapError::ShuttingDown,
    1134              :                 ))
    1135              :             }
    1136              :             Ok(()) => Ok(attached_tenant),
    1137              :         }
    1138              :     }
    1139              : 
    1140            1 :     fn tenant_map_acquire_slot(
    1141            1 :         &self,
    1142            1 :         tenant_shard_id: &TenantShardId,
    1143            1 :         mode: TenantSlotAcquireMode,
    1144            1 :     ) -> Result<SlotGuard, TenantSlotError> {
    1145              :         use TenantSlotAcquireMode::*;
    1146            1 :         METRICS.tenant_slot_writes.inc();
    1147              : 
    1148            1 :         let mut locked = self.tenants.write().unwrap();
    1149            1 :         let span = tracing::info_span!("acquire_slot", tenant_id=%tenant_shard_id.tenant_id, shard_id = %tenant_shard_id.shard_slug());
    1150            1 :         let _guard = span.enter();
    1151              : 
    1152            1 :         let m = match &mut *locked {
    1153            0 :             TenantsMap::Initializing => return Err(TenantMapError::StillInitializing.into()),
    1154            0 :             TenantsMap::ShuttingDown(_) => return Err(TenantMapError::ShuttingDown.into()),
    1155            1 :             TenantsMap::Open(m) => m,
    1156              :         };
    1157              : 
    1158              :         use std::collections::btree_map::Entry;
    1159              : 
    1160            1 :         let entry = m.entry(*tenant_shard_id);
    1161              : 
    1162            1 :         match entry {
    1163            0 :             Entry::Vacant(v) => match mode {
    1164              :                 MustExist => {
    1165            0 :                     tracing::debug!("Vacant && MustExist: return NotFound");
    1166            0 :                     Err(TenantSlotError::NotFound(*tenant_shard_id))
    1167              :                 }
    1168              :                 _ => {
    1169            0 :                     let (completion, barrier) = utils::completion::channel();
    1170            0 :                     let inserting = TenantSlot::InProgress(barrier);
    1171            0 :                     METRICS.slot_inserted(&inserting);
    1172            0 :                     v.insert(inserting);
    1173            0 :                     tracing::debug!("Vacant, inserted InProgress");
    1174            0 :                     Ok(SlotGuard::new(
    1175            0 :                         *tenant_shard_id,
    1176            0 :                         None,
    1177            0 :                         completion,
    1178            0 :                         &self.tenants,
    1179            0 :                     ))
    1180              :                 }
    1181              :             },
    1182            1 :             Entry::Occupied(mut o) => {
    1183              :                 // Apply mode-driven checks
    1184            1 :                 match (o.get(), mode) {
    1185              :                     (TenantSlot::InProgress(_), _) => {
    1186            0 :                         tracing::debug!("Occupied, failing for InProgress");
    1187            0 :                         Err(TenantSlotError::InProgress)
    1188              :                     }
    1189              :                     _ => {
    1190              :                         // Happy case: the slot was not in any state that violated our mode
    1191            1 :                         let (completion, barrier) = utils::completion::channel();
    1192            1 :                         let in_progress = TenantSlot::InProgress(barrier);
    1193            1 :                         METRICS.slot_inserted(&in_progress);
    1194            1 :                         let old_value = o.insert(in_progress);
    1195            1 :                         METRICS.slot_removed(&old_value);
    1196            1 :                         tracing::debug!("Occupied, replaced with InProgress");
    1197            1 :                         Ok(SlotGuard::new(
    1198            1 :                             *tenant_shard_id,
    1199            1 :                             Some(old_value),
    1200            1 :                             completion,
    1201            1 :                             &self.tenants,
    1202            1 :                         ))
    1203              :                     }
    1204              :                 }
    1205              :             }
    1206              :         }
    1207            1 :     }
    1208              : 
    1209              :     /// Resetting a tenant is equivalent to detaching it, then attaching it again with the same
    1210              :     /// LocationConf that was last used to attach it.  Optionally, the local file cache may be
    1211              :     /// dropped before re-attaching.
    1212              :     ///
    1213              :     /// This is not part of a tenant's normal lifecycle: it is used for debug/support, in situations
    1214              :     /// where an issue is identified that would go away with a restart of the tenant.
    1215              :     ///
    1216              :     /// This does not have any special "force" shutdown of a tenant: it relies on the tenant's tasks
    1217              :     /// to respect the cancellation tokens used in normal shutdown().
    1218              :     #[instrument(skip_all, fields(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), %drop_cache))]
    1219              :     pub(crate) async fn reset_tenant(
    1220              :         &self,
    1221              :         tenant_shard_id: TenantShardId,
    1222              :         drop_cache: bool,
    1223              :         ctx: &RequestContext,
    1224              :     ) -> anyhow::Result<()> {
    1225              :         let mut slot_guard =
    1226              :             self.tenant_map_acquire_slot(&tenant_shard_id, TenantSlotAcquireMode::Any)?;
    1227              :         let Some(old_slot) = slot_guard.get_old_value() else {
    1228              :             anyhow::bail!("Tenant not found when trying to reset");
    1229              :         };
    1230              : 
    1231              :         let Some(tenant) = old_slot.get_attached() else {
    1232              :             slot_guard.revert();
    1233              :             anyhow::bail!("Tenant is not in attached state");
    1234              :         };
    1235              : 
    1236              :         let (_guard, progress) = utils::completion::channel();
    1237              :         match tenant.shutdown(progress, ShutdownMode::Hard).await {
    1238              :             Ok(()) => {
    1239              :                 slot_guard.drop_old_value()?;
    1240              :             }
    1241              :             Err(_barrier) => {
    1242              :                 slot_guard.revert();
    1243              :                 anyhow::bail!("Cannot reset Tenant, already shutting down");
    1244              :             }
    1245              :         }
    1246              : 
    1247              :         let tenant_path = self.conf.tenant_path(&tenant_shard_id);
    1248              :         let timelines_path = self.conf.timelines_path(&tenant_shard_id);
    1249              :         let config = TenantShard::load_tenant_config(self.conf, &tenant_shard_id)?;
    1250              : 
    1251              :         if drop_cache {
    1252              :             tracing::info!("Dropping local file cache");
    1253              : 
    1254              :             match tokio::fs::read_dir(&timelines_path).await {
    1255              :                 Err(e) => {
    1256              :                     tracing::warn!("Failed to list timelines while dropping cache: {}", e);
    1257              :                 }
    1258              :                 Ok(mut entries) => {
    1259              :                     while let Some(entry) = entries.next_entry().await? {
    1260              :                         tokio::fs::remove_dir_all(entry.path()).await?;
    1261              :                     }
    1262              :                 }
    1263              :             }
    1264              :         }
    1265              : 
    1266              :         let shard_identity = config.shard;
    1267              :         let tenant = tenant_spawn(
    1268              :             self.conf,
    1269              :             tenant_shard_id,
    1270              :             &tenant_path,
    1271              :             self.resources.clone(),
    1272              :             AttachedTenantConf::try_from(self.conf, config)?,
    1273              :             shard_identity,
    1274              :             None,
    1275              :             SpawnMode::Eager,
    1276              :             ctx,
    1277              :         )?;
    1278              : 
    1279              :         slot_guard.upsert(TenantSlot::Attached(tenant))?;
    1280              : 
    1281              :         Ok(())
    1282              :     }
    1283              : 
    1284            0 :     pub(crate) fn get_attached_active_tenant_shards(&self) -> Vec<Arc<TenantShard>> {
    1285            0 :         let locked = self.tenants.read().unwrap();
    1286            0 :         match &*locked {
    1287            0 :             TenantsMap::Initializing => Vec::new(),
    1288            0 :             TenantsMap::Open(map) | TenantsMap::ShuttingDown(map) => map
    1289            0 :                 .values()
    1290            0 :                 .filter_map(|slot| {
    1291            0 :                     slot.get_attached()
    1292            0 :                         .and_then(|t| if t.is_active() { Some(t.clone()) } else { None })
    1293            0 :                 })
    1294            0 :                 .collect(),
    1295              :         }
    1296            0 :     }
    1297              :     // Do some synchronous work for all tenant slots in Secondary state.  The provided
    1298              :     // callback should be small and fast, as it will be called inside the global
    1299              :     // TenantsMap lock.
    1300            0 :     pub(crate) fn foreach_secondary_tenants<F>(&self, mut func: F)
    1301            0 :     where
    1302            0 :         // TODO: let the callback return a hint to drop out of the loop early
    1303            0 :         F: FnMut(&TenantShardId, &Arc<SecondaryTenant>),
    1304              :     {
    1305            0 :         let locked = self.tenants.read().unwrap();
    1306              : 
    1307            0 :         let map = match &*locked {
    1308            0 :             TenantsMap::Initializing | TenantsMap::ShuttingDown(_) => return,
    1309            0 :             TenantsMap::Open(m) => m,
    1310              :         };
    1311              : 
    1312            0 :         for (tenant_id, slot) in map {
    1313            0 :             if let TenantSlot::Secondary(state) = slot {
    1314              :                 // Only expose secondary tenants that are not currently shutting down
    1315            0 :                 if !state.cancel.is_cancelled() {
    1316            0 :                     func(tenant_id, state)
    1317            0 :                 }
    1318            0 :             }
    1319              :         }
    1320            0 :     }
    1321              : 
    1322              :     /// Total list of all tenant slots: this includes attached, secondary, and InProgress.
    1323            0 :     pub(crate) fn list(&self) -> Vec<(TenantShardId, TenantSlot)> {
    1324            0 :         let locked = self.tenants.read().unwrap();
    1325            0 :         match &*locked {
    1326            0 :             TenantsMap::Initializing => Vec::new(),
    1327            0 :             TenantsMap::Open(map) | TenantsMap::ShuttingDown(map) => {
    1328            0 :                 map.iter().map(|(k, v)| (*k, v.clone())).collect()
    1329              :             }
    1330              :         }
    1331            0 :     }
    1332              : 
    1333            0 :     pub(crate) fn get(&self, tenant_shard_id: TenantShardId) -> Option<TenantSlot> {
    1334            0 :         let locked = self.tenants.read().unwrap();
    1335            0 :         match &*locked {
    1336            0 :             TenantsMap::Initializing => None,
    1337            0 :             TenantsMap::Open(map) | TenantsMap::ShuttingDown(map) => {
    1338            0 :                 map.get(&tenant_shard_id).cloned()
    1339              :             }
    1340              :         }
    1341            0 :     }
    1342              : 
    1343              :     /// If a tenant is attached, detach it.  Then remove its data from remote storage.
    1344              :     ///
    1345              :     /// A tenant is considered deleted once it is gone from remote storage.  It is the caller's
    1346              :     /// responsibility to avoid trying to attach the tenant again or use it any way once deletion
    1347              :     /// has started: this operation is not atomic, and must be retried until it succeeds.
    1348              :     ///
    1349              :     /// As a special case, if an unsharded tenant ID is given for a sharded tenant, it will remove
    1350              :     /// all tenant shards in remote storage (removing all paths with the tenant prefix). The storage
    1351              :     /// controller uses this to purge all remote tenant data, including any stale parent shards that
    1352              :     /// may remain after splits. Ideally, this special case would be handled elsewhere. See:
    1353              :     /// <https://github.com/neondatabase/neon/pull/9394>.
    1354            0 :     pub(crate) async fn delete_tenant(
    1355            0 :         &self,
    1356            0 :         tenant_shard_id: TenantShardId,
    1357            0 :     ) -> Result<(), DeleteTenantError> {
    1358            0 :         super::span::debug_assert_current_span_has_tenant_id();
    1359              : 
    1360            0 :         async fn delete_local(
    1361            0 :             conf: &PageServerConf,
    1362            0 :             background_purges: &BackgroundPurges,
    1363            0 :             tenant_shard_id: &TenantShardId,
    1364            0 :         ) -> anyhow::Result<()> {
    1365            0 :             let local_tenant_directory = conf.tenant_path(tenant_shard_id);
    1366            0 :             let tmp_dir = safe_rename_tenant_dir(&local_tenant_directory)
    1367            0 :                 .await
    1368            0 :                 .with_context(|| {
    1369            0 :                     format!("local tenant directory {local_tenant_directory:?} rename")
    1370            0 :                 })?;
    1371            0 :             background_purges.spawn(tmp_dir);
    1372            0 :             Ok(())
    1373            0 :         }
    1374              : 
    1375            0 :         let slot_guard =
    1376            0 :             self.tenant_map_acquire_slot(&tenant_shard_id, TenantSlotAcquireMode::Any)?;
    1377            0 :         match &slot_guard.old_value {
    1378            0 :             Some(TenantSlot::Attached(tenant)) => {
    1379              :                 // Legacy deletion flow: the tenant remains attached, goes to Stopping state, and
    1380              :                 // deletion will be resumed across restarts.
    1381            0 :                 let tenant = tenant.clone();
    1382            0 :                 let (_guard, progress) = utils::completion::channel();
    1383            0 :                 match tenant.shutdown(progress, ShutdownMode::Hard).await {
    1384            0 :                     Ok(()) => {}
    1385            0 :                     Err(barrier) => {
    1386            0 :                         info!("Shutdown already in progress, waiting for it to complete");
    1387            0 :                         barrier.wait().await;
    1388              :                     }
    1389              :                 }
    1390            0 :                 delete_local(self.conf, &self.background_purges, &tenant_shard_id).await?;
    1391              :             }
    1392            0 :             Some(TenantSlot::Secondary(secondary_tenant)) => {
    1393            0 :                 secondary_tenant.shutdown().await;
    1394              : 
    1395            0 :                 delete_local(self.conf, &self.background_purges, &tenant_shard_id).await?;
    1396              :             }
    1397            0 :             Some(TenantSlot::InProgress(_)) => unreachable!(),
    1398            0 :             None => {}
    1399              :         };
    1400              : 
    1401              :         // Fall through: local state for this tenant is no longer present, proceed with remote delete.
    1402              :         // - We use a retry wrapper here so that common transient S3 errors (e.g. 503, 429) do not result
    1403              :         //   in 500 responses to delete requests.
    1404              :         // - We keep the `SlotGuard` during this I/O, so that if a concurrent delete request comes in, it will
    1405              :         //   503/retry, rather than kicking off a wasteful concurrent deletion.
    1406              :         // NB: this also deletes partial prefixes, i.e. a <tenant_id> path will delete all
    1407              :         // <tenant_id>_<shard_id>/* objects. See method comment for why.
    1408            0 :         backoff::retry(
    1409            0 :             || async move {
    1410            0 :                 self.resources
    1411            0 :                     .remote_storage
    1412            0 :                     .delete_prefix(&remote_tenant_path(&tenant_shard_id), &self.cancel)
    1413            0 :                     .await
    1414            0 :             },
    1415              :             |_| false, // backoff::retry handles cancellation
    1416              :             1,
    1417              :             3,
    1418            0 :             &format!("delete_tenant[tenant_shard_id={tenant_shard_id}]"),
    1419            0 :             &self.cancel,
    1420              :         )
    1421            0 :         .await
    1422            0 :         .unwrap_or(Err(TimeoutOrCancel::Cancel.into()))
    1423            0 :         .map_err(|err| {
    1424            0 :             if TimeoutOrCancel::caused_by_cancel(&err) {
    1425            0 :                 return DeleteTenantError::Cancelled;
    1426            0 :             }
    1427            0 :             DeleteTenantError::Other(err)
    1428            0 :         })
    1429            0 :     }
    1430              : 
    1431              :     #[instrument(skip_all, fields(tenant_id=%tenant.get_tenant_shard_id().tenant_id, shard_id=%tenant.get_tenant_shard_id().shard_slug(), new_shard_count=%new_shard_count.literal()))]
    1432              :     pub(crate) async fn shard_split(
    1433              :         &self,
    1434              :         tenant: Arc<TenantShard>,
    1435              :         new_shard_count: ShardCount,
    1436              :         new_stripe_size: Option<ShardStripeSize>,
    1437              :         ctx: &RequestContext,
    1438              :     ) -> anyhow::Result<Vec<TenantShardId>> {
    1439              :         let tenant_shard_id = *tenant.get_tenant_shard_id();
    1440              :         let r = self
    1441              :             .do_shard_split(tenant, new_shard_count, new_stripe_size, ctx)
    1442              :             .await;
    1443              :         if r.is_err() {
    1444              :             // Shard splitting might have left the original shard in a partially shut down state (it
    1445              :             // stops the shard's remote timeline client).  Reset it to ensure we leave things in
    1446              :             // a working state.
    1447              :             if self.get(tenant_shard_id).is_some() {
    1448              :                 tracing::warn!("Resetting after shard split failure");
    1449              :                 if let Err(e) = self.reset_tenant(tenant_shard_id, false, ctx).await {
    1450              :                     // Log this error because our return value will still be the original error, not this one.  This is
    1451              :                     // a severe error: if this happens, we might be leaving behind a tenant that is not fully functional
    1452              :                     // (e.g. has uploads disabled).  We can't do anything else: if reset fails then shutting the tenant down or
    1453              :                     // setting it broken probably won't help either.
    1454              :                     tracing::error!("Failed to reset: {e}");
    1455              :                 }
    1456              :             }
    1457              :         }
    1458              : 
    1459              :         r
    1460              :     }
    1461              : 
    1462            0 :     pub(crate) async fn do_shard_split(
    1463            0 :         &self,
    1464            0 :         tenant: Arc<TenantShard>,
    1465            0 :         new_shard_count: ShardCount,
    1466            0 :         new_stripe_size: Option<ShardStripeSize>,
    1467            0 :         ctx: &RequestContext,
    1468            0 :     ) -> anyhow::Result<Vec<TenantShardId>> {
    1469            0 :         let tenant_shard_id = *tenant.get_tenant_shard_id();
    1470              : 
    1471              :         // Validate the incoming request
    1472            0 :         if new_shard_count.count() <= tenant_shard_id.shard_count.count() {
    1473            0 :             anyhow::bail!("Requested shard count is not an increase");
    1474            0 :         }
    1475            0 :         let expansion_factor = new_shard_count.count() / tenant_shard_id.shard_count.count();
    1476            0 :         if !expansion_factor.is_power_of_two() {
    1477            0 :             anyhow::bail!("Requested split is not a power of two");
    1478            0 :         }
    1479              : 
    1480            0 :         if let Some(new_stripe_size) = new_stripe_size {
    1481            0 :             if tenant.get_shard_stripe_size() != new_stripe_size
    1482            0 :                 && tenant_shard_id.shard_count.count() > 1
    1483              :             {
    1484              :                 // This tenant already has multiple shards, it is illegal to try and change its stripe size
    1485            0 :                 anyhow::bail!(
    1486            0 :                     "Shard stripe size may not be modified once tenant has multiple shards"
    1487              :                 );
    1488            0 :             }
    1489            0 :         }
    1490              : 
    1491              :         // Plan: identify what the new child shards will be
    1492            0 :         let child_shards = tenant_shard_id.split(new_shard_count);
    1493            0 :         tracing::info!(
    1494            0 :             "Shard {} splits into: {}",
    1495            0 :             tenant_shard_id.to_index(),
    1496            0 :             child_shards
    1497            0 :                 .iter()
    1498            0 :                 .map(|id| format!("{}", id.to_index()))
    1499            0 :                 .join(",")
    1500              :         );
    1501              : 
    1502            0 :         fail::fail_point!("shard-split-pre-prepare", |_| Err(anyhow::anyhow!(
    1503            0 :             "failpoint"
    1504              :         )));
    1505              : 
    1506            0 :         let parent_shard_identity = tenant.shard_identity;
    1507            0 :         let parent_tenant_conf = tenant.get_tenant_conf();
    1508            0 :         let parent_generation = tenant.generation;
    1509              : 
    1510              :         // Phase 1: Write out child shards' remote index files, in the parent tenant's current generation
    1511            0 :         if let Err(e) = tenant.split_prepare(&child_shards).await {
    1512              :             // If [`Tenant::split_prepare`] fails, we must reload the tenant, because it might
    1513              :             // have been left in a partially-shut-down state.
    1514            0 :             tracing::warn!("Failed to prepare for split: {e}, reloading Tenant before returning");
    1515            0 :             return Err(e);
    1516            0 :         }
    1517              : 
    1518            0 :         fail::fail_point!("shard-split-post-prepare", |_| Err(anyhow::anyhow!(
    1519            0 :             "failpoint"
    1520              :         )));
    1521              : 
    1522            0 :         self.resources.deletion_queue_client.flush_advisory();
    1523              : 
    1524              :         // Phase 2: Put the parent shard to InProgress and grab a reference to the parent Tenant
    1525              :         //
    1526              :         // TODO: keeping the parent as InProgress while spawning the children causes read
    1527              :         // unavailability, as we can't acquire a timeline handle for it. The parent should be
    1528              :         // available for reads until the children are ready -- potentially until *all* subsplits
    1529              :         // across all parent shards are complete and the compute has been notified. See:
    1530              :         // <https://databricks.atlassian.net/browse/LKB-672>.
    1531            0 :         drop(tenant);
    1532            0 :         let mut parent_slot_guard =
    1533            0 :             self.tenant_map_acquire_slot(&tenant_shard_id, TenantSlotAcquireMode::Any)?;
    1534            0 :         let parent = match parent_slot_guard.get_old_value() {
    1535            0 :             Some(TenantSlot::Attached(t)) => t,
    1536            0 :             Some(TenantSlot::Secondary(_)) => anyhow::bail!("Tenant location in secondary mode"),
    1537              :             Some(TenantSlot::InProgress(_)) => {
    1538              :                 // tenant_map_acquire_slot never returns InProgress, if a slot was InProgress
    1539              :                 // it would return an error.
    1540            0 :                 unreachable!()
    1541              :             }
    1542              :             None => {
    1543              :                 // We don't actually need the parent shard to still be attached to do our work, but it's
    1544              :                 // a weird enough situation that the caller probably didn't want us to continue working
    1545              :                 // if they had detached the tenant they requested the split on.
    1546            0 :                 anyhow::bail!("Detached parent shard in the middle of split!")
    1547              :             }
    1548              :         };
    1549            0 :         fail::fail_point!("shard-split-pre-hardlink", |_| Err(anyhow::anyhow!(
    1550            0 :             "failpoint"
    1551              :         )));
    1552              :         // Optimization: hardlink layers from the parent into the children, so that they don't have to
    1553              :         // re-download & duplicate the data referenced in their initial IndexPart
    1554            0 :         self.shard_split_hardlink(parent, child_shards.clone())
    1555            0 :             .await?;
    1556            0 :         fail::fail_point!("shard-split-post-hardlink", |_| Err(anyhow::anyhow!(
    1557            0 :             "failpoint"
    1558              :         )));
    1559              : 
    1560              :         // Take a snapshot of where the parent's WAL ingest had got to: we will wait for
    1561              :         // child shards to reach this point.
    1562            0 :         let mut target_lsns = HashMap::new();
    1563            0 :         for timeline in parent.timelines.lock().unwrap().clone().values() {
    1564            0 :             target_lsns.insert(timeline.timeline_id, timeline.get_last_record_lsn());
    1565            0 :         }
    1566              : 
    1567              :         // TODO: we should have the parent shard stop its WAL ingest here, it's a waste of resources
    1568              :         // and could slow down the children trying to catch up.
    1569              : 
    1570              :         // Phase 3: Spawn the child shards
    1571            0 :         for child_shard in &child_shards {
    1572            0 :             let mut child_shard_identity = parent_shard_identity;
    1573            0 :             if let Some(new_stripe_size) = new_stripe_size {
    1574            0 :                 child_shard_identity.stripe_size = new_stripe_size;
    1575            0 :             }
    1576            0 :             child_shard_identity.count = child_shard.shard_count;
    1577            0 :             child_shard_identity.number = child_shard.shard_number;
    1578              : 
    1579            0 :             let child_location_conf = LocationConf {
    1580            0 :                 mode: LocationMode::Attached(AttachedLocationConfig {
    1581            0 :                     generation: parent_generation,
    1582            0 :                     attach_mode: AttachmentMode::Single,
    1583            0 :                 }),
    1584            0 :                 shard: child_shard_identity,
    1585            0 :                 tenant_conf: parent_tenant_conf.clone(),
    1586            0 :             };
    1587              : 
    1588            0 :             self.upsert_location(
    1589            0 :                 *child_shard,
    1590            0 :                 child_location_conf,
    1591            0 :                 None,
    1592            0 :                 SpawnMode::Eager,
    1593            0 :                 ctx,
    1594            0 :             )
    1595            0 :             .await?;
    1596              :         }
    1597              : 
    1598            0 :         fail::fail_point!("shard-split-post-child-conf", |_| Err(anyhow::anyhow!(
    1599            0 :             "failpoint"
    1600              :         )));
    1601              : 
    1602              :         // Phase 4: wait for child chards WAL ingest to catch up to target LSN
    1603            0 :         for child_shard_id in &child_shards {
    1604            0 :             let child_shard_id = *child_shard_id;
    1605            0 :             let child_shard = {
    1606            0 :                 let locked = self.tenants.read().unwrap();
    1607            0 :                 let peek_slot =
    1608            0 :                     tenant_map_peek_slot(&locked, &child_shard_id, TenantSlotPeekMode::Read)?;
    1609            0 :                 peek_slot.and_then(|s| s.get_attached()).cloned()
    1610              :             };
    1611            0 :             if let Some(t) = child_shard {
    1612              :                 // Wait for the child shard to become active: this should be very quick because it only
    1613              :                 // has to download the index_part that we just uploaded when creating it.
    1614            0 :                 if let Err(e) = t.wait_to_become_active(ACTIVE_TENANT_TIMEOUT).await {
    1615              :                     // This is not fatal: we have durably created the child shard.  It just makes the
    1616              :                     // split operation less seamless for clients, as we will may detach the parent
    1617              :                     // shard before the child shards are fully ready to serve requests.
    1618            0 :                     tracing::warn!("Failed to wait for shard {child_shard_id} to activate: {e}");
    1619            0 :                     continue;
    1620            0 :                 }
    1621              : 
    1622            0 :                 let timelines = t.timelines.lock().unwrap().clone();
    1623            0 :                 for timeline in timelines.values() {
    1624            0 :                     let Some(target_lsn) = target_lsns.get(&timeline.timeline_id) else {
    1625            0 :                         continue;
    1626              :                     };
    1627              : 
    1628            0 :                     tracing::info!(
    1629            0 :                         "Waiting for child shard {}/{} to reach target lsn {}...",
    1630              :                         child_shard_id,
    1631            0 :                         timeline.timeline_id,
    1632              :                         target_lsn
    1633              :                     );
    1634              : 
    1635            0 :                     fail::fail_point!("shard-split-lsn-wait", |_| Err(anyhow::anyhow!(
    1636            0 :                         "failpoint"
    1637              :                     )));
    1638            0 :                     if let Err(e) = timeline
    1639            0 :                         .wait_lsn(
    1640            0 :                             *target_lsn,
    1641            0 :                             crate::tenant::timeline::WaitLsnWaiter::Tenant,
    1642            0 :                             crate::tenant::timeline::WaitLsnTimeout::Default,
    1643            0 :                             ctx,
    1644            0 :                         )
    1645            0 :                         .await
    1646              :                     {
    1647              :                         // Failure here might mean shutdown, in any case this part is an optimization
    1648              :                         // and we shouldn't hold up the split operation.
    1649            0 :                         tracing::warn!(
    1650            0 :                             "Failed to wait for timeline {} to reach lsn {target_lsn}: {e}",
    1651            0 :                             timeline.timeline_id
    1652              :                         );
    1653              :                     } else {
    1654            0 :                         tracing::info!(
    1655            0 :                             "Child shard {}/{} reached target lsn {}",
    1656              :                             child_shard_id,
    1657            0 :                             timeline.timeline_id,
    1658              :                             target_lsn
    1659              :                         );
    1660              :                     }
    1661              :                 }
    1662            0 :             }
    1663              :         }
    1664              : 
    1665              :         // Phase 5: Shut down the parent shard. We leave it on disk in case the split fails and we
    1666              :         // have to roll back to the parent shard, avoiding a cold start. It will be cleaned up once
    1667              :         // the storage controller commits the split, or if all else fails, on the next restart.
    1668              :         //
    1669              :         // TODO: We don't flush the ephemeral layer here, because the split is likely to succeed and
    1670              :         // catching up the parent should be reasonably quick. Consider using FreezeAndFlush instead.
    1671            0 :         let (_guard, progress) = completion::channel();
    1672            0 :         match parent.shutdown(progress, ShutdownMode::Hard).await {
    1673            0 :             Ok(()) => {}
    1674            0 :             Err(other) => {
    1675            0 :                 other.wait().await;
    1676              :             }
    1677              :         }
    1678              : 
    1679            0 :         fail::fail_point!("shard-split-pre-finish", |_| Err(anyhow::anyhow!(
    1680            0 :             "failpoint"
    1681              :         )));
    1682              : 
    1683            0 :         parent_slot_guard.drop_old_value()?;
    1684              : 
    1685              :         // Phase 6: Release the InProgress on the parent shard
    1686            0 :         drop(parent_slot_guard);
    1687              : 
    1688            0 :         utils::pausable_failpoint!("shard-split-post-finish-pause");
    1689              : 
    1690            0 :         Ok(child_shards)
    1691            0 :     }
    1692              : 
    1693              :     /// Part of [`Self::shard_split`]: hard link parent shard layers into child shards, as an optimization
    1694              :     /// to avoid the children downloading them again.
    1695              :     ///
    1696              :     /// For each resident layer in the parent shard, we will hard link it into all of the child shards.
    1697            0 :     async fn shard_split_hardlink(
    1698            0 :         &self,
    1699            0 :         parent_shard: &TenantShard,
    1700            0 :         child_shards: Vec<TenantShardId>,
    1701            0 :     ) -> anyhow::Result<()> {
    1702            0 :         debug_assert_current_span_has_tenant_id();
    1703              : 
    1704            0 :         let parent_path = self.conf.tenant_path(parent_shard.get_tenant_shard_id());
    1705            0 :         let (parent_timelines, parent_layers) = {
    1706            0 :             let mut parent_layers = Vec::new();
    1707            0 :             let timelines = parent_shard.timelines.lock().unwrap().clone();
    1708            0 :             let parent_timelines = timelines.keys().cloned().collect::<Vec<_>>();
    1709            0 :             for timeline in timelines.values() {
    1710            0 :                 tracing::info!(timeline_id=%timeline.timeline_id, "Loading list of layers to hardlink");
    1711            0 :                 let layers = timeline
    1712            0 :                     .layers
    1713            0 :                     .read(LayerManagerLockHolder::GetLayerMapInfo)
    1714            0 :                     .await;
    1715              : 
    1716            0 :                 for layer in layers.likely_resident_layers() {
    1717            0 :                     let relative_path = layer
    1718            0 :                         .local_path()
    1719            0 :                         .strip_prefix(&parent_path)
    1720            0 :                         .context("Removing prefix from parent layer path")?;
    1721            0 :                     parent_layers.push(relative_path.to_owned());
    1722              :                 }
    1723              :             }
    1724              : 
    1725            0 :             if parent_layers.is_empty() {
    1726            0 :                 tracing::info!("Ancestor shard has no resident layer to hard link");
    1727            0 :             }
    1728              : 
    1729            0 :             (parent_timelines, parent_layers)
    1730              :         };
    1731              : 
    1732            0 :         let mut child_prefixes = Vec::new();
    1733            0 :         let mut create_dirs = Vec::new();
    1734              : 
    1735            0 :         for child in child_shards {
    1736            0 :             let child_prefix = self.conf.tenant_path(&child);
    1737            0 :             create_dirs.push(child_prefix.clone());
    1738            0 :             create_dirs.extend(
    1739            0 :                 parent_timelines
    1740            0 :                     .iter()
    1741            0 :                     .map(|t| self.conf.timeline_path(&child, t)),
    1742              :             );
    1743              : 
    1744            0 :             child_prefixes.push(child_prefix);
    1745              :         }
    1746              : 
    1747              :         // Since we will do a large number of small filesystem metadata operations, batch them into
    1748              :         // spawn_blocking calls rather than doing each one as a tokio::fs round-trip.
    1749            0 :         let span = tracing::Span::current();
    1750            0 :         let jh = tokio::task::spawn_blocking(move || -> anyhow::Result<usize> {
    1751              :             // Run this synchronous code in the same log context as the outer function that spawned it.
    1752            0 :             let _span = span.enter();
    1753              : 
    1754            0 :             tracing::info!("Creating {} directories", create_dirs.len());
    1755            0 :             for dir in &create_dirs {
    1756            0 :                 if let Err(e) = std::fs::create_dir_all(dir) {
    1757              :                     // Ignore AlreadyExists errors, drop out on all other errors
    1758            0 :                     match e.kind() {
    1759            0 :                         std::io::ErrorKind::AlreadyExists => {}
    1760              :                         _ => {
    1761            0 :                             return Err(anyhow::anyhow!(e).context(format!("Creating {dir}")));
    1762              :                         }
    1763              :                     }
    1764            0 :                 }
    1765              :             }
    1766              : 
    1767            0 :             for child_prefix in child_prefixes {
    1768            0 :                 tracing::info!(
    1769            0 :                     "Hard-linking {} parent layers into child path {}",
    1770            0 :                     parent_layers.len(),
    1771              :                     child_prefix
    1772              :                 );
    1773            0 :                 for relative_layer in &parent_layers {
    1774            0 :                     let parent_path = parent_path.join(relative_layer);
    1775            0 :                     let child_path = child_prefix.join(relative_layer);
    1776            0 :                     if let Err(e) = std::fs::hard_link(&parent_path, &child_path) {
    1777            0 :                         match e.kind() {
    1778            0 :                             std::io::ErrorKind::AlreadyExists => {}
    1779              :                             std::io::ErrorKind::NotFound => {
    1780            0 :                                 tracing::info!(
    1781            0 :                                     "Layer {} not found during hard-linking, evicted during split?",
    1782              :                                     relative_layer
    1783              :                                 );
    1784              :                             }
    1785              :                             _ => {
    1786            0 :                                 return Err(anyhow::anyhow!(e).context(format!(
    1787            0 :                                     "Hard linking {relative_layer} into {child_prefix}"
    1788            0 :                                 )));
    1789              :                             }
    1790              :                         }
    1791            0 :                     }
    1792              :                 }
    1793              :             }
    1794              : 
    1795              :             // Durability is not required for correctness, but if we crashed during split and
    1796              :             // then came restarted with empty timeline dirs, it would be very inefficient to
    1797              :             // re-populate from remote storage.
    1798            0 :             tracing::info!("fsyncing {} directories", create_dirs.len());
    1799            0 :             for dir in create_dirs {
    1800            0 :                 if let Err(e) = crashsafe::fsync(&dir) {
    1801              :                     // Something removed a newly created timeline dir out from underneath us?  Extremely
    1802              :                     // unexpected, but not worth panic'ing over as this whole function is just an
    1803              :                     // optimization.
    1804            0 :                     tracing::warn!("Failed to fsync directory {dir}: {e}")
    1805            0 :                 }
    1806              :             }
    1807              : 
    1808            0 :             Ok(parent_layers.len())
    1809            0 :         });
    1810              : 
    1811            0 :         match jh.await {
    1812            0 :             Ok(Ok(layer_count)) => {
    1813            0 :                 tracing::info!(count = layer_count, "Hard linked layers into child shards");
    1814              :             }
    1815            0 :             Ok(Err(e)) => {
    1816              :                 // This is an optimization, so we tolerate failure.
    1817            0 :                 tracing::warn!("Error hard-linking layers, proceeding anyway: {e}")
    1818              :             }
    1819            0 :             Err(e) => {
    1820              :                 // This is something totally unexpected like a panic, so bail out.
    1821            0 :                 anyhow::bail!("Error joining hard linking task: {e}");
    1822              :             }
    1823              :         }
    1824              : 
    1825            0 :         Ok(())
    1826            0 :     }
    1827              : 
    1828              :     ///
    1829              :     /// Shut down all tenants. This runs as part of pageserver shutdown.
    1830              :     ///
    1831              :     /// NB: We leave the tenants in the map, so that they remain accessible through
    1832              :     /// the management API until we shut it down. If we removed the shut-down tenants
    1833              :     /// from the tenants map, the management API would return 404 for these tenants,
    1834              :     /// because TenantsMap::get() now returns `None`.
    1835              :     /// That could be easily misinterpreted by control plane, the consumer of the
    1836              :     /// management API. For example, it could attach the tenant on a different pageserver.
    1837              :     /// We would then be in split-brain once this pageserver restarts.
    1838              :     #[instrument(skip_all)]
    1839              :     pub(crate) async fn shutdown(&self) {
    1840              :         self.cancel.cancel();
    1841              : 
    1842              :         self.shutdown_all_tenants0().await
    1843              :     }
    1844              : 
    1845            1 :     async fn shutdown_all_tenants0(&self) {
    1846            1 :         let mut join_set = JoinSet::new();
    1847              : 
    1848              :         #[cfg(all(debug_assertions, not(test)))]
    1849              :         {
    1850              :             // Check that our metrics properly tracked the size of the tenants map.  This is a convenient location to check,
    1851              :             // as it happens implicitly at the end of tests etc.
    1852            0 :             let m = self.tenants.read().unwrap();
    1853            0 :             debug_assert_eq!(METRICS.slots_total(), m.len() as u64);
    1854              :         }
    1855              : 
    1856              :         // Atomically, 1. create the shutdown tasks and 2. prevent creation of new tenants.
    1857            1 :         let (total_in_progress, total_attached) = {
    1858            1 :             let mut m = self.tenants.write().unwrap();
    1859            1 :             match &mut *m {
    1860              :                 TenantsMap::Initializing => {
    1861            0 :                     *m = TenantsMap::ShuttingDown(BTreeMap::default());
    1862            0 :                     info!("tenants map is empty");
    1863            0 :                     return;
    1864              :                 }
    1865            1 :                 TenantsMap::Open(tenants) => {
    1866            1 :                     let mut shutdown_state = BTreeMap::new();
    1867            1 :                     let mut total_in_progress = 0;
    1868            1 :                     let mut total_attached = 0;
    1869              : 
    1870            1 :                     for (tenant_shard_id, v) in std::mem::take(tenants).into_iter() {
    1871            1 :                         match v {
    1872            0 :                             TenantSlot::Attached(t) => {
    1873            0 :                                 shutdown_state
    1874            0 :                                     .insert(tenant_shard_id, TenantSlot::Attached(t.clone()));
    1875            0 :                                 join_set.spawn(
    1876            0 :                                     async move {
    1877            0 :                                         let res = {
    1878            0 :                                             let (_guard, shutdown_progress) = completion::channel();
    1879            0 :                                             t.shutdown(shutdown_progress, ShutdownMode::FreezeAndFlush).await
    1880              :                                         };
    1881              : 
    1882            0 :                                         if let Err(other_progress) = res {
    1883              :                                             // join the another shutdown in progress
    1884            0 :                                             other_progress.wait().await;
    1885            0 :                                         }
    1886              : 
    1887              :                                         // we cannot afford per tenant logging here, because if s3 is degraded, we are
    1888              :                                         // going to log too many lines
    1889            0 :                                         debug!("tenant successfully stopped");
    1890            0 :                                     }
    1891            0 :                                     .instrument(info_span!("shutdown", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug())),
    1892              :                                 );
    1893              : 
    1894            0 :                                 total_attached += 1;
    1895              :                             }
    1896            0 :                             TenantSlot::Secondary(state) => {
    1897            0 :                                 // We don't need to wait for this individually per-tenant: the
    1898            0 :                                 // downloader task will be waited on eventually, this cancel
    1899            0 :                                 // is just to encourage it to drop out if it is doing work
    1900            0 :                                 // for this tenant right now.
    1901            0 :                                 state.cancel.cancel();
    1902            0 : 
    1903            0 :                                 shutdown_state
    1904            0 :                                     .insert(tenant_shard_id, TenantSlot::Secondary(state));
    1905            0 :                             }
    1906            1 :                             TenantSlot::InProgress(notify) => {
    1907              :                                 // InProgress tenants are not visible in TenantsMap::ShuttingDown: we will
    1908              :                                 // wait for their notifications to fire in this function.
    1909            1 :                                 join_set.spawn(async move {
    1910            1 :                                     notify.wait().await;
    1911            1 :                                 });
    1912              : 
    1913            1 :                                 total_in_progress += 1;
    1914              :                             }
    1915              :                         }
    1916              :                     }
    1917            1 :                     *m = TenantsMap::ShuttingDown(shutdown_state);
    1918            1 :                     (total_in_progress, total_attached)
    1919              :                 }
    1920              :                 TenantsMap::ShuttingDown(_) => {
    1921            0 :                     error!(
    1922            0 :                         "already shutting down, this function isn't supposed to be called more than once"
    1923              :                     );
    1924            0 :                     return;
    1925              :                 }
    1926              :             }
    1927              :         };
    1928              : 
    1929            1 :         let started_at = std::time::Instant::now();
    1930              : 
    1931            1 :         info!(
    1932            0 :             "Waiting for {} InProgress tenants and {} Attached tenants to shut down",
    1933              :             total_in_progress, total_attached
    1934              :         );
    1935              : 
    1936            1 :         let total = join_set.len();
    1937            1 :         let mut panicked = 0;
    1938            1 :         let mut buffering = true;
    1939              :         const BUFFER_FOR: std::time::Duration = std::time::Duration::from_millis(500);
    1940            1 :         let mut buffered = std::pin::pin!(tokio::time::sleep(BUFFER_FOR));
    1941              : 
    1942            3 :         while !join_set.is_empty() {
    1943            2 :             tokio::select! {
    1944            2 :                 Some(joined) = join_set.join_next() => {
    1945            0 :                     match joined {
    1946            1 :                         Ok(()) => {},
    1947            0 :                         Err(join_error) if join_error.is_cancelled() => {
    1948            0 :                             unreachable!("we are not cancelling any of the tasks");
    1949              :                         }
    1950            0 :                         Err(join_error) if join_error.is_panic() => {
    1951            0 :                             // cannot really do anything, as this panic is likely a bug
    1952            0 :                             panicked += 1;
    1953            0 :                         }
    1954            0 :                         Err(join_error) => {
    1955            0 :                             warn!("unknown kind of JoinError: {join_error}");
    1956              :                         }
    1957              :                     }
    1958            1 :                     if !buffering {
    1959            1 :                         // buffer so that every 500ms since the first update (or starting) we'll log
    1960            1 :                         // how far away we are; this is because we will get SIGKILL'd at 10s, and we
    1961            1 :                         // are not able to log *then*.
    1962            1 :                         buffering = true;
    1963            1 :                         buffered.as_mut().reset(tokio::time::Instant::now() + BUFFER_FOR);
    1964            1 :                     }
    1965              :                 },
    1966            2 :                 _ = &mut buffered, if buffering => {
    1967            1 :                     buffering = false;
    1968            1 :                     info!(remaining = join_set.len(), total, elapsed_ms = started_at.elapsed().as_millis(), "waiting for tenants to shutdown");
    1969              :                 }
    1970              :             }
    1971              :         }
    1972              : 
    1973            1 :         if panicked > 0 {
    1974            0 :             warn!(
    1975              :                 panicked,
    1976            0 :                 total, "observed panicks while shutting down tenants"
    1977              :             );
    1978            1 :         }
    1979              : 
    1980              :         // caller will log how long we took
    1981            1 :     }
    1982              : 
    1983              :     /// Detaches a tenant, and removes its local files asynchronously.
    1984              :     ///
    1985              :     /// File removal is idempotent: even if the tenant has already been removed, this will still
    1986              :     /// remove any local files. This is used during shard splits, where we leave the parent shard's
    1987              :     /// files around in case we have to roll back the split.
    1988            0 :     pub(crate) async fn detach_tenant(
    1989            0 :         &self,
    1990            0 :         conf: &'static PageServerConf,
    1991            0 :         tenant_shard_id: TenantShardId,
    1992            0 :         deletion_queue_client: &DeletionQueueClient,
    1993            0 :     ) -> Result<(), TenantStateError> {
    1994            0 :         if let Some(tmp_path) = self
    1995            0 :             .detach_tenant0(conf, tenant_shard_id, deletion_queue_client)
    1996            0 :             .await?
    1997            0 :         {
    1998            0 :             self.background_purges.spawn(tmp_path);
    1999            0 :         }
    2000              : 
    2001            0 :         Ok(())
    2002            0 :     }
    2003              : 
    2004              :     /// Detaches a tenant. This renames the tenant directory to a temporary path and returns it,
    2005              :     /// allowing the caller to delete it asynchronously. Returns None if the dir is already removed.
    2006            0 :     async fn detach_tenant0(
    2007            0 :         &self,
    2008            0 :         conf: &'static PageServerConf,
    2009            0 :         tenant_shard_id: TenantShardId,
    2010            0 :         deletion_queue_client: &DeletionQueueClient,
    2011            0 :     ) -> Result<Option<Utf8PathBuf>, TenantStateError> {
    2012            0 :         let tenant_dir_rename_operation = |tenant_id_to_clean: TenantShardId| async move {
    2013            0 :             let local_tenant_directory = conf.tenant_path(&tenant_id_to_clean);
    2014            0 :             if !tokio::fs::try_exists(&local_tenant_directory).await? {
    2015              :                 // If the tenant directory doesn't exist, it's already cleaned up.
    2016            0 :                 return Ok(None);
    2017            0 :             }
    2018            0 :             safe_rename_tenant_dir(&local_tenant_directory)
    2019            0 :                 .await
    2020            0 :                 .with_context(|| {
    2021            0 :                     format!("local tenant directory {local_tenant_directory:?} rename")
    2022            0 :                 })
    2023            0 :                 .map(Some)
    2024            0 :         };
    2025              : 
    2026            0 :         let mut removal_result = self
    2027            0 :             .remove_tenant_from_memory(
    2028            0 :                 tenant_shard_id,
    2029            0 :                 tenant_dir_rename_operation(tenant_shard_id),
    2030            0 :             )
    2031            0 :             .await;
    2032              : 
    2033              :         // If the tenant was not found, it was likely already removed. Attempt to remove the tenant
    2034              :         // directory on disk anyway. For example, during shard splits, we shut down and remove the
    2035              :         // parent shard, but leave its directory on disk in case we have to roll back the split.
    2036              :         //
    2037              :         // TODO: it would be better to leave the parent shard attached until the split is committed.
    2038              :         // This will be needed by the gRPC page service too, such that a compute can continue to
    2039              :         // read from the parent shard until it's notified about the new child shards. See:
    2040              :         // <https://github.com/neondatabase/neon/issues/11728>.
    2041            0 :         if let Err(TenantStateError::SlotError(TenantSlotError::NotFound(_))) = removal_result {
    2042            0 :             removal_result = tenant_dir_rename_operation(tenant_shard_id)
    2043            0 :                 .await
    2044            0 :                 .map_err(TenantStateError::Other);
    2045            0 :         }
    2046              : 
    2047              :         // Flush pending deletions, so that they have a good chance of passing validation
    2048              :         // before this tenant is potentially re-attached elsewhere.
    2049            0 :         deletion_queue_client.flush_advisory();
    2050              : 
    2051            0 :         removal_result
    2052            0 :     }
    2053              : 
    2054            0 :     pub(crate) fn list_tenants(
    2055            0 :         &self,
    2056            0 :     ) -> Result<Vec<(TenantShardId, TenantState, Generation)>, TenantMapListError> {
    2057            0 :         let tenants = self.tenants.read().unwrap();
    2058            0 :         let m = match &*tenants {
    2059            0 :             TenantsMap::Initializing => return Err(TenantMapListError::Initializing),
    2060            0 :             TenantsMap::Open(m) | TenantsMap::ShuttingDown(m) => m,
    2061              :         };
    2062            0 :         Ok(m.iter()
    2063            0 :             .filter_map(|(id, tenant)| match tenant {
    2064            0 :                 TenantSlot::Attached(tenant) => {
    2065            0 :                     Some((*id, tenant.current_state(), tenant.generation()))
    2066              :                 }
    2067            0 :                 TenantSlot::Secondary(_) => None,
    2068            0 :                 TenantSlot::InProgress(_) => None,
    2069            0 :             })
    2070            0 :             .collect())
    2071            0 :     }
    2072              : 
    2073              :     /// Completes an earlier prepared timeline detach ancestor.
    2074            0 :     pub(crate) async fn complete_detaching_timeline_ancestor(
    2075            0 :         &self,
    2076            0 :         tenant_shard_id: TenantShardId,
    2077            0 :         timeline_id: TimelineId,
    2078            0 :         prepared: PreparedTimelineDetach,
    2079            0 :         behavior: DetachBehavior,
    2080            0 :         mut attempt: detach_ancestor::Attempt,
    2081            0 :         ctx: &RequestContext,
    2082            0 :     ) -> Result<HashSet<TimelineId>, detach_ancestor::Error> {
    2083              :         use detach_ancestor::Error;
    2084              : 
    2085            0 :         let slot_guard = self
    2086            0 :             .tenant_map_acquire_slot(&tenant_shard_id, TenantSlotAcquireMode::MustExist)
    2087            0 :             .map_err(|e| {
    2088              :                 use TenantSlotError::*;
    2089              : 
    2090            0 :                 match e {
    2091            0 :                     MapState(TenantMapError::ShuttingDown) => Error::ShuttingDown,
    2092            0 :                     NotFound(_) | InProgress | MapState(_) => Error::DetachReparent(e.into()),
    2093              :                 }
    2094            0 :             })?;
    2095              : 
    2096            0 :         let tenant = {
    2097            0 :             let old_slot = slot_guard
    2098            0 :                 .get_old_value()
    2099            0 :                 .as_ref()
    2100            0 :                 .expect("requested MustExist");
    2101              : 
    2102            0 :             let Some(tenant) = old_slot.get_attached() else {
    2103            0 :                 return Err(Error::DetachReparent(anyhow::anyhow!(
    2104            0 :                     "Tenant is not in attached state"
    2105            0 :                 )));
    2106              :             };
    2107              : 
    2108            0 :             if !tenant.is_active() {
    2109            0 :                 return Err(Error::DetachReparent(anyhow::anyhow!(
    2110            0 :                     "Tenant is not active"
    2111            0 :                 )));
    2112            0 :             }
    2113              : 
    2114            0 :             tenant.clone()
    2115              :         };
    2116              : 
    2117            0 :         let timeline = tenant
    2118            0 :             .get_timeline(timeline_id, true)
    2119            0 :             .map_err(Error::NotFound)?;
    2120              : 
    2121            0 :         let resp = timeline
    2122            0 :             .detach_from_ancestor_and_reparent(
    2123            0 :                 &tenant,
    2124            0 :                 prepared,
    2125            0 :                 attempt.ancestor_timeline_id,
    2126            0 :                 attempt.ancestor_lsn,
    2127            0 :                 behavior,
    2128            0 :                 ctx,
    2129            0 :             )
    2130            0 :             .await?;
    2131              : 
    2132            0 :         let mut slot_guard = slot_guard;
    2133              : 
    2134            0 :         let tenant = if resp.reset_tenant_required() {
    2135            0 :             attempt.before_reset_tenant();
    2136              : 
    2137            0 :             let (_guard, progress) = utils::completion::channel();
    2138            0 :             match tenant.shutdown(progress, ShutdownMode::Reload).await {
    2139            0 :                 Ok(()) => {
    2140            0 :                     slot_guard.drop_old_value().expect("it was just shutdown");
    2141            0 :                 }
    2142            0 :                 Err(_barrier) => {
    2143            0 :                     slot_guard.revert();
    2144              :                     // this really should not happen, at all, unless a shutdown without acquiring
    2145              :                     // tenant slot was already going? regardless, on restart the attempt tracking
    2146              :                     // will reset to retryable.
    2147            0 :                     return Err(Error::ShuttingDown);
    2148              :                 }
    2149              :             }
    2150              : 
    2151            0 :             let tenant_path = self.conf.tenant_path(&tenant_shard_id);
    2152            0 :             let config = TenantShard::load_tenant_config(self.conf, &tenant_shard_id)
    2153            0 :                 .map_err(|e| Error::DetachReparent(e.into()))?;
    2154              : 
    2155            0 :             let shard_identity = config.shard;
    2156            0 :             let tenant = tenant_spawn(
    2157            0 :                 self.conf,
    2158            0 :                 tenant_shard_id,
    2159            0 :                 &tenant_path,
    2160            0 :                 self.resources.clone(),
    2161            0 :                 AttachedTenantConf::try_from(self.conf, config).map_err(Error::DetachReparent)?,
    2162            0 :                 shard_identity,
    2163            0 :                 None,
    2164            0 :                 SpawnMode::Eager,
    2165            0 :                 ctx,
    2166              :             )
    2167            0 :             .map_err(|_| Error::ShuttingDown)?;
    2168              : 
    2169              :             {
    2170            0 :                 let mut g = tenant.ongoing_timeline_detach.lock().unwrap();
    2171            0 :                 assert!(
    2172            0 :                     g.is_none(),
    2173            0 :                     "there cannot be any new timeline detach ancestor on newly created tenant"
    2174              :                 );
    2175            0 :                 *g = Some((attempt.timeline_id, attempt.new_barrier()));
    2176              :             }
    2177              : 
    2178              :             // if we bail out here, we will not allow a new attempt, which should be fine.
    2179              :             // pageserver should be shutting down regardless? tenant_reset would help, unless it
    2180              :             // runs into the same problem.
    2181            0 :             slot_guard
    2182            0 :                 .upsert(TenantSlot::Attached(tenant.clone()))
    2183            0 :                 .map_err(|e| match e {
    2184            0 :                     TenantSlotUpsertError::ShuttingDown(_) => Error::ShuttingDown,
    2185            0 :                     other => Error::DetachReparent(other.into()),
    2186            0 :                 })?;
    2187            0 :             tenant
    2188              :         } else {
    2189            0 :             tracing::info!("skipping tenant_reset as no changes made required it");
    2190            0 :             tenant
    2191              :         };
    2192              : 
    2193            0 :         if let Some(reparented) = resp.completed() {
    2194              :             // finally ask the restarted tenant to complete the detach
    2195              :             //
    2196              :             // rationale for 9999s: we don't really have a timetable here; if retried, the caller
    2197              :             // will get an 503.
    2198            0 :             tenant
    2199            0 :                 .wait_to_become_active(std::time::Duration::from_secs(9999))
    2200            0 :                 .await
    2201            0 :                 .map_err(|e| {
    2202              :                     use GetActiveTenantError::{Cancelled, WillNotBecomeActive};
    2203              :                     use pageserver_api::models::TenantState;
    2204            0 :                     match e {
    2205              :                         Cancelled | WillNotBecomeActive(TenantState::Stopping { .. }) => {
    2206            0 :                             Error::ShuttingDown
    2207              :                         }
    2208            0 :                         other => Error::Complete(other.into()),
    2209              :                     }
    2210            0 :                 })?;
    2211              : 
    2212            0 :             utils::pausable_failpoint!(
    2213              :                 "timeline-detach-ancestor::after_activating_before_finding-pausable"
    2214              :             );
    2215              : 
    2216            0 :             let timeline = tenant
    2217            0 :                 .get_timeline(attempt.timeline_id, true)
    2218            0 :                 .map_err(Error::NotFound)?;
    2219              : 
    2220            0 :             timeline
    2221            0 :                 .complete_detaching_timeline_ancestor(&tenant, attempt, ctx)
    2222            0 :                 .await
    2223            0 :                 .map(|()| reparented)
    2224              :         } else {
    2225              :             // at least the latest versions have now been downloaded and refreshed; be ready to
    2226              :             // retry another time.
    2227            0 :             Err(Error::FailedToReparentAll)
    2228              :         }
    2229            0 :     }
    2230              : 
    2231              :     /// A page service client sends a TenantId, and to look up the correct Tenant we must
    2232              :     /// resolve this to a fully qualified TenantShardId.
    2233              :     ///
    2234              :     /// During shard splits: we shall see parent shards in InProgress state and skip them, and
    2235              :     /// instead match on child shards which should appear in Attached state.  Very early in a shard
    2236              :     /// split, or in other cases where a shard is InProgress, we will return our own InProgress result
    2237              :     /// to instruct the caller to wait for that to finish before querying again.
    2238            0 :     pub(crate) fn resolve_attached_shard(
    2239            0 :         &self,
    2240            0 :         tenant_id: &TenantId,
    2241            0 :         selector: ShardSelector,
    2242            0 :     ) -> ShardResolveResult {
    2243            0 :         let tenants = self.tenants.read().unwrap();
    2244            0 :         let mut want_shard: Option<ShardIndex> = None;
    2245            0 :         let mut any_in_progress = None;
    2246              : 
    2247            0 :         match &*tenants {
    2248            0 :             TenantsMap::Initializing => ShardResolveResult::NotFound,
    2249            0 :             TenantsMap::Open(m) | TenantsMap::ShuttingDown(m) => {
    2250            0 :                 for slot in m.range(TenantShardId::tenant_range(*tenant_id)) {
    2251              :                     // Ignore all slots that don't contain an attached tenant
    2252            0 :                     let tenant = match &slot.1 {
    2253            0 :                         TenantSlot::Attached(t) => t,
    2254            0 :                         TenantSlot::InProgress(barrier) => {
    2255              :                             // We might still find a usable shard, but in case we don't, remember that
    2256              :                             // we saw at least one InProgress slot, so that we can distinguish this case
    2257              :                             // from a simple NotFound in our return value.
    2258            0 :                             any_in_progress = Some(barrier.clone());
    2259            0 :                             continue;
    2260              :                         }
    2261            0 :                         _ => continue,
    2262              :                     };
    2263              : 
    2264            0 :                     match selector {
    2265            0 :                         ShardSelector::Zero if slot.0.shard_number == ShardNumber(0) => {
    2266            0 :                             return ShardResolveResult::Found(tenant.clone());
    2267              :                         }
    2268            0 :                         ShardSelector::Page(key) => {
    2269              :                             // Each time we find an attached slot with a different shard count,
    2270              :                             // recompute the expected shard number: during shard splits we might
    2271              :                             // have multiple shards with the old shard count.
    2272            0 :                             if want_shard.is_none()
    2273            0 :                                 || want_shard.unwrap().shard_count != tenant.shard_identity.count
    2274            0 :                             {
    2275            0 :                                 want_shard = Some(ShardIndex {
    2276            0 :                                     shard_number: tenant.shard_identity.get_shard_number(&key),
    2277            0 :                                     shard_count: tenant.shard_identity.count,
    2278            0 :                                 });
    2279            0 :                             }
    2280              : 
    2281            0 :                             if Some(ShardIndex {
    2282            0 :                                 shard_number: tenant.shard_identity.number,
    2283            0 :                                 shard_count: tenant.shard_identity.count,
    2284            0 :                             }) == want_shard
    2285              :                             {
    2286            0 :                                 return ShardResolveResult::Found(tenant.clone());
    2287            0 :                             }
    2288              :                         }
    2289            0 :                         ShardSelector::Known(shard)
    2290            0 :                             if tenant.shard_identity.shard_index() == shard =>
    2291              :                         {
    2292            0 :                             return ShardResolveResult::Found(tenant.clone());
    2293              :                         }
    2294            0 :                         _ => continue,
    2295              :                     }
    2296              :                 }
    2297              : 
    2298              :                 // Fall through: we didn't find a slot that was in Attached state & matched our selector.  If
    2299              :                 // we found one or more InProgress slot, indicate to caller that they should retry later.  Otherwise
    2300              :                 // this requested shard simply isn't found.
    2301            0 :                 if let Some(barrier) = any_in_progress {
    2302            0 :                     ShardResolveResult::InProgress(barrier)
    2303              :                 } else {
    2304            0 :                     ShardResolveResult::NotFound
    2305              :                 }
    2306              :             }
    2307              :         }
    2308            0 :     }
    2309              : 
    2310              :     /// Calculate the tenant shards' contributions to this pageserver's utilization metrics.  The
    2311              :     /// returned values are:
    2312              :     ///  - the number of bytes of local disk space this pageserver's shards are requesting, i.e.
    2313              :     ///    how much space they would use if not impacted by disk usage eviction.
    2314              :     ///  - the number of tenant shards currently on this pageserver, including attached
    2315              :     ///    and secondary.
    2316              :     ///
    2317              :     /// This function is quite expensive: callers are expected to cache the result and
    2318              :     /// limit how often they call it.
    2319            0 :     pub(crate) fn calculate_utilization(&self) -> Result<(u64, u32), TenantMapListError> {
    2320            0 :         let tenants = self.tenants.read().unwrap();
    2321            0 :         let m = match &*tenants {
    2322            0 :             TenantsMap::Initializing => return Err(TenantMapListError::Initializing),
    2323            0 :             TenantsMap::Open(m) | TenantsMap::ShuttingDown(m) => m,
    2324              :         };
    2325            0 :         let shard_count = m.len();
    2326            0 :         let mut wanted_bytes = 0;
    2327              : 
    2328            0 :         for tenant_slot in m.values() {
    2329            0 :             match tenant_slot {
    2330            0 :                 TenantSlot::InProgress(_barrier) => {
    2331              :                     // While a slot is being changed, we can't know how much storage it wants.  This
    2332              :                     // means this function's output can fluctuate if a lot of changes are going on
    2333              :                     // (such as transitions from secondary to attached).
    2334              :                     //
    2335              :                     // We could wait for the barrier and retry, but it's important that the utilization
    2336              :                     // API is responsive, and the data quality impact is not very significant.
    2337            0 :                     continue;
    2338              :                 }
    2339            0 :                 TenantSlot::Attached(tenant) => {
    2340            0 :                     wanted_bytes += tenant.local_storage_wanted();
    2341            0 :                 }
    2342            0 :                 TenantSlot::Secondary(secondary) => {
    2343            0 :                     let progress = secondary.progress.lock().unwrap();
    2344            0 :                     wanted_bytes += if progress.heatmap_mtime.is_some() {
    2345              :                         // If we have heatmap info, then we will 'want' the sum
    2346              :                         // of the size of layers in the heatmap: this is how much space
    2347              :                         // we would use if not doing any eviction.
    2348            0 :                         progress.bytes_total
    2349              :                     } else {
    2350              :                         // In the absence of heatmap info, assume that the secondary location simply
    2351              :                         // needs as much space as it is currently using.
    2352            0 :                         secondary.resident_size_metric.get()
    2353              :                     }
    2354              :                 }
    2355              :             }
    2356              :         }
    2357              : 
    2358            0 :         Ok((wanted_bytes, shard_count as u32))
    2359            0 :     }
    2360              : 
    2361              :     #[instrument(skip_all, fields(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), %timeline_id))]
    2362              :     pub(crate) async fn immediate_gc(
    2363              :         &self,
    2364              :         tenant_shard_id: TenantShardId,
    2365              :         timeline_id: TimelineId,
    2366              :         gc_req: TimelineGcRequest,
    2367              :         cancel: CancellationToken,
    2368              :         ctx: &RequestContext,
    2369              :     ) -> Result<GcResult, ApiError> {
    2370              :         let tenant = {
    2371              :             let guard = self.tenants.read().unwrap();
    2372              :             guard
    2373              :                 .get(&tenant_shard_id)
    2374              :                 .cloned()
    2375            0 :                 .with_context(|| format!("tenant {tenant_shard_id}"))
    2376            0 :                 .map_err(|e| ApiError::NotFound(e.into()))?
    2377              :         };
    2378              : 
    2379            0 :         let gc_horizon = gc_req.gc_horizon.unwrap_or_else(|| tenant.get_gc_horizon());
    2380              :         // Use tenant's pitr setting
    2381              :         let pitr = tenant.get_pitr_interval();
    2382              : 
    2383              :         tenant.wait_to_become_active(ACTIVE_TENANT_TIMEOUT).await?;
    2384              : 
    2385              :         // Run in task_mgr to avoid race with tenant_detach operation
    2386              :         let ctx: RequestContext =
    2387              :             ctx.detached_child(TaskKind::GarbageCollector, DownloadBehavior::Download);
    2388              : 
    2389              :         let _gate_guard = tenant.gate.enter().map_err(|_| ApiError::ShuttingDown)?;
    2390              : 
    2391              :         fail::fail_point!("immediate_gc_task_pre");
    2392              : 
    2393              :         #[allow(unused_mut)]
    2394              :         let mut result = tenant
    2395              :             .gc_iteration(Some(timeline_id), gc_horizon, pitr, &cancel, &ctx)
    2396              :             .await;
    2397              :         // FIXME: `gc_iteration` can return an error for multiple reasons; we should handle it
    2398              :         // better once the types support it.
    2399              : 
    2400              :         #[cfg(feature = "testing")]
    2401              :         {
    2402              :             // we need to synchronize with drop completion for python tests without polling for
    2403              :             // log messages
    2404              :             if let Ok(result) = result.as_mut() {
    2405              :                 let mut js = tokio::task::JoinSet::new();
    2406              :                 for layer in std::mem::take(&mut result.doomed_layers) {
    2407              :                     js.spawn(layer.wait_drop());
    2408              :                 }
    2409              :                 tracing::info!(
    2410              :                     total = js.len(),
    2411              :                     "starting to wait for the gc'd layers to be dropped"
    2412              :                 );
    2413              :                 while let Some(res) = js.join_next().await {
    2414              :                     res.expect("wait_drop should not panic");
    2415              :                 }
    2416              :             }
    2417              : 
    2418              :             let timeline = tenant.get_timeline(timeline_id, false).ok();
    2419            0 :             let rtc = timeline.as_ref().map(|x| &x.remote_client);
    2420              : 
    2421              :             if let Some(rtc) = rtc {
    2422              :                 // layer drops schedule actions on remote timeline client to actually do the
    2423              :                 // deletions; don't care about the shutdown error, just exit fast
    2424              :                 drop(rtc.wait_completion().await);
    2425              :             }
    2426              :         }
    2427              : 
    2428            0 :         result.map_err(|e| match e {
    2429            0 :             GcError::TenantCancelled | GcError::TimelineCancelled => ApiError::ShuttingDown,
    2430              :             GcError::TimelineNotFound => {
    2431            0 :                 ApiError::NotFound(anyhow::anyhow!("Timeline not found").into())
    2432              :             }
    2433            0 :             other => ApiError::InternalServerError(anyhow::anyhow!(other)),
    2434            0 :         })
    2435              :     }
    2436              : 
    2437              :     /// Stops and removes the tenant from memory, if it's not [`TenantState::Stopping`] already, bails otherwise.
    2438              :     /// Allows to remove other tenant resources manually, via `tenant_cleanup`.
    2439              :     /// If the cleanup fails, tenant will stay in memory in [`TenantState::Broken`] state, and another removal
    2440            1 :     async fn remove_tenant_from_memory<V, F>(
    2441            1 :         &self,
    2442            1 :         tenant_shard_id: TenantShardId,
    2443            1 :         tenant_cleanup: F,
    2444            1 :     ) -> Result<V, TenantStateError>
    2445            1 :     where
    2446            1 :         F: std::future::Future<Output = anyhow::Result<V>>,
    2447            1 :     {
    2448            1 :         let mut slot_guard =
    2449            1 :             self.tenant_map_acquire_slot(&tenant_shard_id, TenantSlotAcquireMode::MustExist)?;
    2450              : 
    2451              :         // allow pageserver shutdown to await for our completion
    2452            1 :         let (_guard, progress) = completion::channel();
    2453              : 
    2454              :         // The SlotGuard allows us to manipulate the Tenant object without fear of some
    2455              :         // concurrent API request doing something else for the same tenant ID.
    2456            1 :         let attached_tenant = match slot_guard.get_old_value() {
    2457            1 :             Some(TenantSlot::Attached(tenant)) => {
    2458              :                 // whenever we remove a tenant from memory, we don't want to flush and wait for upload
    2459            1 :                 let shutdown_mode = ShutdownMode::Hard;
    2460              : 
    2461              :                 // shutdown is sure to transition tenant to stopping, and wait for all tasks to complete, so
    2462              :                 // that we can continue safely to cleanup.
    2463            1 :                 match tenant.shutdown(progress, shutdown_mode).await {
    2464            1 :                     Ok(()) => {}
    2465            0 :                     Err(_other) => {
    2466              :                         // if pageserver shutdown or other detach/ignore is already ongoing, we don't want to
    2467              :                         // wait for it but return an error right away because these are distinct requests.
    2468            0 :                         slot_guard.revert();
    2469            0 :                         return Err(TenantStateError::IsStopping(tenant_shard_id));
    2470              :                     }
    2471              :                 }
    2472            1 :                 Some(tenant)
    2473              :             }
    2474            0 :             Some(TenantSlot::Secondary(secondary_state)) => {
    2475            0 :                 tracing::info!("Shutting down in secondary mode");
    2476            0 :                 secondary_state.shutdown().await;
    2477            0 :                 None
    2478              :             }
    2479              :             Some(TenantSlot::InProgress(_)) => {
    2480              :                 // Acquiring a slot guarantees its old value was not InProgress
    2481            0 :                 unreachable!();
    2482              :             }
    2483            0 :             None => None,
    2484              :         };
    2485              : 
    2486            1 :         match tenant_cleanup
    2487            1 :             .await
    2488            1 :             .with_context(|| format!("Failed to run cleanup for tenant {tenant_shard_id}"))
    2489              :         {
    2490            1 :             Ok(hook_value) => {
    2491              :                 // Success: drop the old TenantSlot::Attached.
    2492            1 :                 slot_guard
    2493            1 :                     .drop_old_value()
    2494            1 :                     .expect("We just called shutdown");
    2495              : 
    2496            1 :                 Ok(hook_value)
    2497              :             }
    2498            0 :             Err(e) => {
    2499              :                 // If we had a Tenant, set it to Broken and put it back in the TenantsMap
    2500            0 :                 if let Some(attached_tenant) = attached_tenant {
    2501            0 :                     attached_tenant.set_broken(e.to_string()).await;
    2502            0 :                 }
    2503              :                 // Leave the broken tenant in the map
    2504            0 :                 slot_guard.revert();
    2505              : 
    2506            0 :                 Err(TenantStateError::Other(e))
    2507              :             }
    2508              :         }
    2509            1 :     }
    2510              : }
    2511              : 
    2512              : #[derive(Debug, thiserror::Error)]
    2513              : pub(crate) enum GetTenantError {
    2514              :     /// NotFound is a TenantId rather than TenantShardId, because this error type is used from
    2515              :     /// getters that use a TenantId and a ShardSelector, not just getters that target a specific shard.
    2516              :     #[error("Tenant {0} not found")]
    2517              :     NotFound(TenantId),
    2518              : 
    2519              :     #[error("Tenant {0} not found")]
    2520              :     ShardNotFound(TenantShardId),
    2521              : 
    2522              :     #[error("Tenant {0} is not active")]
    2523              :     NotActive(TenantShardId),
    2524              : 
    2525              :     // Initializing or shutting down: cannot authoritatively say whether we have this tenant
    2526              :     #[error("Tenant map is not available: {0}")]
    2527              :     MapState(#[from] TenantMapError),
    2528              : }
    2529              : 
    2530              : #[derive(thiserror::Error, Debug)]
    2531              : pub(crate) enum GetActiveTenantError {
    2532              :     /// We may time out either while TenantSlot is InProgress, or while the Tenant
    2533              :     /// is in a non-Active state
    2534              :     #[error(
    2535              :         "Timed out waiting {wait_time:?} for tenant active state. Latest state: {latest_state:?}"
    2536              :     )]
    2537              :     WaitForActiveTimeout {
    2538              :         latest_state: Option<TenantState>,
    2539              :         wait_time: Duration,
    2540              :     },
    2541              : 
    2542              :     /// The TenantSlot is absent, or in secondary mode
    2543              :     #[error(transparent)]
    2544              :     NotFound(#[from] GetTenantError),
    2545              : 
    2546              :     /// Cancellation token fired while we were waiting
    2547              :     #[error("cancelled")]
    2548              :     Cancelled,
    2549              : 
    2550              :     /// Tenant exists, but is in a state that cannot become active (e.g. Stopping, Broken)
    2551              :     #[error("will not become active.  Current state: {0}")]
    2552              :     WillNotBecomeActive(TenantState),
    2553              : 
    2554              :     /// Broken is logically a subset of WillNotBecomeActive, but a distinct error is useful as
    2555              :     /// WillNotBecomeActive is a permitted error under some circumstances, whereas broken should
    2556              :     /// never happen.
    2557              :     #[error("Tenant is broken: {0}")]
    2558              :     Broken(String),
    2559              : 
    2560              :     #[error("reconnect to switch tenant id")]
    2561              :     SwitchedTenant,
    2562              : }
    2563              : 
    2564              : #[derive(Debug, thiserror::Error)]
    2565              : pub(crate) enum DeleteTimelineError {
    2566              :     #[error("Tenant {0}")]
    2567              :     Tenant(#[from] GetTenantError),
    2568              : 
    2569              :     #[error("Timeline {0}")]
    2570              :     Timeline(#[from] crate::tenant::DeleteTimelineError),
    2571              : }
    2572              : 
    2573              : #[derive(Debug, thiserror::Error)]
    2574              : pub(crate) enum TenantStateError {
    2575              :     #[error("Tenant {0} is stopping")]
    2576              :     IsStopping(TenantShardId),
    2577              :     #[error(transparent)]
    2578              :     SlotError(#[from] TenantSlotError),
    2579              :     #[error(transparent)]
    2580              :     SlotUpsertError(#[from] TenantSlotUpsertError),
    2581              :     #[error(transparent)]
    2582              :     Other(#[from] anyhow::Error),
    2583              : }
    2584              : 
    2585              : #[derive(Debug, thiserror::Error)]
    2586              : pub(crate) enum TenantMapListError {
    2587              :     #[error("tenant map is still initiailizing")]
    2588              :     Initializing,
    2589              : }
    2590              : 
    2591              : #[derive(Debug, thiserror::Error)]
    2592              : pub(crate) enum TenantMapInsertError {
    2593              :     #[error(transparent)]
    2594              :     SlotError(#[from] TenantSlotError),
    2595              :     #[error(transparent)]
    2596              :     SlotUpsertError(#[from] TenantSlotUpsertError),
    2597              :     #[error(transparent)]
    2598              :     Other(#[from] anyhow::Error),
    2599              : }
    2600              : 
    2601              : /// Superset of TenantMapError: issues that can occur when acquiring a slot
    2602              : /// for a particular tenant ID.
    2603              : #[derive(Debug, thiserror::Error)]
    2604              : pub(crate) enum TenantSlotError {
    2605              :     /// When acquiring a slot with the expectation that the tenant already exists.
    2606              :     #[error("Tenant {0} not found")]
    2607              :     NotFound(TenantShardId),
    2608              : 
    2609              :     // Tried to read a slot that is currently being mutated by another administrative
    2610              :     // operation.
    2611              :     #[error("tenant has a state change in progress, try again later")]
    2612              :     InProgress,
    2613              : 
    2614              :     #[error(transparent)]
    2615              :     MapState(#[from] TenantMapError),
    2616              : }
    2617              : 
    2618              : /// Superset of TenantMapError: issues that can occur when using a SlotGuard
    2619              : /// to insert a new value.
    2620              : #[derive(thiserror::Error)]
    2621              : pub(crate) enum TenantSlotUpsertError {
    2622              :     /// An error where the slot is in an unexpected state, indicating a code bug
    2623              :     #[error("Internal error updating Tenant")]
    2624              :     InternalError(Cow<'static, str>),
    2625              : 
    2626              :     #[error(transparent)]
    2627              :     MapState(TenantMapError),
    2628              : 
    2629              :     // If we encounter TenantManager shutdown during upsert, we must carry the Completion
    2630              :     // from the SlotGuard, so that the caller can hold it while they clean up: otherwise
    2631              :     // TenantManager shutdown might race ahead before we're done cleaning up any Tenant that
    2632              :     // was protected by the SlotGuard.
    2633              :     #[error("Shutting down")]
    2634              :     ShuttingDown((TenantSlot, utils::completion::Completion)),
    2635              : }
    2636              : 
    2637              : impl std::fmt::Debug for TenantSlotUpsertError {
    2638            0 :     fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
    2639            0 :         match self {
    2640            0 :             Self::InternalError(reason) => write!(f, "Internal Error {reason}"),
    2641            0 :             Self::MapState(map_error) => write!(f, "Tenant map state: {map_error:?}"),
    2642            0 :             Self::ShuttingDown(_completion) => write!(f, "Tenant map shutting down"),
    2643              :         }
    2644            0 :     }
    2645              : }
    2646              : 
    2647              : #[derive(Debug, thiserror::Error)]
    2648              : enum TenantSlotDropError {
    2649              :     /// It is only legal to drop a TenantSlot if its contents are fully shut down
    2650              :     #[error("Tenant was not shut down")]
    2651              :     NotShutdown,
    2652              : }
    2653              : 
    2654              : /// Errors that can happen any time we are walking the tenant map to try and acquire
    2655              : /// the TenantSlot for a particular tenant.
    2656              : #[derive(Debug, thiserror::Error)]
    2657              : pub(crate) enum TenantMapError {
    2658              :     // Tried to read while initializing
    2659              :     #[error("tenant map is still initializing")]
    2660              :     StillInitializing,
    2661              : 
    2662              :     // Tried to read while shutting down
    2663              :     #[error("tenant map is shutting down")]
    2664              :     ShuttingDown,
    2665              : }
    2666              : 
    2667              : /// Guards a particular tenant_id's content in the TenantsMap.
    2668              : ///
    2669              : /// While this structure exists, the TenantsMap will contain a [`TenantSlot::InProgress`]
    2670              : /// for this tenant, which acts as a marker for any operations targeting
    2671              : /// this tenant to retry later, or wait for the InProgress state to end.
    2672              : ///
    2673              : /// This structure enforces the important invariant that we do not have overlapping
    2674              : /// tasks that will try to use local storage for a the same tenant ID: we enforce that
    2675              : /// the previous contents of a slot have been shut down before the slot can be
    2676              : /// left empty or used for something else
    2677              : ///
    2678              : /// Holders of a SlotGuard should explicitly dispose of it, using either `upsert`
    2679              : /// to provide a new value, or `revert` to put the slot back into its initial
    2680              : /// state.  If the SlotGuard is dropped without calling either of these, then
    2681              : /// we will leave the slot empty if our `old_value` is already shut down, else
    2682              : /// we will replace the slot with `old_value` (equivalent to doing a revert).
    2683              : ///
    2684              : /// The `old_value` may be dropped before the SlotGuard is dropped, by calling
    2685              : /// `drop_old_value`.  It is an error to call this without shutting down
    2686              : /// the conents of `old_value`.
    2687              : pub(crate) struct SlotGuard<'a> {
    2688              :     tenant_shard_id: TenantShardId,
    2689              :     old_value: Option<TenantSlot>,
    2690              :     upserted: bool,
    2691              : 
    2692              :     /// [`TenantSlot::InProgress`] carries the corresponding Barrier: it will
    2693              :     /// release any waiters as soon as this SlotGuard is dropped.
    2694              :     completion: utils::completion::Completion,
    2695              : 
    2696              :     tenants: &'a std::sync::RwLock<TenantsMap>,
    2697              : }
    2698              : 
    2699              : impl<'a> SlotGuard<'a> {
    2700            1 :     fn new(
    2701            1 :         tenant_shard_id: TenantShardId,
    2702            1 :         old_value: Option<TenantSlot>,
    2703            1 :         completion: utils::completion::Completion,
    2704            1 :         tenants: &'a std::sync::RwLock<TenantsMap>,
    2705            1 :     ) -> Self {
    2706            1 :         Self {
    2707            1 :             tenant_shard_id,
    2708            1 :             old_value,
    2709            1 :             upserted: false,
    2710            1 :             completion,
    2711            1 :             tenants,
    2712            1 :         }
    2713            1 :     }
    2714              : 
    2715              :     /// Get any value that was present in the slot before we acquired ownership
    2716              :     /// of it: in state transitions, this will be the old state.
    2717              :     ///
    2718              :     // FIXME: get_ prefix
    2719              :     // FIXME: this should be .as_ref() -- unsure why no clippy
    2720            1 :     fn get_old_value(&self) -> &Option<TenantSlot> {
    2721            1 :         &self.old_value
    2722            1 :     }
    2723              : 
    2724              :     /// Emplace a new value in the slot.  This consumes the guard, and after
    2725              :     /// returning, the slot is no longer protected from concurrent changes.
    2726            0 :     fn upsert(mut self, new_value: TenantSlot) -> Result<(), TenantSlotUpsertError> {
    2727            0 :         if !self.old_value_is_shutdown() {
    2728              :             // This is a bug: callers should never try to drop an old value without
    2729              :             // shutting it down
    2730            0 :             return Err(TenantSlotUpsertError::InternalError(
    2731            0 :                 "Old TenantSlot value not shut down".into(),
    2732            0 :             ));
    2733            0 :         }
    2734              : 
    2735            0 :         let replaced: Option<TenantSlot> = {
    2736            0 :             let mut locked = self.tenants.write().unwrap();
    2737              : 
    2738            0 :             if let TenantSlot::InProgress(_) = new_value {
    2739              :                 // It is never expected to try and upsert InProgress via this path: it should
    2740              :                 // only be written via the tenant_map_acquire_slot path.  If we hit this it's a bug.
    2741            0 :                 return Err(TenantSlotUpsertError::InternalError(
    2742            0 :                     "Attempt to upsert an InProgress state".into(),
    2743            0 :                 ));
    2744            0 :             }
    2745              : 
    2746            0 :             let m = match &mut *locked {
    2747              :                 TenantsMap::Initializing => {
    2748            0 :                     return Err(TenantSlotUpsertError::MapState(
    2749            0 :                         TenantMapError::StillInitializing,
    2750            0 :                     ));
    2751              :                 }
    2752              :                 TenantsMap::ShuttingDown(_) => {
    2753            0 :                     return Err(TenantSlotUpsertError::ShuttingDown((
    2754            0 :                         new_value,
    2755            0 :                         self.completion.clone(),
    2756            0 :                     )));
    2757              :                 }
    2758            0 :                 TenantsMap::Open(m) => m,
    2759              :             };
    2760              : 
    2761            0 :             METRICS.slot_inserted(&new_value);
    2762              : 
    2763            0 :             let replaced = m.insert(self.tenant_shard_id, new_value);
    2764            0 :             self.upserted = true;
    2765            0 :             if let Some(replaced) = replaced.as_ref() {
    2766            0 :                 METRICS.slot_removed(replaced);
    2767            0 :             }
    2768              : 
    2769            0 :             replaced
    2770              :         };
    2771              : 
    2772              :         // Sanity check: on an upsert we should always be replacing an InProgress marker
    2773            0 :         match replaced {
    2774              :             Some(TenantSlot::InProgress(_)) => {
    2775              :                 // Expected case: we find our InProgress in the map: nothing should have
    2776              :                 // replaced it because the code that acquires slots will not grant another
    2777              :                 // one for the same TenantId.
    2778            0 :                 Ok(())
    2779              :             }
    2780              :             None => {
    2781            0 :                 METRICS.unexpected_errors.inc();
    2782            0 :                 error!(
    2783              :                     tenant_shard_id = %self.tenant_shard_id,
    2784            0 :                     "Missing InProgress marker during tenant upsert, this is a bug."
    2785              :                 );
    2786            0 :                 Err(TenantSlotUpsertError::InternalError(
    2787            0 :                     "Missing InProgress marker during tenant upsert".into(),
    2788            0 :                 ))
    2789              :             }
    2790            0 :             Some(slot) => {
    2791            0 :                 METRICS.unexpected_errors.inc();
    2792            0 :                 error!(tenant_shard_id=%self.tenant_shard_id, "Unexpected contents of TenantSlot during upsert, this is a bug.  Contents: {:?}", slot);
    2793            0 :                 Err(TenantSlotUpsertError::InternalError(
    2794            0 :                     "Unexpected contents of TenantSlot".into(),
    2795            0 :                 ))
    2796              :             }
    2797              :         }
    2798            0 :     }
    2799              : 
    2800              :     /// Replace the InProgress slot with whatever was in the guard when we started
    2801            0 :     fn revert(mut self) {
    2802            0 :         if let Some(value) = self.old_value.take() {
    2803            0 :             match self.upsert(value) {
    2804            0 :                 Err(TenantSlotUpsertError::InternalError(_)) => {
    2805            0 :                     // We already logged the error, nothing else we can do.
    2806            0 :                 }
    2807              :                 Err(
    2808              :                     TenantSlotUpsertError::MapState(_) | TenantSlotUpsertError::ShuttingDown(_),
    2809            0 :                 ) => {
    2810            0 :                     // If the map is shutting down, we need not replace anything
    2811            0 :                 }
    2812            0 :                 Ok(()) => {}
    2813              :             }
    2814            0 :         }
    2815            0 :     }
    2816              : 
    2817              :     /// We may never drop our old value until it is cleanly shut down: otherwise we might leave
    2818              :     /// rogue background tasks that would write to the local tenant directory that this guard
    2819              :     /// is responsible for protecting
    2820            1 :     fn old_value_is_shutdown(&self) -> bool {
    2821            1 :         match self.old_value.as_ref() {
    2822            1 :             Some(TenantSlot::Attached(tenant)) => tenant.gate.close_complete(),
    2823            0 :             Some(TenantSlot::Secondary(secondary_tenant)) => secondary_tenant.gate.close_complete(),
    2824              :             Some(TenantSlot::InProgress(_)) => {
    2825              :                 // A SlotGuard cannot be constructed for a slot that was already InProgress
    2826            0 :                 unreachable!()
    2827              :             }
    2828            0 :             None => true,
    2829              :         }
    2830            1 :     }
    2831              : 
    2832              :     /// The guard holder is done with the old value of the slot: they are obliged to already
    2833              :     /// shut it down before we reach this point.
    2834            1 :     fn drop_old_value(&mut self) -> Result<(), TenantSlotDropError> {
    2835            1 :         if !self.old_value_is_shutdown() {
    2836            0 :             Err(TenantSlotDropError::NotShutdown)
    2837              :         } else {
    2838            1 :             self.old_value.take();
    2839            1 :             Ok(())
    2840              :         }
    2841            1 :     }
    2842              : }
    2843              : 
    2844              : impl<'a> Drop for SlotGuard<'a> {
    2845            1 :     fn drop(&mut self) {
    2846            1 :         if self.upserted {
    2847            0 :             return;
    2848            1 :         }
    2849              :         // Our old value is already shutdown, or it never existed: it is safe
    2850              :         // for us to fully release the TenantSlot back into an empty state
    2851              : 
    2852            1 :         let mut locked = self.tenants.write().unwrap();
    2853              : 
    2854            1 :         let m = match &mut *locked {
    2855              :             TenantsMap::Initializing => {
    2856              :                 // There is no map, this should never happen.
    2857            0 :                 return;
    2858              :             }
    2859              :             TenantsMap::ShuttingDown(_) => {
    2860              :                 // When we transition to shutdown, InProgress elements are removed
    2861              :                 // from the map, so we do not need to clean up our Inprogress marker.
    2862              :                 // See [`shutdown_all_tenants0`]
    2863            1 :                 return;
    2864              :             }
    2865            0 :             TenantsMap::Open(m) => m,
    2866              :         };
    2867              : 
    2868              :         use std::collections::btree_map::Entry;
    2869            0 :         match m.entry(self.tenant_shard_id) {
    2870            0 :             Entry::Occupied(mut entry) => {
    2871            0 :                 if !matches!(entry.get(), TenantSlot::InProgress(_)) {
    2872            0 :                     METRICS.unexpected_errors.inc();
    2873            0 :                     error!(tenant_shard_id=%self.tenant_shard_id, "Unexpected contents of TenantSlot during drop, this is a bug.  Contents: {:?}", entry.get());
    2874            0 :                 }
    2875              : 
    2876            0 :                 if self.old_value_is_shutdown() {
    2877            0 :                     METRICS.slot_removed(entry.get());
    2878            0 :                     entry.remove();
    2879            0 :                 } else {
    2880            0 :                     let inserting = self.old_value.take().unwrap();
    2881            0 :                     METRICS.slot_inserted(&inserting);
    2882            0 :                     let replaced = entry.insert(inserting);
    2883            0 :                     METRICS.slot_removed(&replaced);
    2884            0 :                 }
    2885              :             }
    2886              :             Entry::Vacant(_) => {
    2887            0 :                 METRICS.unexpected_errors.inc();
    2888            0 :                 error!(
    2889              :                     tenant_shard_id = %self.tenant_shard_id,
    2890            0 :                     "Missing InProgress marker during SlotGuard drop, this is a bug."
    2891              :                 );
    2892              :             }
    2893              :         }
    2894            1 :     }
    2895              : }
    2896              : 
    2897              : enum TenantSlotPeekMode {
    2898              :     /// In Read mode, peek will be permitted to see the slots even if the pageserver is shutting down
    2899              :     Read,
    2900              :     /// In Write mode, trying to peek at a slot while the pageserver is shutting down is an error
    2901              :     Write,
    2902              : }
    2903              : 
    2904            0 : fn tenant_map_peek_slot<'a>(
    2905            0 :     tenants: &'a std::sync::RwLockReadGuard<'a, TenantsMap>,
    2906            0 :     tenant_shard_id: &TenantShardId,
    2907            0 :     mode: TenantSlotPeekMode,
    2908            0 : ) -> Result<Option<&'a TenantSlot>, TenantMapError> {
    2909            0 :     match tenants.deref() {
    2910            0 :         TenantsMap::Initializing => Err(TenantMapError::StillInitializing),
    2911            0 :         TenantsMap::ShuttingDown(m) => match mode {
    2912              :             TenantSlotPeekMode::Read => Ok(Some(
    2913              :                 // When reading in ShuttingDown state, we must translate None results
    2914              :                 // into a ShuttingDown error, because absence of a tenant shard ID in the map
    2915              :                 // isn't a reliable indicator of the tenant being gone: it might have been
    2916              :                 // InProgress when shutdown started, and cleaned up from that state such
    2917              :                 // that it's now no longer in the map.  Callers will have to wait until
    2918              :                 // we next start up to get a proper answer.  This avoids incorrect 404 API responses.
    2919            0 :                 m.get(tenant_shard_id).ok_or(TenantMapError::ShuttingDown)?,
    2920              :             )),
    2921            0 :             TenantSlotPeekMode::Write => Err(TenantMapError::ShuttingDown),
    2922              :         },
    2923            0 :         TenantsMap::Open(m) => Ok(m.get(tenant_shard_id)),
    2924              :     }
    2925            0 : }
    2926              : 
    2927              : enum TenantSlotAcquireMode {
    2928              :     /// Acquire the slot irrespective of current state, or whether it already exists
    2929              :     Any,
    2930              :     /// Return an error if trying to acquire a slot and it doesn't already exist
    2931              :     MustExist,
    2932              : }
    2933              : 
    2934              : use http_utils::error::ApiError;
    2935              : use pageserver_api::models::TimelineGcRequest;
    2936              : 
    2937              : use crate::tenant::gc_result::GcResult;
    2938              : 
    2939              : #[cfg(test)]
    2940              : mod tests {
    2941              :     use std::collections::BTreeMap;
    2942              :     use std::sync::Arc;
    2943              : 
    2944              :     use camino::Utf8PathBuf;
    2945              :     use storage_broker::BrokerClientChannel;
    2946              :     use tracing::Instrument;
    2947              : 
    2948              :     use super::super::harness::TenantHarness;
    2949              :     use super::TenantsMap;
    2950              :     use crate::{
    2951              :         basebackup_cache::BasebackupCache,
    2952              :         tenant::{
    2953              :             TenantSharedResources,
    2954              :             mgr::{BackgroundPurges, TenantManager, TenantSlot},
    2955              :         },
    2956              :     };
    2957              : 
    2958              :     #[tokio::test(start_paused = true)]
    2959            1 :     async fn shutdown_awaits_in_progress_tenant() {
    2960              :         // Test that if an InProgress tenant is in the map during shutdown, the shutdown will gracefully
    2961              :         // wait for it to complete before proceeding.
    2962              : 
    2963            1 :         let h = TenantHarness::create("shutdown_awaits_in_progress_tenant")
    2964            1 :             .await
    2965            1 :             .unwrap();
    2966            1 :         let (t, _ctx) = h.load().await;
    2967              : 
    2968              :         // harness loads it to active, which is forced and nothing is running on the tenant
    2969              : 
    2970            1 :         let id = t.tenant_shard_id();
    2971              : 
    2972              :         // tenant harness configures the logging and we cannot escape it
    2973            1 :         let span = h.span();
    2974            1 :         let _e = span.enter();
    2975              : 
    2976            1 :         let tenants = BTreeMap::from([(id, TenantSlot::Attached(t.clone()))]);
    2977              : 
    2978              :         // Invoke remove_tenant_from_memory with a cleanup hook that blocks until we manually
    2979              :         // permit it to proceed: that will stick the tenant in InProgress
    2980              : 
    2981            1 :         let (basebackup_cache, _) = BasebackupCache::new(Utf8PathBuf::new(), None);
    2982              : 
    2983            1 :         let tenant_manager = TenantManager {
    2984            1 :             tenants: std::sync::RwLock::new(TenantsMap::Open(tenants)),
    2985            1 :             conf: h.conf,
    2986              :             resources: TenantSharedResources {
    2987            1 :                 broker_client: BrokerClientChannel::connect_lazy("foobar.com")
    2988            1 :                     .await
    2989            1 :                     .unwrap(),
    2990            1 :                 remote_storage: h.remote_storage.clone(),
    2991            1 :                 deletion_queue_client: h.deletion_queue.new_client(),
    2992            1 :                 l0_flush_global_state: crate::l0_flush::L0FlushGlobalState::new(
    2993            1 :                     h.conf.l0_flush.clone(),
    2994              :                 ),
    2995            1 :                 basebackup_cache,
    2996            1 :                 feature_resolver: crate::feature_resolver::FeatureResolver::new_disabled(),
    2997              :             },
    2998            1 :             cancel: tokio_util::sync::CancellationToken::new(),
    2999            1 :             background_purges: BackgroundPurges::default(),
    3000              :         };
    3001              : 
    3002            1 :         let tenant_manager = Arc::new(tenant_manager);
    3003              : 
    3004            1 :         let (until_cleanup_completed, can_complete_cleanup) = utils::completion::channel();
    3005            1 :         let (until_cleanup_started, cleanup_started) = utils::completion::channel();
    3006            1 :         let mut remove_tenant_from_memory_task = {
    3007            1 :             let tenant_manager = tenant_manager.clone();
    3008            1 :             let jh = tokio::spawn({
    3009            1 :                 async move {
    3010            1 :                     let cleanup = async move {
    3011            1 :                         drop(until_cleanup_started);
    3012            1 :                         can_complete_cleanup.wait().await;
    3013            1 :                         anyhow::Ok(())
    3014            1 :                     };
    3015            1 :                     tenant_manager.remove_tenant_from_memory(id, cleanup).await
    3016            1 :                 }
    3017            1 :                 .instrument(h.span())
    3018              :             });
    3019              : 
    3020              :             // now the long cleanup should be in place, with the stopping state
    3021            1 :             cleanup_started.wait().await;
    3022            1 :             jh
    3023              :         };
    3024              : 
    3025            1 :         let mut shutdown_task = {
    3026            1 :             let (until_shutdown_started, shutdown_started) = utils::completion::channel();
    3027              : 
    3028            1 :             let tenant_manager = tenant_manager.clone();
    3029              : 
    3030            1 :             let shutdown_task = tokio::spawn(async move {
    3031            1 :                 drop(until_shutdown_started);
    3032            1 :                 tenant_manager.shutdown_all_tenants0().await;
    3033            1 :             });
    3034              : 
    3035            1 :             shutdown_started.wait().await;
    3036            1 :             shutdown_task
    3037              :         };
    3038              : 
    3039            1 :         let long_time = std::time::Duration::from_secs(15);
    3040            1 :         tokio::select! {
    3041            1 :             _ = &mut shutdown_task => unreachable!("shutdown should block on remove_tenant_from_memory completing"),
    3042            1 :             _ = &mut remove_tenant_from_memory_task => unreachable!("remove_tenant_from_memory_task should not complete until explicitly unblocked"),
    3043            1 :             _ = tokio::time::sleep(long_time) => {},
    3044              :         }
    3045              : 
    3046            1 :         drop(until_cleanup_completed);
    3047              : 
    3048              :         // Now that we allow it to proceed, shutdown should complete immediately
    3049            1 :         remove_tenant_from_memory_task.await.unwrap().unwrap();
    3050            1 :         shutdown_task.await.unwrap();
    3051            1 :     }
    3052              : }
        

Generated by: LCOV version 2.1-beta